feat: add domestic multi-platform workbench routes

This commit is contained in:
kris
2026-03-23 09:13:37 +08:00
parent dedf69193d
commit 1719047ef5
2 changed files with 905 additions and 0 deletions

View File

@@ -0,0 +1,900 @@
from __future__ import annotations
import json
from typing import Any
from fastapi import Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field
class PlatformAnalysisRequest(BaseModel):
model_profile_ids: list[str] = Field(default_factory=list)
linked_account_ids: list[str] = Field(default_factory=list)
include_linked_accounts: bool = True
include_recent_similar_candidates: bool = True
max_videos: int = Field(default=6, ge=1, le=20)
extra_focus: str = ""
temperature: float = 0.35
auto_analyze_top_videos: bool = False
top_video_analysis_count: int = Field(default=4, ge=1, le=10)
class PlatformTopVideoAnalysisRequest(BaseModel):
model_profile_id: str = ""
top_video_count: int = Field(default=5, ge=1, le=12)
min_score: float = 0
temperature: float = 0.25
class PlatformSimilaritySearchRequest(BaseModel):
source_account_id: str = ""
candidate_urls: list[str] = Field(default_factory=list)
seed_linked_accounts: bool = True
search_public_pages: bool = True
model_profile_id: str = ""
max_candidates: int = Field(default=8, ge=1, le=20)
extra_requirements: str = ""
class PlatformBenchmarkLinksRequest(BaseModel):
target_account_ids: list[str] = Field(default_factory=list)
target_profile_urls: list[str] = Field(default_factory=list)
relation_type: str = "benchmark"
note: str = ""
search_id: str = ""
class PlatformTrackingAccountRequest(BaseModel):
tracked_account_id: str
assistant_id: str = ""
note: str = ""
class PlatformTrackingCursorRequest(BaseModel):
last_seen_at: str
def register_domestic_platform_routes(app: Any, legacy: Any, *, platform: str, label: str) -> None:
table_prefix = platform
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value or {}, ensure_ascii=False)
def _parse_json(raw: str, fallback: Any) -> Any:
cleaned = str(raw or "").strip()
if not cleaned:
return fallback
try:
value = json.loads(cleaned)
return value
except json.JSONDecodeError:
return fallback
def ensure_schema() -> None:
schema = f"""
CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_reports (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
account_source_id TEXT NOT NULL,
focus_text TEXT NOT NULL DEFAULT '',
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(account_source_id) REFERENCES content_sources(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_analysis_reports_account_created
ON {table_prefix}_analysis_reports(account_source_id, created_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_suggestions (
id TEXT PRIMARY KEY,
report_id TEXT NOT NULL,
model_profile_id TEXT NOT NULL DEFAULT '',
model_label TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'ok',
suggestion_text TEXT NOT NULL DEFAULT '',
parsed_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(report_id) REFERENCES {table_prefix}_analysis_reports(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_searches (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT NOT NULL,
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_candidates (
id TEXT PRIMARY KEY,
search_id TEXT NOT NULL,
candidate_account_id TEXT,
candidate_profile_url TEXT NOT NULL DEFAULT '',
heuristic_score REAL NOT NULL DEFAULT 0,
agent_score REAL NOT NULL DEFAULT 0,
rationale_text TEXT NOT NULL DEFAULT '',
dimensions_json TEXT NOT NULL DEFAULT '{{}}',
raw_output_json TEXT NOT NULL DEFAULT '{{}}',
rank_index INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
FOREIGN KEY(search_id) REFERENCES {table_prefix}_similarity_searches(id) ON DELETE CASCADE,
FOREIGN KEY(candidate_account_id) REFERENCES content_sources(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_similarity_candidates_search_rank
ON {table_prefix}_similarity_candidates(search_id, rank_index ASC);
CREATE TABLE IF NOT EXISTS {table_prefix}_account_relations (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT NOT NULL,
target_account_id TEXT,
target_profile_url TEXT NOT NULL DEFAULT '',
relation_type TEXT NOT NULL DEFAULT 'benchmark',
note TEXT NOT NULL DEFAULT '',
search_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE,
FOREIGN KEY(target_account_id) REFERENCES content_sources(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_account_relations_source
ON {table_prefix}_account_relations(source_account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_tracked_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
tracked_account_id TEXT NOT NULL,
assistant_id TEXT,
note TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, tracked_account_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(tracked_account_id) REFERENCES content_sources(id) ON DELETE CASCADE,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_tracked_accounts_user_updated
ON {table_prefix}_tracked_accounts(user_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_tracking_cursors (
user_id TEXT PRIMARY KEY,
last_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
ensure_schema()
@app.on_event("startup")
def _startup_platform_schema() -> None:
ensure_schema()
def _content_source_rows(user_id: str, platform_value: str, kind: str = "") -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"SELECT * FROM content_sources WHERE user_id = ? AND platform = ? ORDER BY updated_at DESC, created_at DESC",
(user_id, platform_value),
)
if kind:
rows = [row for row in rows if row.get("source_kind") == kind]
return rows
def _content_source_payload(row: dict[str, Any]) -> dict[str, Any]:
return legacy.content_source_payload(row)
def _source_metadata(row: dict[str, Any]) -> dict[str, Any]:
return _content_source_payload(row).get("metadata", {})
def _require_account(account_id: str, user_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM content_sources WHERE id = ? AND user_id = ? AND source_kind = 'creator_account' AND platform = ?",
(account_id, user_id, platform),
)
if not row:
raise HTTPException(status_code=404, detail=f"{label} account not found")
return row
def _linked_video_sources(account_row: dict[str, Any]) -> list[dict[str, Any]]:
project_id = account_row.get("project_id", "")
rows = legacy.db.fetch_all(
"SELECT * FROM content_sources WHERE user_id = ? AND project_id = ? AND source_kind = 'video_link' AND platform = ? ORDER BY updated_at DESC, created_at DESC",
(account_row["user_id"], project_id, platform),
)
account_id = account_row["id"]
source_url = str(account_row.get("source_url") or "").strip()
linked: list[dict[str, Any]] = []
for row in rows:
metadata = _source_metadata(row)
if metadata.get("origin_content_source_id") == account_id or metadata.get("source_account_url") == source_url:
linked.append(row)
return linked
def _jobs_for_source(source_id: str) -> list[dict[str, Any]]:
return legacy.db.fetch_all(
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC",
(source_id,),
)
def _latest_job_for_source(source_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC LIMIT 1",
(source_id,),
)
def _extract_performance_score(job_row: dict[str, Any] | None) -> float:
if not job_row:
return 0.0
result_map = _parse_json(job_row.get("result_json") or "{}", {})
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
candidates = [
result_map.get("performance_score"),
(result_map.get("analysis") or {}).get("performance_score"),
(result_map.get("scores") or {}).get("performance_score"),
artifacts_map.get("performance_score"),
(artifacts_map.get("scores") or {}).get("performance_score"),
]
for value in candidates:
try:
return float(value)
except (TypeError, ValueError):
continue
return 0.0
def _extract_metrics(job_row: dict[str, Any] | None) -> dict[str, Any]:
if not job_row:
return {}
result_map = _parse_json(job_row.get("result_json") or "{}", {})
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
return (
result_map.get("metrics")
or artifacts_map.get("metrics")
or result_map.get("stats")
or artifacts_map.get("stats")
or {}
)
def _video_payload(source_row: dict[str, Any]) -> dict[str, Any]:
payload = _content_source_payload(source_row)
metadata = payload.get("metadata", {})
latest_job = _latest_job_for_source(source_row["id"])
metrics = _extract_metrics(latest_job)
tags = metadata.get("tags") or []
if not isinstance(tags, list):
tags = []
return {
"id": source_row["id"],
"aweme_id": str(metadata.get("external_id") or source_row["id"]),
"title": payload.get("title") or "未命名作品",
"description": metadata.get("summary") or metadata.get("description") or (latest_job or {}).get("style_summary", ""),
"share_url": payload.get("source_url", ""),
"cover_url": metadata.get("cover_url") or "",
"duration_sec": float(metadata.get("duration_sec") or 0),
"published_at": metadata.get("published_at") or source_row.get("created_at"),
"tags": tags,
"content_type": metadata.get("content_type") or "video",
"stats": {
"play": metrics.get("play_count") or metrics.get("play") or 0,
"like": metrics.get("like_count") or metrics.get("like") or 0,
"comment": metrics.get("comment_count") or metrics.get("comment") or 0,
"share": metrics.get("share_count") or metrics.get("share") or 0,
},
"score": {
"performance_score": _extract_performance_score(latest_job),
},
"source": payload,
"latest_job_id": (latest_job or {}).get("id", ""),
}
def _account_payload(account_row: dict[str, Any]) -> dict[str, Any]:
payload = _content_source_payload(account_row)
metadata = payload.get("metadata", {})
videos = [_video_payload(item) for item in _linked_video_sources(account_row)]
play_values = [float(video["stats"].get("play") or 0) for video in videos if float(video["stats"].get("play") or 0) > 0]
like_values = [float(video["stats"].get("like") or 0) for video in videos if float(video["stats"].get("like") or 0) > 0]
tags = metadata.get("tags") or []
if not isinstance(tags, list):
tags = []
return {
"id": account_row["id"],
"platform": platform,
"profile_url": payload.get("source_url", ""),
"canonical_profile_url": payload.get("source_url", ""),
"handle": payload.get("handle", ""),
"nickname": payload.get("title") or payload.get("handle") or "未命名账号",
"signature": metadata.get("bio") or metadata.get("description") or "",
"avatar_url": metadata.get("avatar_url") or "",
"tags": tags,
"keywords": metadata.get("keywords") or [],
"sync_status": "ready" if payload.get("metadata", {}).get("last_sync_error", "") == "" else "partial",
"video_summary": {
"count": len(videos),
"avg_play": sum(play_values) / len(play_values) if play_values else 0,
"avg_like": sum(like_values) / len(like_values) if like_values else 0,
"videos": videos[:8],
},
"project_id": payload.get("project_id", ""),
"created_at": payload.get("created_at", ""),
"updated_at": payload.get("updated_at", ""),
}
def _relation_payload(row: dict[str, Any]) -> dict[str, Any]:
target = None
if row.get("target_account_id"):
target = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["target_account_id"],))
return {
"id": row["id"],
"source_account_id": row["source_account_id"],
"target_account_id": row.get("target_account_id", "") or "",
"target_profile_url": row.get("target_profile_url", ""),
"target_nickname": (_account_payload(target)["nickname"] if target else ""),
"relation_type": row.get("relation_type", "benchmark"),
"note": row.get("note", ""),
"search_id": row.get("search_id", ""),
"created_at": row["created_at"],
}
def _report_payload(row: dict[str, Any]) -> dict[str, Any]:
suggestions = [
{
"id": suggestion["id"],
"status": suggestion.get("status", "ok"),
"model_profile_id": suggestion.get("model_profile_id", ""),
"model_label": suggestion.get("model_label", ""),
"suggestion_text": suggestion.get("suggestion_text", ""),
"parsed_json": _parse_json(suggestion.get("parsed_json") or "{}", {}),
"created_at": suggestion.get("created_at", ""),
}
for suggestion in legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
(row["id"],),
)
]
return {
"id": row["id"],
"focus_text": row.get("focus_text", ""),
"suggestions": suggestions,
"created_at": row["created_at"],
}
def _workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
reports = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_analysis_reports WHERE account_source_id = ? ORDER BY created_at DESC LIMIT 6",
(account_row["id"],),
)
relations = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC",
(account_row["id"],),
)
return {
"account": _account_payload(account_row),
"recent_reports": [_report_payload(row) for row in reports],
"linked_accounts": [_relation_payload(row) for row in relations],
}
async def _call_reasoning_model(user_id: str, prompt: str, *, system_prompt: str, model_profile_id: str = "", temperature: float = 0.3) -> tuple[str, dict[str, Any]]:
profile = legacy.model_profile_for_account(user_id, model_profile_id or None)
output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=temperature)
parsed = legacy.parse_json_object(output)
return output, parsed if isinstance(parsed, dict) else {}
async def _create_sync_job_for_account(account_row: dict[str, Any], assistant_id: str = "") -> dict[str, Any]:
project_id = account_row.get("project_id") or ""
if not project_id:
raise HTTPException(status_code=400, detail="Account source is not attached to a project")
kb = legacy.resolve_target_kb(account_row["user_id"], None, project_id)
source_payload = _content_source_payload(account_row)
profile = legacy.model_profile_for_account(account_row["user_id"], None)
job_row = legacy.create_job_record(
account_id=account_row["user_id"],
project_id=project_id,
knowledge_base_id=kb["id"],
content_source_id=account_row["id"],
assistant_id=assistant_id or None,
source_type="creator_account",
line_type="analysis",
workflow_key="content_source_sync_pipeline",
title=f"{source_payload.get('title') or source_payload.get('handle') or label} 内容同步",
language="auto",
source_url=source_payload.get("source_url", ""),
artifacts={
"source_account_url": source_payload.get("source_url", ""),
"platform": platform,
"handle": source_payload.get("handle", ""),
"max_items": int(source_payload.get("metadata", {}).get("max_items") or 5),
"skip_existing": True,
"auto_trigger_analysis": True,
},
analysis_model_profile_id=profile["id"],
)
queued = await legacy.trigger_orchestrated_job(job_row)
return legacy.job_payload(queued)
def _tracking_cursor(user_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?",
(user_id,),
)
def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]:
existing = _tracking_cursor(user_id)
updated_at = now()
if existing:
legacy.db.execute(
f"UPDATE {table_prefix}_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?",
(last_seen_at, updated_at, user_id),
)
else:
legacy.db.execute(
f"INSERT INTO {table_prefix}_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)",
(user_id, last_seen_at, updated_at),
)
return legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?",
(user_id,),
)
def _tracking_digest_item(tracked_row: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
latest_job = _latest_job_for_source(video["id"])
summary = (latest_job or {}).get("style_summary") or video.get("description") or "已发现更新内容"
assistant = None
if tracked_row.get("assistant_id"):
assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (tracked_row["assistant_id"],))
if assistant_row:
assistant = legacy.assistant_payload(assistant_row)
borrowing_points = [point for point in [summary[:36], video.get("title", "")[:36]] if point]
return {
"tracking_id": tracked_row["id"],
"tracked_account_id": tracked_row["tracked_account_id"],
"tracked_account_name": _account_payload(_require_account(tracked_row["tracked_account_id"], tracked_row["user_id"]))["nickname"],
"assistant_id": tracked_row.get("assistant_id", "") or "",
"assistant_name": (assistant or {}).get("name", ""),
"note": tracked_row.get("note", ""),
"video": video,
"summary_text": summary,
"borrowing_points": borrowing_points[:3],
"created_at": video.get("published_at") or now(),
}
def _tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]:
tracked_rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
(user_id,),
)
cursor = _tracking_cursor(user_id)
threshold = (since_value or (cursor or {}).get("last_seen_at") or "").strip()
items: list[dict[str, Any]] = []
for tracked in tracked_rows:
account_row = _require_account(tracked["tracked_account_id"], user_id)
for video in _account_payload(account_row)["video_summary"]["videos"]:
published_at = str(video.get("published_at") or "")
if threshold and published_at and published_at <= threshold:
continue
items.append(_tracking_digest_item(tracked, video))
items.sort(key=lambda item: item.get("created_at", ""), reverse=True)
return {
"items": items[:limit],
"tracked_accounts": [
{
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"note": row.get("note", ""),
"updated_at": row["updated_at"],
}
for row in tracked_rows
],
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
}
@app.get(f"/v2/{platform}/accounts")
def list_platform_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
return [_account_payload(row) for row in _content_source_rows(account["id"], platform, "creator_account")]
@app.get(f"/v2/{platform}/accounts/{{account_id}}/workspace")
def get_platform_account_workspace(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
return _workspace_payload(account_row)
@app.get(f"/v2/{platform}/accounts/{{account_id}}/videos")
def list_platform_account_videos(
account_id: str,
limit: int = Query(default=80, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
items = [_video_payload(row) for row in _linked_video_sources(account_row)]
items.sort(key=lambda item: (item["score"]["performance_score"], item.get("published_at") or ""), reverse=True)
top_ids = [item["id"] for item in items if float(item["score"]["performance_score"] or 0) >= 60][:12]
latest_ids = [item["id"] for item in sorted(items, key=lambda item: item.get("published_at") or "", reverse=True)[:12]]
return {
"items": items[:limit],
"count": len(items),
"meta": {"platform": platform, "account_id": account_id},
"top_scored_video_ids": top_ids,
"latest_video_ids": latest_ids,
"high_score_threshold": 60,
}
@app.post(f"/v2/{platform}/accounts/{{account_id}}/analysis")
async def analyze_platform_account(
account_id: str,
request: PlatformAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
workspace = _workspace_payload(account_row)
context = {
"account": workspace["account"],
"top_videos": workspace["account"]["video_summary"]["videos"][: max(1, min(request.max_videos, 8))],
"linked_accounts": workspace["linked_accounts"][:5],
"extra_focus": request.extra_focus,
}
prompt = (
f"请从新媒体商业化运营视角,分析这个{label}账号,输出执行摘要、可借鉴点、风险提醒和下一步动作。"
f"\n\n输入:\n{json.dumps(context, ensure_ascii=False, indent=2)}"
)
output, parsed = await _call_reasoning_model(
account["id"],
prompt,
system_prompt="你是新媒体账号分析顾问。尽量输出 JSON字段包括 executive_summary、borrow_points、risks、next_actions。",
temperature=request.temperature,
)
report_id = make_id(f"{platform}_report")
legacy.db.execute(
f"INSERT INTO {table_prefix}_analysis_reports (id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
report_id,
account["id"],
account_row["id"],
request.extra_focus or "",
prompt,
_safe_json_dumps(context),
now(),
),
)
suggestion_id = make_id(f"{platform}_suggestion")
profile = legacy.model_profile_for_account(account["id"], None)
legacy.db.execute(
f"INSERT INTO {table_prefix}_analysis_suggestions (id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
suggestion_id,
report_id,
profile["id"],
f"{profile.get('name', '')} · {profile.get('model_name', '')}".strip(" ·"),
"ok",
output[:4000],
_safe_json_dumps(parsed),
now(),
),
)
report_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_analysis_reports WHERE id = ?",
(report_id,),
)
report_payload = _report_payload(report_row)
return {
"report_id": report_id,
"account_id": account_row["id"],
"suggestions": report_payload["suggestions"],
"context": context,
}
@app.post(f"/v2/{platform}/accounts/{{account_id}}/videos/analyze-top")
async def analyze_platform_top_videos(
account_id: str,
request: PlatformTopVideoAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
videos = [_video_payload(row) for row in _linked_video_sources(account_row)]
ranked = [
video for video in sorted(videos, key=lambda item: item["score"]["performance_score"], reverse=True)
if float(video["score"]["performance_score"] or 0) >= float(request.min_score or 0)
][: request.top_video_count]
results: list[dict[str, Any]] = []
for video in ranked:
prompt = (
f"请拆解这条{label}作品为什么值得关注,输出 summary、borrow_points、risks。"
f"\n\n输入:\n{json.dumps(video, ensure_ascii=False, indent=2)}"
)
output, parsed = await _call_reasoning_model(
account["id"],
prompt,
system_prompt="你是短视频内容拆解助手。尽量输出 JSON字段包括 summary、borrow_points、risks。",
model_profile_id=request.model_profile_id,
temperature=request.temperature,
)
summary_text = str(parsed.get("summary") or parsed.get("headline_summary") or output)[:240]
results.append(
{
"id": make_id(f"{platform}_va"),
"video_id": video["id"],
"video_title": video["title"],
"status": "ok",
"summary_text": summary_text,
"parsed_json": parsed,
"performance_score": video["score"]["performance_score"],
"created_at": now(),
}
)
return {
"account_id": account_row["id"],
"analyzed_count": len(results),
"items": results,
}
@app.post(f"/v2/{platform}/similar-searches")
async def create_platform_similarity_search(
request: PlatformSimilaritySearchRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(request.source_account_id, account["id"])
source_payload = _account_payload(account_row)
candidates = [
row for row in _content_source_rows(account["id"], platform, "creator_account")
if row["id"] != account_row["id"]
][: max(5, request.max_candidates)]
ranked_candidates: list[dict[str, Any]] = []
source_tags = set(source_payload.get("tags") or [])
for index, row in enumerate(candidates, start=1):
payload = _account_payload(row)
overlap = len(source_tags.intersection(set(payload.get("tags") or [])))
heuristic = overlap * 10 + max(0, 50 - index)
rationale = f"与源账号同平台,标签重合 {overlap},适合作为{label}对标候选。"
ranked_candidates.append(
{
"candidate_account_id": row["id"],
"candidate_profile_url": payload.get("profile_url", ""),
"candidate_nickname": payload.get("nickname", ""),
"heuristic_score": float(heuristic),
"agent_score": float(heuristic),
"rationale_text": rationale,
"dimensions_json": {"tag_overlap": overlap},
}
)
ranked_candidates.sort(key=lambda item: item["agent_score"], reverse=True)
ranked_candidates = ranked_candidates[: request.max_candidates]
search_id = make_id(f"{platform}_search")
legacy.db.execute(
f"INSERT INTO {table_prefix}_similarity_searches (id, user_id, source_account_id, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(
search_id,
account["id"],
account_row["id"],
request.extra_requirements or "",
_safe_json_dumps({"source_account": source_payload}),
now(),
),
)
for idx, item in enumerate(ranked_candidates):
legacy.db.execute(
f"""INSERT INTO {table_prefix}_similarity_candidates
(id, search_id, candidate_account_id, candidate_profile_url, heuristic_score, agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
make_id(f"{platform}_candidate"),
search_id,
item.get("candidate_account_id") or None,
item.get("candidate_profile_url", ""),
item.get("heuristic_score", 0),
item.get("agent_score", 0),
item.get("rationale_text", ""),
_safe_json_dumps(item.get("dimensions_json") or {}),
_safe_json_dumps(item),
idx,
now(),
),
)
return {"id": search_id, "search_id": search_id}
@app.get(f"/v2/{platform}/similar-searches/{{search_id}}")
def get_platform_similarity_search(search_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
search_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_similarity_searches WHERE id = ? AND user_id = ?",
(search_id, account["id"]),
)
if not search_row:
raise HTTPException(status_code=404, detail="Similarity search not found")
candidate_rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_similarity_candidates WHERE search_id = ? ORDER BY rank_index ASC",
(search_id,),
)
candidates = []
for row in candidate_rows:
payload = _parse_json(row.get("raw_output_json") or "{}", {})
payload.setdefault("candidate_account_id", row.get("candidate_account_id", ""))
payload.setdefault("candidate_profile_url", row.get("candidate_profile_url", ""))
payload.setdefault("rationale_text", row.get("rationale_text", ""))
payload.setdefault("agent_score", row.get("agent_score", 0))
payload.setdefault("heuristic_score", row.get("heuristic_score", 0))
candidates.append(payload)
return {
"id": search_row["id"],
"search_id": search_row["id"],
"source_account_id": search_row["source_account_id"],
"candidates": candidates,
"created_at": search_row["created_at"],
}
@app.get(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links")
def list_platform_benchmark_links(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
_require_account(account_id, account["id"])
rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC",
(account_id,),
)
return [_relation_payload(row) for row in rows]
@app.post(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links")
def create_platform_benchmark_links(
account_id: str,
request: PlatformBenchmarkLinksRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_account = _require_account(account_id, account["id"])
created: list[dict[str, Any]] = []
for target_account_id in request.target_account_ids:
target = _require_account(target_account_id, account["id"])
relation_id = make_id(f"{platform}_link")
legacy.db.execute(
f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
relation_id,
account["id"],
source_account["id"],
target["id"],
target.get("source_url", ""),
request.relation_type or "benchmark",
request.note or "",
request.search_id or "",
now(),
),
)
created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,))))
for target_profile_url in request.target_profile_urls:
cleaned = str(target_profile_url or "").strip()
if not cleaned:
continue
relation_id = make_id(f"{platform}_link")
legacy.db.execute(
f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
relation_id,
account["id"],
source_account["id"],
None,
cleaned,
request.relation_type or "benchmark",
request.note or "",
request.search_id or "",
now(),
),
)
created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,))))
return {"links": created}
@app.get(f"/v2/{platform}/tracking/accounts")
def list_platform_tracking_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
(account["id"],),
)
cursor = _tracking_cursor(account["id"])
return {
"items": [
{
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"note": row.get("note", ""),
"updated_at": row["updated_at"],
}
for row in rows
],
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
}
@app.post(f"/v2/{platform}/tracking/accounts")
def create_platform_tracking_account(
request: PlatformTrackingAccountRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
tracked = _require_account(request.tracked_account_id, account["id"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, tracked.get("project_id", ""))
existing = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked["id"]),
)
if existing:
legacy.db.execute(
f"UPDATE {table_prefix}_tracked_accounts SET assistant_id = ?, note = ?, updated_at = ? WHERE id = ?",
(((assistant or {}).get("id") or None), request.note or "", now(), existing["id"]),
)
row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (existing["id"],))
else:
tracking_id = make_id(f"{platform}_track")
legacy.db.execute(
f"INSERT INTO {table_prefix}_tracked_accounts (id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
tracking_id,
account["id"],
tracked["id"],
(assistant or {}).get("id") or None,
request.note or "",
now(),
now(),
),
)
row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (tracking_id,))
return {
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"note": row.get("note", ""),
"updated_at": row["updated_at"],
}
@app.post(f"/v2/{platform}/tracking/accounts/{{tracked_account_id}}/refresh")
async def refresh_platform_tracked_account(tracked_account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
tracked_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked_account_id),
)
if not tracked_row:
raise HTTPException(status_code=404, detail="Tracked account not found")
account_row = _require_account(tracked_account_id, account["id"])
queued = await _create_sync_job_for_account(account_row, assistant_id=tracked_row.get("assistant_id", "") or "")
legacy.db.execute(
f"UPDATE {table_prefix}_tracked_accounts SET updated_at = ? WHERE id = ?",
(now(), tracked_row["id"]),
)
return {"tracking_id": tracked_row["id"], "tracked_account_id": tracked_account_id, "sync_job_id": queued["id"], "status": queued["status"]}
@app.post(f"/v2/{platform}/tracking/refresh")
async def refresh_platform_tracking(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
tracked_rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
(account["id"],),
)
refreshed = 0
failed = 0
items: list[dict[str, Any]] = []
for row in tracked_rows:
try:
account_row = _require_account(row["tracked_account_id"], account["id"])
queued = await _create_sync_job_for_account(account_row, assistant_id=row.get("assistant_id", "") or "")
refreshed += 1
items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "sync_job_id": queued["id"], "status": queued["status"]})
except Exception as exc:
failed += 1
items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "error": str(exc)})
return {"refreshed": refreshed, "failed": failed, "items": items}
@app.post(f"/v2/{platform}/tracking/cursor")
def update_platform_tracking_cursor(request: PlatformTrackingCursorRequest, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
cursor = _set_tracking_cursor(account["id"], request.last_seen_at)
return {"last_seen_at": cursor["last_seen_at"], "updated_at": cursor["updated_at"]}
@app.get(f"/v2/{platform}/tracking/digest")
def get_platform_tracking_digest(
since: str = Query(default=""),
limit: int = Query(default=24, ge=1, le=100),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
return _tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from .domestic_platform_features import register_domestic_platform_routes
from .douyin_features import register_douyin_routes
try:
@@ -14,3 +15,7 @@ except Exception:
app = core.app
register_douyin_routes(app, core)
register_domestic_platform_routes(app, core, platform="xiaohongshu", label="小红书")
register_domestic_platform_routes(app, core, platform="bilibili", label="哔哩哔哩")
register_domestic_platform_routes(app, core, platform="kuaishou", label="快手")
register_domestic_platform_routes(app, core, platform="wechat_video", label="微信视频号")