Files
storyforge/collector-service/app/domestic_platform_features.py

949 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
from typing import Any
from fastapi import Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field
class PlatformAnalysisRequest(BaseModel):
model_profile_ids: list[str] = Field(default_factory=list)
linked_account_ids: list[str] = Field(default_factory=list)
include_linked_accounts: bool = True
include_recent_similar_candidates: bool = True
max_videos: int = Field(default=6, ge=1, le=20)
extra_focus: str = ""
temperature: float = 0.35
auto_analyze_top_videos: bool = False
top_video_analysis_count: int = Field(default=4, ge=1, le=10)
class PlatformTopVideoAnalysisRequest(BaseModel):
model_profile_id: str = ""
top_video_count: int = Field(default=5, ge=1, le=12)
min_score: float = 0
temperature: float = 0.25
class PlatformSimilaritySearchRequest(BaseModel):
source_account_id: str = ""
candidate_urls: list[str] = Field(default_factory=list)
seed_linked_accounts: bool = True
search_public_pages: bool = True
model_profile_id: str = ""
max_candidates: int = Field(default=8, ge=1, le=20)
extra_requirements: str = ""
class PlatformBenchmarkLinksRequest(BaseModel):
target_account_ids: list[str] = Field(default_factory=list)
target_profile_urls: list[str] = Field(default_factory=list)
relation_type: str = "benchmark"
note: str = ""
search_id: str = ""
class PlatformTrackingAccountRequest(BaseModel):
tracked_account_id: str
assistant_id: str = ""
note: str = ""
class PlatformTrackingCursorRequest(BaseModel):
last_seen_at: str
def register_domestic_platform_routes(app: Any, legacy: Any, *, platform: str, label: str) -> None:
table_prefix = platform
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value or {}, ensure_ascii=False)
def _parse_json(raw: str, fallback: Any) -> Any:
cleaned = str(raw or "").strip()
if not cleaned:
return fallback
try:
value = json.loads(cleaned)
return value
except json.JSONDecodeError:
return fallback
def ensure_schema() -> None:
schema = f"""
CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_reports (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
account_source_id TEXT NOT NULL,
focus_text TEXT NOT NULL DEFAULT '',
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(account_source_id) REFERENCES content_sources(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_analysis_reports_account_created
ON {table_prefix}_analysis_reports(account_source_id, created_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_suggestions (
id TEXT PRIMARY KEY,
report_id TEXT NOT NULL,
model_profile_id TEXT NOT NULL DEFAULT '',
model_label TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'ok',
suggestion_text TEXT NOT NULL DEFAULT '',
parsed_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(report_id) REFERENCES {table_prefix}_analysis_reports(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_searches (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT NOT NULL,
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{{}}',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_candidates (
id TEXT PRIMARY KEY,
search_id TEXT NOT NULL,
candidate_account_id TEXT,
candidate_profile_url TEXT NOT NULL DEFAULT '',
heuristic_score REAL NOT NULL DEFAULT 0,
agent_score REAL NOT NULL DEFAULT 0,
rationale_text TEXT NOT NULL DEFAULT '',
dimensions_json TEXT NOT NULL DEFAULT '{{}}',
raw_output_json TEXT NOT NULL DEFAULT '{{}}',
rank_index INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
FOREIGN KEY(search_id) REFERENCES {table_prefix}_similarity_searches(id) ON DELETE CASCADE,
FOREIGN KEY(candidate_account_id) REFERENCES content_sources(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_similarity_candidates_search_rank
ON {table_prefix}_similarity_candidates(search_id, rank_index ASC);
CREATE TABLE IF NOT EXISTS {table_prefix}_account_relations (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT NOT NULL,
target_account_id TEXT,
target_profile_url TEXT NOT NULL DEFAULT '',
relation_type TEXT NOT NULL DEFAULT 'benchmark',
note TEXT NOT NULL DEFAULT '',
search_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE,
FOREIGN KEY(target_account_id) REFERENCES content_sources(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_account_relations_source
ON {table_prefix}_account_relations(source_account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_tracked_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
tracked_account_id TEXT NOT NULL,
assistant_id TEXT,
note TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, tracked_account_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(tracked_account_id) REFERENCES content_sources(id) ON DELETE CASCADE,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_tracked_accounts_user_updated
ON {table_prefix}_tracked_accounts(user_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS {table_prefix}_tracking_cursors (
user_id TEXT PRIMARY KEY,
last_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
ensure_schema()
@app.on_event("startup")
def _startup_platform_schema() -> None:
ensure_schema()
def _content_source_rows(user_id: str, platform_value: str, kind: str = "") -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"SELECT * FROM content_sources WHERE user_id = ? AND platform = ? ORDER BY updated_at DESC, created_at DESC",
(user_id, platform_value),
)
if kind:
rows = [row for row in rows if row.get("source_kind") == kind]
return rows
def _content_source_payload(row: dict[str, Any]) -> dict[str, Any]:
return legacy.content_source_payload(row)
def _source_metadata(row: dict[str, Any]) -> dict[str, Any]:
return _content_source_payload(row).get("metadata", {})
def _require_account(account_id: str, user_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM content_sources WHERE id = ? AND user_id = ? AND source_kind = 'creator_account' AND platform = ?",
(account_id, user_id, platform),
)
if not row:
raise HTTPException(status_code=404, detail=f"{label} account not found")
return row
def _linked_video_sources(account_row: dict[str, Any]) -> list[dict[str, Any]]:
project_id = account_row.get("project_id", "")
rows = legacy.db.fetch_all(
"SELECT * FROM content_sources WHERE user_id = ? AND project_id = ? AND source_kind = 'video_link' AND platform = ? ORDER BY updated_at DESC, created_at DESC",
(account_row["user_id"], project_id, platform),
)
account_id = account_row["id"]
source_url = str(account_row.get("source_url") or "").strip()
linked: list[dict[str, Any]] = []
for row in rows:
metadata = _source_metadata(row)
if metadata.get("origin_content_source_id") == account_id or metadata.get("source_account_url") == source_url:
linked.append(row)
return linked
def _jobs_for_source(source_id: str) -> list[dict[str, Any]]:
return legacy.db.fetch_all(
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC",
(source_id,),
)
def _latest_job_for_source(source_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC LIMIT 1",
(source_id,),
)
def _extract_performance_score(job_row: dict[str, Any] | None) -> float:
if not job_row:
return 0.0
result_map = _parse_json(job_row.get("result_json") or "{}", {})
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
candidates = [
result_map.get("performance_score"),
(result_map.get("analysis") or {}).get("performance_score"),
(result_map.get("scores") or {}).get("performance_score"),
artifacts_map.get("performance_score"),
(artifacts_map.get("scores") or {}).get("performance_score"),
]
for value in candidates:
try:
return float(value)
except (TypeError, ValueError):
continue
return 0.0
def _extract_metrics(job_row: dict[str, Any] | None) -> dict[str, Any]:
if not job_row:
return {}
result_map = _parse_json(job_row.get("result_json") or "{}", {})
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
return (
result_map.get("metrics")
or artifacts_map.get("metrics")
or result_map.get("stats")
or artifacts_map.get("stats")
or {}
)
def _video_payload(source_row: dict[str, Any]) -> dict[str, Any]:
payload = _content_source_payload(source_row)
metadata = payload.get("metadata", {})
latest_job = _latest_job_for_source(source_row["id"])
metrics = _extract_metrics(latest_job)
tags = metadata.get("tags") or []
if not isinstance(tags, list):
tags = []
return {
"id": source_row["id"],
"aweme_id": str(metadata.get("external_id") or source_row["id"]),
"title": payload.get("title") or "未命名作品",
"description": metadata.get("summary") or metadata.get("description") or (latest_job or {}).get("style_summary", ""),
"share_url": payload.get("source_url", ""),
"cover_url": metadata.get("cover_url") or "",
"duration_sec": float(metadata.get("duration_sec") or 0),
"published_at": metadata.get("published_at") or source_row.get("created_at"),
"tags": tags,
"content_type": metadata.get("content_type") or "video",
"stats": {
"play": metrics.get("play_count") or metrics.get("play") or 0,
"like": metrics.get("like_count") or metrics.get("like") or 0,
"comment": metrics.get("comment_count") or metrics.get("comment") or 0,
"share": metrics.get("share_count") or metrics.get("share") or 0,
},
"score": {
"performance_score": _extract_performance_score(latest_job),
},
"source": payload,
"latest_job_id": (latest_job or {}).get("id", ""),
}
def _account_payload(account_row: dict[str, Any]) -> dict[str, Any]:
payload = _content_source_payload(account_row)
metadata = payload.get("metadata", {})
videos = [_video_payload(item) for item in _linked_video_sources(account_row)]
play_values = [float(video["stats"].get("play") or 0) for video in videos if float(video["stats"].get("play") or 0) > 0]
like_values = [float(video["stats"].get("like") or 0) for video in videos if float(video["stats"].get("like") or 0) > 0]
tags = metadata.get("tags") or []
if not isinstance(tags, list):
tags = []
return {
"id": account_row["id"],
"platform": platform,
"profile_url": payload.get("source_url", ""),
"canonical_profile_url": payload.get("source_url", ""),
"handle": payload.get("handle", ""),
"nickname": payload.get("title") or payload.get("handle") or "未命名账号",
"signature": metadata.get("bio") or metadata.get("description") or "",
"avatar_url": metadata.get("avatar_url") or "",
"tags": tags,
"keywords": metadata.get("keywords") or [],
"sync_status": "ready" if payload.get("metadata", {}).get("last_sync_error", "") == "" else "partial",
"video_summary": {
"count": len(videos),
"avg_play": sum(play_values) / len(play_values) if play_values else 0,
"avg_like": sum(like_values) / len(like_values) if like_values else 0,
"videos": videos[:8],
},
"project_id": payload.get("project_id", ""),
"created_at": payload.get("created_at", ""),
"updated_at": payload.get("updated_at", ""),
}
def _relation_payload(row: dict[str, Any]) -> dict[str, Any]:
target = None
if row.get("target_account_id"):
target = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["target_account_id"],))
return {
"id": row["id"],
"source_account_id": row["source_account_id"],
"target_account_id": row.get("target_account_id", "") or "",
"target_profile_url": row.get("target_profile_url", ""),
"target_nickname": (_account_payload(target)["nickname"] if target else ""),
"relation_type": row.get("relation_type", "benchmark"),
"note": row.get("note", ""),
"search_id": row.get("search_id", ""),
"created_at": row["created_at"],
}
def _model_profile_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"name": row["name"],
"model_name": row["model_name"],
"base_url": row["base_url"],
"is_default": bool(row.get("is_default", 0)),
}
def _report_payload(row: dict[str, Any]) -> dict[str, Any]:
suggestions = [
{
"id": suggestion["id"],
"status": suggestion.get("status", "ok"),
"model_profile_id": suggestion.get("model_profile_id", ""),
"model_label": suggestion.get("model_label", ""),
"suggestion_text": suggestion.get("suggestion_text", ""),
"parsed_json": _parse_json(suggestion.get("parsed_json") or "{}", {}),
"created_at": suggestion.get("created_at", ""),
}
for suggestion in legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
(row["id"],),
)
]
return {
"id": row["id"],
"focus_text": row.get("focus_text", ""),
"model_profile_ids": _parse_json(row.get("model_profile_ids_json") or "[]", []),
"linked_account_ids": _parse_json(row.get("linked_account_ids_json") or "[]", []),
"suggestions": suggestions,
"created_at": row["created_at"],
}
def _workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
reports = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_analysis_reports WHERE account_source_id = ? ORDER BY created_at DESC LIMIT 6",
(account_row["id"],),
)
relations = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC",
(account_row["id"],),
)
recent_searches = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_similarity_searches WHERE source_account_id = ? ORDER BY created_at DESC LIMIT 5",
(account_row["id"],),
)
return {
"account": _account_payload(account_row),
"latest_public_snapshot": None,
"latest_creator_snapshot": None,
"recent_reports": [_report_payload(row) for row in reports],
"linked_accounts": [_relation_payload(row) for row in relations],
"recent_similarity_searches": [
{
"id": row["id"],
"prompt_text": row.get("prompt_text", ""),
"context": _parse_json(row.get("context_json") or "{}", {}),
"created_at": row["created_at"],
}
for row in recent_searches
],
"available_model_profiles": [_model_profile_payload(row) for row in legacy.db.fetch_all(
"""
SELECT *
FROM model_profiles
WHERE owner_account_id IS NULL OR owner_account_id = ?
ORDER BY is_default DESC, created_at ASC
""",
(account_row["user_id"],),
)],
}
async def _call_reasoning_model(user_id: str, prompt: str, *, system_prompt: str, model_profile_id: str = "", temperature: float = 0.3) -> tuple[str, dict[str, Any]]:
profile = legacy.model_profile_for_account(user_id, model_profile_id or None)
output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=temperature)
parsed = legacy.parse_json_object(output)
return output, parsed if isinstance(parsed, dict) else {}
async def _create_sync_job_for_account(account_row: dict[str, Any], assistant_id: str = "") -> dict[str, Any]:
project_id = account_row.get("project_id") or ""
if not project_id:
raise HTTPException(status_code=400, detail="Account source is not attached to a project")
kb = legacy.resolve_target_kb(account_row["user_id"], None, project_id)
source_payload = _content_source_payload(account_row)
profile = legacy.model_profile_for_account(account_row["user_id"], None)
job_row = legacy.create_job_record(
account_id=account_row["user_id"],
project_id=project_id,
knowledge_base_id=kb["id"],
content_source_id=account_row["id"],
assistant_id=assistant_id or None,
source_type="creator_account",
line_type="analysis",
workflow_key="content_source_sync_pipeline",
title=f"{source_payload.get('title') or source_payload.get('handle') or label} 内容同步",
language="auto",
source_url=source_payload.get("source_url", ""),
artifacts={
"source_account_url": source_payload.get("source_url", ""),
"platform": platform,
"handle": source_payload.get("handle", ""),
"max_items": int(source_payload.get("metadata", {}).get("max_items") or 5),
"skip_existing": True,
"auto_trigger_analysis": True,
},
analysis_model_profile_id=profile["id"],
)
queued = await legacy.trigger_orchestrated_job(job_row)
return legacy.job_payload(queued)
def _tracking_cursor(user_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?",
(user_id,),
)
def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]:
existing = _tracking_cursor(user_id)
updated_at = now()
if existing:
legacy.db.execute(
f"UPDATE {table_prefix}_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?",
(last_seen_at, updated_at, user_id),
)
else:
legacy.db.execute(
f"INSERT INTO {table_prefix}_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)",
(user_id, last_seen_at, updated_at),
)
return legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?",
(user_id,),
)
def _tracked_account_payload(tracked_row: dict[str, Any]) -> dict[str, Any]:
assistant_name = ""
if tracked_row.get("assistant_id"):
assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (tracked_row["assistant_id"],))
if assistant_row:
assistant_name = legacy.assistant_payload(assistant_row).get("name", "")
account_row = _require_account(tracked_row["tracked_account_id"], tracked_row["user_id"])
return {
"id": tracked_row["id"],
"platform": platform,
"tracked_account_id": tracked_row["tracked_account_id"],
"assistant_id": tracked_row.get("assistant_id", "") or "",
"assistant_name": assistant_name,
"note": tracked_row.get("note", ""),
"created_at": tracked_row.get("created_at", ""),
"updated_at": tracked_row["updated_at"],
"account": _account_payload(account_row),
}
def _list_tracked_accounts(user_id: str) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
(user_id,),
)
return [_tracked_account_payload(row) for row in rows]
def _tracking_digest_item(tracked_item: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
latest_job = _latest_job_for_source(video["id"])
summary = (latest_job or {}).get("style_summary") or video.get("description") or "已发现更新内容"
borrowing_points = [point for point in [summary[:36], video.get("title", "")[:36]] if point]
performance_score = float(video.get("score", {}).get("performance_score") or 0)
return {
"tracking_id": tracked_item["id"],
"platform": platform,
"tracked_account_id": tracked_item["tracked_account_id"],
"tracked_account_name": tracked_item["account"]["nickname"],
"assistant_id": tracked_item.get("assistant_id", "") or "",
"assistant_name": tracked_item.get("assistant_name", ""),
"note": tracked_item.get("note", ""),
"account": tracked_item["account"],
"video": video,
"summary": summary,
"summary_text": summary,
"borrowing_points": borrowing_points[:3],
"is_high_value": performance_score >= 60,
"created_at": video.get("published_at") or now(),
}
def _tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]:
tracked_items = _list_tracked_accounts(user_id)
cursor = _tracking_cursor(user_id)
threshold = (since_value or (cursor or {}).get("last_seen_at") or "").strip()
items: list[dict[str, Any]] = []
for tracked in tracked_items:
for video in tracked["account"]["video_summary"]["videos"]:
published_at = str(video.get("published_at") or "")
if threshold and published_at and published_at <= threshold:
continue
items.append(_tracking_digest_item(tracked, video))
items.sort(key=lambda item: item.get("created_at", ""), reverse=True)
return {
"generated_at": now(),
"since": threshold,
"items": items[:limit],
"tracked_accounts": tracked_items,
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
}
@app.get(f"/v2/{platform}/accounts")
def list_platform_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
return [_account_payload(row) for row in _content_source_rows(account["id"], platform, "creator_account")]
@app.get(f"/v2/{platform}/accounts/{{account_id}}/workspace")
def get_platform_account_workspace(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
return _workspace_payload(account_row)
@app.get(f"/v2/{platform}/accounts/{{account_id}}/analysis-reports")
def list_platform_analysis_reports(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
account_row = _require_account(account_id, account["id"])
return _workspace_payload(account_row)["recent_reports"]
@app.get(f"/v2/{platform}/accounts/{{account_id}}/snapshots")
def list_platform_snapshots(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
_require_account(account_id, account["id"])
return []
@app.get(f"/v2/{platform}/accounts/{{account_id}}/creator-fields")
def get_platform_creator_fields(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
_require_account(account_id, account["id"])
raise HTTPException(status_code=404, detail="No creator-center snapshot found")
@app.get(f"/v2/{platform}/accounts/{{account_id}}/videos")
def list_platform_account_videos(
account_id: str,
limit: int = Query(default=80, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
items = [_video_payload(row) for row in _linked_video_sources(account_row)]
items.sort(key=lambda item: (item["score"]["performance_score"], item.get("published_at") or ""), reverse=True)
top_ids = [item["id"] for item in items if float(item["score"]["performance_score"] or 0) >= 60][:12]
latest_ids = [item["id"] for item in sorted(items, key=lambda item: item.get("published_at") or "", reverse=True)[:12]]
return {
"items": items[:limit],
"count": len(items),
"meta": {"platform": platform, "account_id": account_id},
"top_scored_video_ids": top_ids,
"latest_video_ids": latest_ids,
"high_score_threshold": 60,
}
@app.post(f"/v2/{platform}/accounts/{{account_id}}/analysis")
async def analyze_platform_account(
account_id: str,
request: PlatformAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
workspace = _workspace_payload(account_row)
context = {
"account": workspace["account"],
"top_videos": workspace["account"]["video_summary"]["videos"][: max(1, min(request.max_videos, 8))],
"linked_accounts": workspace["linked_accounts"][:5],
"extra_focus": request.extra_focus,
}
prompt = (
f"请从新媒体商业化运营视角,分析这个{label}账号,输出执行摘要、可借鉴点、风险提醒和下一步动作。"
f"\n\n输入:\n{json.dumps(context, ensure_ascii=False, indent=2)}"
)
output, parsed = await _call_reasoning_model(
account["id"],
prompt,
system_prompt="你是新媒体账号分析顾问。尽量输出 JSON字段包括 executive_summary、borrow_points、risks、next_actions。",
temperature=request.temperature,
)
report_id = make_id(f"{platform}_report")
legacy.db.execute(
f"INSERT INTO {table_prefix}_analysis_reports (id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
report_id,
account["id"],
account_row["id"],
request.extra_focus or "",
prompt,
_safe_json_dumps(context),
now(),
),
)
suggestion_id = make_id(f"{platform}_suggestion")
profile = legacy.model_profile_for_account(account["id"], None)
legacy.db.execute(
f"INSERT INTO {table_prefix}_analysis_suggestions (id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
suggestion_id,
report_id,
profile["id"],
f"{profile.get('name', '')} · {profile.get('model_name', '')}".strip(" ·"),
"ok",
output[:4000],
_safe_json_dumps(parsed),
now(),
),
)
report_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_analysis_reports WHERE id = ?",
(report_id,),
)
report_payload = _report_payload(report_row)
return {
"report_id": report_id,
"account_id": account_row["id"],
"created_at": now(),
"suggestions": report_payload["suggestions"],
"context": context,
"top_video_analyses": [],
}
@app.post(f"/v2/{platform}/accounts/{{account_id}}/videos/analyze-top")
async def analyze_platform_top_videos(
account_id: str,
request: PlatformTopVideoAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(account_id, account["id"])
videos = [_video_payload(row) for row in _linked_video_sources(account_row)]
ranked = [
video for video in sorted(videos, key=lambda item: item["score"]["performance_score"], reverse=True)
if float(video["score"]["performance_score"] or 0) >= float(request.min_score or 0)
][: request.top_video_count]
results: list[dict[str, Any]] = []
for video in ranked:
prompt = (
f"请拆解这条{label}作品为什么值得关注,输出 summary、borrow_points、risks。"
f"\n\n输入:\n{json.dumps(video, ensure_ascii=False, indent=2)}"
)
output, parsed = await _call_reasoning_model(
account["id"],
prompt,
system_prompt="你是短视频内容拆解助手。尽量输出 JSON字段包括 summary、borrow_points、risks。",
model_profile_id=request.model_profile_id,
temperature=request.temperature,
)
summary_text = str(parsed.get("summary") or parsed.get("headline_summary") or output)[:240]
results.append(
{
"id": make_id(f"{platform}_va"),
"video_id": video["id"],
"video_title": video["title"],
"status": "ok",
"summary_text": summary_text,
"parsed_json": parsed,
"performance_score": video["score"]["performance_score"],
"created_at": now(),
}
)
return {
"account_id": account_row["id"],
"analyzed_count": len(results),
"items": results,
}
@app.post(f"/v2/{platform}/similar-searches")
async def create_platform_similarity_search(
request: PlatformSimilaritySearchRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
account_row = _require_account(request.source_account_id, account["id"])
source_payload = _account_payload(account_row)
candidates = [
row for row in _content_source_rows(account["id"], platform, "creator_account")
if row["id"] != account_row["id"]
][: max(5, request.max_candidates)]
ranked_candidates: list[dict[str, Any]] = []
source_tags = set(source_payload.get("tags") or [])
for index, row in enumerate(candidates, start=1):
payload = _account_payload(row)
overlap = len(source_tags.intersection(set(payload.get("tags") or [])))
heuristic = overlap * 10 + max(0, 50 - index)
rationale = f"与源账号同平台,标签重合 {overlap},适合作为{label}对标候选。"
ranked_candidates.append(
{
"candidate_account_id": row["id"],
"candidate_profile_url": payload.get("profile_url", ""),
"candidate_nickname": payload.get("nickname", ""),
"heuristic_score": float(heuristic),
"agent_score": float(heuristic),
"rationale_text": rationale,
"dimensions_json": {"tag_overlap": overlap},
}
)
ranked_candidates.sort(key=lambda item: item["agent_score"], reverse=True)
ranked_candidates = ranked_candidates[: request.max_candidates]
search_id = make_id(f"{platform}_search")
legacy.db.execute(
f"INSERT INTO {table_prefix}_similarity_searches (id, user_id, source_account_id, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(
search_id,
account["id"],
account_row["id"],
request.extra_requirements or "",
_safe_json_dumps({"source_account": source_payload}),
now(),
),
)
for idx, item in enumerate(ranked_candidates):
legacy.db.execute(
f"""INSERT INTO {table_prefix}_similarity_candidates
(id, search_id, candidate_account_id, candidate_profile_url, heuristic_score, agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
make_id(f"{platform}_candidate"),
search_id,
item.get("candidate_account_id") or None,
item.get("candidate_profile_url", ""),
item.get("heuristic_score", 0),
item.get("agent_score", 0),
item.get("rationale_text", ""),
_safe_json_dumps(item.get("dimensions_json") or {}),
_safe_json_dumps(item),
idx,
now(),
),
)
return {"id": search_id, "search_id": search_id}
@app.get(f"/v2/{platform}/similar-searches/{{search_id}}")
def get_platform_similarity_search(search_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
search_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_similarity_searches WHERE id = ? AND user_id = ?",
(search_id, account["id"]),
)
if not search_row:
raise HTTPException(status_code=404, detail="Similarity search not found")
candidate_rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_similarity_candidates WHERE search_id = ? ORDER BY rank_index ASC",
(search_id,),
)
candidates = []
for row in candidate_rows:
payload = _parse_json(row.get("raw_output_json") or "{}", {})
payload.setdefault("candidate_account_id", row.get("candidate_account_id", ""))
payload.setdefault("candidate_profile_url", row.get("candidate_profile_url", ""))
payload.setdefault("rationale_text", row.get("rationale_text", ""))
payload.setdefault("agent_score", row.get("agent_score", 0))
payload.setdefault("heuristic_score", row.get("heuristic_score", 0))
candidates.append(payload)
return {
"id": search_row["id"],
"search_id": search_row["id"],
"source_account_id": search_row["source_account_id"],
"candidates": candidates,
"created_at": search_row["created_at"],
}
@app.get(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links")
def list_platform_benchmark_links(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
_require_account(account_id, account["id"])
rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC",
(account_id,),
)
return [_relation_payload(row) for row in rows]
@app.post(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links")
def create_platform_benchmark_links(
account_id: str,
request: PlatformBenchmarkLinksRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_account = _require_account(account_id, account["id"])
created: list[dict[str, Any]] = []
for target_account_id in request.target_account_ids:
target = _require_account(target_account_id, account["id"])
relation_id = make_id(f"{platform}_link")
legacy.db.execute(
f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
relation_id,
account["id"],
source_account["id"],
target["id"],
target.get("source_url", ""),
request.relation_type or "benchmark",
request.note or "",
request.search_id or "",
now(),
),
)
created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,))))
for target_profile_url in request.target_profile_urls:
cleaned = str(target_profile_url or "").strip()
if not cleaned:
continue
relation_id = make_id(f"{platform}_link")
legacy.db.execute(
f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
relation_id,
account["id"],
source_account["id"],
None,
cleaned,
request.relation_type or "benchmark",
request.note or "",
request.search_id or "",
now(),
),
)
created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,))))
return {"links": created}
@app.get(f"/v2/{platform}/tracking/accounts")
def list_platform_tracking_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
cursor = _tracking_cursor(account["id"])
return {
"items": _list_tracked_accounts(account["id"]),
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
}
@app.post(f"/v2/{platform}/tracking/accounts")
def create_platform_tracking_account(
request: PlatformTrackingAccountRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
tracked = _require_account(request.tracked_account_id, account["id"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, tracked.get("project_id", ""))
existing = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked["id"]),
)
if existing:
legacy.db.execute(
f"UPDATE {table_prefix}_tracked_accounts SET assistant_id = ?, note = ?, updated_at = ? WHERE id = ?",
(((assistant or {}).get("id") or None), request.note or "", now(), existing["id"]),
)
row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (existing["id"],))
else:
tracking_id = make_id(f"{platform}_track")
legacy.db.execute(
f"INSERT INTO {table_prefix}_tracked_accounts (id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
tracking_id,
account["id"],
tracked["id"],
(assistant or {}).get("id") or None,
request.note or "",
now(),
now(),
),
)
row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (tracking_id,))
return _tracked_account_payload(row)
@app.post(f"/v2/{platform}/tracking/accounts/{{tracked_account_id}}/refresh")
async def refresh_platform_tracked_account(tracked_account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
tracked_row = legacy.db.fetch_one(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked_account_id),
)
if not tracked_row:
raise HTTPException(status_code=404, detail="Tracked account not found")
account_row = _require_account(tracked_account_id, account["id"])
queued = await _create_sync_job_for_account(account_row, assistant_id=tracked_row.get("assistant_id", "") or "")
legacy.db.execute(
f"UPDATE {table_prefix}_tracked_accounts SET updated_at = ? WHERE id = ?",
(now(), tracked_row["id"]),
)
return {"tracking_id": tracked_row["id"], "tracked_account_id": tracked_account_id, "sync_job_id": queued["id"], "status": queued["status"]}
@app.post(f"/v2/{platform}/tracking/refresh")
async def refresh_platform_tracking(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
tracked_rows = legacy.db.fetch_all(
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
(account["id"],),
)
refreshed = 0
failed = 0
items: list[dict[str, Any]] = []
for row in tracked_rows:
try:
account_row = _require_account(row["tracked_account_id"], account["id"])
queued = await _create_sync_job_for_account(account_row, assistant_id=row.get("assistant_id", "") or "")
refreshed += 1
items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "sync_job_id": queued["id"], "status": queued["status"]})
except Exception as exc:
failed += 1
items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "error": str(exc)})
return {"refreshed": refreshed, "failed": failed, "items": items}
@app.post(f"/v2/{platform}/tracking/cursor")
def update_platform_tracking_cursor(request: PlatformTrackingCursorRequest, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
cursor = _set_tracking_cursor(account["id"], request.last_seen_at)
return {"last_seen_at": cursor["last_seen_at"], "updated_at": cursor["updated_at"]}
@app.get(f"/v2/{platform}/tracking/digest")
def get_platform_tracking_digest(
since: str = Query(default=""),
limit: int = Query(default=24, ge=1, le=100),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
return _tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)