from __future__ import annotations import json from typing import Any from fastapi import Body, Depends, HTTPException, Query from pydantic import BaseModel, Field class PlatformAnalysisRequest(BaseModel): model_profile_ids: list[str] = Field(default_factory=list) linked_account_ids: list[str] = Field(default_factory=list) include_linked_accounts: bool = True include_recent_similar_candidates: bool = True max_videos: int = Field(default=6, ge=1, le=20) extra_focus: str = "" temperature: float = 0.35 auto_analyze_top_videos: bool = False top_video_analysis_count: int = Field(default=4, ge=1, le=10) class PlatformTopVideoAnalysisRequest(BaseModel): model_profile_id: str = "" top_video_count: int = Field(default=5, ge=1, le=12) min_score: float = 0 temperature: float = 0.25 class PlatformSimilaritySearchRequest(BaseModel): source_account_id: str = "" candidate_urls: list[str] = Field(default_factory=list) seed_linked_accounts: bool = True search_public_pages: bool = True model_profile_id: str = "" max_candidates: int = Field(default=8, ge=1, le=20) extra_requirements: str = "" class PlatformBenchmarkLinksRequest(BaseModel): target_account_ids: list[str] = Field(default_factory=list) target_profile_urls: list[str] = Field(default_factory=list) relation_type: str = "benchmark" note: str = "" search_id: str = "" class PlatformTrackingAccountRequest(BaseModel): tracked_account_id: str assistant_id: str = "" note: str = "" class PlatformTrackingCursorRequest(BaseModel): last_seen_at: str def register_domestic_platform_routes(app: Any, legacy: Any, *, platform: str, label: str) -> None: table_prefix = platform def now() -> str: return legacy.utc_now() def make_id(prefix: str) -> str: return legacy.make_id(prefix) def _safe_json_dumps(value: Any) -> str: return json.dumps(value or {}, ensure_ascii=False) def _parse_json(raw: str, fallback: Any) -> Any: cleaned = str(raw or "").strip() if not cleaned: return fallback try: value = json.loads(cleaned) return value except json.JSONDecodeError: return fallback def ensure_schema() -> None: schema = f""" CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_reports ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, account_source_id TEXT NOT NULL, focus_text TEXT NOT NULL DEFAULT '', prompt_text TEXT NOT NULL DEFAULT '', context_json TEXT NOT NULL DEFAULT '{{}}', created_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(account_source_id) REFERENCES content_sources(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_{table_prefix}_analysis_reports_account_created ON {table_prefix}_analysis_reports(account_source_id, created_at DESC); CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_suggestions ( id TEXT PRIMARY KEY, report_id TEXT NOT NULL, model_profile_id TEXT NOT NULL DEFAULT '', model_label TEXT NOT NULL DEFAULT '', status TEXT NOT NULL DEFAULT 'ok', suggestion_text TEXT NOT NULL DEFAULT '', parsed_json TEXT NOT NULL DEFAULT '{{}}', created_at TEXT NOT NULL, FOREIGN KEY(report_id) REFERENCES {table_prefix}_analysis_reports(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_searches ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, source_account_id TEXT NOT NULL, prompt_text TEXT NOT NULL DEFAULT '', context_json TEXT NOT NULL DEFAULT '{{}}', created_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_candidates ( id TEXT PRIMARY KEY, search_id TEXT NOT NULL, candidate_account_id TEXT, candidate_profile_url TEXT NOT NULL DEFAULT '', heuristic_score REAL NOT NULL DEFAULT 0, agent_score REAL NOT NULL DEFAULT 0, rationale_text TEXT NOT NULL DEFAULT '', dimensions_json TEXT NOT NULL DEFAULT '{{}}', raw_output_json TEXT NOT NULL DEFAULT '{{}}', rank_index INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL, FOREIGN KEY(search_id) REFERENCES {table_prefix}_similarity_searches(id) ON DELETE CASCADE, FOREIGN KEY(candidate_account_id) REFERENCES content_sources(id) ON DELETE SET NULL ); CREATE INDEX IF NOT EXISTS idx_{table_prefix}_similarity_candidates_search_rank ON {table_prefix}_similarity_candidates(search_id, rank_index ASC); CREATE TABLE IF NOT EXISTS {table_prefix}_account_relations ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, source_account_id TEXT NOT NULL, target_account_id TEXT, target_profile_url TEXT NOT NULL DEFAULT '', relation_type TEXT NOT NULL DEFAULT 'benchmark', note TEXT NOT NULL DEFAULT '', search_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE, FOREIGN KEY(target_account_id) REFERENCES content_sources(id) ON DELETE SET NULL ); CREATE INDEX IF NOT EXISTS idx_{table_prefix}_account_relations_source ON {table_prefix}_account_relations(source_account_id, created_at DESC); CREATE TABLE IF NOT EXISTS {table_prefix}_tracked_accounts ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, tracked_account_id TEXT NOT NULL, assistant_id TEXT, note TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, UNIQUE(user_id, tracked_account_id), FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(tracked_account_id) REFERENCES content_sources(id) ON DELETE CASCADE, FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL ); CREATE INDEX IF NOT EXISTS idx_{table_prefix}_tracked_accounts_user_updated ON {table_prefix}_tracked_accounts(user_id, updated_at DESC); CREATE TABLE IF NOT EXISTS {table_prefix}_tracking_cursors ( user_id TEXT PRIMARY KEY, last_seen_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE ); """ with legacy.db.session() as conn: conn.executescript(schema) ensure_schema() @app.on_event("startup") def _startup_platform_schema() -> None: ensure_schema() def _content_source_rows(user_id: str, platform_value: str, kind: str = "") -> list[dict[str, Any]]: rows = legacy.db.fetch_all( "SELECT * FROM content_sources WHERE user_id = ? AND platform = ? ORDER BY updated_at DESC, created_at DESC", (user_id, platform_value), ) if kind: rows = [row for row in rows if row.get("source_kind") == kind] return rows def _content_source_payload(row: dict[str, Any]) -> dict[str, Any]: return legacy.content_source_payload(row) def _source_metadata(row: dict[str, Any]) -> dict[str, Any]: return _content_source_payload(row).get("metadata", {}) def _require_account(account_id: str, user_id: str) -> dict[str, Any]: row = legacy.db.fetch_one( "SELECT * FROM content_sources WHERE id = ? AND user_id = ? AND source_kind = 'creator_account' AND platform = ?", (account_id, user_id, platform), ) if not row: raise HTTPException(status_code=404, detail=f"{label} account not found") return row def _linked_video_sources(account_row: dict[str, Any]) -> list[dict[str, Any]]: project_id = account_row.get("project_id", "") rows = legacy.db.fetch_all( "SELECT * FROM content_sources WHERE user_id = ? AND project_id = ? AND source_kind = 'video_link' AND platform = ? ORDER BY updated_at DESC, created_at DESC", (account_row["user_id"], project_id, platform), ) account_id = account_row["id"] source_url = str(account_row.get("source_url") or "").strip() linked: list[dict[str, Any]] = [] for row in rows: metadata = _source_metadata(row) if metadata.get("origin_content_source_id") == account_id or metadata.get("source_account_url") == source_url: linked.append(row) return linked def _jobs_for_source(source_id: str) -> list[dict[str, Any]]: return legacy.db.fetch_all( "SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC", (source_id,), ) def _latest_job_for_source(source_id: str) -> dict[str, Any] | None: return legacy.db.fetch_one( "SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC LIMIT 1", (source_id,), ) def _extract_performance_score(job_row: dict[str, Any] | None) -> float: if not job_row: return 0.0 result_map = _parse_json(job_row.get("result_json") or "{}", {}) artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {}) candidates = [ result_map.get("performance_score"), (result_map.get("analysis") or {}).get("performance_score"), (result_map.get("scores") or {}).get("performance_score"), artifacts_map.get("performance_score"), (artifacts_map.get("scores") or {}).get("performance_score"), ] for value in candidates: try: return float(value) except (TypeError, ValueError): continue return 0.0 def _extract_metrics(job_row: dict[str, Any] | None) -> dict[str, Any]: if not job_row: return {} result_map = _parse_json(job_row.get("result_json") or "{}", {}) artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {}) return ( result_map.get("metrics") or artifacts_map.get("metrics") or result_map.get("stats") or artifacts_map.get("stats") or {} ) def _video_payload(source_row: dict[str, Any]) -> dict[str, Any]: payload = _content_source_payload(source_row) metadata = payload.get("metadata", {}) latest_job = _latest_job_for_source(source_row["id"]) metrics = _extract_metrics(latest_job) tags = metadata.get("tags") or [] if not isinstance(tags, list): tags = [] return { "id": source_row["id"], "aweme_id": str(metadata.get("external_id") or source_row["id"]), "title": payload.get("title") or "未命名作品", "description": metadata.get("summary") or metadata.get("description") or (latest_job or {}).get("style_summary", ""), "share_url": payload.get("source_url", ""), "cover_url": metadata.get("cover_url") or "", "duration_sec": float(metadata.get("duration_sec") or 0), "published_at": metadata.get("published_at") or source_row.get("created_at"), "tags": tags, "content_type": metadata.get("content_type") or "video", "stats": { "play": metrics.get("play_count") or metrics.get("play") or 0, "like": metrics.get("like_count") or metrics.get("like") or 0, "comment": metrics.get("comment_count") or metrics.get("comment") or 0, "share": metrics.get("share_count") or metrics.get("share") or 0, }, "score": { "performance_score": _extract_performance_score(latest_job), }, "source": payload, "latest_job_id": (latest_job or {}).get("id", ""), } def _account_payload(account_row: dict[str, Any]) -> dict[str, Any]: payload = _content_source_payload(account_row) metadata = payload.get("metadata", {}) videos = [_video_payload(item) for item in _linked_video_sources(account_row)] play_values = [float(video["stats"].get("play") or 0) for video in videos if float(video["stats"].get("play") or 0) > 0] like_values = [float(video["stats"].get("like") or 0) for video in videos if float(video["stats"].get("like") or 0) > 0] tags = metadata.get("tags") or [] if not isinstance(tags, list): tags = [] return { "id": account_row["id"], "platform": platform, "profile_url": payload.get("source_url", ""), "canonical_profile_url": payload.get("source_url", ""), "handle": payload.get("handle", ""), "nickname": payload.get("title") or payload.get("handle") or "未命名账号", "signature": metadata.get("bio") or metadata.get("description") or "", "avatar_url": metadata.get("avatar_url") or "", "tags": tags, "keywords": metadata.get("keywords") or [], "sync_status": "ready" if payload.get("metadata", {}).get("last_sync_error", "") == "" else "partial", "video_summary": { "count": len(videos), "avg_play": sum(play_values) / len(play_values) if play_values else 0, "avg_like": sum(like_values) / len(like_values) if like_values else 0, "videos": videos[:8], }, "project_id": payload.get("project_id", ""), "created_at": payload.get("created_at", ""), "updated_at": payload.get("updated_at", ""), } def _relation_payload(row: dict[str, Any]) -> dict[str, Any]: target = None if row.get("target_account_id"): target = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["target_account_id"],)) return { "id": row["id"], "source_account_id": row["source_account_id"], "target_account_id": row.get("target_account_id", "") or "", "target_profile_url": row.get("target_profile_url", ""), "target_nickname": (_account_payload(target)["nickname"] if target else ""), "relation_type": row.get("relation_type", "benchmark"), "note": row.get("note", ""), "search_id": row.get("search_id", ""), "created_at": row["created_at"], } def _model_profile_payload(row: dict[str, Any]) -> dict[str, Any]: return { "id": row["id"], "name": row["name"], "model_name": row["model_name"], "base_url": row["base_url"], "is_default": bool(row.get("is_default", 0)), } def _report_payload(row: dict[str, Any]) -> dict[str, Any]: suggestions = [ { "id": suggestion["id"], "status": suggestion.get("status", "ok"), "model_profile_id": suggestion.get("model_profile_id", ""), "model_label": suggestion.get("model_label", ""), "suggestion_text": suggestion.get("suggestion_text", ""), "parsed_json": _parse_json(suggestion.get("parsed_json") or "{}", {}), "created_at": suggestion.get("created_at", ""), } for suggestion in legacy.db.fetch_all( f"SELECT * FROM {table_prefix}_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC", (row["id"],), ) ] return { "id": row["id"], "focus_text": row.get("focus_text", ""), "model_profile_ids": _parse_json(row.get("model_profile_ids_json") or "[]", []), "linked_account_ids": _parse_json(row.get("linked_account_ids_json") or "[]", []), "suggestions": suggestions, "created_at": row["created_at"], } def _workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]: reports = legacy.db.fetch_all( f"SELECT * FROM {table_prefix}_analysis_reports WHERE account_source_id = ? ORDER BY created_at DESC LIMIT 6", (account_row["id"],), ) relations = legacy.db.fetch_all( f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC", (account_row["id"],), ) recent_searches = legacy.db.fetch_all( f"SELECT * FROM {table_prefix}_similarity_searches WHERE source_account_id = ? ORDER BY created_at DESC LIMIT 5", (account_row["id"],), ) return { "account": _account_payload(account_row), "latest_public_snapshot": None, "latest_creator_snapshot": None, "recent_reports": [_report_payload(row) for row in reports], "linked_accounts": [_relation_payload(row) for row in relations], "recent_similarity_searches": [ { "id": row["id"], "prompt_text": row.get("prompt_text", ""), "context": _parse_json(row.get("context_json") or "{}", {}), "created_at": row["created_at"], } for row in recent_searches ], "available_model_profiles": [_model_profile_payload(row) for row in legacy.db.fetch_all( """ SELECT * FROM model_profiles WHERE owner_account_id IS NULL OR owner_account_id = ? ORDER BY is_default DESC, created_at ASC """, (account_row["user_id"],), )], } async def _call_reasoning_model(user_id: str, prompt: str, *, system_prompt: str, model_profile_id: str = "", temperature: float = 0.3) -> tuple[str, dict[str, Any]]: profile = legacy.model_profile_for_account(user_id, model_profile_id or None) output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=temperature) parsed = legacy.parse_json_object(output) return output, parsed if isinstance(parsed, dict) else {} async def _create_sync_job_for_account(account_row: dict[str, Any], assistant_id: str = "") -> dict[str, Any]: project_id = account_row.get("project_id") or "" if not project_id: raise HTTPException(status_code=400, detail="Account source is not attached to a project") kb = legacy.resolve_target_kb(account_row["user_id"], None, project_id) source_payload = _content_source_payload(account_row) profile = legacy.model_profile_for_account(account_row["user_id"], None) job_row = legacy.create_job_record( account_id=account_row["user_id"], project_id=project_id, knowledge_base_id=kb["id"], content_source_id=account_row["id"], assistant_id=assistant_id or None, source_type="creator_account", line_type="analysis", workflow_key="content_source_sync_pipeline", title=f"{source_payload.get('title') or source_payload.get('handle') or label} 内容同步", language="auto", source_url=source_payload.get("source_url", ""), artifacts={ "source_account_url": source_payload.get("source_url", ""), "platform": platform, "handle": source_payload.get("handle", ""), "max_items": int(source_payload.get("metadata", {}).get("max_items") or 5), "skip_existing": True, "auto_trigger_analysis": True, }, analysis_model_profile_id=profile["id"], ) queued = await legacy.trigger_orchestrated_job(job_row) return legacy.job_payload(queued) def _tracking_cursor(user_id: str) -> dict[str, Any] | None: return legacy.db.fetch_one( f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?", (user_id,), ) def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]: existing = _tracking_cursor(user_id) updated_at = now() if existing: legacy.db.execute( f"UPDATE {table_prefix}_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?", (last_seen_at, updated_at, user_id), ) else: legacy.db.execute( f"INSERT INTO {table_prefix}_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)", (user_id, last_seen_at, updated_at), ) return legacy.db.fetch_one( f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?", (user_id,), ) def _tracked_account_payload(tracked_row: dict[str, Any]) -> dict[str, Any]: assistant_name = "" if tracked_row.get("assistant_id"): assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (tracked_row["assistant_id"],)) if assistant_row: assistant_name = legacy.assistant_payload(assistant_row).get("name", "") account_row = _require_account(tracked_row["tracked_account_id"], tracked_row["user_id"]) return { "id": tracked_row["id"], "platform": platform, "tracked_account_id": tracked_row["tracked_account_id"], "assistant_id": tracked_row.get("assistant_id", "") or "", "assistant_name": assistant_name, "note": tracked_row.get("note", ""), "created_at": tracked_row.get("created_at", ""), "updated_at": tracked_row["updated_at"], "account": _account_payload(account_row), } def _list_tracked_accounts(user_id: str) -> list[dict[str, Any]]: rows = legacy.db.fetch_all( f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC", (user_id,), ) return [_tracked_account_payload(row) for row in rows] def _tracking_digest_item(tracked_item: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]: latest_job = _latest_job_for_source(video["id"]) summary = (latest_job or {}).get("style_summary") or video.get("description") or "已发现更新内容" borrowing_points = [point for point in [summary[:36], video.get("title", "")[:36]] if point] performance_score = float(video.get("score", {}).get("performance_score") or 0) return { "tracking_id": tracked_item["id"], "platform": platform, "tracked_account_id": tracked_item["tracked_account_id"], "tracked_account_name": tracked_item["account"]["nickname"], "assistant_id": tracked_item.get("assistant_id", "") or "", "assistant_name": tracked_item.get("assistant_name", ""), "note": tracked_item.get("note", ""), "account": tracked_item["account"], "video": video, "summary": summary, "summary_text": summary, "borrowing_points": borrowing_points[:3], "is_high_value": performance_score >= 60, "created_at": video.get("published_at") or now(), } def _tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]: tracked_items = _list_tracked_accounts(user_id) cursor = _tracking_cursor(user_id) threshold = (since_value or (cursor or {}).get("last_seen_at") or "").strip() items: list[dict[str, Any]] = [] for tracked in tracked_items: for video in tracked["account"]["video_summary"]["videos"]: published_at = str(video.get("published_at") or "") if threshold and published_at and published_at <= threshold: continue items.append(_tracking_digest_item(tracked, video)) items.sort(key=lambda item: item.get("created_at", ""), reverse=True) return { "generated_at": now(), "since": threshold, "items": items[:limit], "tracked_accounts": tracked_items, "cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""), } @app.get(f"/v2/{platform}/accounts") def list_platform_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]: return [_account_payload(row) for row in _content_source_rows(account["id"], platform, "creator_account")] @app.get(f"/v2/{platform}/accounts/{{account_id}}/workspace") def get_platform_account_workspace(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: account_row = _require_account(account_id, account["id"]) return _workspace_payload(account_row) @app.get(f"/v2/{platform}/accounts/{{account_id}}/analysis-reports") def list_platform_analysis_reports(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]: account_row = _require_account(account_id, account["id"]) return _workspace_payload(account_row)["recent_reports"] @app.get(f"/v2/{platform}/accounts/{{account_id}}/snapshots") def list_platform_snapshots(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]: _require_account(account_id, account["id"]) return [] @app.get(f"/v2/{platform}/accounts/{{account_id}}/creator-fields") def get_platform_creator_fields(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: _require_account(account_id, account["id"]) raise HTTPException(status_code=404, detail="No creator-center snapshot found") @app.get(f"/v2/{platform}/accounts/{{account_id}}/videos") def list_platform_account_videos( account_id: str, limit: int = Query(default=80, ge=1, le=200), account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: account_row = _require_account(account_id, account["id"]) items = [_video_payload(row) for row in _linked_video_sources(account_row)] items.sort(key=lambda item: (item["score"]["performance_score"], item.get("published_at") or ""), reverse=True) top_ids = [item["id"] for item in items if float(item["score"]["performance_score"] or 0) >= 60][:12] latest_ids = [item["id"] for item in sorted(items, key=lambda item: item.get("published_at") or "", reverse=True)[:12]] return { "items": items[:limit], "count": len(items), "meta": {"platform": platform, "account_id": account_id}, "top_scored_video_ids": top_ids, "latest_video_ids": latest_ids, "high_score_threshold": 60, } @app.post(f"/v2/{platform}/accounts/{{account_id}}/analysis") async def analyze_platform_account( account_id: str, request: PlatformAnalysisRequest, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: account_row = _require_account(account_id, account["id"]) workspace = _workspace_payload(account_row) context = { "account": workspace["account"], "top_videos": workspace["account"]["video_summary"]["videos"][: max(1, min(request.max_videos, 8))], "linked_accounts": workspace["linked_accounts"][:5], "extra_focus": request.extra_focus, } prompt = ( f"请从新媒体商业化运营视角,分析这个{label}账号,输出执行摘要、可借鉴点、风险提醒和下一步动作。" f"\n\n输入:\n{json.dumps(context, ensure_ascii=False, indent=2)}" ) output, parsed = await _call_reasoning_model( account["id"], prompt, system_prompt="你是新媒体账号分析顾问。尽量输出 JSON,字段包括 executive_summary、borrow_points、risks、next_actions。", temperature=request.temperature, ) report_id = make_id(f"{platform}_report") legacy.db.execute( f"INSERT INTO {table_prefix}_analysis_reports (id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)", ( report_id, account["id"], account_row["id"], request.extra_focus or "", prompt, _safe_json_dumps(context), now(), ), ) suggestion_id = make_id(f"{platform}_suggestion") profile = legacy.model_profile_for_account(account["id"], None) legacy.db.execute( f"INSERT INTO {table_prefix}_analysis_suggestions (id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ( suggestion_id, report_id, profile["id"], f"{profile.get('name', '')} · {profile.get('model_name', '')}".strip(" ·"), "ok", output[:4000], _safe_json_dumps(parsed), now(), ), ) report_row = legacy.db.fetch_one( f"SELECT * FROM {table_prefix}_analysis_reports WHERE id = ?", (report_id,), ) report_payload = _report_payload(report_row) return { "report_id": report_id, "account_id": account_row["id"], "created_at": now(), "suggestions": report_payload["suggestions"], "context": context, "top_video_analyses": [], } @app.post(f"/v2/{platform}/accounts/{{account_id}}/videos/analyze-top") async def analyze_platform_top_videos( account_id: str, request: PlatformTopVideoAnalysisRequest, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: account_row = _require_account(account_id, account["id"]) videos = [_video_payload(row) for row in _linked_video_sources(account_row)] ranked = [ video for video in sorted(videos, key=lambda item: item["score"]["performance_score"], reverse=True) if float(video["score"]["performance_score"] or 0) >= float(request.min_score or 0) ][: request.top_video_count] results: list[dict[str, Any]] = [] for video in ranked: prompt = ( f"请拆解这条{label}作品为什么值得关注,输出 summary、borrow_points、risks。" f"\n\n输入:\n{json.dumps(video, ensure_ascii=False, indent=2)}" ) output, parsed = await _call_reasoning_model( account["id"], prompt, system_prompt="你是短视频内容拆解助手。尽量输出 JSON,字段包括 summary、borrow_points、risks。", model_profile_id=request.model_profile_id, temperature=request.temperature, ) summary_text = str(parsed.get("summary") or parsed.get("headline_summary") or output)[:240] results.append( { "id": make_id(f"{platform}_va"), "video_id": video["id"], "video_title": video["title"], "status": "ok", "summary_text": summary_text, "parsed_json": parsed, "performance_score": video["score"]["performance_score"], "created_at": now(), } ) return { "account_id": account_row["id"], "analyzed_count": len(results), "items": results, } @app.post(f"/v2/{platform}/similar-searches") async def create_platform_similarity_search( request: PlatformSimilaritySearchRequest, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: account_row = _require_account(request.source_account_id, account["id"]) source_payload = _account_payload(account_row) candidates = [ row for row in _content_source_rows(account["id"], platform, "creator_account") if row["id"] != account_row["id"] ][: max(5, request.max_candidates)] ranked_candidates: list[dict[str, Any]] = [] source_tags = set(source_payload.get("tags") or []) for index, row in enumerate(candidates, start=1): payload = _account_payload(row) overlap = len(source_tags.intersection(set(payload.get("tags") or []))) heuristic = overlap * 10 + max(0, 50 - index) rationale = f"与源账号同平台,标签重合 {overlap},适合作为{label}对标候选。" ranked_candidates.append( { "candidate_account_id": row["id"], "candidate_profile_url": payload.get("profile_url", ""), "candidate_nickname": payload.get("nickname", ""), "heuristic_score": float(heuristic), "agent_score": float(heuristic), "rationale_text": rationale, "dimensions_json": {"tag_overlap": overlap}, } ) ranked_candidates.sort(key=lambda item: item["agent_score"], reverse=True) ranked_candidates = ranked_candidates[: request.max_candidates] search_id = make_id(f"{platform}_search") legacy.db.execute( f"INSERT INTO {table_prefix}_similarity_searches (id, user_id, source_account_id, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?)", ( search_id, account["id"], account_row["id"], request.extra_requirements or "", _safe_json_dumps({"source_account": source_payload}), now(), ), ) for idx, item in enumerate(ranked_candidates): legacy.db.execute( f"""INSERT INTO {table_prefix}_similarity_candidates (id, search_id, candidate_account_id, candidate_profile_url, heuristic_score, agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( make_id(f"{platform}_candidate"), search_id, item.get("candidate_account_id") or None, item.get("candidate_profile_url", ""), item.get("heuristic_score", 0), item.get("agent_score", 0), item.get("rationale_text", ""), _safe_json_dumps(item.get("dimensions_json") or {}), _safe_json_dumps(item), idx, now(), ), ) return {"id": search_id, "search_id": search_id} @app.get(f"/v2/{platform}/similar-searches/{{search_id}}") def get_platform_similarity_search(search_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: search_row = legacy.db.fetch_one( f"SELECT * FROM {table_prefix}_similarity_searches WHERE id = ? AND user_id = ?", (search_id, account["id"]), ) if not search_row: raise HTTPException(status_code=404, detail="Similarity search not found") candidate_rows = legacy.db.fetch_all( f"SELECT * FROM {table_prefix}_similarity_candidates WHERE search_id = ? ORDER BY rank_index ASC", (search_id,), ) candidates = [] for row in candidate_rows: payload = _parse_json(row.get("raw_output_json") or "{}", {}) payload.setdefault("candidate_account_id", row.get("candidate_account_id", "")) payload.setdefault("candidate_profile_url", row.get("candidate_profile_url", "")) payload.setdefault("rationale_text", row.get("rationale_text", "")) payload.setdefault("agent_score", row.get("agent_score", 0)) payload.setdefault("heuristic_score", row.get("heuristic_score", 0)) candidates.append(payload) return { "id": search_row["id"], "search_id": search_row["id"], "source_account_id": search_row["source_account_id"], "candidates": candidates, "created_at": search_row["created_at"], } @app.get(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links") def list_platform_benchmark_links(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]: _require_account(account_id, account["id"]) rows = legacy.db.fetch_all( f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC", (account_id,), ) return [_relation_payload(row) for row in rows] @app.post(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links") def create_platform_benchmark_links( account_id: str, request: PlatformBenchmarkLinksRequest, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: source_account = _require_account(account_id, account["id"]) created: list[dict[str, Any]] = [] for target_account_id in request.target_account_ids: target = _require_account(target_account_id, account["id"]) relation_id = make_id(f"{platform}_link") legacy.db.execute( f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( relation_id, account["id"], source_account["id"], target["id"], target.get("source_url", ""), request.relation_type or "benchmark", request.note or "", request.search_id or "", now(), ), ) created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,)))) for target_profile_url in request.target_profile_urls: cleaned = str(target_profile_url or "").strip() if not cleaned: continue relation_id = make_id(f"{platform}_link") legacy.db.execute( f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", ( relation_id, account["id"], source_account["id"], None, cleaned, request.relation_type or "benchmark", request.note or "", request.search_id or "", now(), ), ) created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,)))) return {"links": created} @app.get(f"/v2/{platform}/tracking/accounts") def list_platform_tracking_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: cursor = _tracking_cursor(account["id"]) return { "items": _list_tracked_accounts(account["id"]), "cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""), } @app.post(f"/v2/{platform}/tracking/accounts") def create_platform_tracking_account( request: PlatformTrackingAccountRequest, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: tracked = _require_account(request.tracked_account_id, account["id"]) assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, tracked.get("project_id", "")) existing = legacy.db.fetch_one( f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?", (account["id"], tracked["id"]), ) if existing: legacy.db.execute( f"UPDATE {table_prefix}_tracked_accounts SET assistant_id = ?, note = ?, updated_at = ? WHERE id = ?", (((assistant or {}).get("id") or None), request.note or "", now(), existing["id"]), ) row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (existing["id"],)) else: tracking_id = make_id(f"{platform}_track") legacy.db.execute( f"INSERT INTO {table_prefix}_tracked_accounts (id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)", ( tracking_id, account["id"], tracked["id"], (assistant or {}).get("id") or None, request.note or "", now(), now(), ), ) row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (tracking_id,)) return _tracked_account_payload(row) @app.post(f"/v2/{platform}/tracking/accounts/{{tracked_account_id}}/refresh") async def refresh_platform_tracked_account(tracked_account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: tracked_row = legacy.db.fetch_one( f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?", (account["id"], tracked_account_id), ) if not tracked_row: raise HTTPException(status_code=404, detail="Tracked account not found") account_row = _require_account(tracked_account_id, account["id"]) queued = await _create_sync_job_for_account(account_row, assistant_id=tracked_row.get("assistant_id", "") or "") legacy.db.execute( f"UPDATE {table_prefix}_tracked_accounts SET updated_at = ? WHERE id = ?", (now(), tracked_row["id"]), ) return {"tracking_id": tracked_row["id"], "tracked_account_id": tracked_account_id, "sync_job_id": queued["id"], "status": queued["status"]} @app.post(f"/v2/{platform}/tracking/refresh") async def refresh_platform_tracking(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: tracked_rows = legacy.db.fetch_all( f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC", (account["id"],), ) refreshed = 0 failed = 0 items: list[dict[str, Any]] = [] for row in tracked_rows: try: account_row = _require_account(row["tracked_account_id"], account["id"]) queued = await _create_sync_job_for_account(account_row, assistant_id=row.get("assistant_id", "") or "") refreshed += 1 items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "sync_job_id": queued["id"], "status": queued["status"]}) except Exception as exc: failed += 1 items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "error": str(exc)}) return {"refreshed": refreshed, "failed": failed, "items": items} @app.post(f"/v2/{platform}/tracking/cursor") def update_platform_tracking_cursor(request: PlatformTrackingCursorRequest, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: cursor = _set_tracking_cursor(account["id"], request.last_seen_at) return {"last_seen_at": cursor["last_seen_at"], "updated_at": cursor["updated_at"]} @app.get(f"/v2/{platform}/tracking/digest") def get_platform_tracking_digest( since: str = Query(default=""), limit: int = Query(default=24, ge=1, le=100), account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: return _tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)