24 Commits

Author SHA1 Message Date
kris
0c0d83934e feat: add oneliner registry, quota and ops review flows 2026-03-23 16:37:28 +08:00
kris
98b3efa10b feat: extend oneliner execution workflows 2026-03-23 16:01:24 +08:00
kris
3b7d4f0d5b feat: expand oneliner execution and admin audit logs 2026-03-23 15:43:34 +08:00
kris
28ac70cf8f feat: add oneliner execution and platform validation 2026-03-23 15:31:36 +08:00
kris
52f20c3c3d feat: add oneliner control plane routes 2026-03-23 15:01:15 +08:00
kris
d427b89409 feat: secure nas storage artifacts and archive links 2026-03-23 11:53:21 +08:00
kris
a8d503159f feat: add tenant storage status endpoint 2026-03-23 10:36:23 +08:00
kris
4ee3af45d7 feat: support nas-backed tenant media storage 2026-03-23 10:29:49 +08:00
kris
2570c4200b feat: isolate live recorder assets by tenant 2026-03-23 09:59:22 +08:00
kris
96ab6af9ed feat: proxy nas live recorder controls 2026-03-23 09:21:15 +08:00
kris
1719047ef5 feat: add domestic multi-platform workbench routes 2026-03-23 09:13:37 +08:00
kris
dedf69193d collector-service: add wechat video routes 2026-03-23 09:10:34 +08:00
kris
a46e4f47b3 Add bilibili collector routes 2026-03-23 09:06:13 +08:00
kris
a695eb04b9 Add Kuaishou route module 2026-03-23 09:06:04 +08:00
kris
9afac1eff7 Add Xiaohongshu route module 2026-03-23 09:05:58 +08:00
kris
70e4652996 feat: normalize domestic platform semantics 2026-03-23 08:49:32 +08:00
kris
caf51bc293 feat: infer domestic platforms from source urls 2026-03-23 08:38:51 +08:00
kris
5c39ea2728 feat: add live top video analysis workflow 2026-03-23 08:09:37 +08:00
kris
10eae9ad69 feat: add live douyin video listing route 2026-03-23 07:51:32 +08:00
kris
5a739a414d feat: add live reviews and douyin tracking routes 2026-03-23 07:39:39 +08:00
kris
7910019ef9 feat: harden live real-cut orchestration 2026-03-23 06:54:01 +08:00
kris
1256b9df75 fix: report cutvideo upload capability 2026-03-23 05:21:39 +08:00
kris
65d5588b57 fix: align live collector integration defaults 2026-03-23 05:16:41 +08:00
kris
3d0a898faa feat: recover live collector source and local model integrations 2026-03-23 04:38:52 +08:00
15 changed files with 15338 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Collector service source overlay for legacy pyc-backed app."""

View File

@@ -0,0 +1,545 @@
from __future__ import annotations
import json
from typing import Any
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def _first_non_empty(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
elif value not in ("", [], {}, ()):
return str(value)
return ""
class BilibiliContentSourceCreateRequest(BaseModel):
project_id: str = ""
source_kind: str = "creator_account"
platform: str = ""
handle: str = ""
source_url: str = ""
title: str = ""
local_path: str = ""
metadata: dict[str, Any] = Field(default_factory=dict)
class BilibiliContentSourceSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
platform: str = ""
handle: str = ""
source_url: str = ""
title: str = ""
analysis_model_profile_id: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
class BilibiliReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
platform: str = "bilibili"
content_type: str = "video"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
class BilibiliReviewUpdateRequest(BaseModel):
title: str | None = None
platform: str | None = None
content_type: str | None = None
publish_url: str | None = None
published_at: str | None = None
metrics: dict[str, Any] | None = None
verdict: str | None = None
highlights: str | None = None
next_actions: str | None = None
notes: str | None = None
assistant_id: str | None = None
def _is_youtube_url(source_url: str) -> bool:
lowered = source_url.strip().lower()
return "youtube.com" in lowered or "youtu.be" in lowered
def _resolve_bilibili_platform(legacy: Any, platform: str, source_url: str = "") -> str:
if _is_youtube_url(source_url):
raise HTTPException(status_code=400, detail="YouTube sources are not supported in the bilibili routes")
inferred = legacy.infer_platform_from_url(source_url) if source_url.strip() else ""
normalized = legacy.normalize_platform_slug(platform, allow_blank=True)
if not normalized:
normalized = inferred or "bilibili"
if normalized == "youtube":
raise HTTPException(status_code=400, detail="YouTube sources are not supported in the bilibili routes")
if inferred and inferred not in {"bilibili", "youtube"} and not platform.strip():
raise HTTPException(
status_code=400,
detail=f"Bilibili routes only accept bilibili sources, not {inferred}",
)
if normalized != "bilibili":
raise HTTPException(
status_code=400,
detail=f"Bilibili routes only accept bilibili sources, not {normalized}",
)
return "bilibili"
def _content_source_query(legacy: Any, account_id: str, project_id: str | None = None) -> tuple[str, tuple[Any, ...]]:
clauses = ["user_id = ?", "platform = 'bilibili'"]
params: list[Any] = [account_id]
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("project_id = ?")
params.append(normalized_project)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
sql = f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC"
return sql, tuple(params)
def _job_query(
source_id: str | None = None,
project_id: str | None = None,
limit: int = 50,
) -> tuple[str, tuple[Any, ...]]:
clauses = ["j.user_id = ?", "cs.platform = 'bilibili'"]
params: list[Any] = []
if source_id:
clauses.append("j.content_source_id = ?")
params.append(source_id)
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("j.project_id = ?")
params.append(normalized_project)
else:
clauses.append("(j.project_id IS NULL OR j.project_id = '')")
sql = (
"SELECT j.* "
"FROM jobs j "
"JOIN content_sources cs ON cs.id = j.content_source_id "
f"WHERE {' AND '.join(clauses)} "
"ORDER BY j.created_at DESC "
"LIMIT ?"
)
params = [*params]
return sql, tuple([*params, limit])
def _review_query(project_id: str | None = None, limit: int = 50) -> tuple[str, tuple[Any, ...]]:
clauses = ["r.user_id = ?", "r.platform = 'bilibili'"]
params: list[Any] = []
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("r.project_id = ?")
params.append(normalized_project)
else:
clauses.append("(r.project_id IS NULL OR r.project_id = '')")
sql = (
"SELECT r.* "
"FROM publish_reviews r "
f"WHERE {' AND '.join(clauses)} "
"ORDER BY COALESCE(NULLIF(r.published_at, ''), r.created_at) DESC, r.created_at DESC "
"LIMIT ?"
)
return sql, tuple([*params, limit])
def _build_sync_result(legacy: Any, row: dict[str, Any], content_source: dict[str, Any]) -> dict[str, Any]:
payload = legacy.job_payload(row)
payload["content_source"] = legacy.content_source_payload(content_source)
return payload
def register_bilibili_routes(app: Any, legacy: Any) -> None:
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def resolve_project(account: dict[str, Any], project_id: str) -> dict[str, Any]:
return legacy.resolve_target_project(account["id"], project_id or None, username=account["username"])
def resolve_kb(account: dict[str, Any], kb_id: str, project_id: str) -> dict[str, Any]:
return legacy.resolve_target_kb(account["id"], kb_id or None, project_id, username=account["username"])
def resolve_assistant(account: dict[str, Any], assistant_id: str, project_id: str) -> dict[str, Any] | None:
return legacy.resolve_target_assistant(account["id"], assistant_id or None, project_id)
def create_or_update_source(
*,
account: dict[str, Any],
request: BilibiliContentSourceCreateRequest,
sync_request: BilibiliContentSourceSyncRequest | None = None,
) -> dict[str, Any]:
source_url = _first_non_empty(request.source_url, sync_request.source_url if sync_request else "")
_resolve_bilibili_platform(legacy, request.platform or (sync_request.platform if sync_request else ""), source_url)
project = resolve_project(account, request.project_id or (sync_request.project_id if sync_request else ""))
title = _first_non_empty(request.title, sync_request.title if sync_request else "", request.handle, source_url)
metadata: dict[str, Any] = dict(request.metadata)
metadata.setdefault("platform", "bilibili")
if sync_request:
metadata.update(
{
"sync_mode": "recent_uploads",
"max_items": sync_request.max_items,
"analysis_model_profile_id": sync_request.analysis_model_profile_id,
}
)
return legacy.create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=(request.source_kind or "creator_account").strip(),
platform="bilibili",
handle=request.handle.strip(),
source_url=source_url.strip(),
title=title.strip(),
local_path=request.local_path.strip(),
metadata=metadata,
)
async def sync_source(
*,
account: dict[str, Any],
request: BilibiliContentSourceSyncRequest,
content_source: dict[str, Any] | None = None,
) -> dict[str, Any]:
source_row = content_source
if request.content_source_id.strip():
source_row = legacy.load_owned_content_source(request.content_source_id.strip(), account["id"])
source_url = _first_non_empty(
request.source_url,
(source_row or {}).get("source_url", ""),
)
_resolve_bilibili_platform(
legacy,
request.platform or (source_row or {}).get("platform", ""),
source_url,
)
project_id = request.project_id or (source_row or {}).get("project_id", "")
project = resolve_project(account, project_id)
kb = resolve_kb(account, request.knowledge_base_id, project["id"])
assistant = resolve_assistant(account, request.assistant_id, project["id"])
source_title = _first_non_empty(
request.title,
(source_row or {}).get("title", ""),
request.handle,
source_url,
)
if source_row and source_row.get("project_id") and source_row["project_id"] != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to the target project")
if not source_row:
source_row = create_or_update_source(
account=account,
request=BilibiliContentSourceCreateRequest(
project_id=project["id"],
source_kind="creator_account",
platform="bilibili",
handle=request.handle.strip(),
source_url=source_url,
title=source_title,
local_path="",
metadata={
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": request.analysis_model_profile_id,
},
),
sync_request=request,
)
job_row = legacy.create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{source_title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": "bilibili",
"source_kind": source_row.get("source_kind", "creator_account"),
"source_title": source_title,
"source_url": source_url,
"max_items": request.max_items,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
"analysis_model_profile_id": request.analysis_model_profile_id,
},
analysis_model_profile_id=request.analysis_model_profile_id,
)
legacy.update_content_source_metadata(
source_row["id"],
{
"platform": "bilibili",
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": now(),
"max_items": request.max_items,
"analysis_model_profile_id": request.analysis_model_profile_id,
},
)
return _build_sync_result(legacy, await legacy.trigger_orchestrated_job(job_row), source_row)
@app.get("/v2/bilibili/content-sources")
def list_bilibili_content_sources(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
sql, params = _content_source_query(legacy, account["id"], project_id)
return [legacy.content_source_payload(row) for row in legacy.db.fetch_all(sql, params)]
@app.post("/v2/bilibili/content-sources")
def create_bilibili_content_source(
request: BilibiliContentSourceCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = create_or_update_source(account=account, request=request)
return legacy.content_source_payload(row)
@app.get("/v2/bilibili/content-sources/{source_id}")
def get_bilibili_content_source(
source_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = legacy.load_owned_content_source(source_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
return legacy.content_source_payload(row)
@app.post("/v2/bilibili/content-sources/{source_id}/sync")
async def sync_bilibili_content_source(
source_id: str,
request: BilibiliContentSourceSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = legacy.load_owned_content_source(source_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
return await sync_source(account=account, request=request, content_source=row)
@app.post("/v2/bilibili/pipelines/content-source-sync")
async def create_bilibili_content_source_sync_job(
request: BilibiliContentSourceSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
return await sync_source(account=account, request=request)
@app.get("/v2/bilibili/content-sources/{source_id}/jobs")
def list_bilibili_content_source_jobs(
source_id: str,
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
row = legacy.load_owned_content_source(source_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
sql, params = _job_query(source_id=source_id, limit=limit)
rows = legacy.db.fetch_all(sql, (account["id"], *params))
return [legacy.job_payload(item) for item in rows]
@app.get("/v2/bilibili/jobs")
def list_bilibili_jobs(
project_id: str | None = Query(default=None),
content_source_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
if content_source_id:
row = legacy.load_owned_content_source(content_source_id.strip(), account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
sql, params = _job_query(source_id=content_source_id.strip() if content_source_id else None, project_id=project_id, limit=limit)
rows = legacy.db.fetch_all(sql, (account["id"], *params))
return [legacy.job_payload(item) for item in rows]
@app.get("/v2/bilibili/jobs/{job_id}")
def get_bilibili_job(
job_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = legacy.load_owned_job(job_id, account["id"])
if row.get("content_source_id"):
source = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (row["content_source_id"], account["id"]))
if not source or source.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili job not found")
return legacy.job_context_payload(row)
@app.get("/v2/bilibili/reviews")
def list_bilibili_reviews(
project_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
sql, params = _review_query(project_id=project_id, limit=limit)
rows = legacy.db.fetch_all(sql, (account["id"], *params))
return [legacy.review_payload(item) for item in rows]
@app.get("/v2/bilibili/reviews/{review_id}")
def get_bilibili_review(
review_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = legacy.load_owned_review(review_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili review not found")
return legacy.review_payload(row)
@app.post("/v2/bilibili/reviews")
def create_bilibili_review(
request: BilibiliReviewCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_job = None
if request.source_job_id.strip():
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
if source_job.get("content_source_id"):
source = legacy.db.fetch_one(
"SELECT * FROM content_sources WHERE id = ? AND user_id = ?",
(source_job["content_source_id"], account["id"]),
)
if not source or source.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili source job not found")
normalized_platform = _resolve_bilibili_platform(legacy, request.platform, source_job.get("source_url", "") if source_job else "")
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
project = resolve_project(account, requested_project_id)
assistant = resolve_assistant(account, request.assistant_id, project["id"])
review_id = make_id("review")
title = _first_non_empty(request.title, source_job.get("title", "") if source_job else "", f"{project['name']} 复盘")
timestamp = now()
legacy.db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
normalized_platform,
request.content_type.strip() or "video",
request.publish_url.strip(),
request.published_at.strip(),
_safe_json_dumps(request.metrics),
request.verdict.strip(),
request.highlights.strip(),
request.next_actions.strip(),
request.notes.strip(),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return legacy.review_payload(row)
@app.patch("/v2/bilibili/reviews/{review_id}")
def update_bilibili_review(
review_id: str,
request: BilibiliReviewUpdateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
current = legacy.load_owned_review(review_id, account["id"])
if current.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili review not found")
assistant_id = current.get("assistant_id") or None
if request.assistant_id is not None:
assistant = resolve_assistant(account, request.assistant_id or "", current.get("project_id", ""))
assistant_id = (assistant or {}).get("id") or None
if request.platform is not None:
_resolve_bilibili_platform(legacy, request.platform, current.get("publish_url", ""))
legacy.db.execute(
"""
UPDATE publish_reviews
SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?,
metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?,
assistant_id = ?, updated_at = ?
WHERE id = ? AND user_id = ?
""",
(
request.title if request.title is not None else current.get("title", ""),
"bilibili",
request.content_type if request.content_type is not None else current.get("content_type", "video"),
request.publish_url if request.publish_url is not None else current.get("publish_url", ""),
request.published_at if request.published_at is not None else current.get("published_at", ""),
_safe_json_dumps(request.metrics if request.metrics is not None else legacy.parse_json_object(current.get("metrics_json") or "{}")),
request.verdict if request.verdict is not None else current.get("verdict", ""),
request.highlights if request.highlights is not None else current.get("highlights", ""),
request.next_actions if request.next_actions is not None else current.get("next_actions", ""),
request.notes if request.notes is not None else current.get("notes", ""),
assistant_id,
now(),
review_id,
account["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return legacy.review_payload(row)
@app.get("/v2/bilibili/content-sources/{source_id}/reviews")
def list_bilibili_content_source_reviews(
source_id: str,
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
row = legacy.load_owned_content_source(source_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
rows = legacy.db.fetch_all(
"""
SELECT r.*
FROM publish_reviews r
JOIN jobs j ON j.id = r.source_job_id
WHERE r.user_id = ? AND r.platform = 'bilibili' AND j.content_source_id = ?
ORDER BY COALESCE(NULLIF(r.published_at, ''), r.created_at) DESC, r.created_at DESC
LIMIT ?
""",
(account["id"], source_id, limit),
)
return [legacy.review_payload(item) for item in rows]
__all__ = ["register_bilibili_routes"]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,394 @@
from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator
def utc_now() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def dict_factory(cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict[str, Any]:
return {col[0]: row[idx] for idx, col in enumerate(cursor.description)}
class Database:
def __init__(self, path: str) -> None:
self.path = Path(path)
self.path.parent.mkdir(parents=True, exist_ok=True)
def connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.path)
conn.row_factory = dict_factory
conn.execute("PRAGMA foreign_keys = ON")
return conn
@contextmanager
def session(self) -> Iterator[sqlite3.Connection]:
conn = self.connect()
try:
yield conn
conn.commit()
finally:
conn.close()
def fetch_one(self, sql: str, params: tuple[Any, ...] = ()) -> dict[str, Any] | None:
with self.session() as conn:
return conn.execute(sql, params).fetchone()
def fetch_all(self, sql: str, params: tuple[Any, ...] = ()) -> list[dict[str, Any]]:
with self.session() as conn:
return list(conn.execute(sql, params).fetchall())
def execute(self, sql: str, params: tuple[Any, ...] = ()) -> None:
with self.session() as conn:
conn.execute(sql, params)
def table_exists(self, name: str) -> bool:
row = self.fetch_one(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?",
(name,),
)
return bool(row)
def column_exists(self, table: str, column: str) -> bool:
with self.session() as conn:
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
return any(row["name"] == column for row in rows)
def init_schema(self) -> None:
schema = """
CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
password_salt TEXT NOT NULL,
display_name TEXT NOT NULL,
role TEXT NOT NULL,
approval_status TEXT NOT NULL,
approved_by TEXT,
approved_at TEXT,
preferred_analysis_model_id TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS auth_tokens (
token TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS model_profiles (
id TEXT PRIMARY KEY,
owner_account_id TEXT,
name TEXT NOT NULL,
provider TEXT NOT NULL,
base_url TEXT NOT NULL,
api_key TEXT NOT NULL DEFAULT '',
model_name TEXT NOT NULL,
is_system INTEGER NOT NULL DEFAULT 0,
is_default INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(owner_account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS knowledge_bases (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
sync_status TEXT NOT NULL DEFAULT 'ready',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS knowledge_documents (
id TEXT PRIMARY KEY,
knowledge_base_id TEXT NOT NULL,
title TEXT NOT NULL,
source_type TEXT NOT NULL,
source_url TEXT NOT NULL DEFAULT '',
transcript_text TEXT NOT NULL DEFAULT '',
style_summary TEXT NOT NULL DEFAULT '',
combined_text TEXT NOT NULL DEFAULT '',
analysis_json TEXT NOT NULL DEFAULT '{}',
storyboard_json TEXT NOT NULL DEFAULT '[]',
source_artifact_json TEXT NOT NULL DEFAULT '{}',
analysis_model_profile_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS assistants (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
system_prompt TEXT NOT NULL DEFAULT '',
generation_goal TEXT NOT NULL DEFAULT '',
config_json TEXT NOT NULL DEFAULT '{}',
model_profile_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS assistant_knowledge_bases (
assistant_id TEXT NOT NULL,
knowledge_base_id TEXT NOT NULL,
PRIMARY KEY (assistant_id, knowledge_base_id),
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE CASCADE,
FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT,
parent_job_id TEXT,
assistant_id TEXT,
knowledge_base_id TEXT NOT NULL,
content_source_id TEXT,
source_type TEXT NOT NULL,
line_type TEXT NOT NULL DEFAULT 'analysis',
workflow_key TEXT NOT NULL DEFAULT '',
orchestrator TEXT NOT NULL DEFAULT 'n8n',
provider_name TEXT NOT NULL DEFAULT '',
provider_task_id TEXT NOT NULL DEFAULT '',
source_url TEXT,
title TEXT NOT NULL,
language TEXT NOT NULL DEFAULT 'auto',
status TEXT NOT NULL,
transcript_text TEXT NOT NULL DEFAULT '',
style_summary TEXT NOT NULL DEFAULT '',
upload_status TEXT NOT NULL DEFAULT 'pending',
error TEXT NOT NULL DEFAULT '',
artifacts_json TEXT NOT NULL DEFAULT '{}',
result_json TEXT NOT NULL DEFAULT '{}',
analysis_model_profile_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL,
FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS content_sources (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT,
source_kind TEXT NOT NULL,
platform TEXT NOT NULL DEFAULT '',
handle TEXT NOT NULL DEFAULT '',
source_url TEXT NOT NULL DEFAULT '',
title TEXT NOT NULL DEFAULT '',
local_path TEXT NOT NULL DEFAULT '',
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS publish_reviews (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT,
source_job_id TEXT,
assistant_id TEXT,
title TEXT NOT NULL,
platform TEXT NOT NULL DEFAULT 'douyin',
content_type TEXT NOT NULL DEFAULT 'video',
publish_url TEXT NOT NULL DEFAULT '',
published_at TEXT NOT NULL DEFAULT '',
metrics_json TEXT NOT NULL DEFAULT '{}',
verdict TEXT NOT NULL DEFAULT '',
highlights TEXT NOT NULL DEFAULT '',
next_actions TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(source_job_id) REFERENCES jobs(id) ON DELETE SET NULL,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS live_recorder_sources (
id TEXT PRIMARY KEY,
platform TEXT NOT NULL DEFAULT '',
source_url TEXT NOT NULL,
remote_name TEXT NOT NULL UNIQUE,
title TEXT NOT NULL DEFAULT '',
metadata_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(platform, source_url)
);
CREATE TABLE IF NOT EXISTS live_recorder_bindings (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT,
assistant_id TEXT,
source_id TEXT NOT NULL,
title TEXT NOT NULL DEFAULT '',
quality TEXT NOT NULL DEFAULT '原画',
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, source_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL,
FOREIGN KEY(source_id) REFERENCES live_recorder_sources(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS job_events (
id TEXT PRIMARY KEY,
job_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS app_updates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL,
channel TEXT NOT NULL,
version_code INTEGER NOT NULL,
version_name TEXT NOT NULL,
min_supported_code INTEGER NOT NULL,
apk_url TEXT NOT NULL,
apk_sha256 TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
force_update INTEGER NOT NULL DEFAULT 0,
is_active INTEGER NOT NULL DEFAULT 1,
published_at INTEGER NOT NULL,
created_by TEXT NOT NULL
);
"""
with self.session() as conn:
conn.executescript(schema)
self.migrate_schema()
def migrate_schema(self) -> None:
table_columns: dict[str, dict[str, str]] = {
"knowledge_bases": {
"project_id": "TEXT",
},
"knowledge_documents": {
"analysis_json": "TEXT NOT NULL DEFAULT '{}'",
"storyboard_json": "TEXT NOT NULL DEFAULT '[]'",
"source_artifact_json": "TEXT NOT NULL DEFAULT '{}'",
},
"assistants": {
"project_id": "TEXT",
"config_json": "TEXT NOT NULL DEFAULT '{}'",
},
"jobs": {
"project_id": "TEXT",
"parent_job_id": "TEXT",
"content_source_id": "TEXT",
"line_type": "TEXT NOT NULL DEFAULT 'analysis'",
"workflow_key": "TEXT NOT NULL DEFAULT ''",
"orchestrator": "TEXT NOT NULL DEFAULT 'n8n'",
"provider_name": "TEXT NOT NULL DEFAULT ''",
"provider_task_id": "TEXT NOT NULL DEFAULT ''",
"result_json": "TEXT NOT NULL DEFAULT '{}'",
},
}
for table, columns in table_columns.items():
if not self.table_exists(table):
continue
for column, definition in columns.items():
if self.column_exists(table, column):
continue
self.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}")
self.ensure_default_projects()
def ensure_default_projects(self) -> None:
if not self.table_exists("projects"):
return
accounts = self.fetch_all("SELECT id, username FROM accounts ORDER BY created_at ASC")
for account in accounts:
project = self.fetch_one(
"SELECT * FROM projects WHERE user_id = ? ORDER BY created_at ASC LIMIT 1",
(account["id"],),
)
if not project:
project_id = f"proj_{account['id']}"
now = utc_now()
self.execute(
"""
INSERT INTO projects (id, user_id, name, description, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
project_id,
account["id"],
f"{account['username']} 默认项目",
"系统自动创建的默认项目",
now,
now,
),
)
project = self.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,))
if not project:
continue
if self.column_exists("knowledge_bases", "project_id"):
self.execute(
"""
UPDATE knowledge_bases
SET project_id = ?
WHERE user_id = ? AND (project_id IS NULL OR project_id = '')
""",
(project["id"], account["id"]),
)
if self.column_exists("assistants", "project_id"):
self.execute(
"""
UPDATE assistants
SET project_id = ?
WHERE user_id = ? AND (project_id IS NULL OR project_id = '')
""",
(project["id"], account["id"]),
)
if self.column_exists("jobs", "project_id"):
self.execute(
"""
UPDATE jobs
SET project_id = ?
WHERE user_id = ? AND (project_id IS NULL OR project_id = '')
""",
(project["id"], account["id"]),
)

View File

@@ -0,0 +1,900 @@
from __future__ import annotations
import json
from typing import Any
from fastapi import Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field
class PlatformAnalysisRequest(BaseModel):
model_profile_ids: list[str] = Field(default_factory=list)
linked_account_ids: list[str] = Field(default_factory=list)
include_linked_accounts: bool = True
include_recent_similar_candidates: bool = True
max_videos: int = Field(default=6, ge=1, le=20)
extra_focus: str = ""
temperature: float = 0.35
auto_analyze_top_videos: bool = False
top_video_analysis_count: int = Field(default=4, ge=1, le=10)
class PlatformTopVideoAnalysisRequest(BaseModel):
model_profile_id: str = ""
top_video_count: int = Field(default=5, ge=1, le=12)
min_score: float = 0
temperature: float = 0.25
class PlatformSimilaritySearchRequest(BaseModel):
source_account_id: str = ""
candidate_urls: list[str] = Field(default_factory=list)
seed_linked_accounts: bool = True
search_public_pages: bool = True
model_profile_id: str = ""
max_candidates: int = Field(default=8, ge=1, le=20)
extra_requirements: str = ""
class PlatformBenchmarkLinksRequest(BaseModel):
target_account_ids: list[str] = Field(default_factory=list)
target_profile_urls: list[str] = Field(default_factory=list)
relation_type: str = "benchmark"
note: str = ""
search_id: str = ""
class PlatformTrackingAccountRequest(BaseModel):
tracked_account_id: str
assistant_id: str = ""
note: str = ""
class PlatformTrackingCursorRequest(BaseModel):
last_seen_at: str
def register_domestic_platform_routes(app: Any, legacy: Any, *, platform: str, label: str) -> None:
table_prefix = platform
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value or {}, ensure_ascii=False)
def _parse_json(raw: str, fallback: Any) -> Any:
cleaned = str(raw or "").strip()
if not cleaned:
return fallback
try:
value = json.loads(cleaned)
return value
except json.JSONDecodeError:
return fallback
def ensure_schema() -> None:
schema = f"""
CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_reports (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
account_source_id TEXT NOT NULL,
focus_text TEXT NOT NULL DEFAULT '',
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(account_source_id) REFERENCES content_sources(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_analysis_reports_account_created
ON {table_prefix}_analysis_reports(account_source_id, created_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_suggestions (
id TEXT PRIMARY KEY,
report_id TEXT NOT NULL,
model_profile_id TEXT NOT NULL DEFAULT '',
model_label TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'ok',
suggestion_text TEXT NOT NULL DEFAULT '',
parsed_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(report_id) REFERENCES {table_prefix}_analysis_reports(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_searches (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT NOT NULL,
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_candidates (
id TEXT PRIMARY KEY,
search_id TEXT NOT NULL,
candidate_account_id TEXT,
candidate_profile_url TEXT NOT NULL DEFAULT '',
heuristic_score REAL NOT NULL DEFAULT 0,
agent_score REAL NOT NULL DEFAULT 0,
rationale_text TEXT NOT NULL DEFAULT '',
dimensions_json TEXT NOT NULL DEFAULT '{{}}',
raw_output_json TEXT NOT NULL DEFAULT '{{}}',
rank_index INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
FOREIGN KEY(search_id) REFERENCES {table_prefix}_similarity_searches(id) ON DELETE CASCADE,
FOREIGN KEY(candidate_account_id) REFERENCES content_sources(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_similarity_candidates_search_rank
ON {table_prefix}_similarity_candidates(search_id, rank_index ASC);
CREATE TABLE IF NOT EXISTS {table_prefix}_account_relations (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT NOT NULL,
target_account_id TEXT,
target_profile_url TEXT NOT NULL DEFAULT '',
relation_type TEXT NOT NULL DEFAULT 'benchmark',
note TEXT NOT NULL DEFAULT '',
search_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE,
FOREIGN KEY(target_account_id) REFERENCES content_sources(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_account_relations_source
ON {table_prefix}_account_relations(source_account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_tracked_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
tracked_account_id TEXT NOT NULL,
assistant_id TEXT,
note TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, tracked_account_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(tracked_account_id) REFERENCES content_sources(id) ON DELETE CASCADE,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_tracked_accounts_user_updated
ON {table_prefix}_tracked_accounts(user_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_tracking_cursors (
user_id TEXT PRIMARY KEY,
last_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
ensure_schema()
@app.on_event("startup")
def _startup_platform_schema() -> None:
ensure_schema()
def _content_source_rows(user_id: str, platform_value: str, kind: str = "") -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"SELECT * FROM content_sources WHERE user_id = ? AND platform = ? ORDER BY updated_at DESC, created_at DESC",
(user_id, platform_value),
)
if kind:
rows = [row for row in rows if row.get("source_kind") == kind]
return rows
def _content_source_payload(row: dict[str, Any]) -> dict[str, Any]:
return legacy.content_source_payload(row)
def _source_metadata(row: dict[str, Any]) -> dict[str, Any]:
return _content_source_payload(row).get("metadata", {})
def _require_account(account_id: str, user_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM content_sources WHERE id = ? AND user_id = ? AND source_kind = 'creator_account' AND platform = ?",
(account_id, user_id, platform),
)
if not row:
raise HTTPException(status_code=404, detail=f"{label} account not found")
return row
def _linked_video_sources(account_row: dict[str, Any]) -> list[dict[str, Any]]:
project_id = account_row.get("project_id", "")
rows = legacy.db.fetch_all(
"SELECT * FROM content_sources WHERE user_id = ? AND project_id = ? AND source_kind = 'video_link' AND platform = ? ORDER BY updated_at DESC, created_at DESC",
(account_row["user_id"], project_id, platform),
)
account_id = account_row["id"]
source_url = str(account_row.get("source_url") or "").strip()
linked: list[dict[str, Any]] = []
for row in rows:
metadata = _source_metadata(row)
if metadata.get("origin_content_source_id") == account_id or metadata.get("source_account_url") == source_url:
linked.append(row)
return linked
def _jobs_for_source(source_id: str) -> list[dict[str, Any]]:
return legacy.db.fetch_all(
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC",
(source_id,),
)
def _latest_job_for_source(source_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC LIMIT 1",
(source_id,),
)
def _extract_performance_score(job_row: dict[str, Any] | None) -> float:
if not job_row:
return 0.0
result_map = _parse_json(job_row.get("result_json") or "{}", {})
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
candidates = [
result_map.get("performance_score"),
(result_map.get("analysis") or {}).get("performance_score"),
(result_map.get("scores") or {}).get("performance_score"),
artifacts_map.get("performance_score"),
(artifacts_map.get("scores") or {}).get("performance_score"),
]
for value in candidates:
try:
return float(value)
except (TypeError, ValueError):
continue
return 0.0
def _extract_metrics(job_row: dict[str, Any] | None) -> dict[str, Any]:
if not job_row:
return {}
result_map = _parse_json(job_row.get("result_json") or "{}", {})
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
return (
result_map.get("metrics")
or artifacts_map.get("metrics")
or result_map.get("stats")
or artifacts_map.get("stats")
or {}
)
def _video_payload(source_row: dict[str, Any]) -> dict[str, Any]:
payload = _content_source_payload(source_row)
metadata = payload.get("metadata", {})
latest_job = _latest_job_for_source(source_row["id"])
metrics = _extract_metrics(latest_job)
tags = metadata.get("tags") or []
if not isinstance(tags, list):
tags = []
return {
"id": source_row["id"],
"aweme_id": str(metadata.get("external_id") or source_row["id"]),
"title": payload.get("title") or "未命名作品",
"description": metadata.get("summary") or metadata.get("description") or (latest_job or {}).get("style_summary", ""),
"share_url": payload.get("source_url", ""),
"cover_url": metadata.get("cover_url") or "",
"duration_sec": float(metadata.get("duration_sec") or 0),
"published_at": metadata.get("published_at") or source_row.get("created_at"),
"tags": tags,
"content_type": metadata.get("content_type") or "video",
"stats": {
"play": metrics.get("play_count") or metrics.get("play") or 0,
"like": metrics.get("like_count") or metrics.get("like") or 0,
"comment": metrics.get("comment_count") or metrics.get("comment") or 0,
"share": metrics.get("share_count") or metrics.get("share") or 0,
},
"score": {
"performance_score": _extract_performance_score(latest_job),
},
"source": payload,
"latest_job_id": (latest_job or {}).get("id", ""),
}
def _account_payload(account_row: dict[str, Any]) -> dict[str, Any]:
payload = _content_source_payload(account_row)
metadata = payload.get("metadata", {})
videos = [_video_payload(item) for item in _linked_video_sources(account_row)]
play_values = [float(video["stats"].get("play") or 0) for video in videos if float(video["stats"].get("play") or 0) > 0]
like_values = [float(video["stats"].get("like") or 0) for video in videos if float(video["stats"].get("like") or 0) > 0]
tags = metadata.get("tags") or []
if not isinstance(tags, list):
tags = []
return {
"id": account_row["id"],
"platform": platform,
"profile_url": payload.get("source_url", ""),
"canonical_profile_url": payload.get("source_url", ""),
"handle": payload.get("handle", ""),
"nickname": payload.get("title") or payload.get("handle") or "未命名账号",
"signature": metadata.get("bio") or metadata.get("description") or "",
"avatar_url": metadata.get("avatar_url") or "",
"tags": tags,
"keywords": metadata.get("keywords") or [],
"sync_status": "ready" if payload.get("metadata", {}).get("last_sync_error", "") == "" else "partial",
"video_summary": {
"count": len(videos),
"avg_play": sum(play_values) / len(play_values) if play_values else 0,
"avg_like": sum(like_values) / len(like_values) if like_values else 0,
"videos": videos[:8],
},
"project_id": payload.get("project_id", ""),
"created_at": payload.get("created_at", ""),
"updated_at": payload.get("updated_at", ""),
}
def _relation_payload(row: dict[str, Any]) -> dict[str, Any]:
target = None
if row.get("target_account_id"):
target = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["target_account_id"],))
return {
"id": row["id"],
"source_account_id": row["source_account_id"],
"target_account_id": row.get("target_account_id", "") or "",
"target_profile_url": row.get("target_profile_url", ""),
"target_nickname": (_account_payload(target)["nickname"] if target else ""),
"relation_type": row.get("relation_type", "benchmark"),
"note": row.get("note", ""),
"search_id": row.get("search_id", ""),
"created_at": row["created_at"],
}
def _report_payload(row: dict[str, Any]) -> dict[str, Any]:
suggestions = [
{
"id": suggestion["id"],
"status": suggestion.get("status", "ok"),
"model_profile_id": suggestion.get("model_profile_id", ""),
"model_label": suggestion.get("model_label", ""),
"suggestion_text": suggestion.get("suggestion_text", ""),
"parsed_json": _parse_json(suggestion.get("parsed_json") or "{}", {}),
"created_at": suggestion.get("created_at", ""),
}
for suggestion in legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
(row["id"],),
)
]
return {
"id": row["id"],
"focus_text": row.get("focus_text", ""),
"suggestions": suggestions,
"created_at": row["created_at"],
}
def _workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
reports = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_analysis_reports WHERE account_source_id = ? ORDER BY created_at DESC LIMIT 6",
(account_row["id"],),
)
relations = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC",
(account_row["id"],),
)
return {
"account": _account_payload(account_row),
"recent_reports": [_report_payload(row) for row in reports],
"linked_accounts": [_relation_payload(row) for row in relations],
}
async def _call_reasoning_model(user_id: str, prompt: str, *, system_prompt: str, model_profile_id: str = "", temperature: float = 0.3) -> tuple[str, dict[str, Any]]:
profile = legacy.model_profile_for_account(user_id, model_profile_id or None)
output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=temperature)
parsed = legacy.parse_json_object(output)
return output, parsed if isinstance(parsed, dict) else {}
async def _create_sync_job_for_account(account_row: dict[str, Any], assistant_id: str = "") -> dict[str, Any]:
project_id = account_row.get("project_id") or ""
if not project_id:
raise HTTPException(status_code=400, detail="Account source is not attached to a project")
kb = legacy.resolve_target_kb(account_row["user_id"], None, project_id)
source_payload = _content_source_payload(account_row)
profile = legacy.model_profile_for_account(account_row["user_id"], None)
job_row = legacy.create_job_record(
account_id=account_row["user_id"],
project_id=project_id,
knowledge_base_id=kb["id"],
content_source_id=account_row["id"],
assistant_id=assistant_id or None,
source_type="creator_account",
line_type="analysis",
workflow_key="content_source_sync_pipeline",
title=f"{source_payload.get('title') or source_payload.get('handle') or label} 内容同步",
language="auto",
source_url=source_payload.get("source_url", ""),
artifacts={
"source_account_url": source_payload.get("source_url", ""),
"platform": platform,
"handle": source_payload.get("handle", ""),
"max_items": int(source_payload.get("metadata", {}).get("max_items") or 5),
"skip_existing": True,
"auto_trigger_analysis": True,
},
analysis_model_profile_id=profile["id"],
)
queued = await legacy.trigger_orchestrated_job(job_row)
return legacy.job_payload(queued)
def _tracking_cursor(user_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?",
(user_id,),
)
def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]:
existing = _tracking_cursor(user_id)
updated_at = now()
if existing:
legacy.db.execute(
f"UPDATE {table_prefix}_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?",
(last_seen_at, updated_at, user_id),
)
else:
legacy.db.execute(
f"INSERT INTO {table_prefix}_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)",
(user_id, last_seen_at, updated_at),
)
return legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?",
(user_id,),
)
def _tracking_digest_item(tracked_row: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
latest_job = _latest_job_for_source(video["id"])
summary = (latest_job or {}).get("style_summary") or video.get("description") or "已发现更新内容"
assistant = None
if tracked_row.get("assistant_id"):
assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (tracked_row["assistant_id"],))
if assistant_row:
assistant = legacy.assistant_payload(assistant_row)
borrowing_points = [point for point in [summary[:36], video.get("title", "")[:36]] if point]
return {
"tracking_id": tracked_row["id"],
"tracked_account_id": tracked_row["tracked_account_id"],
"tracked_account_name": _account_payload(_require_account(tracked_row["tracked_account_id"], tracked_row["user_id"]))["nickname"],
"assistant_id": tracked_row.get("assistant_id", "") or "",
"assistant_name": (assistant or {}).get("name", ""),
"note": tracked_row.get("note", ""),
"video": video,
"summary_text": summary,
"borrowing_points": borrowing_points[:3],
"created_at": video.get("published_at") or now(),
}
def _tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]:
tracked_rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
(user_id,),
)
cursor = _tracking_cursor(user_id)
threshold = (since_value or (cursor or {}).get("last_seen_at") or "").strip()
items: list[dict[str, Any]] = []
for tracked in tracked_rows:
account_row = _require_account(tracked["tracked_account_id"], user_id)
for video in _account_payload(account_row)["video_summary"]["videos"]:
published_at = str(video.get("published_at") or "")
if threshold and published_at and published_at <= threshold:
continue
items.append(_tracking_digest_item(tracked, video))
items.sort(key=lambda item: item.get("created_at", ""), reverse=True)
return {
"items": items[:limit],
"tracked_accounts": [
{
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"note": row.get("note", ""),
"updated_at": row["updated_at"],
}
for row in tracked_rows
],
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
}
@app.get(f"/v2/{platform}/accounts")
def list_platform_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
return [_account_payload(row) for row in _content_source_rows(account["id"], platform, "creator_account")]
@app.get(f"/v2/{platform}/accounts/{{account_id}}/workspace")
def get_platform_account_workspace(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
return _workspace_payload(account_row)
@app.get(f"/v2/{platform}/accounts/{{account_id}}/videos")
def list_platform_account_videos(
account_id: str,
limit: int = Query(default=80, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
items = [_video_payload(row) for row in _linked_video_sources(account_row)]
items.sort(key=lambda item: (item["score"]["performance_score"], item.get("published_at") or ""), reverse=True)
top_ids = [item["id"] for item in items if float(item["score"]["performance_score"] or 0) >= 60][:12]
latest_ids = [item["id"] for item in sorted(items, key=lambda item: item.get("published_at") or "", reverse=True)[:12]]
return {
"items": items[:limit],
"count": len(items),
"meta": {"platform": platform, "account_id": account_id},
"top_scored_video_ids": top_ids,
"latest_video_ids": latest_ids,
"high_score_threshold": 60,
}
@app.post(f"/v2/{platform}/accounts/{{account_id}}/analysis")
async def analyze_platform_account(
account_id: str,
request: PlatformAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
workspace = _workspace_payload(account_row)
context = {
"account": workspace["account"],
"top_videos": workspace["account"]["video_summary"]["videos"][: max(1, min(request.max_videos, 8))],
"linked_accounts": workspace["linked_accounts"][:5],
"extra_focus": request.extra_focus,
}
prompt = (
f"请从新媒体商业化运营视角,分析这个{label}账号,输出执行摘要、可借鉴点、风险提醒和下一步动作。"
f"\n\n输入:\n{json.dumps(context, ensure_ascii=False, indent=2)}"
)
output, parsed = await _call_reasoning_model(
account["id"],
prompt,
system_prompt="你是新媒体账号分析顾问。尽量输出 JSON字段包括 executive_summary、borrow_points、risks、next_actions。",
temperature=request.temperature,
)
report_id = make_id(f"{platform}_report")
legacy.db.execute(
f"INSERT INTO {table_prefix}_analysis_reports (id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
report_id,
account["id"],
account_row["id"],
request.extra_focus or "",
prompt,
_safe_json_dumps(context),
now(),
),
)
suggestion_id = make_id(f"{platform}_suggestion")
profile = legacy.model_profile_for_account(account["id"], None)
legacy.db.execute(
f"INSERT INTO {table_prefix}_analysis_suggestions (id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
suggestion_id,
report_id,
profile["id"],
f"{profile.get('name', '')} · {profile.get('model_name', '')}".strip(" ·"),
"ok",
output[:4000],
_safe_json_dumps(parsed),
now(),
),
)
report_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_analysis_reports WHERE id = ?",
(report_id,),
)
report_payload = _report_payload(report_row)
return {
"report_id": report_id,
"account_id": account_row["id"],
"suggestions": report_payload["suggestions"],
"context": context,
}
@app.post(f"/v2/{platform}/accounts/{{account_id}}/videos/analyze-top")
async def analyze_platform_top_videos(
account_id: str,
request: PlatformTopVideoAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
videos = [_video_payload(row) for row in _linked_video_sources(account_row)]
ranked = [
video for video in sorted(videos, key=lambda item: item["score"]["performance_score"], reverse=True)
if float(video["score"]["performance_score"] or 0) >= float(request.min_score or 0)
][: request.top_video_count]
results: list[dict[str, Any]] = []
for video in ranked:
prompt = (
f"请拆解这条{label}作品为什么值得关注,输出 summary、borrow_points、risks。"
f"\n\n输入:\n{json.dumps(video, ensure_ascii=False, indent=2)}"
)
output, parsed = await _call_reasoning_model(
account["id"],
prompt,
system_prompt="你是短视频内容拆解助手。尽量输出 JSON字段包括 summary、borrow_points、risks。",
model_profile_id=request.model_profile_id,
temperature=request.temperature,
)
summary_text = str(parsed.get("summary") or parsed.get("headline_summary") or output)[:240]
results.append(
{
"id": make_id(f"{platform}_va"),
"video_id": video["id"],
"video_title": video["title"],
"status": "ok",
"summary_text": summary_text,
"parsed_json": parsed,
"performance_score": video["score"]["performance_score"],
"created_at": now(),
}
)
return {
"account_id": account_row["id"],
"analyzed_count": len(results),
"items": results,
}
@app.post(f"/v2/{platform}/similar-searches")
async def create_platform_similarity_search(
request: PlatformSimilaritySearchRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(request.source_account_id, account["id"])
source_payload = _account_payload(account_row)
candidates = [
row for row in _content_source_rows(account["id"], platform, "creator_account")
if row["id"] != account_row["id"]
][: max(5, request.max_candidates)]
ranked_candidates: list[dict[str, Any]] = []
source_tags = set(source_payload.get("tags") or [])
for index, row in enumerate(candidates, start=1):
payload = _account_payload(row)
overlap = len(source_tags.intersection(set(payload.get("tags") or [])))
heuristic = overlap * 10 + max(0, 50 - index)
rationale = f"与源账号同平台,标签重合 {overlap},适合作为{label}对标候选。"
ranked_candidates.append(
{
"candidate_account_id": row["id"],
"candidate_profile_url": payload.get("profile_url", ""),
"candidate_nickname": payload.get("nickname", ""),
"heuristic_score": float(heuristic),
"agent_score": float(heuristic),
"rationale_text": rationale,
"dimensions_json": {"tag_overlap": overlap},
}
)
ranked_candidates.sort(key=lambda item: item["agent_score"], reverse=True)
ranked_candidates = ranked_candidates[: request.max_candidates]
search_id = make_id(f"{platform}_search")
legacy.db.execute(
f"INSERT INTO {table_prefix}_similarity_searches (id, user_id, source_account_id, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(
search_id,
account["id"],
account_row["id"],
request.extra_requirements or "",
_safe_json_dumps({"source_account": source_payload}),
now(),
),
)
for idx, item in enumerate(ranked_candidates):
legacy.db.execute(
f"""INSERT INTO {table_prefix}_similarity_candidates
(id, search_id, candidate_account_id, candidate_profile_url, heuristic_score, agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
make_id(f"{platform}_candidate"),
search_id,
item.get("candidate_account_id") or None,
item.get("candidate_profile_url", ""),
item.get("heuristic_score", 0),
item.get("agent_score", 0),
item.get("rationale_text", ""),
_safe_json_dumps(item.get("dimensions_json") or {}),
_safe_json_dumps(item),
idx,
now(),
),
)
return {"id": search_id, "search_id": search_id}
@app.get(f"/v2/{platform}/similar-searches/{{search_id}}")
def get_platform_similarity_search(search_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
search_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_similarity_searches WHERE id = ? AND user_id = ?",
(search_id, account["id"]),
)
if not search_row:
raise HTTPException(status_code=404, detail="Similarity search not found")
candidate_rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_similarity_candidates WHERE search_id = ? ORDER BY rank_index ASC",
(search_id,),
)
candidates = []
for row in candidate_rows:
payload = _parse_json(row.get("raw_output_json") or "{}", {})
payload.setdefault("candidate_account_id", row.get("candidate_account_id", ""))
payload.setdefault("candidate_profile_url", row.get("candidate_profile_url", ""))
payload.setdefault("rationale_text", row.get("rationale_text", ""))
payload.setdefault("agent_score", row.get("agent_score", 0))
payload.setdefault("heuristic_score", row.get("heuristic_score", 0))
candidates.append(payload)
return {
"id": search_row["id"],
"search_id": search_row["id"],
"source_account_id": search_row["source_account_id"],
"candidates": candidates,
"created_at": search_row["created_at"],
}
@app.get(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links")
def list_platform_benchmark_links(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
_require_account(account_id, account["id"])
rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC",
(account_id,),
)
return [_relation_payload(row) for row in rows]
@app.post(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links")
def create_platform_benchmark_links(
account_id: str,
request: PlatformBenchmarkLinksRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_account = _require_account(account_id, account["id"])
created: list[dict[str, Any]] = []
for target_account_id in request.target_account_ids:
target = _require_account(target_account_id, account["id"])
relation_id = make_id(f"{platform}_link")
legacy.db.execute(
f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
relation_id,
account["id"],
source_account["id"],
target["id"],
target.get("source_url", ""),
request.relation_type or "benchmark",
request.note or "",
request.search_id or "",
now(),
),
)
created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,))))
for target_profile_url in request.target_profile_urls:
cleaned = str(target_profile_url or "").strip()
if not cleaned:
continue
relation_id = make_id(f"{platform}_link")
legacy.db.execute(
f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
relation_id,
account["id"],
source_account["id"],
None,
cleaned,
request.relation_type or "benchmark",
request.note or "",
request.search_id or "",
now(),
),
)
created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,))))
return {"links": created}
@app.get(f"/v2/{platform}/tracking/accounts")
def list_platform_tracking_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
(account["id"],),
)
cursor = _tracking_cursor(account["id"])
return {
"items": [
{
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"note": row.get("note", ""),
"updated_at": row["updated_at"],
}
for row in rows
],
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
}
@app.post(f"/v2/{platform}/tracking/accounts")
def create_platform_tracking_account(
request: PlatformTrackingAccountRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
tracked = _require_account(request.tracked_account_id, account["id"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, tracked.get("project_id", ""))
existing = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked["id"]),
)
if existing:
legacy.db.execute(
f"UPDATE {table_prefix}_tracked_accounts SET assistant_id = ?, note = ?, updated_at = ? WHERE id = ?",
(((assistant or {}).get("id") or None), request.note or "", now(), existing["id"]),
)
row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (existing["id"],))
else:
tracking_id = make_id(f"{platform}_track")
legacy.db.execute(
f"INSERT INTO {table_prefix}_tracked_accounts (id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
tracking_id,
account["id"],
tracked["id"],
(assistant or {}).get("id") or None,
request.note or "",
now(),
now(),
),
)
row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (tracking_id,))
return {
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"note": row.get("note", ""),
"updated_at": row["updated_at"],
}
@app.post(f"/v2/{platform}/tracking/accounts/{{tracked_account_id}}/refresh")
async def refresh_platform_tracked_account(tracked_account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
tracked_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked_account_id),
)
if not tracked_row:
raise HTTPException(status_code=404, detail="Tracked account not found")
account_row = _require_account(tracked_account_id, account["id"])
queued = await _create_sync_job_for_account(account_row, assistant_id=tracked_row.get("assistant_id", "") or "")
legacy.db.execute(
f"UPDATE {table_prefix}_tracked_accounts SET updated_at = ? WHERE id = ?",
(now(), tracked_row["id"]),
)
return {"tracking_id": tracked_row["id"], "tracked_account_id": tracked_account_id, "sync_job_id": queued["id"], "status": queued["status"]}
@app.post(f"/v2/{platform}/tracking/refresh")
async def refresh_platform_tracking(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
tracked_rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
(account["id"],),
)
refreshed = 0
failed = 0
items: list[dict[str, Any]] = []
for row in tracked_rows:
try:
account_row = _require_account(row["tracked_account_id"], account["id"])
queued = await _create_sync_job_for_account(account_row, assistant_id=row.get("assistant_id", "") or "")
refreshed += 1
items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "sync_job_id": queued["id"], "status": queued["status"]})
except Exception as exc:
failed += 1
items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "error": str(exc)})
return {"refreshed": refreshed, "failed": failed, "items": items}
@app.post(f"/v2/{platform}/tracking/cursor")
def update_platform_tracking_cursor(request: PlatformTrackingCursorRequest, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
cursor = _set_tracking_cursor(account["id"], request.last_seen_at)
return {"last_seen_at": cursor["last_seen_at"], "updated_at": cursor["updated_at"]}
@app.get(f"/v2/{platform}/tracking/digest")
def get_platform_tracking_digest(
since: str = Query(default=""),
limit: int = Query(default=24, ge=1, le=100),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
return _tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,226 @@
from __future__ import annotations
import mimetypes
from pathlib import Path
from typing import Any
import httpx
def _join_url(base_url: str, path: str) -> str:
base = base_url.rstrip("/")
if path.startswith("http://") or path.startswith("https://"):
return path
return f"{base}/{path.lstrip('/')}"
def _unwrap_response(payload: Any) -> dict[str, Any]:
if not isinstance(payload, dict):
return {"value": payload}
if payload.get("success") is True and "data" in payload:
data = payload.get("data")
if isinstance(data, dict):
return data
return {"value": data}
return payload
class N8NClient:
def __init__(
self,
*,
base_url: str,
workflow_paths: dict[str, str],
shared_secret: str = "",
timeout: float = 60.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.workflow_paths = workflow_paths
self.shared_secret = shared_secret.strip()
self.timeout = timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
async def trigger(self, workflow_key: str, payload: dict[str, Any]) -> dict[str, Any]:
workflow_path = self.workflow_paths.get(workflow_key, "").strip()
if not workflow_path:
raise ValueError(f"workflow path not configured for {workflow_key}")
try:
workflow_path = workflow_path.format(**payload)
except KeyError:
pass
headers: dict[str, str] = {}
if self.shared_secret:
headers["X-Orchestrator-Secret"] = self.shared_secret
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, workflow_path),
json=payload,
headers=headers,
)
response.raise_for_status()
if not response.content:
return {"triggered": True}
return _unwrap_response(response.json())
class CutVideoClient:
def __init__(
self,
*,
base_url: str,
api_key: str = "",
timeout: float = 120.0,
upload_timeout: float = 1800.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.api_key = api_key.strip()
self.timeout = timeout
self.upload_timeout = upload_timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
def _headers(self) -> dict[str, str]:
headers: dict[str, str] = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
async def submit_job(self, payload: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, "/api/jobs"),
json=payload,
headers=self._headers(),
)
response.raise_for_status()
return _unwrap_response(response.json())
async def upload_source_file(self, source_path: Path, *, folder_name: str = "") -> dict[str, Any]:
content_type = mimetypes.guess_type(source_path.name)[0] or "application/octet-stream"
headers = self._headers()
data = {"folder_name": folder_name} if folder_name else {}
async with httpx.AsyncClient(timeout=self.upload_timeout) as client:
with source_path.open("rb") as handle:
response = await client.post(
_join_url(self.base_url, "/api/uploads"),
data=data,
files={"files": (source_path.name, handle, content_type)},
headers=headers,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_task(self, task_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, f"/api/tasks/{task_id}"),
headers=self._headers(),
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_run(self, run_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, f"/api/runs/{run_id}"),
headers=self._headers(),
)
response.raise_for_status()
return _unwrap_response(response.json())
async def list_runs(self) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, "/api/runs"),
headers=self._headers(),
)
response.raise_for_status()
return _unwrap_response(response.json())
class AsrHttpClient:
def __init__(
self,
*,
base_url: str,
transcribe_path: str = "/transcribe",
field_name: str = "wav",
timeout: float = 120.0,
) -> None:
self.base_url = base_url.rstrip("/")
self.transcribe_path = transcribe_path
self.field_name = field_name.strip() or "wav"
self.timeout = timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
async def transcribe_audio(self, audio_path: Path) -> dict[str, Any]:
content_type = mimetypes.guess_type(audio_path.name)[0] or "application/octet-stream"
async with httpx.AsyncClient(timeout=self.timeout) as client:
with audio_path.open("rb") as handle:
response = await client.post(
_join_url(self.base_url, self.transcribe_path),
files={self.field_name: (audio_path.name, handle, content_type)},
)
response.raise_for_status()
return _unwrap_response(response.json())
class HuobaoDramaClient:
def __init__(self, *, base_url: str, timeout: float = 180.0) -> None:
self.base_url = base_url.rstrip("/")
self.timeout = timeout
@property
def enabled(self) -> bool:
return bool(self.base_url)
async def create_drama(self, payload: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, "/api/v1/dramas"),
json=payload,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def generate_image(self, payload: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, "/api/v1/images"),
json=payload,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_image(self, image_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, f"/api/v1/images/{image_id}"),
)
response.raise_for_status()
return _unwrap_response(response.json())
async def generate_video(self, payload: dict[str, Any]) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
_join_url(self.base_url, "/api/v1/videos"),
json=payload,
)
response.raise_for_status()
return _unwrap_response(response.json())
async def get_video(self, video_id: str) -> dict[str, Any]:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
_join_url(self.base_url, f"/api/v1/videos/{video_id}"),
)
response.raise_for_status()
return _unwrap_response(response.json())

View File

@@ -0,0 +1,381 @@
from __future__ import annotations
import json
from typing import Any
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
from .core_main import (
content_source_payload,
create_content_source,
create_job_record,
job_payload,
load_owned_content_source,
load_owned_job,
make_id,
parse_json_object,
resolve_target_assistant,
resolve_target_kb,
resolve_target_project,
review_payload,
trigger_orchestrated_job,
utc_now,
model_profile_for_account,
db,
)
KUAISHOU_PLATFORM = "kuaishou"
KUAISHOU_URL_HINTS = (
"kuaishou.com",
"v.kuaishou.com",
"chenzhongtech.com",
)
YOUTUBE_URL_HINTS = (
"youtube.com",
"youtu.be",
"m.youtube.com",
"music.youtube.com",
)
class KuaishouContentSourceCreateRequest(BaseModel):
project_id: str = ""
source_kind: str = "creator_account"
handle: str = ""
source_url: str = ""
title: str = ""
local_path: str = ""
metadata: dict[str, Any] = Field(default_factory=dict)
class KuaishouContentSourceSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
handle: str = ""
source_url: str = ""
title: str = ""
analysis_model_profile_id: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
class KuaishouReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
content_type: str = "video"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
def _normalize_text(value: str | None) -> str:
return str(value or "").strip()
def _is_youtube_url(value: str) -> bool:
normalized = _normalize_text(value).lower()
return any(hint in normalized for hint in YOUTUBE_URL_HINTS)
def _is_kuaishou_url(value: str) -> bool:
normalized = _normalize_text(value).lower()
return any(hint in normalized for hint in KUAISHOU_URL_HINTS)
def _ensure_kuaishou_url(value: str) -> str:
normalized = _normalize_text(value)
if not normalized:
return ""
if _is_youtube_url(normalized):
raise HTTPException(status_code=400, detail="YouTube URLs are not supported in the Kuaishou routes")
return normalized
def _content_source_is_kuaishou(row: dict[str, Any]) -> bool:
if _normalize_text(row.get("platform")).lower() == KUAISHOU_PLATFORM:
return True
return _is_kuaishou_url(row.get("source_url", ""))
def _job_is_kuaishou(row: dict[str, Any]) -> bool:
artifacts = parse_json_object(row.get("artifacts_json") or "{}")
source_url = _normalize_text(row.get("source_url"))
if source_url and _is_youtube_url(source_url):
return False
if source_url and _is_kuaishou_url(source_url):
return True
if _normalize_text(artifacts.get("platform")).lower() == KUAISHOU_PLATFORM:
return True
content_source_id = _normalize_text(row.get("content_source_id"))
if content_source_id:
source_row = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (content_source_id,))
return bool(source_row and _content_source_is_kuaishou(source_row))
return False
def _require_owned_kuaishou_source(source_id: str, account_id: str) -> dict[str, Any]:
row = load_owned_content_source(source_id, account_id)
if not _content_source_is_kuaishou(row):
raise HTTPException(status_code=400, detail="Content source does not belong to the Kuaishou route")
return row
def _list_kuaishou_jobs(account_id: str, project_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
rows = db.fetch_all(
"SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
(account_id, max(limit, 1) * 10),
)
items: list[dict[str, Any]] = []
for row in rows:
if project_id and _normalize_text(row.get("project_id")) != project_id:
continue
if _job_is_kuaishou(row):
items.append(job_payload(row))
if len(items) >= limit:
break
return items
def _list_kuaishou_reviews(account_id: str, project_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?"]
params: list[Any] = [account_id, KUAISHOU_PLATFORM]
if project_id is not None:
normalized = project_id.strip()
if normalized:
clauses.append("project_id = ?")
params.append(normalized)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
sql = f"""
SELECT * FROM publish_reviews
WHERE {' AND '.join(clauses)}
ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC
LIMIT ?
"""
params.append(limit)
return [review_payload(row) for row in db.fetch_all(sql, tuple(params))]
def register_kuaishou_routes(app: Any, legacy: Any) -> None:
"""Register a small Kuaishou route set on top of the shared collector tables."""
@app.get("/v2/kuaishou/content-sources")
def list_kuaishou_content_sources(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?"]
params: list[Any] = [account["id"], KUAISHOU_PLATFORM]
if project_id:
resolve_target_project(account["id"], project_id, username=account["username"])
clauses.append("project_id = ?")
params.append(project_id)
rows = legacy.db.fetch_all(
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
tuple(params),
)
return [content_source_payload(row) for row in rows]
@app.post("/v2/kuaishou/content-sources")
def create_kuaishou_content_source_api(
request: KuaishouContentSourceCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = resolve_target_project(account["id"], request.project_id or None, username=account["username"])
source_url = _ensure_kuaishou_url(request.source_url)
if source_url and _is_youtube_url(source_url):
raise HTTPException(status_code=400, detail="YouTube URLs are not supported in the Kuaishou routes")
row = create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=_normalize_text(request.source_kind) or "creator_account",
platform=KUAISHOU_PLATFORM,
handle=_normalize_text(request.handle),
source_url=source_url,
title=_normalize_text(request.title) or _normalize_text(request.handle) or source_url,
local_path=_normalize_text(request.local_path),
metadata=request.metadata,
)
return content_source_payload(row)
@app.post("/v2/kuaishou/pipelines/content-source-sync")
async def create_kuaishou_content_source_sync_job(
request: KuaishouContentSourceSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = None
if request.content_source_id.strip():
source_row = _require_owned_kuaishou_source(request.content_source_id.strip(), account["id"])
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
kb = resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
profile = model_profile_for_account(account["id"], request.analysis_model_profile_id or None)
source_url = _ensure_kuaishou_url(
request.source_url or (source_row or {}).get("source_url", "")
)
if not source_url:
raise HTTPException(status_code=400, detail="source_url or content_source_id with a Kuaishou URL is required")
handle = _normalize_text(request.handle or (source_row or {}).get("handle", ""))
source_title = (
_normalize_text(request.title)
or (source_row or {}).get("title", "").strip()
or handle
or source_url
)
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
if not source_row:
source_row = create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind="creator_account",
platform=KUAISHOU_PLATFORM,
handle=handle,
source_url=source_url,
title=source_title,
metadata={
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
},
)
job_row = create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{source_title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": KUAISHOU_PLATFORM,
"handle": handle,
"source_account_url": source_url,
"source_title": source_title,
"max_items": request.max_items,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
},
analysis_model_profile_id=profile["id"],
)
legacy.update_content_source_metadata(
source_row["id"],
{
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": utc_now(),
},
)
return job_payload(await trigger_orchestrated_job(job_row))
@app.get("/v2/kuaishou/jobs")
def list_kuaishou_jobs_api(
project_id: str | None = Query(default=None),
limit: int = Query(default=20, ge=1, le=100),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
return _list_kuaishou_jobs(account["id"], project_id=project_id, limit=limit)
@app.get("/v2/kuaishou/workspace")
def get_kuaishou_workspace(
project_id: str | None = Query(default=None),
limit: int = Query(default=10, ge=1, le=50),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
content_sources = list_kuaishou_content_sources(project_id=project_id, account=account)
reviews = _list_kuaishou_reviews(account["id"], project_id=project_id, limit=limit)
jobs = _list_kuaishou_jobs(account["id"], project_id=project_id, limit=limit)
return {
"platform": KUAISHOU_PLATFORM,
"project_id": project_id or "",
"content_sources": content_sources,
"recent_jobs": jobs,
"recent_reviews": reviews,
"counts": {
"content_sources": len(content_sources),
"jobs": len(jobs),
"reviews": len(reviews),
},
}
@app.get("/v2/kuaishou/reviews")
def list_kuaishou_reviews_api(
project_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
return _list_kuaishou_reviews(account["id"], project_id=project_id, limit=limit)
@app.post("/v2/kuaishou/reviews")
def create_kuaishou_review(
request: KuaishouReviewCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_job = None
if request.source_job_id.strip():
source_job = load_owned_job(request.source_job_id.strip(), account["id"])
if not _job_is_kuaishou(source_job):
raise HTTPException(status_code=400, detail="Source job does not belong to the Kuaishou route")
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
review_id = make_id("review")
title = request.title.strip() or (source_job.get("title", "") if source_job else "")
if not title:
title = f"{project['name']} 快手复盘"
timestamp = utc_now()
db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
KUAISHOU_PLATFORM,
request.content_type or "video",
_normalize_text(request.publish_url),
_normalize_text(request.published_at),
json.dumps(request.metrics, ensure_ascii=False),
_normalize_text(request.verdict),
_normalize_text(request.highlights),
_normalize_text(request.next_actions),
_normalize_text(request.notes),
timestamp,
timestamp,
),
)
row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return review_payload(row)

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
import importlib.machinery
import importlib.util
import sys
import types
from pathlib import Path
from typing import Any
BASE_DIR = Path(__file__).resolve().parent
PYCACHE_DIR = BASE_DIR / "__pycache__"
LEGACY_PYC_DIR = BASE_DIR / "_legacy_pyc"
SUPPORTED_PYTHON = (3, 11)
_LEGACY_MODULE: Any | None = None
def _ensure_supported_runtime() -> None:
if sys.version_info[:2] != SUPPORTED_PYTHON:
version = ".".join(map(str, sys.version_info[:3]))
required = ".".join(map(str, SUPPORTED_PYTHON))
raise RuntimeError(
f"Legacy collector bytecode requires Python {required}. Current runtime: {version}."
)
def _ensure_package() -> None:
package = sys.modules.get("app")
if package is None:
package = types.ModuleType("app")
package.__path__ = [str(BASE_DIR)]
sys.modules["app"] = package
def _load_sourceless_module(module_name: str, pyc_path: Path) -> Any:
loader = importlib.machinery.SourcelessFileLoader(module_name, str(pyc_path))
spec = importlib.util.spec_from_loader(module_name, loader)
if spec is None:
raise RuntimeError(f"Unable to create spec for {module_name}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
loader.exec_module(module)
return module
def load_legacy_main() -> Any:
global _LEGACY_MODULE
if _LEGACY_MODULE is not None:
return _LEGACY_MODULE
_ensure_supported_runtime()
_ensure_package()
for name in ("database", "fastgpt", "openai_compat"):
full_name = f"app.{name}"
if full_name not in sys.modules:
pyc_dir = LEGACY_PYC_DIR if (LEGACY_PYC_DIR / f"{name}.cpython-311.pyc").exists() else PYCACHE_DIR
_load_sourceless_module(full_name, pyc_dir / f"{name}.cpython-311.pyc")
legacy_name = "app.main_legacy"
if legacy_name in sys.modules:
_LEGACY_MODULE = sys.modules[legacy_name]
return _LEGACY_MODULE
main_pyc_dir = LEGACY_PYC_DIR if (LEGACY_PYC_DIR / "main.cpython-311.pyc").exists() else PYCACHE_DIR
_LEGACY_MODULE = _load_sourceless_module(legacy_name, main_pyc_dir / "main.cpython-311.pyc")
_LEGACY_MODULE.__package__ = "app"
return _LEGACY_MODULE

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from .domestic_platform_features import register_domestic_platform_routes
from .douyin_features import register_douyin_routes
from .oneliner_features import register_oneliner_routes
try:
from . import core_main as core
except Exception:
# Keep a bytecode-backed fallback so the app can still boot if the
# recovered source baseline is incomplete in this workspace.
from .legacy_runtime import load_legacy_main
core = load_legacy_main()
app = core.app
register_douyin_routes(app, core)
register_domestic_platform_routes(app, core, platform="xiaohongshu", label="小红书")
register_domestic_platform_routes(app, core, platform="bilibili", label="哔哩哔哩")
register_domestic_platform_routes(app, core, platform="kuaishou", label="快手")
register_domestic_platform_routes(app, core, platform="wechat_video", label="微信视频号")
register_oneliner_routes(app, core)
app.openapi_schema = None

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
from typing import Any
import httpx
class OpenAICompatClient:
def __init__(self, timeout: float = 180.0) -> None:
self.timeout = timeout
async def chat_completion(
self,
*,
base_url: str,
api_key: str,
model: str,
system_prompt: str,
user_prompt: str,
temperature: float = 0.7,
) -> str:
url = base_url.rstrip("/") + "/chat/completions"
headers = {"Content-Type": "application/json"}
if api_key.strip():
headers["Authorization"] = f"Bearer {api_key.strip()}"
payload: dict[str, Any] = {
"model": model,
"temperature": temperature,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
}
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
choices = data.get("choices") or []
if not choices:
return ""
message = choices[0].get("message") or {}
content = message.get("content") or ""
if isinstance(content, list):
return "\n".join(str(item.get("text", "")) for item in content if isinstance(item, dict)).strip()
return str(content).strip()

View File

@@ -0,0 +1,531 @@
from __future__ import annotations
import json
from collections import Counter
from typing import Any
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
# This module is intentionally self-contained because the task only allows
# writes to a new file. To activate it, import `register_wechat_video_routes`
# from `app.main` and call it with `(app, core)`.
WECHAT_VIDEO_PLATFORM = "wechat_video"
ACCOUNT_SOURCE_KIND = "creator_account"
YOUTUBE_HOST_MARKERS = ("youtube.com", "youtu.be")
class WechatVideoAccountSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
profile_url: str = ""
handle: str = ""
title: str = ""
analysis_model_profile_id: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
class WechatVideoReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
content_type: str = "video"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
def register_wechat_video_routes(app: Any, legacy: Any) -> None:
if getattr(app.state, "wechat_video_routes_registered", False):
return
app.state.wechat_video_routes_registered = True
def _account_not_found() -> HTTPException:
return HTTPException(status_code=404, detail="WeChat Video account not found")
def _normalize_wechat_source_url(source_url: str) -> str:
normalized = source_url.strip()
if not normalized:
return ""
lowered = normalized.lower()
if any(marker in lowered for marker in YOUTUBE_HOST_MARKERS):
raise HTTPException(status_code=400, detail="YouTube is not supported by wechat_video routes")
inferred = legacy.infer_platform_from_url(normalized)
if inferred != WECHAT_VIDEO_PLATFORM:
raise HTTPException(
status_code=400,
detail="wechat_video routes only accept channels.weixin.qq.com or mp.weixin.qq.com/s URLs",
)
return normalized
def _require_owned_account(source_id: str, user_id: str) -> dict[str, Any]:
row = legacy.load_owned_content_source(source_id, user_id)
if row.get("platform") != WECHAT_VIDEO_PLATFORM or row.get("source_kind") != ACCOUNT_SOURCE_KIND:
raise _account_not_found()
return row
def _list_sync_job_rows(source_row: dict[str, Any], *, limit: int = 50) -> list[dict[str, Any]]:
return legacy.db.fetch_all(
"""
SELECT *
FROM jobs
WHERE user_id = ? AND content_source_id = ? AND source_type = 'content_source_sync'
ORDER BY created_at DESC
LIMIT ?
""",
(source_row["user_id"], source_row["id"], max(1, limit)),
)
def _list_video_job_rows(source_row: dict[str, Any], *, limit: int = 200) -> list[dict[str, Any]]:
sync_rows = _list_sync_job_rows(source_row, limit=max(1, limit))
if not sync_rows:
return []
parent_job_ids = [row["id"] for row in sync_rows]
placeholders = ",".join("?" for _ in parent_job_ids)
query = f"""
SELECT *
FROM jobs
WHERE user_id = ? AND source_type = 'video_link' AND parent_job_id IN ({placeholders})
ORDER BY created_at DESC
"""
params: tuple[Any, ...] = (source_row["user_id"], *parent_job_ids)
return legacy.db.fetch_all(query, params)[: max(1, limit)]
def _dedupe_latest_video_jobs(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
deduped: list[dict[str, Any]] = []
seen_urls: set[str] = set()
for row in rows:
source_url = str(row.get("source_url") or "").strip()
if not source_url or source_url in seen_urls:
continue
seen_urls.add(source_url)
deduped.append(row)
return deduped
def _fetch_content_source(source_id: str) -> dict[str, Any] | None:
if not source_id:
return None
return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,))
def _load_related_reviews(source_row: dict[str, Any], video_rows: list[dict[str, Any]], *, limit: int = 50) -> list[dict[str, Any]]:
candidate_rows = legacy.db.fetch_all(
"""
SELECT *
FROM publish_reviews
WHERE user_id = ? AND platform = ?
ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC
LIMIT 400
""",
(source_row["user_id"], WECHAT_VIDEO_PLATFORM),
)
job_ids = {row["id"] for row in video_rows}
video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")}
results: list[dict[str, Any]] = []
for row in candidate_rows:
source_job_id = str(row.get("source_job_id") or "").strip()
publish_url = str(row.get("publish_url") or "").strip()
if source_job_id and source_job_id in job_ids:
results.append(row)
continue
if publish_url and publish_url in video_urls:
results.append(row)
return results[: max(1, limit)]
def _load_related_documents(video_rows: list[dict[str, Any]], *, limit: int = 30) -> list[dict[str, Any]]:
kb_ids = {str(row.get("knowledge_base_id") or "").strip() for row in video_rows if row.get("knowledge_base_id")}
video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")}
documents: list[dict[str, Any]] = []
seen_document_ids: set[str] = set()
for kb_id in kb_ids:
for row in legacy.db.fetch_all(
"""
SELECT *
FROM knowledge_documents
WHERE knowledge_base_id = ?
ORDER BY created_at DESC
LIMIT 200
""",
(kb_id,),
):
if row["id"] in seen_document_ids:
continue
if str(row.get("source_url") or "").strip() not in video_urls:
continue
seen_document_ids.add(row["id"])
documents.append(row)
if len(documents) >= limit:
return documents
return documents
def _build_review_maps(review_rows: list[dict[str, Any]]) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
by_job_id: dict[str, dict[str, Any]] = {}
by_url: dict[str, dict[str, Any]] = {}
for row in review_rows:
source_job_id = str(row.get("source_job_id") or "").strip()
publish_url = str(row.get("publish_url") or "").strip()
if source_job_id and source_job_id not in by_job_id:
by_job_id[source_job_id] = row
if publish_url and publish_url not in by_url:
by_url[publish_url] = row
return by_job_id, by_url
def _build_document_map(document_rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
by_url: dict[str, dict[str, Any]] = {}
for row in document_rows:
source_url = str(row.get("source_url") or "").strip()
if source_url and source_url not in by_url:
by_url[source_url] = row
return by_url
def _build_account_payload(source_row: dict[str, Any]) -> dict[str, Any]:
payload = legacy.content_source_payload(source_row)
metadata = payload.get("metadata") or {}
latest_sync_job = None
last_sync_job_id = str(metadata.get("last_sync_job_id") or "")
if last_sync_job_id:
latest_sync_job = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (last_sync_job_id,))
payload["platform_label"] = legacy.platform_label(WECHAT_VIDEO_PLATFORM)
payload["last_sync_job_id"] = last_sync_job_id
payload["last_sync_completed_at"] = str(metadata.get("last_sync_completed_at") or "")
payload["last_sync_error"] = str(metadata.get("last_sync_error") or "")
payload["last_sync_status"] = str((latest_sync_job or {}).get("status") or "")
payload["sync_mode"] = str(metadata.get("sync_mode") or "recent_uploads")
return payload
def _build_video_item(
job_row: dict[str, Any],
review_by_job_id: dict[str, dict[str, Any]],
review_by_url: dict[str, dict[str, Any]],
document_by_url: dict[str, dict[str, Any]],
) -> dict[str, Any]:
source_url = str(job_row.get("source_url") or "").strip()
content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip())
review_row = review_by_job_id.get(job_row["id"]) or review_by_url.get(source_url)
document_row = document_by_url.get(source_url)
artifacts = legacy.parse_job_artifacts(job_row)
return {
"id": job_row["id"],
"title": job_row.get("title", ""),
"status": job_row.get("status", ""),
"source_url": source_url,
"external_id": str(artifacts.get("external_id") or ""),
"origin_sync_job_id": str(artifacts.get("origin_sync_job_id") or ""),
"job": legacy.job_payload(job_row),
"content_source": legacy.content_source_payload(content_source) if content_source else None,
"latest_review": legacy.review_payload(review_row) if review_row else None,
"document": legacy.document_payload(document_row) if document_row else None,
}
def _build_workspace_payload(source_row: dict[str, Any]) -> dict[str, Any]:
sync_rows = _list_sync_job_rows(source_row, limit=20)
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200))
review_rows = _load_related_reviews(source_row, video_rows, limit=20)
document_rows = _load_related_documents(video_rows, limit=12)
review_by_job_id, review_by_url = _build_review_maps(review_rows)
document_by_url = _build_document_map(document_rows)
status_counts = Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)
latest_sync = legacy.job_context_payload(sync_rows[0]) if sync_rows else None
return {
"account": _build_account_payload(source_row),
"latest_sync_job": latest_sync,
"sync_jobs": [legacy.job_payload(row) for row in sync_rows[:10]],
"videos": {
"total": len(video_rows),
"status_counts": dict(status_counts),
"items": [
_build_video_item(row, review_by_job_id, review_by_url, document_by_url)
for row in video_rows[:20]
],
},
"reviews": [legacy.review_payload(row) for row in review_rows],
"recent_documents": [legacy.document_payload(row) for row in document_rows],
"stats": {
"sync_job_count": len(sync_rows),
"video_job_count": len(video_rows),
"completed_video_count": status_counts.get("completed", 0),
"failed_video_count": status_counts.get("failed", 0),
"review_count": len(review_rows),
"document_count": len(document_rows),
},
}
def _update_account_source(
source_row: dict[str, Any],
*,
source_url: str,
title: str,
handle: str,
metadata_updates: dict[str, Any],
) -> dict[str, Any]:
merged_metadata = legacy.merge_json_field(source_row.get("metadata_json") or "{}", metadata_updates)
legacy.db.execute(
"""
UPDATE content_sources
SET handle = ?, source_url = ?, title = ?, platform = ?, metadata_json = ?, updated_at = ?
WHERE id = ? AND user_id = ?
""",
(
handle,
source_url,
title,
WECHAT_VIDEO_PLATFORM,
merged_metadata,
legacy.utc_now(),
source_row["id"],
source_row["user_id"],
),
)
return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],))
def _job_belongs_to_account(job_row: dict[str, Any], source_row: dict[str, Any]) -> bool:
if str(job_row.get("content_source_id") or "").strip():
content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip())
metadata = (legacy.content_source_payload(content_source).get("metadata") or {}) if content_source else {}
if content_source and str(metadata.get("origin_content_source_id") or "") == source_row["id"]:
return True
if str(job_row.get("parent_job_id") or "").strip():
parent_row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["parent_job_id"],))
if parent_row and str(parent_row.get("content_source_id") or "") == source_row["id"]:
return True
return False
@app.get("/v2/wechat-video/accounts")
def list_wechat_video_accounts(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?", "source_kind = ?"]
params: list[Any] = [account["id"], WECHAT_VIDEO_PLATFORM, ACCOUNT_SOURCE_KIND]
if project_id:
project = legacy.resolve_target_project(account["id"], project_id, username=account["username"])
clauses.append("project_id = ?")
params.append(project["id"])
rows = legacy.db.fetch_all(
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY updated_at DESC",
tuple(params),
)
return [_build_account_payload(row) for row in rows]
@app.post("/v2/wechat-video/accounts/sync")
async def sync_wechat_video_account(
request: WechatVideoAccountSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = None
if request.content_source_id.strip():
source_row = _require_owned_account(request.content_source_id.strip(), account["id"])
source_url = _normalize_wechat_source_url(request.profile_url or (source_row or {}).get("source_url", ""))
if not source_url:
raise HTTPException(status_code=400, detail="profile_url or content_source_id is required")
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
profile = legacy.model_profile_for_account(account["id"], request.analysis_model_profile_id or None)
handle = request.handle.strip() or (source_row or {}).get("handle", "").strip()
title = request.title.strip() or (source_row or {}).get("title", "").strip() or handle or source_url
metadata_updates = {
"account_type": WECHAT_VIDEO_PLATFORM,
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
"last_sync_error": "",
}
if not source_row:
source_row = legacy.create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=ACCOUNT_SOURCE_KIND,
platform=WECHAT_VIDEO_PLATFORM,
handle=handle,
source_url=source_url,
title=title,
metadata=metadata_updates,
)
else:
source_row = _update_account_source(
source_row,
source_url=source_url,
title=title,
handle=handle,
metadata_updates=metadata_updates,
)
job_row = legacy.create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": WECHAT_VIDEO_PLATFORM,
"handle": handle,
"source_account_url": source_url,
"source_title": title,
"max_items": request.max_items,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
},
analysis_model_profile_id=profile["id"],
)
legacy.update_content_source_metadata(
source_row["id"],
{
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": legacy.utc_now(),
"last_sync_error": "",
},
)
queued_row = await legacy.trigger_orchestrated_job(job_row)
source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],))
workspace = _build_workspace_payload(source_row)
workspace["sync_job"] = legacy.job_payload(queued_row)
return workspace
@app.get("/v2/wechat-video/accounts/{account_id}")
def get_wechat_video_account(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(source_row)
@app.get("/v2/wechat-video/accounts/{account_id}/workspace")
def get_wechat_video_account_workspace(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(source_row)
@app.get("/v2/wechat-video/accounts/{account_id}/videos")
def list_wechat_video_account_videos(
account_id: str,
limit: int = Query(default=50, ge=1, le=200),
status: str = Query(default=""),
q: str = Query(default=""),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = _require_owned_account(account_id, account["id"])
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=max(limit * 4, 200)))
normalized_status = status.strip().lower()
normalized_query = q.strip().lower()
if normalized_status:
video_rows = [row for row in video_rows if str(row.get("status") or "").lower() == normalized_status]
if normalized_query:
video_rows = [
row
for row in video_rows
if normalized_query in str(row.get("title") or "").lower()
or normalized_query in str(row.get("source_url") or "").lower()
]
selected_rows = video_rows[:limit]
review_rows = _load_related_reviews(source_row, selected_rows, limit=max(limit, 20))
document_rows = _load_related_documents(selected_rows, limit=max(limit, 20))
review_by_job_id, review_by_url = _build_review_maps(review_rows)
document_by_url = _build_document_map(document_rows)
return {
"account": _build_account_payload(source_row),
"total": len(video_rows),
"status_counts": dict(Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)),
"items": [
_build_video_item(row, review_by_job_id, review_by_url, document_by_url)
for row in selected_rows
],
}
@app.get("/v2/wechat-video/accounts/{account_id}/reviews")
def list_wechat_video_account_reviews(
account_id: str,
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
source_row = _require_owned_account(account_id, account["id"])
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200))
review_rows = _load_related_reviews(source_row, video_rows, limit=limit)
return [legacy.review_payload(row) for row in review_rows]
@app.post("/v2/wechat-video/accounts/{account_id}/reviews")
def create_wechat_video_review(
account_id: str,
request: WechatVideoReviewCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = _require_owned_account(account_id, account["id"])
source_job = None
if request.source_job_id.strip():
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
if not _job_belongs_to_account(source_job, source_row):
raise HTTPException(status_code=400, detail="source_job_id does not belong to the target WeChat Video account")
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else source_row.get("project_id", ""))
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
if source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="WeChat Video account does not belong to target project")
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
publish_url = request.publish_url.strip() or (source_job.get("source_url", "") if source_job else "")
if publish_url:
_normalize_wechat_source_url(publish_url)
title = request.title.strip() or (source_job.get("title", "") if source_job else "") or f"{source_row.get('title', '')} 复盘".strip()
if not title:
title = "微信视频号复盘"
review_id = legacy.make_id("review")
timestamp = legacy.utc_now()
legacy.db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
WECHAT_VIDEO_PLATFORM,
request.content_type or "video",
publish_url,
request.published_at.strip(),
json.dumps(request.metrics, ensure_ascii=False),
request.verdict.strip(),
request.highlights.strip(),
request.next_actions.strip(),
request.notes.strip(),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return legacy.review_payload(row)

View File

@@ -0,0 +1,765 @@
from __future__ import annotations
import json
import re
from datetime import datetime, timezone
from html import unescape
from typing import Any, Iterable
from urllib.parse import unquote
import httpx
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
DEFAULT_TIMEOUT = 20.0
MAX_HTML_SEARCH_BYTES = 2_000_000
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
)
XHS_PLATFORM = "xiaohongshu"
class XHSManualPageCapture(BaseModel):
url: str = ""
title: str = ""
payload: dict[str, Any] = Field(default_factory=dict)
class XiaohongshuContentSourceCreateRequest(BaseModel):
project_id: str = ""
source_kind: str
handle: str = ""
source_url: str = ""
title: str = ""
local_path: str = ""
metadata: dict[str, Any] = Field(default_factory=dict)
class XiaohongshuContentSourceSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
source_url: str = ""
handle: str = ""
title: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
manual_source_payload: dict[str, Any] | None = None
manual_pages: list[XHSManualPageCapture] = Field(default_factory=list)
discovery_note: str = ""
class XiaohongshuReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
platform: str = XHS_PLATFORM
content_type: str = "note"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
class XiaohongshuReviewUpdateRequest(BaseModel):
title: str | None = None
platform: str | None = None
content_type: str | None = None
publish_url: str | None = None
published_at: str | None = None
metrics: dict[str, Any] | None = None
verdict: str | None = None
highlights: str | None = None
next_actions: str | None = None
notes: str | None = None
assistant_id: str | None = None
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def _first_non_empty(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
elif value not in ("", [], {}, ()):
return str(value)
return ""
def _dedupe_strings(values: Iterable[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
item = str(value).strip()
if not item:
continue
key = item.lower()
if key in seen:
continue
seen.add(key)
result.append(item)
return result
def _compact_text(value: Any, limit: int = 500) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return f"{text[: limit - 1]}"
def _parse_count(value: Any) -> float:
if value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value)
text = str(value).strip().lower().replace(",", "")
if not text:
return 0.0
multiplier = 1.0
if text.endswith("w") or text.endswith(""):
multiplier = 10_000.0
text = text[:-1]
elif text.endswith("亿"):
multiplier = 100_000_000.0
text = text[:-1]
text = text.replace("+", "")
match = re.search(r"-?\d+(?:\.\d+)?", text)
if not match:
return 0.0
try:
return float(match.group()) * multiplier
except ValueError:
return 0.0
def _normalize_timestamp(value: Any) -> str | None:
if value in (None, "", 0, "0"):
return None
if isinstance(value, str):
stripped = value.strip()
if not stripped:
return None
if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped):
return stripped
if stripped.isdigit():
value = int(stripped)
else:
return stripped
if isinstance(value, (int, float)):
ts = float(value)
if ts > 10_000_000_000:
ts /= 1000.0
try:
return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat()
except Exception:
return None
return None
def _extract_hashtags(*texts: str) -> list[str]:
tags: list[str] = []
for text in texts:
if not text:
continue
tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text))
return _dedupe_strings(tags)
def _extract_keywords(*texts: str) -> list[str]:
candidates: list[str] = []
for text in texts:
if not text:
continue
candidates.extend(_extract_hashtags(text))
candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text))
candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text))
stop_words = {
"小红书",
"笔记",
"内容",
"账号",
"发布",
"更多",
"关注",
"用户",
"xhs",
"xiaohongshu",
}
return _dedupe_strings(item for item in candidates if item.lower() not in stop_words)
def _walk_json(value: Any) -> Iterable[dict[str, Any]]:
if isinstance(value, dict):
yield value
for child in value.values():
yield from _walk_json(child)
elif isinstance(value, list):
for child in value:
yield from _walk_json(child)
def _extract_json_objects_from_text(text: str) -> list[Any]:
decoder = json.JSONDecoder()
objects: list[Any] = []
seen: set[str] = set()
if not text:
return objects
candidates = [text, unquote(text), unescape(text), unescape(unquote(text))]
for candidate in candidates:
snippet = candidate[:MAX_HTML_SEARCH_BYTES]
for match in re.finditer(r"[\{\[]", snippet):
try:
obj, _ = decoder.raw_decode(snippet[match.start() :])
except Exception:
continue
marker = _safe_json_dumps(obj)
if marker in seen:
continue
seen.add(marker)
objects.append(obj)
if len(objects) >= 50:
return objects
return objects
def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]:
blobs: list[dict[str, Any]] = []
seen: set[str] = set()
for attrs, content in re.findall(r"<script([^>]*)>(.*?)</script>", html, re.IGNORECASE | re.DOTALL):
script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE)
script_id = script_id_match.group(1) if script_id_match else ""
for obj in _extract_json_objects_from_text(content.strip()):
marker = _safe_json_dumps(obj)
if marker in seen:
continue
seen.add(marker)
blobs.append({"script_id": script_id, "payload": obj})
return blobs
async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]:
headers = {
"User-Agent": DEFAULT_USER_AGENT,
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
if cookie.strip():
headers["Cookie"] = cookie.strip()
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
return str(response.url), response.text
def _note_candidate_score(value: dict[str, Any]) -> int:
score = 0
if any(key in value for key in ("note_id", "noteId", "id", "post_id")):
score += 2
if any(key in value for key in ("title", "desc", "content", "text", "note")):
score += 2
if any(key in value for key in ("author", "user", "owner")):
score += 2
if "stats" in value and isinstance(value["stats"], dict):
score += 2
return score
def _extract_note_candidates(payload: Any) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in _walk_json(payload):
if _note_candidate_score(item) >= 4:
candidates.append(item)
for key in ("author", "user", "owner"):
child = item.get(key)
if isinstance(child, dict) and _note_candidate_score(child) >= 3:
candidates.append(child)
return candidates
def _normalize_note_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]:
stats_source = candidate.get("stats") if isinstance(candidate.get("stats"), dict) else {}
author = candidate.get("author") if isinstance(candidate.get("author"), dict) else {}
if not author and isinstance(candidate.get("user"), dict):
author = candidate["user"]
cover = candidate.get("cover") or candidate.get("image") or candidate.get("images")
if isinstance(cover, list) and cover:
cover = cover[0]
if isinstance(cover, dict):
cover = _first_non_empty(
cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
cover.get("url"),
)
return {
"note_id": _first_non_empty(candidate.get("note_id"), candidate.get("noteId"), candidate.get("id"), candidate.get("post_id")),
"title": _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("content"), candidate.get("text")),
"content": _first_non_empty(candidate.get("content"), candidate.get("desc"), candidate.get("text"), candidate.get("note")),
"author_name": _first_non_empty(author.get("nickname"), author.get("name"), candidate.get("nickname")),
"author_url": _first_non_empty(author.get("profile_url"), candidate.get("profile_url")),
"share_url": _first_non_empty(candidate.get("share_url"), candidate.get("url"), fallback_url),
"cover_url": _first_non_empty(cover),
"published_at": _normalize_timestamp(candidate.get("publish_time") or candidate.get("created_at") or candidate.get("create_time")),
"tags": _extract_hashtags(
_first_non_empty(candidate.get("title")),
_first_non_empty(candidate.get("desc"), candidate.get("content")),
),
"stats": {
"like": _parse_count(stats_source.get("like_count") or stats_source.get("liked_count") or candidate.get("like_count")),
"comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")),
"collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count")),
"share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")),
},
"raw": candidate,
}
def _extract_notes(payloads: Iterable[Any]) -> list[dict[str, Any]]:
notes: list[dict[str, Any]] = []
seen: set[str] = set()
for payload in payloads:
for candidate in _extract_note_candidates(payload):
normalized = _normalize_note_candidate(candidate)
dedupe_key = normalized["note_id"] or normalized["share_url"] or normalized["title"]
if not dedupe_key or dedupe_key in seen:
continue
seen.add(dedupe_key)
notes.append(normalized)
notes.sort(
key=lambda item: (
item["stats"]["like"] + item["stats"]["comment"] * 3 + item["stats"]["collect"] * 2 + item["stats"]["share"] * 4
),
reverse=True,
)
return notes
def _is_xhs_source_row(row: dict[str, Any]) -> bool:
platform = str(row.get("platform", "") or "").strip().lower()
if platform == XHS_PLATFORM:
return True
source_url = str(row.get("source_url", "") or "")
normalized = source_url.strip().lower()
return "xiaohongshu.com" in normalized or "xhslink.com" in normalized
def _job_matches_platform(row: dict[str, Any], legacy: Any) -> bool:
if row.get("content_source_id"):
source = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],))
if source:
return _is_xhs_source_row(source)
source_url = str(row.get("source_url") or "")
return "xiaohongshu.com" in source_url.lower() or "xhslink.com" in source_url.lower()
def _review_matches_platform(row: dict[str, Any], legacy: Any) -> bool:
return str(row.get("platform", "") or "").strip().lower() == XHS_PLATFORM
def _normalize_platform(value: str | None) -> str:
return str(value or "").strip().lower()
def _require_xhs_platform(value: str | None) -> str:
normalized = _normalize_platform(value or XHS_PLATFORM)
if normalized != XHS_PLATFORM:
raise HTTPException(status_code=400, detail="Xiaohongshu routes only support the xiaohongshu platform")
return normalized
def register_xiaohongshu_routes(app: Any, legacy: Any) -> None:
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def _content_source_row_or_404(source_id: str, account_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (source_id, account_id))
if not row:
raise HTTPException(status_code=404, detail="Content source not found")
if not _is_xhs_source_row(row):
raise HTTPException(status_code=404, detail="Content source not found")
return row
def _xhs_job_payload(row: dict[str, Any]) -> dict[str, Any]:
payload = legacy.job_payload(row)
if row.get("content_source_id"):
source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],))
if source_row and _is_xhs_source_row(source_row):
payload["content_source"] = legacy.content_source_payload(source_row)
return payload
def _xhs_review_payload(row: dict[str, Any]) -> dict[str, Any]:
payload = legacy.review_payload(row)
if payload.get("platform", "") != XHS_PLATFORM:
payload["platform"] = XHS_PLATFORM
return payload
async def _collect_public_source(
source_url: str,
manual_payload: dict[str, Any] | None,
manual_pages: list[XHSManualPageCapture],
) -> dict[str, Any]:
source_url = source_url.strip()
blobs: list[dict[str, Any]] = []
errors: list[str] = []
if manual_payload:
blobs.append({"script_id": "manual_source_payload", "payload": manual_payload})
for page in manual_pages:
blobs.append({
"script_id": "manual_page_payload",
"url": page.url,
"title": page.title,
"payload": page.payload,
})
if source_url:
try:
final_url, html = await _fetch_html(source_url)
source_url = final_url
blobs.extend(_extract_json_blobs_from_html(html))
except Exception as exc:
errors.append(f"source_fetch_failed: {exc}")
payloads = [item["payload"] for item in blobs]
notes = _extract_notes(payloads)
source_title = _first_non_empty(
manual_payload.get("title", "") if manual_payload else "",
*(item.get("title", "") for item in notes[:3]),
source_url,
)
return {
"source_url": source_url,
"title": source_title,
"notes": notes,
"raw_pages": blobs,
"errors": errors,
}
@app.get("/v2/xiaohongshu/content-sources")
def list_content_sources(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?"]
params: list[Any] = [account["id"], XHS_PLATFORM]
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("project_id = ?")
params.append(normalized_project)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
rows = legacy.db.fetch_all(
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
tuple(params),
)
return [legacy.content_source_payload(row) for row in rows]
@app.post("/v2/xiaohongshu/content-sources")
def create_content_source_api(
request: XiaohongshuContentSourceCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = legacy.resolve_target_project(account["id"], request.project_id or None, username=account["username"])
row = legacy.create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=request.source_kind.strip(),
platform=XHS_PLATFORM,
handle=request.handle.strip(),
source_url=request.source_url.strip(),
title=request.title.strip(),
local_path=request.local_path.strip(),
metadata={
**request.metadata,
"platform_label": "小红书",
"platform": XHS_PLATFORM,
},
)
return legacy.content_source_payload(row)
@app.get("/v2/xiaohongshu/content-sources/{source_id}")
def get_content_source(source_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
row = _content_source_row_or_404(source_id, account["id"])
return legacy.content_source_payload(row)
@app.post("/v2/xiaohongshu/content-sources/sync")
async def sync_content_source(
request: XiaohongshuContentSourceSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = None
if request.content_source_id.strip():
source_row = _content_source_row_or_404(request.content_source_id.strip(), account["id"])
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
source_url = (request.source_url or (source_row or {}).get("source_url") or "").strip()
if not source_url and not source_row:
raise HTTPException(status_code=400, detail="source_url or content_source_id is required")
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
if source_row and not _is_xhs_source_row(source_row):
raise HTTPException(status_code=400, detail="Content source is not scoped to Xiaohongshu")
source_kind = (source_row or {}).get("source_kind", "creator_account")
handle = (request.handle or (source_row or {}).get("handle", "")).strip()
source_title = (
request.title.strip()
or (source_row or {}).get("title", "").strip()
or handle
or source_url
)
if not source_row:
source_row = legacy.create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=source_kind or "creator_account",
platform=XHS_PLATFORM,
handle=handle,
source_url=source_url,
title=source_title,
metadata={
"platform": XHS_PLATFORM,
"platform_label": "小红书",
"sync_mode": "recent_notes",
"max_items": request.max_items,
},
)
public_data = await _collect_public_source(source_url, request.manual_source_payload, request.manual_pages)
note_count = len(public_data["notes"])
top_notes = [
{
"note_id": item["note_id"],
"title": _compact_text(item["title"], 120),
"content": _compact_text(item["content"], 180),
"author_name": item["author_name"],
"published_at": item["published_at"],
"stats": item["stats"],
"tags": item["tags"][:6],
}
for item in public_data["notes"][: request.max_items]
]
job_row = legacy.create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{source_title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": XHS_PLATFORM,
"handle": handle,
"source_account_url": source_url,
"source_title": source_title,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
"max_items": request.max_items,
"note_count": note_count,
"top_notes": top_notes,
"raw_pages": public_data["raw_pages"],
"errors": public_data["errors"],
"discovery_note": request.discovery_note.strip(),
},
analysis_model_profile_id="",
)
legacy.update_content_source_metadata(
source_row["id"],
{
"platform": XHS_PLATFORM,
"platform_label": "小红书",
"sync_mode": "recent_notes",
"max_items": request.max_items,
"note_count": note_count,
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": now(),
},
)
return legacy.job_payload(await legacy.trigger_orchestrated_job(job_row))
@app.get("/v2/xiaohongshu/jobs")
def list_jobs(
parent_job_id: str | None = Query(default=None),
line_type: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?"]
params: list[Any] = [account["id"]]
if parent_job_id is not None:
normalized_parent = parent_job_id.strip()
if normalized_parent:
clauses.append("parent_job_id = ?")
params.append(normalized_parent)
else:
clauses.append("(parent_job_id IS NULL OR parent_job_id = '')")
if line_type:
clauses.append("line_type = ?")
params.append(line_type.strip())
rows = legacy.db.fetch_all(
f"SELECT * FROM jobs WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
tuple(params),
)
return [_xhs_job_payload(row) for row in rows if _job_matches_platform(row, legacy)]
@app.get("/v2/xiaohongshu/jobs/{job_id}")
def get_job(job_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"]))
if not row or not _job_matches_platform(row, legacy):
raise HTTPException(status_code=404, detail="Job not found")
return _xhs_job_payload(row)
@app.get("/v2/xiaohongshu/jobs/{job_id}/events")
def get_job_events(job_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"]))
if not row or not _job_matches_platform(row, legacy):
raise HTTPException(status_code=404, detail="Job not found")
return [
legacy.job_event_payload(item)
for item in legacy.db.fetch_all("SELECT * FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (job_id,))
]
@app.get("/v2/xiaohongshu/reviews")
def list_reviews(
project_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?"]
params: list[Any] = [account["id"], XHS_PLATFORM]
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("project_id = ?")
params.append(normalized_project)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
sql = (
f"SELECT * FROM publish_reviews WHERE {' AND '.join(clauses)} "
"ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC LIMIT ?"
)
params.append(limit)
return [_xhs_review_payload(row) for row in legacy.db.fetch_all(sql, tuple(params))]
@app.post("/v2/xiaohongshu/reviews")
def create_review(
request: XiaohongshuReviewCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_job = None
if request.source_job_id.strip():
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
if not _job_matches_platform(source_job, legacy):
raise HTTPException(status_code=404, detail="Job not found")
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
review_id = make_id("review")
title = request.title.strip() or (source_job.get("title", "") if source_job else "")
if not title:
title = f"{project['name']} 复盘"
timestamp = now()
normalized_platform = _require_xhs_platform(request.platform)
legacy.db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
normalized_platform,
request.content_type.strip() or "note",
request.publish_url.strip(),
request.published_at.strip(),
_safe_json_dumps(request.metrics),
request.verdict.strip(),
request.highlights.strip(),
request.next_actions.strip(),
request.notes.strip(),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return _xhs_review_payload(row)
@app.patch("/v2/xiaohongshu/reviews/{review_id}")
def update_review(
review_id: str,
request: XiaohongshuReviewUpdateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
current = legacy.load_owned_review(review_id, account["id"])
if not _review_matches_platform(current, legacy):
raise HTTPException(status_code=404, detail="Review not found")
assistant_id = current.get("assistant_id") or None
if request.assistant_id is not None:
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, current.get("project_id", ""))
assistant_id = (assistant or {}).get("id") or None
normalized_platform = current.get("platform", XHS_PLATFORM)
if request.platform is not None:
normalized_platform = _require_xhs_platform(request.platform)
legacy.db.execute(
"""
UPDATE publish_reviews
SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?,
metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?,
assistant_id = ?, updated_at = ?
WHERE id = ? AND user_id = ?
""",
(
request.title if request.title is not None else current.get("title", ""),
normalized_platform,
request.content_type if request.content_type is not None else current.get("content_type", "note"),
request.publish_url if request.publish_url is not None else current.get("publish_url", ""),
request.published_at if request.published_at is not None else current.get("published_at", ""),
_safe_json_dumps(request.metrics if request.metrics is not None else legacy.parse_json_object(current.get("metrics_json") or "{}")),
request.verdict if request.verdict is not None else current.get("verdict", ""),
request.highlights if request.highlights is not None else current.get("highlights", ""),
request.next_actions if request.next_actions is not None else current.get("next_actions", ""),
request.notes if request.notes is not None else current.get("notes", ""),
assistant_id,
now(),
review_id,
account["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return _xhs_review_payload(row)

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PORT="${PORT:-18083}"
HOST="${HOST:-127.0.0.1}"
# Mirror the current live collector runtime so we can verify the source overlay
# against the same database and external integrations without touching 8081.
export DATA_DIR="${DATA_DIR:-/Users/kris/code/StoryForge-gitea/data/collector}"
export DATABASE_PATH="${DATABASE_PATH:-$DATA_DIR/storyforge.db}"
export DEFAULT_EXTERNAL_BASE_URL="${DEFAULT_EXTERNAL_BASE_URL:-https://test.hyzq.net/storyforge}"
export LOCAL_OPENAI_BASE_URL="${LOCAL_OPENAI_BASE_URL:-http://host.docker.internal:8317/v1}"
export LOCAL_OPENAI_MODEL="${LOCAL_OPENAI_MODEL:-GLM-5}"
export LOCAL_OPENAI_API_KEY="${LOCAL_OPENAI_API_KEY:-}"
export YTDLP_BIN="${YTDLP_BIN:-yt-dlp}"
export FFMPEG_BIN="${FFMPEG_BIN:-ffmpeg}"
export WHISPER_BIN="${WHISPER_BIN:-}"
export WHISPER_MODEL="${WHISPER_MODEL:-$DATA_DIR/models/ggml-base.en.bin}"
export ASR_HTTP_BASE_URL="${ASR_HTTP_BASE_URL:-http://host.docker.internal:8088}"
export ASR_HTTP_TRANSCRIBE_PATH="${ASR_HTTP_TRANSCRIBE_PATH:-/transcribe}"
export ASR_HTTP_FIELD_NAME="${ASR_HTTP_FIELD_NAME:-wav}"
export ASR_HTTP_TIMEOUT_SEC="${ASR_HTTP_TIMEOUT_SEC:-120}"
export N8N_BASE_URL="${N8N_BASE_URL:-http://n8n:5678}"
export N8N_ANALYSIS_WEBHOOK_PATH="${N8N_ANALYSIS_WEBHOOK_PATH:-/webhook/storyforge-analysis}"
export N8N_REAL_CUT_WEBHOOK_PATH="${N8N_REAL_CUT_WEBHOOK_PATH:-/webhook/storyforge-real-cut}"
export N8N_AI_VIDEO_WEBHOOK_PATH="${N8N_AI_VIDEO_WEBHOOK_PATH:-/webhook/storyforge-ai-video}"
export N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH="${N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH:-/webhook/storyforge-content-source-sync}"
export ORCHESTRATOR_SHARED_SECRET="${ORCHESTRATOR_SHARED_SECRET:-storyforge-local-secret}"
export CUTVIDEO_BASE_URL="${CUTVIDEO_BASE_URL:-http://192.168.31.18:7860}"
export CUTVIDEO_API_KEY="${CUTVIDEO_API_KEY:-}"
export CUTVIDEO_BASE_CONFIG="${CUTVIDEO_BASE_CONFIG:-example.job.yaml}"
export CUTVIDEO_POLL_INTERVAL_SEC="${CUTVIDEO_POLL_INTERVAL_SEC:-10}"
export CUTVIDEO_MAX_WAIT_SEC="${CUTVIDEO_MAX_WAIT_SEC:-1800}"
export CUTVIDEO_UPLOAD_TIMEOUT_SEC="${CUTVIDEO_UPLOAD_TIMEOUT_SEC:-1800}"
export HUOBAO_BASE_URL="${HUOBAO_BASE_URL:-http://host.docker.internal:5678}"
export HUOBAO_POLL_INTERVAL_SEC="${HUOBAO_POLL_INTERVAL_SEC:-10}"
export HUOBAO_MAX_WAIT_SEC="${HUOBAO_MAX_WAIT_SEC:-900}"
cd "$ROOT_DIR"
exec ./.venv311/bin/python -m uvicorn app.main:app --host "$HOST" --port "$PORT"