from __future__ import annotations import json from collections import Counter from typing import Any from fastapi import Depends, HTTPException, Query from pydantic import BaseModel, Field # This module is intentionally self-contained because the task only allows # writes to a new file. To activate it, import `register_wechat_video_routes` # from `app.main` and call it with `(app, core)`. WECHAT_VIDEO_PLATFORM = "wechat_video" ACCOUNT_SOURCE_KIND = "creator_account" YOUTUBE_HOST_MARKERS = ("youtube.com", "youtu.be") class WechatVideoAccountSyncRequest(BaseModel): project_id: str = "" knowledge_base_id: str = "" assistant_id: str = "" content_source_id: str = "" profile_url: str = "" handle: str = "" title: str = "" analysis_model_profile_id: str = "" language: str = "auto" max_items: int = Field(default=5, ge=1, le=20) skip_existing: bool = True auto_trigger_analysis: bool = True class WechatVideoReviewCreateRequest(BaseModel): project_id: str = "" source_job_id: str = "" assistant_id: str = "" title: str = "" content_type: str = "video" publish_url: str = "" published_at: str = "" metrics: dict[str, Any] = Field(default_factory=dict) verdict: str = "" highlights: str = "" next_actions: str = "" notes: str = "" def register_wechat_video_routes(app: Any, legacy: Any) -> None: if getattr(app.state, "wechat_video_routes_registered", False): return app.state.wechat_video_routes_registered = True def _account_not_found() -> HTTPException: return HTTPException(status_code=404, detail="WeChat Video account not found") def _normalize_wechat_source_url(source_url: str) -> str: normalized = source_url.strip() if not normalized: return "" lowered = normalized.lower() if any(marker in lowered for marker in YOUTUBE_HOST_MARKERS): raise HTTPException(status_code=400, detail="YouTube is not supported by wechat_video routes") inferred = legacy.infer_platform_from_url(normalized) if inferred != WECHAT_VIDEO_PLATFORM: raise HTTPException( status_code=400, detail="wechat_video routes only accept channels.weixin.qq.com or mp.weixin.qq.com/s URLs", ) return normalized def _require_owned_account(source_id: str, user_id: str) -> dict[str, Any]: row = legacy.load_owned_content_source(source_id, user_id) if row.get("platform") != WECHAT_VIDEO_PLATFORM or row.get("source_kind") != ACCOUNT_SOURCE_KIND: raise _account_not_found() return row def _list_sync_job_rows(source_row: dict[str, Any], *, limit: int = 50) -> list[dict[str, Any]]: return legacy.db.fetch_all( """ SELECT * FROM jobs WHERE user_id = ? AND content_source_id = ? AND source_type = 'content_source_sync' ORDER BY created_at DESC LIMIT ? """, (source_row["user_id"], source_row["id"], max(1, limit)), ) def _list_video_job_rows(source_row: dict[str, Any], *, limit: int = 200) -> list[dict[str, Any]]: sync_rows = _list_sync_job_rows(source_row, limit=max(1, limit)) if not sync_rows: return [] parent_job_ids = [row["id"] for row in sync_rows] placeholders = ",".join("?" for _ in parent_job_ids) query = f""" SELECT * FROM jobs WHERE user_id = ? AND source_type = 'video_link' AND parent_job_id IN ({placeholders}) ORDER BY created_at DESC """ params: tuple[Any, ...] = (source_row["user_id"], *parent_job_ids) return legacy.db.fetch_all(query, params)[: max(1, limit)] def _dedupe_latest_video_jobs(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: deduped: list[dict[str, Any]] = [] seen_urls: set[str] = set() for row in rows: source_url = str(row.get("source_url") or "").strip() if not source_url or source_url in seen_urls: continue seen_urls.add(source_url) deduped.append(row) return deduped def _fetch_content_source(source_id: str) -> dict[str, Any] | None: if not source_id: return None return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,)) def _load_related_reviews(source_row: dict[str, Any], video_rows: list[dict[str, Any]], *, limit: int = 50) -> list[dict[str, Any]]: candidate_rows = legacy.db.fetch_all( """ SELECT * FROM publish_reviews WHERE user_id = ? AND platform = ? ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC LIMIT 400 """, (source_row["user_id"], WECHAT_VIDEO_PLATFORM), ) job_ids = {row["id"] for row in video_rows} video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")} results: list[dict[str, Any]] = [] for row in candidate_rows: source_job_id = str(row.get("source_job_id") or "").strip() publish_url = str(row.get("publish_url") or "").strip() if source_job_id and source_job_id in job_ids: results.append(row) continue if publish_url and publish_url in video_urls: results.append(row) return results[: max(1, limit)] def _load_related_documents(video_rows: list[dict[str, Any]], *, limit: int = 30) -> list[dict[str, Any]]: kb_ids = {str(row.get("knowledge_base_id") or "").strip() for row in video_rows if row.get("knowledge_base_id")} video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")} documents: list[dict[str, Any]] = [] seen_document_ids: set[str] = set() for kb_id in kb_ids: for row in legacy.db.fetch_all( """ SELECT * FROM knowledge_documents WHERE knowledge_base_id = ? ORDER BY created_at DESC LIMIT 200 """, (kb_id,), ): if row["id"] in seen_document_ids: continue if str(row.get("source_url") or "").strip() not in video_urls: continue seen_document_ids.add(row["id"]) documents.append(row) if len(documents) >= limit: return documents return documents def _build_review_maps(review_rows: list[dict[str, Any]]) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]: by_job_id: dict[str, dict[str, Any]] = {} by_url: dict[str, dict[str, Any]] = {} for row in review_rows: source_job_id = str(row.get("source_job_id") or "").strip() publish_url = str(row.get("publish_url") or "").strip() if source_job_id and source_job_id not in by_job_id: by_job_id[source_job_id] = row if publish_url and publish_url not in by_url: by_url[publish_url] = row return by_job_id, by_url def _build_document_map(document_rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: by_url: dict[str, dict[str, Any]] = {} for row in document_rows: source_url = str(row.get("source_url") or "").strip() if source_url and source_url not in by_url: by_url[source_url] = row return by_url def _build_account_payload(source_row: dict[str, Any]) -> dict[str, Any]: payload = legacy.content_source_payload(source_row) metadata = payload.get("metadata") or {} latest_sync_job = None last_sync_job_id = str(metadata.get("last_sync_job_id") or "") if last_sync_job_id: latest_sync_job = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (last_sync_job_id,)) payload["platform_label"] = legacy.platform_label(WECHAT_VIDEO_PLATFORM) payload["last_sync_job_id"] = last_sync_job_id payload["last_sync_completed_at"] = str(metadata.get("last_sync_completed_at") or "") payload["last_sync_error"] = str(metadata.get("last_sync_error") or "") payload["last_sync_status"] = str((latest_sync_job or {}).get("status") or "") payload["sync_mode"] = str(metadata.get("sync_mode") or "recent_uploads") return payload def _build_video_item( job_row: dict[str, Any], review_by_job_id: dict[str, dict[str, Any]], review_by_url: dict[str, dict[str, Any]], document_by_url: dict[str, dict[str, Any]], ) -> dict[str, Any]: source_url = str(job_row.get("source_url") or "").strip() content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip()) review_row = review_by_job_id.get(job_row["id"]) or review_by_url.get(source_url) document_row = document_by_url.get(source_url) artifacts = legacy.parse_job_artifacts(job_row) return { "id": job_row["id"], "title": job_row.get("title", ""), "status": job_row.get("status", ""), "source_url": source_url, "external_id": str(artifacts.get("external_id") or ""), "origin_sync_job_id": str(artifacts.get("origin_sync_job_id") or ""), "job": legacy.job_payload(job_row), "content_source": legacy.content_source_payload(content_source) if content_source else None, "latest_review": legacy.review_payload(review_row) if review_row else None, "document": legacy.document_payload(document_row) if document_row else None, } def _build_workspace_payload(source_row: dict[str, Any]) -> dict[str, Any]: sync_rows = _list_sync_job_rows(source_row, limit=20) video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200)) review_rows = _load_related_reviews(source_row, video_rows, limit=20) document_rows = _load_related_documents(video_rows, limit=12) review_by_job_id, review_by_url = _build_review_maps(review_rows) document_by_url = _build_document_map(document_rows) status_counts = Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows) latest_sync = legacy.job_context_payload(sync_rows[0]) if sync_rows else None return { "account": _build_account_payload(source_row), "latest_sync_job": latest_sync, "sync_jobs": [legacy.job_payload(row) for row in sync_rows[:10]], "videos": { "total": len(video_rows), "status_counts": dict(status_counts), "items": [ _build_video_item(row, review_by_job_id, review_by_url, document_by_url) for row in video_rows[:20] ], }, "reviews": [legacy.review_payload(row) for row in review_rows], "recent_documents": [legacy.document_payload(row) for row in document_rows], "stats": { "sync_job_count": len(sync_rows), "video_job_count": len(video_rows), "completed_video_count": status_counts.get("completed", 0), "failed_video_count": status_counts.get("failed", 0), "review_count": len(review_rows), "document_count": len(document_rows), }, } def _update_account_source( source_row: dict[str, Any], *, source_url: str, title: str, handle: str, metadata_updates: dict[str, Any], ) -> dict[str, Any]: merged_metadata = legacy.merge_json_field(source_row.get("metadata_json") or "{}", metadata_updates) legacy.db.execute( """ UPDATE content_sources SET handle = ?, source_url = ?, title = ?, platform = ?, metadata_json = ?, updated_at = ? WHERE id = ? AND user_id = ? """, ( handle, source_url, title, WECHAT_VIDEO_PLATFORM, merged_metadata, legacy.utc_now(), source_row["id"], source_row["user_id"], ), ) return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],)) def _job_belongs_to_account(job_row: dict[str, Any], source_row: dict[str, Any]) -> bool: if str(job_row.get("content_source_id") or "").strip(): content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip()) metadata = (legacy.content_source_payload(content_source).get("metadata") or {}) if content_source else {} if content_source and str(metadata.get("origin_content_source_id") or "") == source_row["id"]: return True if str(job_row.get("parent_job_id") or "").strip(): parent_row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["parent_job_id"],)) if parent_row and str(parent_row.get("content_source_id") or "") == source_row["id"]: return True return False @app.get("/v2/wechat-video/accounts") def list_wechat_video_accounts( project_id: str | None = Query(default=None), account: dict[str, Any] = Depends(legacy.require_approved), ) -> list[dict[str, Any]]: clauses = ["user_id = ?", "platform = ?", "source_kind = ?"] params: list[Any] = [account["id"], WECHAT_VIDEO_PLATFORM, ACCOUNT_SOURCE_KIND] if project_id: project = legacy.resolve_target_project(account["id"], project_id, username=account["username"]) clauses.append("project_id = ?") params.append(project["id"]) rows = legacy.db.fetch_all( f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY updated_at DESC", tuple(params), ) return [_build_account_payload(row) for row in rows] @app.post("/v2/wechat-video/accounts/sync") async def sync_wechat_video_account( request: WechatVideoAccountSyncRequest, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: source_row = None if request.content_source_id.strip(): source_row = _require_owned_account(request.content_source_id.strip(), account["id"]) source_url = _normalize_wechat_source_url(request.profile_url or (source_row or {}).get("source_url", "")) if not source_url: raise HTTPException(status_code=400, detail="profile_url or content_source_id is required") requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "") project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]: raise HTTPException(status_code=400, detail="Content source does not belong to target project") kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"]) assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) profile = legacy.model_profile_for_account(account["id"], request.analysis_model_profile_id or None) handle = request.handle.strip() or (source_row or {}).get("handle", "").strip() title = request.title.strip() or (source_row or {}).get("title", "").strip() or handle or source_url metadata_updates = { "account_type": WECHAT_VIDEO_PLATFORM, "sync_mode": "recent_uploads", "max_items": request.max_items, "analysis_model_profile_id": profile["id"], "last_sync_error": "", } if not source_row: source_row = legacy.create_content_source( account_id=account["id"], project_id=project["id"], source_kind=ACCOUNT_SOURCE_KIND, platform=WECHAT_VIDEO_PLATFORM, handle=handle, source_url=source_url, title=title, metadata=metadata_updates, ) else: source_row = _update_account_source( source_row, source_url=source_url, title=title, handle=handle, metadata_updates=metadata_updates, ) job_row = legacy.create_job_record( account_id=account["id"], project_id=project["id"], knowledge_base_id=kb["id"], source_type="content_source_sync", line_type="content_source_sync", workflow_key="content_source_sync_pipeline", title=f"{title} 内容源同步", language=request.language, source_url=source_url, assistant_id=(assistant or {}).get("id"), content_source_id=source_row["id"], artifacts={ "platform": WECHAT_VIDEO_PLATFORM, "handle": handle, "source_account_url": source_url, "source_title": title, "max_items": request.max_items, "skip_existing": request.skip_existing, "auto_trigger_analysis": request.auto_trigger_analysis, }, analysis_model_profile_id=profile["id"], ) legacy.update_content_source_metadata( source_row["id"], { "sync_mode": "recent_uploads", "max_items": request.max_items, "analysis_model_profile_id": profile["id"], "last_sync_job_id": job_row["id"], "last_sync_requested_at": legacy.utc_now(), "last_sync_error": "", }, ) queued_row = await legacy.trigger_orchestrated_job(job_row) source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],)) workspace = _build_workspace_payload(source_row) workspace["sync_job"] = legacy.job_payload(queued_row) return workspace @app.get("/v2/wechat-video/accounts/{account_id}") def get_wechat_video_account( account_id: str, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: source_row = _require_owned_account(account_id, account["id"]) return _build_workspace_payload(source_row) @app.get("/v2/wechat-video/accounts/{account_id}/workspace") def get_wechat_video_account_workspace( account_id: str, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: source_row = _require_owned_account(account_id, account["id"]) return _build_workspace_payload(source_row) @app.get("/v2/wechat-video/accounts/{account_id}/videos") def list_wechat_video_account_videos( account_id: str, limit: int = Query(default=50, ge=1, le=200), status: str = Query(default=""), q: str = Query(default=""), account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: source_row = _require_owned_account(account_id, account["id"]) video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=max(limit * 4, 200))) normalized_status = status.strip().lower() normalized_query = q.strip().lower() if normalized_status: video_rows = [row for row in video_rows if str(row.get("status") or "").lower() == normalized_status] if normalized_query: video_rows = [ row for row in video_rows if normalized_query in str(row.get("title") or "").lower() or normalized_query in str(row.get("source_url") or "").lower() ] selected_rows = video_rows[:limit] review_rows = _load_related_reviews(source_row, selected_rows, limit=max(limit, 20)) document_rows = _load_related_documents(selected_rows, limit=max(limit, 20)) review_by_job_id, review_by_url = _build_review_maps(review_rows) document_by_url = _build_document_map(document_rows) return { "account": _build_account_payload(source_row), "total": len(video_rows), "status_counts": dict(Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)), "items": [ _build_video_item(row, review_by_job_id, review_by_url, document_by_url) for row in selected_rows ], } @app.get("/v2/wechat-video/accounts/{account_id}/reviews") def list_wechat_video_account_reviews( account_id: str, limit: int = Query(default=50, ge=1, le=200), account: dict[str, Any] = Depends(legacy.require_approved), ) -> list[dict[str, Any]]: source_row = _require_owned_account(account_id, account["id"]) video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200)) review_rows = _load_related_reviews(source_row, video_rows, limit=limit) return [legacy.review_payload(row) for row in review_rows] @app.post("/v2/wechat-video/accounts/{account_id}/reviews") def create_wechat_video_review( account_id: str, request: WechatVideoReviewCreateRequest, account: dict[str, Any] = Depends(legacy.require_approved), ) -> dict[str, Any]: source_row = _require_owned_account(account_id, account["id"]) source_job = None if request.source_job_id.strip(): source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"]) if not _job_belongs_to_account(source_job, source_row): raise HTTPException(status_code=400, detail="source_job_id does not belong to the target WeChat Video account") requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else source_row.get("project_id", "")) project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) if source_row.get("project_id") and source_row.get("project_id") != project["id"]: raise HTTPException(status_code=400, detail="WeChat Video account does not belong to target project") assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) publish_url = request.publish_url.strip() or (source_job.get("source_url", "") if source_job else "") if publish_url: _normalize_wechat_source_url(publish_url) title = request.title.strip() or (source_job.get("title", "") if source_job else "") or f"{source_row.get('title', '')} 复盘".strip() if not title: title = "微信视频号复盘" review_id = legacy.make_id("review") timestamp = legacy.utc_now() legacy.db.execute( """ INSERT INTO publish_reviews ( id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type, publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( review_id, account["id"], project["id"], source_job["id"] if source_job else None, (assistant or {}).get("id") or None, title, WECHAT_VIDEO_PLATFORM, request.content_type or "video", publish_url, request.published_at.strip(), json.dumps(request.metrics, ensure_ascii=False), request.verdict.strip(), request.highlights.strip(), request.next_actions.strip(), request.notes.strip(), timestamp, timestamp, ), ) row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) return legacy.review_payload(row)