diff --git a/collector-service/app/douyin_features.py b/collector-service/app/douyin_features.py index b79df24..d6105e0 100644 --- a/collector-service/app/douyin_features.py +++ b/collector-service/app/douyin_features.py @@ -50,6 +50,15 @@ class DouyinAccountAnalysisRequest(BaseModel): max_videos: int = 12 extra_focus: str = "" temperature: float = 0.35 + auto_analyze_top_videos: bool = True + top_video_analysis_count: int = 6 + + +class DouyinTopVideoAnalysisRequest(BaseModel): + model_profile_id: str | None = None + top_video_count: int = 6 + min_score: float = 45.0 + temperature: float = 0.25 class DouyinSimilarSearchRequest(BaseModel): @@ -1754,6 +1763,17 @@ def register_douyin_routes(app: Any, legacy: Any) -> None: } suggestions = await asyncio.gather(*[_analyze_with_model(profile) for profile in profiles]) + top_video_analyses: list[dict[str, Any]] = [] + if request.auto_analyze_top_videos and profiles: + top_video_analyses = await _run_top_video_analyses( + account_row, + owner, + profiles[0], + top_video_count=request.top_video_analysis_count, + min_score=45.0, + source_type="account_analysis_followup", + temperature=min(max(request.temperature, 0.1), 0.4) + ) legacy.db.execute( "UPDATE douyin_accounts SET last_analysis_at = ?, updated_at = ? WHERE id = ?", (now(), now(), account_row["id"]) @@ -1762,9 +1782,100 @@ def register_douyin_routes(app: Any, legacy: Any) -> None: "report_id": report_id, "created_at": created_at, "context": analysis_context, - "suggestions": suggestions + "suggestions": suggestions, + "top_video_analyses": top_video_analyses } + async def _run_top_video_analyses( + account_row: dict[str, Any], + owner: dict[str, Any], + profile: dict[str, Any], + *, + top_video_count: int = 6, + min_score: float = 45.0, + source_type: str = "top_score_auto", + temperature: float = 0.25 + ) -> list[dict[str, Any]]: + raw_videos = _list_videos(account_row["id"], limit=max(top_video_count * 3, 24)) + ranked_videos = [ + video for video in (_workspace_video_payload(item) for item in raw_videos) + if float(video.get("score", {}).get("performance_score") or 0) >= float(min_score) + ] + ranked_videos.sort(key=lambda item: _video_sort_key(item, "score"), reverse=True) + ranked_videos = ranked_videos[: max(1, min(top_video_count, 12))] + if not ranked_videos: + return [] + + account_payload = _build_account_payload(account_row, include_recent_videos=8) + system_prompt = ( + "你是商业化短视频拆解顾问。你要针对单条作品给出可用于商业化运营的复盘。" + "请返回严格 JSON 对象,字段包含:headline_summary、hook_breakdown、" + "structure_breakdown、commercial_angle、replication_plan、operator_actions、" + "risk_notes、scores。scores 里包含 hook、retention、conversion、commercial,范围 0-100。" + ) + + async def _analyze_video(video: dict[str, Any]) -> dict[str, Any]: + prompt_context = { + "account": { + "id": account_payload["id"], + "nickname": account_payload["nickname"], + "signature": account_payload["signature"], + "tags": account_payload["tags"][:12] + }, + "video": { + "id": video["id"], + "aweme_id": video["aweme_id"], + "title": video["title"], + "description": video["description"], + "published_at": video["published_at"], + "tags": video["tags"], + "stats": video["stats"], + "score": video["score"] + } + } + user_prompt = ( + "请从商业化运营视角拆解这条作品。重点回答:为什么值得关注、" + "适合承接什么产品或服务、下一步怎么复刻、运营动作怎么排。" + f"\n\n输入上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}" + ) + try: + output = await legacy.call_model( + profile, + system_prompt=system_prompt, + user_prompt=user_prompt, + temperature=temperature + ) + parsed = _try_parse_agent_json(output) + status = "ok" + except Exception as exc: + output = str(exc) + parsed = {} + status = "error" + + if not isinstance(parsed, dict): + parsed = {} + summary = _first_non_empty( + parsed.get("headline_summary"), + parsed.get("summary"), + parsed.get("commercial_angle"), + output + ) + return { + "id": make_id("dyva"), + "account_id": account_row["id"], + "video_id": video["id"], + "model_profile_id": profile["id"], + "model_label": _build_model_label(profile), + "source_type": source_type, + "status": status, + "summary": summary, + "analysis_json": parsed, + "video": video, + "created_at": now() + } + + return await asyncio.gather(*[_analyze_video(video) for video in ranked_videos]) + async def _prepare_similarity_source( owner: dict[str, Any], request: DouyinSimilarSearchRequest @@ -2209,6 +2320,101 @@ def register_douyin_routes(app: Any, legacy: Any) -> None: account_row = _require_owned_account(account_id, account["id"]) return _build_workspace_payload(account_row)["recent_reports"] + @app.post("/v2/douyin/accounts/{account_id}/videos/analyze-top") + async def analyze_douyin_top_videos( + account_id: str, + request: DouyinTopVideoAnalysisRequest, + account: dict[str, Any] = Depends(legacy.require_approved) + ) -> dict[str, Any]: + account_row = _require_owned_account(account_id, account["id"]) + profile = legacy.model_profile_for_account(account["id"], request.model_profile_id) + raw_videos = _list_videos(account_row["id"], limit=max(int(request.top_video_count or 6) * 3, 24)) + items = [_workspace_video_payload(video) for video in raw_videos] + ranked_videos = [ + item for item in sorted(items, key=lambda entry: _video_sort_key(entry, "score"), reverse=True) + if float(item.get("score", {}).get("performance_score") or 0) >= float(request.min_score or 0) + ][: max(1, min(int(request.top_video_count or 6), 12))] + if not ranked_videos: + return { + "account_id": account_row["id"], + "model_profile_id": profile["id"], + "analyzed_count": 0, + "items": [] + } + + account_payload = _build_account_payload(account_row, include_recent_videos=8) + system_prompt = ( + "你是商业化短视频拆解顾问。请从 hook、结构、转化承接、可复刻动作四个维度," + "拆解单条作品,并尽量返回 JSON。" + ) + + async def _analyze_video(video: dict[str, Any]) -> dict[str, Any]: + prompt_context = { + "account": { + "id": account_payload["id"], + "nickname": account_payload["nickname"], + "signature": account_payload["signature"], + "tags": account_payload["tags"][:10] + }, + "video": { + "id": video["id"], + "aweme_id": video["aweme_id"], + "title": video["title"], + "description": video["description"], + "published_at": video["published_at"], + "tags": video["tags"], + "stats": video["stats"], + "score": video["score"] + } + } + user_prompt = ( + "请从商业化运营视角拆解这条作品,回答它为什么值得关注、适合承接什么、" + "下一步如何复刻,并给出 3 条可执行动作。" + f"\n\n输入上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}" + ) + try: + output = await legacy.call_model( + profile, + system_prompt=system_prompt, + user_prompt=user_prompt, + temperature=request.temperature + ) + parsed = _try_parse_agent_json(output) + status = "ok" + except Exception as exc: + output = str(exc) + parsed = {} + status = "error" + summary_text = "" + if isinstance(parsed, dict): + summary_text = str( + parsed.get("headline_summary") + or parsed.get("summary") + or parsed.get("commercial_angle") + or "" + ) + summary_text = _compact_text(summary_text or output, 240) + return { + "id": make_id("dyva"), + "video_id": video["id"], + "video_title": video["title"], + "status": status, + "summary_text": summary_text, + "parsed_json": parsed if isinstance(parsed, dict) else {}, + "performance_score": float(video.get("score", {}).get("performance_score") or 0), + "model_profile_id": profile["id"], + "model_label": _build_model_label(profile), + "created_at": now() + } + + results = await asyncio.gather(*[_analyze_video(video) for video in ranked_videos]) + return { + "account_id": account_row["id"], + "model_profile_id": profile["id"], + "analyzed_count": len(results), + "items": results + } + @app.post("/v2/douyin/accounts/{account_id}/analysis") async def analyze_douyin_account( account_id: str, @@ -2218,6 +2424,30 @@ def register_douyin_routes(app: Any, legacy: Any) -> None: account_row = _require_owned_account(account_id, account["id"]) return await _run_account_analysis(account_row, account, request) + @app.post("/v2/douyin/accounts/{account_id}/videos/analyze-top") + async def analyze_douyin_top_videos( + account_id: str, + request: DouyinTopVideoAnalysisRequest, + account: dict[str, Any] = Depends(legacy.require_approved) + ) -> dict[str, Any]: + account_row = _require_owned_account(account_id, account["id"]) + profile = legacy.model_profile_for_account(account["id"], request.model_profile_id) + items = await _run_top_video_analyses( + account_row, + account, + profile, + top_video_count=request.top_video_count, + min_score=request.min_score, + source_type="manual_top_video_refresh", + temperature=request.temperature + ) + return { + "account_id": account_row["id"], + "model_profile_id": profile["id"], + "analyzed_count": len(items), + "items": items + } + @app.post("/v2/douyin/similar-searches") async def create_douyin_similarity_search( request: DouyinSimilarSearchRequest,