From 1719047ef52ad53a0d5310aaa429086c02e22266 Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 09:13:37 +0800 Subject: [PATCH] feat: add domestic multi-platform workbench routes --- .../app/domestic_platform_features.py | 900 ++++++++++++++++++ collector-service/app/main.py | 5 + 2 files changed, 905 insertions(+) create mode 100644 collector-service/app/domestic_platform_features.py diff --git a/collector-service/app/domestic_platform_features.py b/collector-service/app/domestic_platform_features.py new file mode 100644 index 0000000..4bcda8d --- /dev/null +++ b/collector-service/app/domestic_platform_features.py @@ -0,0 +1,900 @@ +from __future__ import annotations + +import json +from typing import Any + +from fastapi import Body, Depends, HTTPException, Query +from pydantic import BaseModel, Field + + +class PlatformAnalysisRequest(BaseModel): + model_profile_ids: list[str] = Field(default_factory=list) + linked_account_ids: list[str] = Field(default_factory=list) + include_linked_accounts: bool = True + include_recent_similar_candidates: bool = True + max_videos: int = Field(default=6, ge=1, le=20) + extra_focus: str = "" + temperature: float = 0.35 + auto_analyze_top_videos: bool = False + top_video_analysis_count: int = Field(default=4, ge=1, le=10) + + +class PlatformTopVideoAnalysisRequest(BaseModel): + model_profile_id: str = "" + top_video_count: int = Field(default=5, ge=1, le=12) + min_score: float = 0 + temperature: float = 0.25 + + +class PlatformSimilaritySearchRequest(BaseModel): + source_account_id: str = "" + candidate_urls: list[str] = Field(default_factory=list) + seed_linked_accounts: bool = True + search_public_pages: bool = True + model_profile_id: str = "" + max_candidates: int = Field(default=8, ge=1, le=20) + extra_requirements: str = "" + + +class PlatformBenchmarkLinksRequest(BaseModel): + target_account_ids: list[str] = Field(default_factory=list) + target_profile_urls: list[str] = Field(default_factory=list) + relation_type: str = "benchmark" + note: str = "" + search_id: str = "" + + +class PlatformTrackingAccountRequest(BaseModel): + tracked_account_id: str + assistant_id: str = "" + note: str = "" + + +class PlatformTrackingCursorRequest(BaseModel): + last_seen_at: str + + +def register_domestic_platform_routes(app: Any, legacy: Any, *, platform: str, label: str) -> None: + table_prefix = platform + + def now() -> str: + return legacy.utc_now() + + def make_id(prefix: str) -> str: + return legacy.make_id(prefix) + + def _safe_json_dumps(value: Any) -> str: + return json.dumps(value or {}, ensure_ascii=False) + + def _parse_json(raw: str, fallback: Any) -> Any: + cleaned = str(raw or "").strip() + if not cleaned: + return fallback + try: + value = json.loads(cleaned) + return value + except json.JSONDecodeError: + return fallback + + def ensure_schema() -> None: + schema = f""" + CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_reports ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + account_source_id TEXT NOT NULL, + focus_text TEXT NOT NULL DEFAULT '', + prompt_text TEXT NOT NULL DEFAULT '', + context_json TEXT NOT NULL DEFAULT '{{}}', + created_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(account_source_id) REFERENCES content_sources(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_{table_prefix}_analysis_reports_account_created + ON {table_prefix}_analysis_reports(account_source_id, created_at DESC); + + CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_suggestions ( + id TEXT PRIMARY KEY, + report_id TEXT NOT NULL, + model_profile_id TEXT NOT NULL DEFAULT '', + model_label TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'ok', + suggestion_text TEXT NOT NULL DEFAULT '', + parsed_json TEXT NOT NULL DEFAULT '{{}}', + created_at TEXT NOT NULL, + FOREIGN KEY(report_id) REFERENCES {table_prefix}_analysis_reports(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_searches ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + source_account_id TEXT NOT NULL, + prompt_text TEXT NOT NULL DEFAULT '', + context_json TEXT NOT NULL DEFAULT '{{}}', + created_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_candidates ( + id TEXT PRIMARY KEY, + search_id TEXT NOT NULL, + candidate_account_id TEXT, + candidate_profile_url TEXT NOT NULL DEFAULT '', + heuristic_score REAL NOT NULL DEFAULT 0, + agent_score REAL NOT NULL DEFAULT 0, + rationale_text TEXT NOT NULL DEFAULT '', + dimensions_json TEXT NOT NULL DEFAULT '{{}}', + raw_output_json TEXT NOT NULL DEFAULT '{{}}', + rank_index INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL, + FOREIGN KEY(search_id) REFERENCES {table_prefix}_similarity_searches(id) ON DELETE CASCADE, + FOREIGN KEY(candidate_account_id) REFERENCES content_sources(id) ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS idx_{table_prefix}_similarity_candidates_search_rank + ON {table_prefix}_similarity_candidates(search_id, rank_index ASC); + + CREATE TABLE IF NOT EXISTS {table_prefix}_account_relations ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + source_account_id TEXT NOT NULL, + target_account_id TEXT, + target_profile_url TEXT NOT NULL DEFAULT '', + relation_type TEXT NOT NULL DEFAULT 'benchmark', + note TEXT NOT NULL DEFAULT '', + search_id TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE, + FOREIGN KEY(target_account_id) REFERENCES content_sources(id) ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS idx_{table_prefix}_account_relations_source + ON {table_prefix}_account_relations(source_account_id, created_at DESC); + + CREATE TABLE IF NOT EXISTS {table_prefix}_tracked_accounts ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + tracked_account_id TEXT NOT NULL, + assistant_id TEXT, + note TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, tracked_account_id), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(tracked_account_id) REFERENCES content_sources(id) ON DELETE CASCADE, + FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS idx_{table_prefix}_tracked_accounts_user_updated + ON {table_prefix}_tracked_accounts(user_id, updated_at DESC); + + CREATE TABLE IF NOT EXISTS {table_prefix}_tracking_cursors ( + user_id TEXT PRIMARY KEY, + last_seen_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE + ); + """ + with legacy.db.session() as conn: + conn.executescript(schema) + + ensure_schema() + + @app.on_event("startup") + def _startup_platform_schema() -> None: + ensure_schema() + + def _content_source_rows(user_id: str, platform_value: str, kind: str = "") -> list[dict[str, Any]]: + rows = legacy.db.fetch_all( + "SELECT * FROM content_sources WHERE user_id = ? AND platform = ? ORDER BY updated_at DESC, created_at DESC", + (user_id, platform_value), + ) + if kind: + rows = [row for row in rows if row.get("source_kind") == kind] + return rows + + def _content_source_payload(row: dict[str, Any]) -> dict[str, Any]: + return legacy.content_source_payload(row) + + def _source_metadata(row: dict[str, Any]) -> dict[str, Any]: + return _content_source_payload(row).get("metadata", {}) + + def _require_account(account_id: str, user_id: str) -> dict[str, Any]: + row = legacy.db.fetch_one( + "SELECT * FROM content_sources WHERE id = ? AND user_id = ? AND source_kind = 'creator_account' AND platform = ?", + (account_id, user_id, platform), + ) + if not row: + raise HTTPException(status_code=404, detail=f"{label} account not found") + return row + + def _linked_video_sources(account_row: dict[str, Any]) -> list[dict[str, Any]]: + project_id = account_row.get("project_id", "") + rows = legacy.db.fetch_all( + "SELECT * FROM content_sources WHERE user_id = ? AND project_id = ? AND source_kind = 'video_link' AND platform = ? ORDER BY updated_at DESC, created_at DESC", + (account_row["user_id"], project_id, platform), + ) + account_id = account_row["id"] + source_url = str(account_row.get("source_url") or "").strip() + linked: list[dict[str, Any]] = [] + for row in rows: + metadata = _source_metadata(row) + if metadata.get("origin_content_source_id") == account_id or metadata.get("source_account_url") == source_url: + linked.append(row) + return linked + + def _jobs_for_source(source_id: str) -> list[dict[str, Any]]: + return legacy.db.fetch_all( + "SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC", + (source_id,), + ) + + def _latest_job_for_source(source_id: str) -> dict[str, Any] | None: + return legacy.db.fetch_one( + "SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC LIMIT 1", + (source_id,), + ) + + def _extract_performance_score(job_row: dict[str, Any] | None) -> float: + if not job_row: + return 0.0 + result_map = _parse_json(job_row.get("result_json") or "{}", {}) + artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {}) + candidates = [ + result_map.get("performance_score"), + (result_map.get("analysis") or {}).get("performance_score"), + (result_map.get("scores") or {}).get("performance_score"), + artifacts_map.get("performance_score"), + (artifacts_map.get("scores") or {}).get("performance_score"), + ] + for value in candidates: + try: + return float(value) + except (TypeError, ValueError): + continue + return 0.0 + + def _extract_metrics(job_row: dict[str, Any] | None) -> dict[str, Any]: + if not job_row: + return {} + result_map = _parse_json(job_row.get("result_json") or "{}", {}) + artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {}) + return ( + result_map.get("metrics") + or artifacts_map.get("metrics") + or result_map.get("stats") + or artifacts_map.get("stats") + or {} + ) + + def _video_payload(source_row: dict[str, Any]) -> dict[str, Any]: + payload = _content_source_payload(source_row) + metadata = payload.get("metadata", {}) + latest_job = _latest_job_for_source(source_row["id"]) + metrics = _extract_metrics(latest_job) + tags = metadata.get("tags") or [] + if not isinstance(tags, list): + tags = [] + return { + "id": source_row["id"], + "aweme_id": str(metadata.get("external_id") or source_row["id"]), + "title": payload.get("title") or "未命名作品", + "description": metadata.get("summary") or metadata.get("description") or (latest_job or {}).get("style_summary", ""), + "share_url": payload.get("source_url", ""), + "cover_url": metadata.get("cover_url") or "", + "duration_sec": float(metadata.get("duration_sec") or 0), + "published_at": metadata.get("published_at") or source_row.get("created_at"), + "tags": tags, + "content_type": metadata.get("content_type") or "video", + "stats": { + "play": metrics.get("play_count") or metrics.get("play") or 0, + "like": metrics.get("like_count") or metrics.get("like") or 0, + "comment": metrics.get("comment_count") or metrics.get("comment") or 0, + "share": metrics.get("share_count") or metrics.get("share") or 0, + }, + "score": { + "performance_score": _extract_performance_score(latest_job), + }, + "source": payload, + "latest_job_id": (latest_job or {}).get("id", ""), + } + + def _account_payload(account_row: dict[str, Any]) -> dict[str, Any]: + payload = _content_source_payload(account_row) + metadata = payload.get("metadata", {}) + videos = [_video_payload(item) for item in _linked_video_sources(account_row)] + play_values = [float(video["stats"].get("play") or 0) for video in videos if float(video["stats"].get("play") or 0) > 0] + like_values = [float(video["stats"].get("like") or 0) for video in videos if float(video["stats"].get("like") or 0) > 0] + tags = metadata.get("tags") or [] + if not isinstance(tags, list): + tags = [] + return { + "id": account_row["id"], + "platform": platform, + "profile_url": payload.get("source_url", ""), + "canonical_profile_url": payload.get("source_url", ""), + "handle": payload.get("handle", ""), + "nickname": payload.get("title") or payload.get("handle") or "未命名账号", + "signature": metadata.get("bio") or metadata.get("description") or "", + "avatar_url": metadata.get("avatar_url") or "", + "tags": tags, + "keywords": metadata.get("keywords") or [], + "sync_status": "ready" if payload.get("metadata", {}).get("last_sync_error", "") == "" else "partial", + "video_summary": { + "count": len(videos), + "avg_play": sum(play_values) / len(play_values) if play_values else 0, + "avg_like": sum(like_values) / len(like_values) if like_values else 0, + "videos": videos[:8], + }, + "project_id": payload.get("project_id", ""), + "created_at": payload.get("created_at", ""), + "updated_at": payload.get("updated_at", ""), + } + + def _relation_payload(row: dict[str, Any]) -> dict[str, Any]: + target = None + if row.get("target_account_id"): + target = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["target_account_id"],)) + return { + "id": row["id"], + "source_account_id": row["source_account_id"], + "target_account_id": row.get("target_account_id", "") or "", + "target_profile_url": row.get("target_profile_url", ""), + "target_nickname": (_account_payload(target)["nickname"] if target else ""), + "relation_type": row.get("relation_type", "benchmark"), + "note": row.get("note", ""), + "search_id": row.get("search_id", ""), + "created_at": row["created_at"], + } + + def _report_payload(row: dict[str, Any]) -> dict[str, Any]: + suggestions = [ + { + "id": suggestion["id"], + "status": suggestion.get("status", "ok"), + "model_profile_id": suggestion.get("model_profile_id", ""), + "model_label": suggestion.get("model_label", ""), + "suggestion_text": suggestion.get("suggestion_text", ""), + "parsed_json": _parse_json(suggestion.get("parsed_json") or "{}", {}), + "created_at": suggestion.get("created_at", ""), + } + for suggestion in legacy.db.fetch_all( + f"SELECT * FROM {table_prefix}_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC", + (row["id"],), + ) + ] + return { + "id": row["id"], + "focus_text": row.get("focus_text", ""), + "suggestions": suggestions, + "created_at": row["created_at"], + } + + def _workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]: + reports = legacy.db.fetch_all( + f"SELECT * FROM {table_prefix}_analysis_reports WHERE account_source_id = ? ORDER BY created_at DESC LIMIT 6", + (account_row["id"],), + ) + relations = legacy.db.fetch_all( + f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC", + (account_row["id"],), + ) + return { + "account": _account_payload(account_row), + "recent_reports": [_report_payload(row) for row in reports], + "linked_accounts": [_relation_payload(row) for row in relations], + } + + async def _call_reasoning_model(user_id: str, prompt: str, *, system_prompt: str, model_profile_id: str = "", temperature: float = 0.3) -> tuple[str, dict[str, Any]]: + profile = legacy.model_profile_for_account(user_id, model_profile_id or None) + output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=temperature) + parsed = legacy.parse_json_object(output) + return output, parsed if isinstance(parsed, dict) else {} + + async def _create_sync_job_for_account(account_row: dict[str, Any], assistant_id: str = "") -> dict[str, Any]: + project_id = account_row.get("project_id") or "" + if not project_id: + raise HTTPException(status_code=400, detail="Account source is not attached to a project") + kb = legacy.resolve_target_kb(account_row["user_id"], None, project_id) + source_payload = _content_source_payload(account_row) + profile = legacy.model_profile_for_account(account_row["user_id"], None) + job_row = legacy.create_job_record( + account_id=account_row["user_id"], + project_id=project_id, + knowledge_base_id=kb["id"], + content_source_id=account_row["id"], + assistant_id=assistant_id or None, + source_type="creator_account", + line_type="analysis", + workflow_key="content_source_sync_pipeline", + title=f"{source_payload.get('title') or source_payload.get('handle') or label} 内容同步", + language="auto", + source_url=source_payload.get("source_url", ""), + artifacts={ + "source_account_url": source_payload.get("source_url", ""), + "platform": platform, + "handle": source_payload.get("handle", ""), + "max_items": int(source_payload.get("metadata", {}).get("max_items") or 5), + "skip_existing": True, + "auto_trigger_analysis": True, + }, + analysis_model_profile_id=profile["id"], + ) + queued = await legacy.trigger_orchestrated_job(job_row) + return legacy.job_payload(queued) + + def _tracking_cursor(user_id: str) -> dict[str, Any] | None: + return legacy.db.fetch_one( + f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?", + (user_id,), + ) + + def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]: + existing = _tracking_cursor(user_id) + updated_at = now() + if existing: + legacy.db.execute( + f"UPDATE {table_prefix}_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?", + (last_seen_at, updated_at, user_id), + ) + else: + legacy.db.execute( + f"INSERT INTO {table_prefix}_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)", + (user_id, last_seen_at, updated_at), + ) + return legacy.db.fetch_one( + f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?", + (user_id,), + ) + + def _tracking_digest_item(tracked_row: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]: + latest_job = _latest_job_for_source(video["id"]) + summary = (latest_job or {}).get("style_summary") or video.get("description") or "已发现更新内容" + assistant = None + if tracked_row.get("assistant_id"): + assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (tracked_row["assistant_id"],)) + if assistant_row: + assistant = legacy.assistant_payload(assistant_row) + borrowing_points = [point for point in [summary[:36], video.get("title", "")[:36]] if point] + return { + "tracking_id": tracked_row["id"], + "tracked_account_id": tracked_row["tracked_account_id"], + "tracked_account_name": _account_payload(_require_account(tracked_row["tracked_account_id"], tracked_row["user_id"]))["nickname"], + "assistant_id": tracked_row.get("assistant_id", "") or "", + "assistant_name": (assistant or {}).get("name", ""), + "note": tracked_row.get("note", ""), + "video": video, + "summary_text": summary, + "borrowing_points": borrowing_points[:3], + "created_at": video.get("published_at") or now(), + } + + def _tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]: + tracked_rows = legacy.db.fetch_all( + f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC", + (user_id,), + ) + cursor = _tracking_cursor(user_id) + threshold = (since_value or (cursor or {}).get("last_seen_at") or "").strip() + items: list[dict[str, Any]] = [] + for tracked in tracked_rows: + account_row = _require_account(tracked["tracked_account_id"], user_id) + for video in _account_payload(account_row)["video_summary"]["videos"]: + published_at = str(video.get("published_at") or "") + if threshold and published_at and published_at <= threshold: + continue + items.append(_tracking_digest_item(tracked, video)) + items.sort(key=lambda item: item.get("created_at", ""), reverse=True) + return { + "items": items[:limit], + "tracked_accounts": [ + { + "id": row["id"], + "tracked_account_id": row["tracked_account_id"], + "assistant_id": row.get("assistant_id", "") or "", + "note": row.get("note", ""), + "updated_at": row["updated_at"], + } + for row in tracked_rows + ], + "cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""), + } + + @app.get(f"/v2/{platform}/accounts") + def list_platform_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]: + return [_account_payload(row) for row in _content_source_rows(account["id"], platform, "creator_account")] + + @app.get(f"/v2/{platform}/accounts/{{account_id}}/workspace") + def get_platform_account_workspace(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: + account_row = _require_account(account_id, account["id"]) + return _workspace_payload(account_row) + + @app.get(f"/v2/{platform}/accounts/{{account_id}}/videos") + def list_platform_account_videos( + account_id: str, + limit: int = Query(default=80, ge=1, le=200), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + account_row = _require_account(account_id, account["id"]) + items = [_video_payload(row) for row in _linked_video_sources(account_row)] + items.sort(key=lambda item: (item["score"]["performance_score"], item.get("published_at") or ""), reverse=True) + top_ids = [item["id"] for item in items if float(item["score"]["performance_score"] or 0) >= 60][:12] + latest_ids = [item["id"] for item in sorted(items, key=lambda item: item.get("published_at") or "", reverse=True)[:12]] + return { + "items": items[:limit], + "count": len(items), + "meta": {"platform": platform, "account_id": account_id}, + "top_scored_video_ids": top_ids, + "latest_video_ids": latest_ids, + "high_score_threshold": 60, + } + + @app.post(f"/v2/{platform}/accounts/{{account_id}}/analysis") + async def analyze_platform_account( + account_id: str, + request: PlatformAnalysisRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + account_row = _require_account(account_id, account["id"]) + workspace = _workspace_payload(account_row) + context = { + "account": workspace["account"], + "top_videos": workspace["account"]["video_summary"]["videos"][: max(1, min(request.max_videos, 8))], + "linked_accounts": workspace["linked_accounts"][:5], + "extra_focus": request.extra_focus, + } + prompt = ( + f"请从新媒体商业化运营视角,分析这个{label}账号,输出执行摘要、可借鉴点、风险提醒和下一步动作。" + f"\n\n输入:\n{json.dumps(context, ensure_ascii=False, indent=2)}" + ) + output, parsed = await _call_reasoning_model( + account["id"], + prompt, + system_prompt="你是新媒体账号分析顾问。尽量输出 JSON,字段包括 executive_summary、borrow_points、risks、next_actions。", + temperature=request.temperature, + ) + report_id = make_id(f"{platform}_report") + legacy.db.execute( + f"INSERT INTO {table_prefix}_analysis_reports (id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + report_id, + account["id"], + account_row["id"], + request.extra_focus or "", + prompt, + _safe_json_dumps(context), + now(), + ), + ) + suggestion_id = make_id(f"{platform}_suggestion") + profile = legacy.model_profile_for_account(account["id"], None) + legacy.db.execute( + f"INSERT INTO {table_prefix}_analysis_suggestions (id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + suggestion_id, + report_id, + profile["id"], + f"{profile.get('name', '')} · {profile.get('model_name', '')}".strip(" ·"), + "ok", + output[:4000], + _safe_json_dumps(parsed), + now(), + ), + ) + report_row = legacy.db.fetch_one( + f"SELECT * FROM {table_prefix}_analysis_reports WHERE id = ?", + (report_id,), + ) + report_payload = _report_payload(report_row) + return { + "report_id": report_id, + "account_id": account_row["id"], + "suggestions": report_payload["suggestions"], + "context": context, + } + + @app.post(f"/v2/{platform}/accounts/{{account_id}}/videos/analyze-top") + async def analyze_platform_top_videos( + account_id: str, + request: PlatformTopVideoAnalysisRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + account_row = _require_account(account_id, account["id"]) + videos = [_video_payload(row) for row in _linked_video_sources(account_row)] + ranked = [ + video for video in sorted(videos, key=lambda item: item["score"]["performance_score"], reverse=True) + if float(video["score"]["performance_score"] or 0) >= float(request.min_score or 0) + ][: request.top_video_count] + results: list[dict[str, Any]] = [] + for video in ranked: + prompt = ( + f"请拆解这条{label}作品为什么值得关注,输出 summary、borrow_points、risks。" + f"\n\n输入:\n{json.dumps(video, ensure_ascii=False, indent=2)}" + ) + output, parsed = await _call_reasoning_model( + account["id"], + prompt, + system_prompt="你是短视频内容拆解助手。尽量输出 JSON,字段包括 summary、borrow_points、risks。", + model_profile_id=request.model_profile_id, + temperature=request.temperature, + ) + summary_text = str(parsed.get("summary") or parsed.get("headline_summary") or output)[:240] + results.append( + { + "id": make_id(f"{platform}_va"), + "video_id": video["id"], + "video_title": video["title"], + "status": "ok", + "summary_text": summary_text, + "parsed_json": parsed, + "performance_score": video["score"]["performance_score"], + "created_at": now(), + } + ) + return { + "account_id": account_row["id"], + "analyzed_count": len(results), + "items": results, + } + + @app.post(f"/v2/{platform}/similar-searches") + async def create_platform_similarity_search( + request: PlatformSimilaritySearchRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + account_row = _require_account(request.source_account_id, account["id"]) + source_payload = _account_payload(account_row) + candidates = [ + row for row in _content_source_rows(account["id"], platform, "creator_account") + if row["id"] != account_row["id"] + ][: max(5, request.max_candidates)] + ranked_candidates: list[dict[str, Any]] = [] + source_tags = set(source_payload.get("tags") or []) + for index, row in enumerate(candidates, start=1): + payload = _account_payload(row) + overlap = len(source_tags.intersection(set(payload.get("tags") or []))) + heuristic = overlap * 10 + max(0, 50 - index) + rationale = f"与源账号同平台,标签重合 {overlap},适合作为{label}对标候选。" + ranked_candidates.append( + { + "candidate_account_id": row["id"], + "candidate_profile_url": payload.get("profile_url", ""), + "candidate_nickname": payload.get("nickname", ""), + "heuristic_score": float(heuristic), + "agent_score": float(heuristic), + "rationale_text": rationale, + "dimensions_json": {"tag_overlap": overlap}, + } + ) + ranked_candidates.sort(key=lambda item: item["agent_score"], reverse=True) + ranked_candidates = ranked_candidates[: request.max_candidates] + search_id = make_id(f"{platform}_search") + legacy.db.execute( + f"INSERT INTO {table_prefix}_similarity_searches (id, user_id, source_account_id, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?)", + ( + search_id, + account["id"], + account_row["id"], + request.extra_requirements or "", + _safe_json_dumps({"source_account": source_payload}), + now(), + ), + ) + for idx, item in enumerate(ranked_candidates): + legacy.db.execute( + f"""INSERT INTO {table_prefix}_similarity_candidates + (id, search_id, candidate_account_id, candidate_profile_url, heuristic_score, agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + make_id(f"{platform}_candidate"), + search_id, + item.get("candidate_account_id") or None, + item.get("candidate_profile_url", ""), + item.get("heuristic_score", 0), + item.get("agent_score", 0), + item.get("rationale_text", ""), + _safe_json_dumps(item.get("dimensions_json") or {}), + _safe_json_dumps(item), + idx, + now(), + ), + ) + return {"id": search_id, "search_id": search_id} + + @app.get(f"/v2/{platform}/similar-searches/{{search_id}}") + def get_platform_similarity_search(search_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: + search_row = legacy.db.fetch_one( + f"SELECT * FROM {table_prefix}_similarity_searches WHERE id = ? AND user_id = ?", + (search_id, account["id"]), + ) + if not search_row: + raise HTTPException(status_code=404, detail="Similarity search not found") + candidate_rows = legacy.db.fetch_all( + f"SELECT * FROM {table_prefix}_similarity_candidates WHERE search_id = ? ORDER BY rank_index ASC", + (search_id,), + ) + candidates = [] + for row in candidate_rows: + payload = _parse_json(row.get("raw_output_json") or "{}", {}) + payload.setdefault("candidate_account_id", row.get("candidate_account_id", "")) + payload.setdefault("candidate_profile_url", row.get("candidate_profile_url", "")) + payload.setdefault("rationale_text", row.get("rationale_text", "")) + payload.setdefault("agent_score", row.get("agent_score", 0)) + payload.setdefault("heuristic_score", row.get("heuristic_score", 0)) + candidates.append(payload) + return { + "id": search_row["id"], + "search_id": search_row["id"], + "source_account_id": search_row["source_account_id"], + "candidates": candidates, + "created_at": search_row["created_at"], + } + + @app.get(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links") + def list_platform_benchmark_links(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]: + _require_account(account_id, account["id"]) + rows = legacy.db.fetch_all( + f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC", + (account_id,), + ) + return [_relation_payload(row) for row in rows] + + @app.post(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links") + def create_platform_benchmark_links( + account_id: str, + request: PlatformBenchmarkLinksRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_account = _require_account(account_id, account["id"]) + created: list[dict[str, Any]] = [] + for target_account_id in request.target_account_ids: + target = _require_account(target_account_id, account["id"]) + relation_id = make_id(f"{platform}_link") + legacy.db.execute( + f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + relation_id, + account["id"], + source_account["id"], + target["id"], + target.get("source_url", ""), + request.relation_type or "benchmark", + request.note or "", + request.search_id or "", + now(), + ), + ) + created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,)))) + for target_profile_url in request.target_profile_urls: + cleaned = str(target_profile_url or "").strip() + if not cleaned: + continue + relation_id = make_id(f"{platform}_link") + legacy.db.execute( + f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + ( + relation_id, + account["id"], + source_account["id"], + None, + cleaned, + request.relation_type or "benchmark", + request.note or "", + request.search_id or "", + now(), + ), + ) + created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,)))) + return {"links": created} + + @app.get(f"/v2/{platform}/tracking/accounts") + def list_platform_tracking_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: + rows = legacy.db.fetch_all( + f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC", + (account["id"],), + ) + cursor = _tracking_cursor(account["id"]) + return { + "items": [ + { + "id": row["id"], + "tracked_account_id": row["tracked_account_id"], + "assistant_id": row.get("assistant_id", "") or "", + "note": row.get("note", ""), + "updated_at": row["updated_at"], + } + for row in rows + ], + "cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""), + } + + @app.post(f"/v2/{platform}/tracking/accounts") + def create_platform_tracking_account( + request: PlatformTrackingAccountRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + tracked = _require_account(request.tracked_account_id, account["id"]) + assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, tracked.get("project_id", "")) + existing = legacy.db.fetch_one( + f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?", + (account["id"], tracked["id"]), + ) + if existing: + legacy.db.execute( + f"UPDATE {table_prefix}_tracked_accounts SET assistant_id = ?, note = ?, updated_at = ? WHERE id = ?", + (((assistant or {}).get("id") or None), request.note or "", now(), existing["id"]), + ) + row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (existing["id"],)) + else: + tracking_id = make_id(f"{platform}_track") + legacy.db.execute( + f"INSERT INTO {table_prefix}_tracked_accounts (id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)", + ( + tracking_id, + account["id"], + tracked["id"], + (assistant or {}).get("id") or None, + request.note or "", + now(), + now(), + ), + ) + row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (tracking_id,)) + return { + "id": row["id"], + "tracked_account_id": row["tracked_account_id"], + "assistant_id": row.get("assistant_id", "") or "", + "note": row.get("note", ""), + "updated_at": row["updated_at"], + } + + @app.post(f"/v2/{platform}/tracking/accounts/{{tracked_account_id}}/refresh") + async def refresh_platform_tracked_account(tracked_account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: + tracked_row = legacy.db.fetch_one( + f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?", + (account["id"], tracked_account_id), + ) + if not tracked_row: + raise HTTPException(status_code=404, detail="Tracked account not found") + account_row = _require_account(tracked_account_id, account["id"]) + queued = await _create_sync_job_for_account(account_row, assistant_id=tracked_row.get("assistant_id", "") or "") + legacy.db.execute( + f"UPDATE {table_prefix}_tracked_accounts SET updated_at = ? WHERE id = ?", + (now(), tracked_row["id"]), + ) + return {"tracking_id": tracked_row["id"], "tracked_account_id": tracked_account_id, "sync_job_id": queued["id"], "status": queued["status"]} + + @app.post(f"/v2/{platform}/tracking/refresh") + async def refresh_platform_tracking(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: + tracked_rows = legacy.db.fetch_all( + f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC", + (account["id"],), + ) + refreshed = 0 + failed = 0 + items: list[dict[str, Any]] = [] + for row in tracked_rows: + try: + account_row = _require_account(row["tracked_account_id"], account["id"]) + queued = await _create_sync_job_for_account(account_row, assistant_id=row.get("assistant_id", "") or "") + refreshed += 1 + items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "sync_job_id": queued["id"], "status": queued["status"]}) + except Exception as exc: + failed += 1 + items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "error": str(exc)}) + return {"refreshed": refreshed, "failed": failed, "items": items} + + @app.post(f"/v2/{platform}/tracking/cursor") + def update_platform_tracking_cursor(request: PlatformTrackingCursorRequest, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]: + cursor = _set_tracking_cursor(account["id"], request.last_seen_at) + return {"last_seen_at": cursor["last_seen_at"], "updated_at": cursor["updated_at"]} + + @app.get(f"/v2/{platform}/tracking/digest") + def get_platform_tracking_digest( + since: str = Query(default=""), + limit: int = Query(default=24, ge=1, le=100), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + return _tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit) diff --git a/collector-service/app/main.py b/collector-service/app/main.py index 88f12db..2dbbec1 100644 --- a/collector-service/app/main.py +++ b/collector-service/app/main.py @@ -1,5 +1,6 @@ from __future__ import annotations +from .domestic_platform_features import register_domestic_platform_routes from .douyin_features import register_douyin_routes try: @@ -14,3 +15,7 @@ except Exception: app = core.app register_douyin_routes(app, core) +register_domestic_platform_routes(app, core, platform="xiaohongshu", label="小红书") +register_domestic_platform_routes(app, core, platform="bilibili", label="哔哩哔哩") +register_domestic_platform_routes(app, core, platform="kuaishou", label="快手") +register_domestic_platform_routes(app, core, platform="wechat_video", label="微信视频号")