feat: add live top video analysis workflow

This commit is contained in:
kris
2026-03-23 08:09:37 +08:00
parent 10eae9ad69
commit 5c39ea2728

View File

@@ -50,6 +50,15 @@ class DouyinAccountAnalysisRequest(BaseModel):
max_videos: int = 12
extra_focus: str = ""
temperature: float = 0.35
auto_analyze_top_videos: bool = True
top_video_analysis_count: int = 6
class DouyinTopVideoAnalysisRequest(BaseModel):
model_profile_id: str | None = None
top_video_count: int = 6
min_score: float = 45.0
temperature: float = 0.25
class DouyinSimilarSearchRequest(BaseModel):
@@ -1754,6 +1763,17 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
}
suggestions = await asyncio.gather(*[_analyze_with_model(profile) for profile in profiles])
top_video_analyses: list[dict[str, Any]] = []
if request.auto_analyze_top_videos and profiles:
top_video_analyses = await _run_top_video_analyses(
account_row,
owner,
profiles[0],
top_video_count=request.top_video_analysis_count,
min_score=45.0,
source_type="account_analysis_followup",
temperature=min(max(request.temperature, 0.1), 0.4)
)
legacy.db.execute(
"UPDATE douyin_accounts SET last_analysis_at = ?, updated_at = ? WHERE id = ?",
(now(), now(), account_row["id"])
@@ -1762,9 +1782,100 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
"report_id": report_id,
"created_at": created_at,
"context": analysis_context,
"suggestions": suggestions
"suggestions": suggestions,
"top_video_analyses": top_video_analyses
}
async def _run_top_video_analyses(
account_row: dict[str, Any],
owner: dict[str, Any],
profile: dict[str, Any],
*,
top_video_count: int = 6,
min_score: float = 45.0,
source_type: str = "top_score_auto",
temperature: float = 0.25
) -> list[dict[str, Any]]:
raw_videos = _list_videos(account_row["id"], limit=max(top_video_count * 3, 24))
ranked_videos = [
video for video in (_workspace_video_payload(item) for item in raw_videos)
if float(video.get("score", {}).get("performance_score") or 0) >= float(min_score)
]
ranked_videos.sort(key=lambda item: _video_sort_key(item, "score"), reverse=True)
ranked_videos = ranked_videos[: max(1, min(top_video_count, 12))]
if not ranked_videos:
return []
account_payload = _build_account_payload(account_row, include_recent_videos=8)
system_prompt = (
"你是商业化短视频拆解顾问。你要针对单条作品给出可用于商业化运营的复盘。"
"请返回严格 JSON 对象字段包含headline_summary、hook_breakdown、"
"structure_breakdown、commercial_angle、replication_plan、operator_actions、"
"risk_notes、scores。scores 里包含 hook、retention、conversion、commercial范围 0-100。"
)
async def _analyze_video(video: dict[str, Any]) -> dict[str, Any]:
prompt_context = {
"account": {
"id": account_payload["id"],
"nickname": account_payload["nickname"],
"signature": account_payload["signature"],
"tags": account_payload["tags"][:12]
},
"video": {
"id": video["id"],
"aweme_id": video["aweme_id"],
"title": video["title"],
"description": video["description"],
"published_at": video["published_at"],
"tags": video["tags"],
"stats": video["stats"],
"score": video["score"]
}
}
user_prompt = (
"请从商业化运营视角拆解这条作品。重点回答:为什么值得关注、"
"适合承接什么产品或服务、下一步怎么复刻、运营动作怎么排。"
f"\n\n输入上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
)
try:
output = await legacy.call_model(
profile,
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature
)
parsed = _try_parse_agent_json(output)
status = "ok"
except Exception as exc:
output = str(exc)
parsed = {}
status = "error"
if not isinstance(parsed, dict):
parsed = {}
summary = _first_non_empty(
parsed.get("headline_summary"),
parsed.get("summary"),
parsed.get("commercial_angle"),
output
)
return {
"id": make_id("dyva"),
"account_id": account_row["id"],
"video_id": video["id"],
"model_profile_id": profile["id"],
"model_label": _build_model_label(profile),
"source_type": source_type,
"status": status,
"summary": summary,
"analysis_json": parsed,
"video": video,
"created_at": now()
}
return await asyncio.gather(*[_analyze_video(video) for video in ranked_videos])
async def _prepare_similarity_source(
owner: dict[str, Any],
request: DouyinSimilarSearchRequest
@@ -2209,6 +2320,101 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
account_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(account_row)["recent_reports"]
@app.post("/v2/douyin/accounts/{account_id}/videos/analyze-top")
async def analyze_douyin_top_videos(
account_id: str,
request: DouyinTopVideoAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
profile = legacy.model_profile_for_account(account["id"], request.model_profile_id)
raw_videos = _list_videos(account_row["id"], limit=max(int(request.top_video_count or 6) * 3, 24))
items = [_workspace_video_payload(video) for video in raw_videos]
ranked_videos = [
item for item in sorted(items, key=lambda entry: _video_sort_key(entry, "score"), reverse=True)
if float(item.get("score", {}).get("performance_score") or 0) >= float(request.min_score or 0)
][: max(1, min(int(request.top_video_count or 6), 12))]
if not ranked_videos:
return {
"account_id": account_row["id"],
"model_profile_id": profile["id"],
"analyzed_count": 0,
"items": []
}
account_payload = _build_account_payload(account_row, include_recent_videos=8)
system_prompt = (
"你是商业化短视频拆解顾问。请从 hook、结构、转化承接、可复刻动作四个维度"
"拆解单条作品,并尽量返回 JSON。"
)
async def _analyze_video(video: dict[str, Any]) -> dict[str, Any]:
prompt_context = {
"account": {
"id": account_payload["id"],
"nickname": account_payload["nickname"],
"signature": account_payload["signature"],
"tags": account_payload["tags"][:10]
},
"video": {
"id": video["id"],
"aweme_id": video["aweme_id"],
"title": video["title"],
"description": video["description"],
"published_at": video["published_at"],
"tags": video["tags"],
"stats": video["stats"],
"score": video["score"]
}
}
user_prompt = (
"请从商业化运营视角拆解这条作品,回答它为什么值得关注、适合承接什么、"
"下一步如何复刻,并给出 3 条可执行动作。"
f"\n\n输入上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
)
try:
output = await legacy.call_model(
profile,
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=request.temperature
)
parsed = _try_parse_agent_json(output)
status = "ok"
except Exception as exc:
output = str(exc)
parsed = {}
status = "error"
summary_text = ""
if isinstance(parsed, dict):
summary_text = str(
parsed.get("headline_summary")
or parsed.get("summary")
or parsed.get("commercial_angle")
or ""
)
summary_text = _compact_text(summary_text or output, 240)
return {
"id": make_id("dyva"),
"video_id": video["id"],
"video_title": video["title"],
"status": status,
"summary_text": summary_text,
"parsed_json": parsed if isinstance(parsed, dict) else {},
"performance_score": float(video.get("score", {}).get("performance_score") or 0),
"model_profile_id": profile["id"],
"model_label": _build_model_label(profile),
"created_at": now()
}
results = await asyncio.gather(*[_analyze_video(video) for video in ranked_videos])
return {
"account_id": account_row["id"],
"model_profile_id": profile["id"],
"analyzed_count": len(results),
"items": results
}
@app.post("/v2/douyin/accounts/{account_id}/analysis")
async def analyze_douyin_account(
account_id: str,
@@ -2218,6 +2424,30 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
account_row = _require_owned_account(account_id, account["id"])
return await _run_account_analysis(account_row, account, request)
@app.post("/v2/douyin/accounts/{account_id}/videos/analyze-top")
async def analyze_douyin_top_videos(
account_id: str,
request: DouyinTopVideoAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
profile = legacy.model_profile_for_account(account["id"], request.model_profile_id)
items = await _run_top_video_analyses(
account_row,
account,
profile,
top_video_count=request.top_video_count,
min_score=request.min_score,
source_type="manual_top_video_refresh",
temperature=request.temperature
)
return {
"account_id": account_row["id"],
"model_profile_id": profile["id"],
"analyzed_count": len(items),
"items": items
}
@app.post("/v2/douyin/similar-searches")
async def create_douyin_similarity_search(
request: DouyinSimilarSearchRequest,