Files
storyforge/collector-service/app/wechat_video_features.py

532 lines
24 KiB
Python

from __future__ import annotations
import json
from collections import Counter
from typing import Any
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
# This module is intentionally self-contained because the task only allows
# writes to a new file. To activate it, import `register_wechat_video_routes`
# from `app.main` and call it with `(app, core)`.
WECHAT_VIDEO_PLATFORM = "wechat_video"
ACCOUNT_SOURCE_KIND = "creator_account"
YOUTUBE_HOST_MARKERS = ("youtube.com", "youtu.be")
class WechatVideoAccountSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
profile_url: str = ""
handle: str = ""
title: str = ""
analysis_model_profile_id: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
class WechatVideoReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
content_type: str = "video"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
def register_wechat_video_routes(app: Any, legacy: Any) -> None:
if getattr(app.state, "wechat_video_routes_registered", False):
return
app.state.wechat_video_routes_registered = True
def _account_not_found() -> HTTPException:
return HTTPException(status_code=404, detail="WeChat Video account not found")
def _normalize_wechat_source_url(source_url: str) -> str:
normalized = source_url.strip()
if not normalized:
return ""
lowered = normalized.lower()
if any(marker in lowered for marker in YOUTUBE_HOST_MARKERS):
raise HTTPException(status_code=400, detail="YouTube is not supported by wechat_video routes")
inferred = legacy.infer_platform_from_url(normalized)
if inferred != WECHAT_VIDEO_PLATFORM:
raise HTTPException(
status_code=400,
detail="wechat_video routes only accept channels.weixin.qq.com or mp.weixin.qq.com/s URLs",
)
return normalized
def _require_owned_account(source_id: str, user_id: str) -> dict[str, Any]:
row = legacy.load_owned_content_source(source_id, user_id)
if row.get("platform") != WECHAT_VIDEO_PLATFORM or row.get("source_kind") != ACCOUNT_SOURCE_KIND:
raise _account_not_found()
return row
def _list_sync_job_rows(source_row: dict[str, Any], *, limit: int = 50) -> list[dict[str, Any]]:
return legacy.db.fetch_all(
"""
SELECT *
FROM jobs
WHERE user_id = ? AND content_source_id = ? AND source_type = 'content_source_sync'
ORDER BY created_at DESC
LIMIT ?
""",
(source_row["user_id"], source_row["id"], max(1, limit)),
)
def _list_video_job_rows(source_row: dict[str, Any], *, limit: int = 200) -> list[dict[str, Any]]:
sync_rows = _list_sync_job_rows(source_row, limit=max(1, limit))
if not sync_rows:
return []
parent_job_ids = [row["id"] for row in sync_rows]
placeholders = ",".join("?" for _ in parent_job_ids)
query = f"""
SELECT *
FROM jobs
WHERE user_id = ? AND source_type = 'video_link' AND parent_job_id IN ({placeholders})
ORDER BY created_at DESC
"""
params: tuple[Any, ...] = (source_row["user_id"], *parent_job_ids)
return legacy.db.fetch_all(query, params)[: max(1, limit)]
def _dedupe_latest_video_jobs(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
deduped: list[dict[str, Any]] = []
seen_urls: set[str] = set()
for row in rows:
source_url = str(row.get("source_url") or "").strip()
if not source_url or source_url in seen_urls:
continue
seen_urls.add(source_url)
deduped.append(row)
return deduped
def _fetch_content_source(source_id: str) -> dict[str, Any] | None:
if not source_id:
return None
return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,))
def _load_related_reviews(source_row: dict[str, Any], video_rows: list[dict[str, Any]], *, limit: int = 50) -> list[dict[str, Any]]:
candidate_rows = legacy.db.fetch_all(
"""
SELECT *
FROM publish_reviews
WHERE user_id = ? AND platform = ?
ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC
LIMIT 400
""",
(source_row["user_id"], WECHAT_VIDEO_PLATFORM),
)
job_ids = {row["id"] for row in video_rows}
video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")}
results: list[dict[str, Any]] = []
for row in candidate_rows:
source_job_id = str(row.get("source_job_id") or "").strip()
publish_url = str(row.get("publish_url") or "").strip()
if source_job_id and source_job_id in job_ids:
results.append(row)
continue
if publish_url and publish_url in video_urls:
results.append(row)
return results[: max(1, limit)]
def _load_related_documents(video_rows: list[dict[str, Any]], *, limit: int = 30) -> list[dict[str, Any]]:
kb_ids = {str(row.get("knowledge_base_id") or "").strip() for row in video_rows if row.get("knowledge_base_id")}
video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")}
documents: list[dict[str, Any]] = []
seen_document_ids: set[str] = set()
for kb_id in kb_ids:
for row in legacy.db.fetch_all(
"""
SELECT *
FROM knowledge_documents
WHERE knowledge_base_id = ?
ORDER BY created_at DESC
LIMIT 200
""",
(kb_id,),
):
if row["id"] in seen_document_ids:
continue
if str(row.get("source_url") or "").strip() not in video_urls:
continue
seen_document_ids.add(row["id"])
documents.append(row)
if len(documents) >= limit:
return documents
return documents
def _build_review_maps(review_rows: list[dict[str, Any]]) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
by_job_id: dict[str, dict[str, Any]] = {}
by_url: dict[str, dict[str, Any]] = {}
for row in review_rows:
source_job_id = str(row.get("source_job_id") or "").strip()
publish_url = str(row.get("publish_url") or "").strip()
if source_job_id and source_job_id not in by_job_id:
by_job_id[source_job_id] = row
if publish_url and publish_url not in by_url:
by_url[publish_url] = row
return by_job_id, by_url
def _build_document_map(document_rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
by_url: dict[str, dict[str, Any]] = {}
for row in document_rows:
source_url = str(row.get("source_url") or "").strip()
if source_url and source_url not in by_url:
by_url[source_url] = row
return by_url
def _build_account_payload(source_row: dict[str, Any]) -> dict[str, Any]:
payload = legacy.content_source_payload(source_row)
metadata = payload.get("metadata") or {}
latest_sync_job = None
last_sync_job_id = str(metadata.get("last_sync_job_id") or "")
if last_sync_job_id:
latest_sync_job = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (last_sync_job_id,))
payload["platform_label"] = legacy.platform_label(WECHAT_VIDEO_PLATFORM)
payload["last_sync_job_id"] = last_sync_job_id
payload["last_sync_completed_at"] = str(metadata.get("last_sync_completed_at") or "")
payload["last_sync_error"] = str(metadata.get("last_sync_error") or "")
payload["last_sync_status"] = str((latest_sync_job or {}).get("status") or "")
payload["sync_mode"] = str(metadata.get("sync_mode") or "recent_uploads")
return payload
def _build_video_item(
job_row: dict[str, Any],
review_by_job_id: dict[str, dict[str, Any]],
review_by_url: dict[str, dict[str, Any]],
document_by_url: dict[str, dict[str, Any]],
) -> dict[str, Any]:
source_url = str(job_row.get("source_url") or "").strip()
content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip())
review_row = review_by_job_id.get(job_row["id"]) or review_by_url.get(source_url)
document_row = document_by_url.get(source_url)
artifacts = legacy.parse_job_artifacts(job_row)
return {
"id": job_row["id"],
"title": job_row.get("title", ""),
"status": job_row.get("status", ""),
"source_url": source_url,
"external_id": str(artifacts.get("external_id") or ""),
"origin_sync_job_id": str(artifacts.get("origin_sync_job_id") or ""),
"job": legacy.job_payload(job_row),
"content_source": legacy.content_source_payload(content_source) if content_source else None,
"latest_review": legacy.review_payload(review_row) if review_row else None,
"document": legacy.document_payload(document_row) if document_row else None,
}
def _build_workspace_payload(source_row: dict[str, Any]) -> dict[str, Any]:
sync_rows = _list_sync_job_rows(source_row, limit=20)
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200))
review_rows = _load_related_reviews(source_row, video_rows, limit=20)
document_rows = _load_related_documents(video_rows, limit=12)
review_by_job_id, review_by_url = _build_review_maps(review_rows)
document_by_url = _build_document_map(document_rows)
status_counts = Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)
latest_sync = legacy.job_context_payload(sync_rows[0]) if sync_rows else None
return {
"account": _build_account_payload(source_row),
"latest_sync_job": latest_sync,
"sync_jobs": [legacy.job_payload(row) for row in sync_rows[:10]],
"videos": {
"total": len(video_rows),
"status_counts": dict(status_counts),
"items": [
_build_video_item(row, review_by_job_id, review_by_url, document_by_url)
for row in video_rows[:20]
],
},
"reviews": [legacy.review_payload(row) for row in review_rows],
"recent_documents": [legacy.document_payload(row) for row in document_rows],
"stats": {
"sync_job_count": len(sync_rows),
"video_job_count": len(video_rows),
"completed_video_count": status_counts.get("completed", 0),
"failed_video_count": status_counts.get("failed", 0),
"review_count": len(review_rows),
"document_count": len(document_rows),
},
}
def _update_account_source(
source_row: dict[str, Any],
*,
source_url: str,
title: str,
handle: str,
metadata_updates: dict[str, Any],
) -> dict[str, Any]:
merged_metadata = legacy.merge_json_field(source_row.get("metadata_json") or "{}", metadata_updates)
legacy.db.execute(
"""
UPDATE content_sources
SET handle = ?, source_url = ?, title = ?, platform = ?, metadata_json = ?, updated_at = ?
WHERE id = ? AND user_id = ?
""",
(
handle,
source_url,
title,
WECHAT_VIDEO_PLATFORM,
merged_metadata,
legacy.utc_now(),
source_row["id"],
source_row["user_id"],
),
)
return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],))
def _job_belongs_to_account(job_row: dict[str, Any], source_row: dict[str, Any]) -> bool:
if str(job_row.get("content_source_id") or "").strip():
content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip())
metadata = (legacy.content_source_payload(content_source).get("metadata") or {}) if content_source else {}
if content_source and str(metadata.get("origin_content_source_id") or "") == source_row["id"]:
return True
if str(job_row.get("parent_job_id") or "").strip():
parent_row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["parent_job_id"],))
if parent_row and str(parent_row.get("content_source_id") or "") == source_row["id"]:
return True
return False
@app.get("/v2/wechat-video/accounts")
def list_wechat_video_accounts(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?", "source_kind = ?"]
params: list[Any] = [account["id"], WECHAT_VIDEO_PLATFORM, ACCOUNT_SOURCE_KIND]
if project_id:
project = legacy.resolve_target_project(account["id"], project_id, username=account["username"])
clauses.append("project_id = ?")
params.append(project["id"])
rows = legacy.db.fetch_all(
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY updated_at DESC",
tuple(params),
)
return [_build_account_payload(row) for row in rows]
@app.post("/v2/wechat-video/accounts/sync")
async def sync_wechat_video_account(
request: WechatVideoAccountSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = None
if request.content_source_id.strip():
source_row = _require_owned_account(request.content_source_id.strip(), account["id"])
source_url = _normalize_wechat_source_url(request.profile_url or (source_row or {}).get("source_url", ""))
if not source_url:
raise HTTPException(status_code=400, detail="profile_url or content_source_id is required")
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
profile = legacy.model_profile_for_account(account["id"], request.analysis_model_profile_id or None)
handle = request.handle.strip() or (source_row or {}).get("handle", "").strip()
title = request.title.strip() or (source_row or {}).get("title", "").strip() or handle or source_url
metadata_updates = {
"account_type": WECHAT_VIDEO_PLATFORM,
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
"last_sync_error": "",
}
if not source_row:
source_row = legacy.create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=ACCOUNT_SOURCE_KIND,
platform=WECHAT_VIDEO_PLATFORM,
handle=handle,
source_url=source_url,
title=title,
metadata=metadata_updates,
)
else:
source_row = _update_account_source(
source_row,
source_url=source_url,
title=title,
handle=handle,
metadata_updates=metadata_updates,
)
job_row = legacy.create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": WECHAT_VIDEO_PLATFORM,
"handle": handle,
"source_account_url": source_url,
"source_title": title,
"max_items": request.max_items,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
},
analysis_model_profile_id=profile["id"],
)
legacy.update_content_source_metadata(
source_row["id"],
{
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": legacy.utc_now(),
"last_sync_error": "",
},
)
queued_row = await legacy.trigger_orchestrated_job(job_row)
source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],))
workspace = _build_workspace_payload(source_row)
workspace["sync_job"] = legacy.job_payload(queued_row)
return workspace
@app.get("/v2/wechat-video/accounts/{account_id}")
def get_wechat_video_account(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(source_row)
@app.get("/v2/wechat-video/accounts/{account_id}/workspace")
def get_wechat_video_account_workspace(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(source_row)
@app.get("/v2/wechat-video/accounts/{account_id}/videos")
def list_wechat_video_account_videos(
account_id: str,
limit: int = Query(default=50, ge=1, le=200),
status: str = Query(default=""),
q: str = Query(default=""),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = _require_owned_account(account_id, account["id"])
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=max(limit * 4, 200)))
normalized_status = status.strip().lower()
normalized_query = q.strip().lower()
if normalized_status:
video_rows = [row for row in video_rows if str(row.get("status") or "").lower() == normalized_status]
if normalized_query:
video_rows = [
row
for row in video_rows
if normalized_query in str(row.get("title") or "").lower()
or normalized_query in str(row.get("source_url") or "").lower()
]
selected_rows = video_rows[:limit]
review_rows = _load_related_reviews(source_row, selected_rows, limit=max(limit, 20))
document_rows = _load_related_documents(selected_rows, limit=max(limit, 20))
review_by_job_id, review_by_url = _build_review_maps(review_rows)
document_by_url = _build_document_map(document_rows)
return {
"account": _build_account_payload(source_row),
"total": len(video_rows),
"status_counts": dict(Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)),
"items": [
_build_video_item(row, review_by_job_id, review_by_url, document_by_url)
for row in selected_rows
],
}
@app.get("/v2/wechat-video/accounts/{account_id}/reviews")
def list_wechat_video_account_reviews(
account_id: str,
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
source_row = _require_owned_account(account_id, account["id"])
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200))
review_rows = _load_related_reviews(source_row, video_rows, limit=limit)
return [legacy.review_payload(row) for row in review_rows]
@app.post("/v2/wechat-video/accounts/{account_id}/reviews")
def create_wechat_video_review(
account_id: str,
request: WechatVideoReviewCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = _require_owned_account(account_id, account["id"])
source_job = None
if request.source_job_id.strip():
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
if not _job_belongs_to_account(source_job, source_row):
raise HTTPException(status_code=400, detail="source_job_id does not belong to the target WeChat Video account")
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else source_row.get("project_id", ""))
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
if source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="WeChat Video account does not belong to target project")
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
publish_url = request.publish_url.strip() or (source_job.get("source_url", "") if source_job else "")
if publish_url:
_normalize_wechat_source_url(publish_url)
title = request.title.strip() or (source_job.get("title", "") if source_job else "") or f"{source_row.get('title', '')} 复盘".strip()
if not title:
title = "微信视频号复盘"
review_id = legacy.make_id("review")
timestamp = legacy.utc_now()
legacy.db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
WECHAT_VIDEO_PLATFORM,
request.content_type or "video",
publish_url,
request.published_at.strip(),
json.dumps(request.metrics, ensure_ascii=False),
request.verdict.strip(),
request.highlights.strip(),
request.next_actions.strip(),
request.notes.strip(),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return legacy.review_payload(row)