diff --git a/collector-service/app/wechat_video_features.py b/collector-service/app/wechat_video_features.py new file mode 100644 index 0000000..c00c061 --- /dev/null +++ b/collector-service/app/wechat_video_features.py @@ -0,0 +1,531 @@ +from __future__ import annotations + +import json +from collections import Counter +from typing import Any + +from fastapi import Depends, HTTPException, Query +from pydantic import BaseModel, Field + +# This module is intentionally self-contained because the task only allows +# writes to a new file. To activate it, import `register_wechat_video_routes` +# from `app.main` and call it with `(app, core)`. + +WECHAT_VIDEO_PLATFORM = "wechat_video" +ACCOUNT_SOURCE_KIND = "creator_account" +YOUTUBE_HOST_MARKERS = ("youtube.com", "youtu.be") + + +class WechatVideoAccountSyncRequest(BaseModel): + project_id: str = "" + knowledge_base_id: str = "" + assistant_id: str = "" + content_source_id: str = "" + profile_url: str = "" + handle: str = "" + title: str = "" + analysis_model_profile_id: str = "" + language: str = "auto" + max_items: int = Field(default=5, ge=1, le=20) + skip_existing: bool = True + auto_trigger_analysis: bool = True + + +class WechatVideoReviewCreateRequest(BaseModel): + project_id: str = "" + source_job_id: str = "" + assistant_id: str = "" + title: str = "" + content_type: str = "video" + publish_url: str = "" + published_at: str = "" + metrics: dict[str, Any] = Field(default_factory=dict) + verdict: str = "" + highlights: str = "" + next_actions: str = "" + notes: str = "" + + +def register_wechat_video_routes(app: Any, legacy: Any) -> None: + if getattr(app.state, "wechat_video_routes_registered", False): + return + app.state.wechat_video_routes_registered = True + + def _account_not_found() -> HTTPException: + return HTTPException(status_code=404, detail="WeChat Video account not found") + + def _normalize_wechat_source_url(source_url: str) -> str: + normalized = source_url.strip() + if not normalized: + return "" + lowered = normalized.lower() + if any(marker in lowered for marker in YOUTUBE_HOST_MARKERS): + raise HTTPException(status_code=400, detail="YouTube is not supported by wechat_video routes") + inferred = legacy.infer_platform_from_url(normalized) + if inferred != WECHAT_VIDEO_PLATFORM: + raise HTTPException( + status_code=400, + detail="wechat_video routes only accept channels.weixin.qq.com or mp.weixin.qq.com/s URLs", + ) + return normalized + + def _require_owned_account(source_id: str, user_id: str) -> dict[str, Any]: + row = legacy.load_owned_content_source(source_id, user_id) + if row.get("platform") != WECHAT_VIDEO_PLATFORM or row.get("source_kind") != ACCOUNT_SOURCE_KIND: + raise _account_not_found() + return row + + def _list_sync_job_rows(source_row: dict[str, Any], *, limit: int = 50) -> list[dict[str, Any]]: + return legacy.db.fetch_all( + """ + SELECT * + FROM jobs + WHERE user_id = ? AND content_source_id = ? AND source_type = 'content_source_sync' + ORDER BY created_at DESC + LIMIT ? + """, + (source_row["user_id"], source_row["id"], max(1, limit)), + ) + + def _list_video_job_rows(source_row: dict[str, Any], *, limit: int = 200) -> list[dict[str, Any]]: + sync_rows = _list_sync_job_rows(source_row, limit=max(1, limit)) + if not sync_rows: + return [] + parent_job_ids = [row["id"] for row in sync_rows] + placeholders = ",".join("?" for _ in parent_job_ids) + query = f""" + SELECT * + FROM jobs + WHERE user_id = ? AND source_type = 'video_link' AND parent_job_id IN ({placeholders}) + ORDER BY created_at DESC + """ + params: tuple[Any, ...] = (source_row["user_id"], *parent_job_ids) + return legacy.db.fetch_all(query, params)[: max(1, limit)] + + def _dedupe_latest_video_jobs(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + deduped: list[dict[str, Any]] = [] + seen_urls: set[str] = set() + for row in rows: + source_url = str(row.get("source_url") or "").strip() + if not source_url or source_url in seen_urls: + continue + seen_urls.add(source_url) + deduped.append(row) + return deduped + + def _fetch_content_source(source_id: str) -> dict[str, Any] | None: + if not source_id: + return None + return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,)) + + def _load_related_reviews(source_row: dict[str, Any], video_rows: list[dict[str, Any]], *, limit: int = 50) -> list[dict[str, Any]]: + candidate_rows = legacy.db.fetch_all( + """ + SELECT * + FROM publish_reviews + WHERE user_id = ? AND platform = ? + ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC + LIMIT 400 + """, + (source_row["user_id"], WECHAT_VIDEO_PLATFORM), + ) + job_ids = {row["id"] for row in video_rows} + video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")} + results: list[dict[str, Any]] = [] + for row in candidate_rows: + source_job_id = str(row.get("source_job_id") or "").strip() + publish_url = str(row.get("publish_url") or "").strip() + if source_job_id and source_job_id in job_ids: + results.append(row) + continue + if publish_url and publish_url in video_urls: + results.append(row) + return results[: max(1, limit)] + + def _load_related_documents(video_rows: list[dict[str, Any]], *, limit: int = 30) -> list[dict[str, Any]]: + kb_ids = {str(row.get("knowledge_base_id") or "").strip() for row in video_rows if row.get("knowledge_base_id")} + video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")} + documents: list[dict[str, Any]] = [] + seen_document_ids: set[str] = set() + for kb_id in kb_ids: + for row in legacy.db.fetch_all( + """ + SELECT * + FROM knowledge_documents + WHERE knowledge_base_id = ? + ORDER BY created_at DESC + LIMIT 200 + """, + (kb_id,), + ): + if row["id"] in seen_document_ids: + continue + if str(row.get("source_url") or "").strip() not in video_urls: + continue + seen_document_ids.add(row["id"]) + documents.append(row) + if len(documents) >= limit: + return documents + return documents + + def _build_review_maps(review_rows: list[dict[str, Any]]) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]: + by_job_id: dict[str, dict[str, Any]] = {} + by_url: dict[str, dict[str, Any]] = {} + for row in review_rows: + source_job_id = str(row.get("source_job_id") or "").strip() + publish_url = str(row.get("publish_url") or "").strip() + if source_job_id and source_job_id not in by_job_id: + by_job_id[source_job_id] = row + if publish_url and publish_url not in by_url: + by_url[publish_url] = row + return by_job_id, by_url + + def _build_document_map(document_rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: + by_url: dict[str, dict[str, Any]] = {} + for row in document_rows: + source_url = str(row.get("source_url") or "").strip() + if source_url and source_url not in by_url: + by_url[source_url] = row + return by_url + + def _build_account_payload(source_row: dict[str, Any]) -> dict[str, Any]: + payload = legacy.content_source_payload(source_row) + metadata = payload.get("metadata") or {} + latest_sync_job = None + last_sync_job_id = str(metadata.get("last_sync_job_id") or "") + if last_sync_job_id: + latest_sync_job = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (last_sync_job_id,)) + payload["platform_label"] = legacy.platform_label(WECHAT_VIDEO_PLATFORM) + payload["last_sync_job_id"] = last_sync_job_id + payload["last_sync_completed_at"] = str(metadata.get("last_sync_completed_at") or "") + payload["last_sync_error"] = str(metadata.get("last_sync_error") or "") + payload["last_sync_status"] = str((latest_sync_job or {}).get("status") or "") + payload["sync_mode"] = str(metadata.get("sync_mode") or "recent_uploads") + return payload + + def _build_video_item( + job_row: dict[str, Any], + review_by_job_id: dict[str, dict[str, Any]], + review_by_url: dict[str, dict[str, Any]], + document_by_url: dict[str, dict[str, Any]], + ) -> dict[str, Any]: + source_url = str(job_row.get("source_url") or "").strip() + content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip()) + review_row = review_by_job_id.get(job_row["id"]) or review_by_url.get(source_url) + document_row = document_by_url.get(source_url) + artifacts = legacy.parse_job_artifacts(job_row) + return { + "id": job_row["id"], + "title": job_row.get("title", ""), + "status": job_row.get("status", ""), + "source_url": source_url, + "external_id": str(artifacts.get("external_id") or ""), + "origin_sync_job_id": str(artifacts.get("origin_sync_job_id") or ""), + "job": legacy.job_payload(job_row), + "content_source": legacy.content_source_payload(content_source) if content_source else None, + "latest_review": legacy.review_payload(review_row) if review_row else None, + "document": legacy.document_payload(document_row) if document_row else None, + } + + def _build_workspace_payload(source_row: dict[str, Any]) -> dict[str, Any]: + sync_rows = _list_sync_job_rows(source_row, limit=20) + video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200)) + review_rows = _load_related_reviews(source_row, video_rows, limit=20) + document_rows = _load_related_documents(video_rows, limit=12) + review_by_job_id, review_by_url = _build_review_maps(review_rows) + document_by_url = _build_document_map(document_rows) + status_counts = Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows) + latest_sync = legacy.job_context_payload(sync_rows[0]) if sync_rows else None + return { + "account": _build_account_payload(source_row), + "latest_sync_job": latest_sync, + "sync_jobs": [legacy.job_payload(row) for row in sync_rows[:10]], + "videos": { + "total": len(video_rows), + "status_counts": dict(status_counts), + "items": [ + _build_video_item(row, review_by_job_id, review_by_url, document_by_url) + for row in video_rows[:20] + ], + }, + "reviews": [legacy.review_payload(row) for row in review_rows], + "recent_documents": [legacy.document_payload(row) for row in document_rows], + "stats": { + "sync_job_count": len(sync_rows), + "video_job_count": len(video_rows), + "completed_video_count": status_counts.get("completed", 0), + "failed_video_count": status_counts.get("failed", 0), + "review_count": len(review_rows), + "document_count": len(document_rows), + }, + } + + def _update_account_source( + source_row: dict[str, Any], + *, + source_url: str, + title: str, + handle: str, + metadata_updates: dict[str, Any], + ) -> dict[str, Any]: + merged_metadata = legacy.merge_json_field(source_row.get("metadata_json") or "{}", metadata_updates) + legacy.db.execute( + """ + UPDATE content_sources + SET handle = ?, source_url = ?, title = ?, platform = ?, metadata_json = ?, updated_at = ? + WHERE id = ? AND user_id = ? + """, + ( + handle, + source_url, + title, + WECHAT_VIDEO_PLATFORM, + merged_metadata, + legacy.utc_now(), + source_row["id"], + source_row["user_id"], + ), + ) + return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],)) + + def _job_belongs_to_account(job_row: dict[str, Any], source_row: dict[str, Any]) -> bool: + if str(job_row.get("content_source_id") or "").strip(): + content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip()) + metadata = (legacy.content_source_payload(content_source).get("metadata") or {}) if content_source else {} + if content_source and str(metadata.get("origin_content_source_id") or "") == source_row["id"]: + return True + if str(job_row.get("parent_job_id") or "").strip(): + parent_row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["parent_job_id"],)) + if parent_row and str(parent_row.get("content_source_id") or "") == source_row["id"]: + return True + return False + + @app.get("/v2/wechat-video/accounts") + def list_wechat_video_accounts( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + clauses = ["user_id = ?", "platform = ?", "source_kind = ?"] + params: list[Any] = [account["id"], WECHAT_VIDEO_PLATFORM, ACCOUNT_SOURCE_KIND] + if project_id: + project = legacy.resolve_target_project(account["id"], project_id, username=account["username"]) + clauses.append("project_id = ?") + params.append(project["id"]) + rows = legacy.db.fetch_all( + f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY updated_at DESC", + tuple(params), + ) + return [_build_account_payload(row) for row in rows] + + @app.post("/v2/wechat-video/accounts/sync") + async def sync_wechat_video_account( + request: WechatVideoAccountSyncRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_row = None + if request.content_source_id.strip(): + source_row = _require_owned_account(request.content_source_id.strip(), account["id"]) + + source_url = _normalize_wechat_source_url(request.profile_url or (source_row or {}).get("source_url", "")) + if not source_url: + raise HTTPException(status_code=400, detail="profile_url or content_source_id is required") + + requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "") + project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]: + raise HTTPException(status_code=400, detail="Content source does not belong to target project") + + kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"]) + assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + profile = legacy.model_profile_for_account(account["id"], request.analysis_model_profile_id or None) + handle = request.handle.strip() or (source_row or {}).get("handle", "").strip() + title = request.title.strip() or (source_row or {}).get("title", "").strip() or handle or source_url + metadata_updates = { + "account_type": WECHAT_VIDEO_PLATFORM, + "sync_mode": "recent_uploads", + "max_items": request.max_items, + "analysis_model_profile_id": profile["id"], + "last_sync_error": "", + } + + if not source_row: + source_row = legacy.create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind=ACCOUNT_SOURCE_KIND, + platform=WECHAT_VIDEO_PLATFORM, + handle=handle, + source_url=source_url, + title=title, + metadata=metadata_updates, + ) + else: + source_row = _update_account_source( + source_row, + source_url=source_url, + title=title, + handle=handle, + metadata_updates=metadata_updates, + ) + + job_row = legacy.create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="content_source_sync", + line_type="content_source_sync", + workflow_key="content_source_sync_pipeline", + title=f"{title} 内容源同步", + language=request.language, + source_url=source_url, + assistant_id=(assistant or {}).get("id"), + content_source_id=source_row["id"], + artifacts={ + "platform": WECHAT_VIDEO_PLATFORM, + "handle": handle, + "source_account_url": source_url, + "source_title": title, + "max_items": request.max_items, + "skip_existing": request.skip_existing, + "auto_trigger_analysis": request.auto_trigger_analysis, + }, + analysis_model_profile_id=profile["id"], + ) + legacy.update_content_source_metadata( + source_row["id"], + { + "sync_mode": "recent_uploads", + "max_items": request.max_items, + "analysis_model_profile_id": profile["id"], + "last_sync_job_id": job_row["id"], + "last_sync_requested_at": legacy.utc_now(), + "last_sync_error": "", + }, + ) + queued_row = await legacy.trigger_orchestrated_job(job_row) + source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],)) + workspace = _build_workspace_payload(source_row) + workspace["sync_job"] = legacy.job_payload(queued_row) + return workspace + + @app.get("/v2/wechat-video/accounts/{account_id}") + def get_wechat_video_account( + account_id: str, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_row = _require_owned_account(account_id, account["id"]) + return _build_workspace_payload(source_row) + + @app.get("/v2/wechat-video/accounts/{account_id}/workspace") + def get_wechat_video_account_workspace( + account_id: str, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_row = _require_owned_account(account_id, account["id"]) + return _build_workspace_payload(source_row) + + @app.get("/v2/wechat-video/accounts/{account_id}/videos") + def list_wechat_video_account_videos( + account_id: str, + limit: int = Query(default=50, ge=1, le=200), + status: str = Query(default=""), + q: str = Query(default=""), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_row = _require_owned_account(account_id, account["id"]) + video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=max(limit * 4, 200))) + normalized_status = status.strip().lower() + normalized_query = q.strip().lower() + if normalized_status: + video_rows = [row for row in video_rows if str(row.get("status") or "").lower() == normalized_status] + if normalized_query: + video_rows = [ + row + for row in video_rows + if normalized_query in str(row.get("title") or "").lower() + or normalized_query in str(row.get("source_url") or "").lower() + ] + selected_rows = video_rows[:limit] + review_rows = _load_related_reviews(source_row, selected_rows, limit=max(limit, 20)) + document_rows = _load_related_documents(selected_rows, limit=max(limit, 20)) + review_by_job_id, review_by_url = _build_review_maps(review_rows) + document_by_url = _build_document_map(document_rows) + return { + "account": _build_account_payload(source_row), + "total": len(video_rows), + "status_counts": dict(Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)), + "items": [ + _build_video_item(row, review_by_job_id, review_by_url, document_by_url) + for row in selected_rows + ], + } + + @app.get("/v2/wechat-video/accounts/{account_id}/reviews") + def list_wechat_video_account_reviews( + account_id: str, + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + source_row = _require_owned_account(account_id, account["id"]) + video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200)) + review_rows = _load_related_reviews(source_row, video_rows, limit=limit) + return [legacy.review_payload(row) for row in review_rows] + + @app.post("/v2/wechat-video/accounts/{account_id}/reviews") + def create_wechat_video_review( + account_id: str, + request: WechatVideoReviewCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_row = _require_owned_account(account_id, account["id"]) + source_job = None + if request.source_job_id.strip(): + source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"]) + if not _job_belongs_to_account(source_job, source_row): + raise HTTPException(status_code=400, detail="source_job_id does not belong to the target WeChat Video account") + + requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else source_row.get("project_id", "")) + project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + if source_row.get("project_id") and source_row.get("project_id") != project["id"]: + raise HTTPException(status_code=400, detail="WeChat Video account does not belong to target project") + + assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + publish_url = request.publish_url.strip() or (source_job.get("source_url", "") if source_job else "") + if publish_url: + _normalize_wechat_source_url(publish_url) + title = request.title.strip() or (source_job.get("title", "") if source_job else "") or f"{source_row.get('title', '')} 复盘".strip() + if not title: + title = "微信视频号复盘" + + review_id = legacy.make_id("review") + timestamp = legacy.utc_now() + legacy.db.execute( + """ + INSERT INTO publish_reviews ( + id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type, + publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + review_id, + account["id"], + project["id"], + source_job["id"] if source_job else None, + (assistant or {}).get("id") or None, + title, + WECHAT_VIDEO_PLATFORM, + request.content_type or "video", + publish_url, + request.published_at.strip(), + json.dumps(request.metrics, ensure_ascii=False), + request.verdict.strip(), + request.highlights.strip(), + request.next_actions.strip(), + request.notes.strip(), + timestamp, + timestamp, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return legacy.review_payload(row)