collector-service: add wechat video routes
This commit is contained in:
531
collector-service/app/wechat_video_features.py
Normal file
531
collector-service/app/wechat_video_features.py
Normal file
@@ -0,0 +1,531 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from collections import Counter
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Depends, HTTPException, Query
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# This module is intentionally self-contained because the task only allows
|
||||
# writes to a new file. To activate it, import `register_wechat_video_routes`
|
||||
# from `app.main` and call it with `(app, core)`.
|
||||
|
||||
WECHAT_VIDEO_PLATFORM = "wechat_video"
|
||||
ACCOUNT_SOURCE_KIND = "creator_account"
|
||||
YOUTUBE_HOST_MARKERS = ("youtube.com", "youtu.be")
|
||||
|
||||
|
||||
class WechatVideoAccountSyncRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
knowledge_base_id: str = ""
|
||||
assistant_id: str = ""
|
||||
content_source_id: str = ""
|
||||
profile_url: str = ""
|
||||
handle: str = ""
|
||||
title: str = ""
|
||||
analysis_model_profile_id: str = ""
|
||||
language: str = "auto"
|
||||
max_items: int = Field(default=5, ge=1, le=20)
|
||||
skip_existing: bool = True
|
||||
auto_trigger_analysis: bool = True
|
||||
|
||||
|
||||
class WechatVideoReviewCreateRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
source_job_id: str = ""
|
||||
assistant_id: str = ""
|
||||
title: str = ""
|
||||
content_type: str = "video"
|
||||
publish_url: str = ""
|
||||
published_at: str = ""
|
||||
metrics: dict[str, Any] = Field(default_factory=dict)
|
||||
verdict: str = ""
|
||||
highlights: str = ""
|
||||
next_actions: str = ""
|
||||
notes: str = ""
|
||||
|
||||
|
||||
def register_wechat_video_routes(app: Any, legacy: Any) -> None:
|
||||
if getattr(app.state, "wechat_video_routes_registered", False):
|
||||
return
|
||||
app.state.wechat_video_routes_registered = True
|
||||
|
||||
def _account_not_found() -> HTTPException:
|
||||
return HTTPException(status_code=404, detail="WeChat Video account not found")
|
||||
|
||||
def _normalize_wechat_source_url(source_url: str) -> str:
|
||||
normalized = source_url.strip()
|
||||
if not normalized:
|
||||
return ""
|
||||
lowered = normalized.lower()
|
||||
if any(marker in lowered for marker in YOUTUBE_HOST_MARKERS):
|
||||
raise HTTPException(status_code=400, detail="YouTube is not supported by wechat_video routes")
|
||||
inferred = legacy.infer_platform_from_url(normalized)
|
||||
if inferred != WECHAT_VIDEO_PLATFORM:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="wechat_video routes only accept channels.weixin.qq.com or mp.weixin.qq.com/s URLs",
|
||||
)
|
||||
return normalized
|
||||
|
||||
def _require_owned_account(source_id: str, user_id: str) -> dict[str, Any]:
|
||||
row = legacy.load_owned_content_source(source_id, user_id)
|
||||
if row.get("platform") != WECHAT_VIDEO_PLATFORM or row.get("source_kind") != ACCOUNT_SOURCE_KIND:
|
||||
raise _account_not_found()
|
||||
return row
|
||||
|
||||
def _list_sync_job_rows(source_row: dict[str, Any], *, limit: int = 50) -> list[dict[str, Any]]:
|
||||
return legacy.db.fetch_all(
|
||||
"""
|
||||
SELECT *
|
||||
FROM jobs
|
||||
WHERE user_id = ? AND content_source_id = ? AND source_type = 'content_source_sync'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(source_row["user_id"], source_row["id"], max(1, limit)),
|
||||
)
|
||||
|
||||
def _list_video_job_rows(source_row: dict[str, Any], *, limit: int = 200) -> list[dict[str, Any]]:
|
||||
sync_rows = _list_sync_job_rows(source_row, limit=max(1, limit))
|
||||
if not sync_rows:
|
||||
return []
|
||||
parent_job_ids = [row["id"] for row in sync_rows]
|
||||
placeholders = ",".join("?" for _ in parent_job_ids)
|
||||
query = f"""
|
||||
SELECT *
|
||||
FROM jobs
|
||||
WHERE user_id = ? AND source_type = 'video_link' AND parent_job_id IN ({placeholders})
|
||||
ORDER BY created_at DESC
|
||||
"""
|
||||
params: tuple[Any, ...] = (source_row["user_id"], *parent_job_ids)
|
||||
return legacy.db.fetch_all(query, params)[: max(1, limit)]
|
||||
|
||||
def _dedupe_latest_video_jobs(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
deduped: list[dict[str, Any]] = []
|
||||
seen_urls: set[str] = set()
|
||||
for row in rows:
|
||||
source_url = str(row.get("source_url") or "").strip()
|
||||
if not source_url or source_url in seen_urls:
|
||||
continue
|
||||
seen_urls.add(source_url)
|
||||
deduped.append(row)
|
||||
return deduped
|
||||
|
||||
def _fetch_content_source(source_id: str) -> dict[str, Any] | None:
|
||||
if not source_id:
|
||||
return None
|
||||
return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,))
|
||||
|
||||
def _load_related_reviews(source_row: dict[str, Any], video_rows: list[dict[str, Any]], *, limit: int = 50) -> list[dict[str, Any]]:
|
||||
candidate_rows = legacy.db.fetch_all(
|
||||
"""
|
||||
SELECT *
|
||||
FROM publish_reviews
|
||||
WHERE user_id = ? AND platform = ?
|
||||
ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC
|
||||
LIMIT 400
|
||||
""",
|
||||
(source_row["user_id"], WECHAT_VIDEO_PLATFORM),
|
||||
)
|
||||
job_ids = {row["id"] for row in video_rows}
|
||||
video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")}
|
||||
results: list[dict[str, Any]] = []
|
||||
for row in candidate_rows:
|
||||
source_job_id = str(row.get("source_job_id") or "").strip()
|
||||
publish_url = str(row.get("publish_url") or "").strip()
|
||||
if source_job_id and source_job_id in job_ids:
|
||||
results.append(row)
|
||||
continue
|
||||
if publish_url and publish_url in video_urls:
|
||||
results.append(row)
|
||||
return results[: max(1, limit)]
|
||||
|
||||
def _load_related_documents(video_rows: list[dict[str, Any]], *, limit: int = 30) -> list[dict[str, Any]]:
|
||||
kb_ids = {str(row.get("knowledge_base_id") or "").strip() for row in video_rows if row.get("knowledge_base_id")}
|
||||
video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")}
|
||||
documents: list[dict[str, Any]] = []
|
||||
seen_document_ids: set[str] = set()
|
||||
for kb_id in kb_ids:
|
||||
for row in legacy.db.fetch_all(
|
||||
"""
|
||||
SELECT *
|
||||
FROM knowledge_documents
|
||||
WHERE knowledge_base_id = ?
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 200
|
||||
""",
|
||||
(kb_id,),
|
||||
):
|
||||
if row["id"] in seen_document_ids:
|
||||
continue
|
||||
if str(row.get("source_url") or "").strip() not in video_urls:
|
||||
continue
|
||||
seen_document_ids.add(row["id"])
|
||||
documents.append(row)
|
||||
if len(documents) >= limit:
|
||||
return documents
|
||||
return documents
|
||||
|
||||
def _build_review_maps(review_rows: list[dict[str, Any]]) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
|
||||
by_job_id: dict[str, dict[str, Any]] = {}
|
||||
by_url: dict[str, dict[str, Any]] = {}
|
||||
for row in review_rows:
|
||||
source_job_id = str(row.get("source_job_id") or "").strip()
|
||||
publish_url = str(row.get("publish_url") or "").strip()
|
||||
if source_job_id and source_job_id not in by_job_id:
|
||||
by_job_id[source_job_id] = row
|
||||
if publish_url and publish_url not in by_url:
|
||||
by_url[publish_url] = row
|
||||
return by_job_id, by_url
|
||||
|
||||
def _build_document_map(document_rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
||||
by_url: dict[str, dict[str, Any]] = {}
|
||||
for row in document_rows:
|
||||
source_url = str(row.get("source_url") or "").strip()
|
||||
if source_url and source_url not in by_url:
|
||||
by_url[source_url] = row
|
||||
return by_url
|
||||
|
||||
def _build_account_payload(source_row: dict[str, Any]) -> dict[str, Any]:
|
||||
payload = legacy.content_source_payload(source_row)
|
||||
metadata = payload.get("metadata") or {}
|
||||
latest_sync_job = None
|
||||
last_sync_job_id = str(metadata.get("last_sync_job_id") or "")
|
||||
if last_sync_job_id:
|
||||
latest_sync_job = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (last_sync_job_id,))
|
||||
payload["platform_label"] = legacy.platform_label(WECHAT_VIDEO_PLATFORM)
|
||||
payload["last_sync_job_id"] = last_sync_job_id
|
||||
payload["last_sync_completed_at"] = str(metadata.get("last_sync_completed_at") or "")
|
||||
payload["last_sync_error"] = str(metadata.get("last_sync_error") or "")
|
||||
payload["last_sync_status"] = str((latest_sync_job or {}).get("status") or "")
|
||||
payload["sync_mode"] = str(metadata.get("sync_mode") or "recent_uploads")
|
||||
return payload
|
||||
|
||||
def _build_video_item(
|
||||
job_row: dict[str, Any],
|
||||
review_by_job_id: dict[str, dict[str, Any]],
|
||||
review_by_url: dict[str, dict[str, Any]],
|
||||
document_by_url: dict[str, dict[str, Any]],
|
||||
) -> dict[str, Any]:
|
||||
source_url = str(job_row.get("source_url") or "").strip()
|
||||
content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip())
|
||||
review_row = review_by_job_id.get(job_row["id"]) or review_by_url.get(source_url)
|
||||
document_row = document_by_url.get(source_url)
|
||||
artifacts = legacy.parse_job_artifacts(job_row)
|
||||
return {
|
||||
"id": job_row["id"],
|
||||
"title": job_row.get("title", ""),
|
||||
"status": job_row.get("status", ""),
|
||||
"source_url": source_url,
|
||||
"external_id": str(artifacts.get("external_id") or ""),
|
||||
"origin_sync_job_id": str(artifacts.get("origin_sync_job_id") or ""),
|
||||
"job": legacy.job_payload(job_row),
|
||||
"content_source": legacy.content_source_payload(content_source) if content_source else None,
|
||||
"latest_review": legacy.review_payload(review_row) if review_row else None,
|
||||
"document": legacy.document_payload(document_row) if document_row else None,
|
||||
}
|
||||
|
||||
def _build_workspace_payload(source_row: dict[str, Any]) -> dict[str, Any]:
|
||||
sync_rows = _list_sync_job_rows(source_row, limit=20)
|
||||
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200))
|
||||
review_rows = _load_related_reviews(source_row, video_rows, limit=20)
|
||||
document_rows = _load_related_documents(video_rows, limit=12)
|
||||
review_by_job_id, review_by_url = _build_review_maps(review_rows)
|
||||
document_by_url = _build_document_map(document_rows)
|
||||
status_counts = Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)
|
||||
latest_sync = legacy.job_context_payload(sync_rows[0]) if sync_rows else None
|
||||
return {
|
||||
"account": _build_account_payload(source_row),
|
||||
"latest_sync_job": latest_sync,
|
||||
"sync_jobs": [legacy.job_payload(row) for row in sync_rows[:10]],
|
||||
"videos": {
|
||||
"total": len(video_rows),
|
||||
"status_counts": dict(status_counts),
|
||||
"items": [
|
||||
_build_video_item(row, review_by_job_id, review_by_url, document_by_url)
|
||||
for row in video_rows[:20]
|
||||
],
|
||||
},
|
||||
"reviews": [legacy.review_payload(row) for row in review_rows],
|
||||
"recent_documents": [legacy.document_payload(row) for row in document_rows],
|
||||
"stats": {
|
||||
"sync_job_count": len(sync_rows),
|
||||
"video_job_count": len(video_rows),
|
||||
"completed_video_count": status_counts.get("completed", 0),
|
||||
"failed_video_count": status_counts.get("failed", 0),
|
||||
"review_count": len(review_rows),
|
||||
"document_count": len(document_rows),
|
||||
},
|
||||
}
|
||||
|
||||
def _update_account_source(
|
||||
source_row: dict[str, Any],
|
||||
*,
|
||||
source_url: str,
|
||||
title: str,
|
||||
handle: str,
|
||||
metadata_updates: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
merged_metadata = legacy.merge_json_field(source_row.get("metadata_json") or "{}", metadata_updates)
|
||||
legacy.db.execute(
|
||||
"""
|
||||
UPDATE content_sources
|
||||
SET handle = ?, source_url = ?, title = ?, platform = ?, metadata_json = ?, updated_at = ?
|
||||
WHERE id = ? AND user_id = ?
|
||||
""",
|
||||
(
|
||||
handle,
|
||||
source_url,
|
||||
title,
|
||||
WECHAT_VIDEO_PLATFORM,
|
||||
merged_metadata,
|
||||
legacy.utc_now(),
|
||||
source_row["id"],
|
||||
source_row["user_id"],
|
||||
),
|
||||
)
|
||||
return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],))
|
||||
|
||||
def _job_belongs_to_account(job_row: dict[str, Any], source_row: dict[str, Any]) -> bool:
|
||||
if str(job_row.get("content_source_id") or "").strip():
|
||||
content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip())
|
||||
metadata = (legacy.content_source_payload(content_source).get("metadata") or {}) if content_source else {}
|
||||
if content_source and str(metadata.get("origin_content_source_id") or "") == source_row["id"]:
|
||||
return True
|
||||
if str(job_row.get("parent_job_id") or "").strip():
|
||||
parent_row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["parent_job_id"],))
|
||||
if parent_row and str(parent_row.get("content_source_id") or "") == source_row["id"]:
|
||||
return True
|
||||
return False
|
||||
|
||||
@app.get("/v2/wechat-video/accounts")
|
||||
def list_wechat_video_accounts(
|
||||
project_id: str | None = Query(default=None),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
clauses = ["user_id = ?", "platform = ?", "source_kind = ?"]
|
||||
params: list[Any] = [account["id"], WECHAT_VIDEO_PLATFORM, ACCOUNT_SOURCE_KIND]
|
||||
if project_id:
|
||||
project = legacy.resolve_target_project(account["id"], project_id, username=account["username"])
|
||||
clauses.append("project_id = ?")
|
||||
params.append(project["id"])
|
||||
rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY updated_at DESC",
|
||||
tuple(params),
|
||||
)
|
||||
return [_build_account_payload(row) for row in rows]
|
||||
|
||||
@app.post("/v2/wechat-video/accounts/sync")
|
||||
async def sync_wechat_video_account(
|
||||
request: WechatVideoAccountSyncRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = None
|
||||
if request.content_source_id.strip():
|
||||
source_row = _require_owned_account(request.content_source_id.strip(), account["id"])
|
||||
|
||||
source_url = _normalize_wechat_source_url(request.profile_url or (source_row or {}).get("source_url", ""))
|
||||
if not source_url:
|
||||
raise HTTPException(status_code=400, detail="profile_url or content_source_id is required")
|
||||
|
||||
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
|
||||
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
|
||||
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
|
||||
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
|
||||
|
||||
kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
|
||||
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
|
||||
profile = legacy.model_profile_for_account(account["id"], request.analysis_model_profile_id or None)
|
||||
handle = request.handle.strip() or (source_row or {}).get("handle", "").strip()
|
||||
title = request.title.strip() or (source_row or {}).get("title", "").strip() or handle or source_url
|
||||
metadata_updates = {
|
||||
"account_type": WECHAT_VIDEO_PLATFORM,
|
||||
"sync_mode": "recent_uploads",
|
||||
"max_items": request.max_items,
|
||||
"analysis_model_profile_id": profile["id"],
|
||||
"last_sync_error": "",
|
||||
}
|
||||
|
||||
if not source_row:
|
||||
source_row = legacy.create_content_source(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
source_kind=ACCOUNT_SOURCE_KIND,
|
||||
platform=WECHAT_VIDEO_PLATFORM,
|
||||
handle=handle,
|
||||
source_url=source_url,
|
||||
title=title,
|
||||
metadata=metadata_updates,
|
||||
)
|
||||
else:
|
||||
source_row = _update_account_source(
|
||||
source_row,
|
||||
source_url=source_url,
|
||||
title=title,
|
||||
handle=handle,
|
||||
metadata_updates=metadata_updates,
|
||||
)
|
||||
|
||||
job_row = legacy.create_job_record(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
knowledge_base_id=kb["id"],
|
||||
source_type="content_source_sync",
|
||||
line_type="content_source_sync",
|
||||
workflow_key="content_source_sync_pipeline",
|
||||
title=f"{title} 内容源同步",
|
||||
language=request.language,
|
||||
source_url=source_url,
|
||||
assistant_id=(assistant or {}).get("id"),
|
||||
content_source_id=source_row["id"],
|
||||
artifacts={
|
||||
"platform": WECHAT_VIDEO_PLATFORM,
|
||||
"handle": handle,
|
||||
"source_account_url": source_url,
|
||||
"source_title": title,
|
||||
"max_items": request.max_items,
|
||||
"skip_existing": request.skip_existing,
|
||||
"auto_trigger_analysis": request.auto_trigger_analysis,
|
||||
},
|
||||
analysis_model_profile_id=profile["id"],
|
||||
)
|
||||
legacy.update_content_source_metadata(
|
||||
source_row["id"],
|
||||
{
|
||||
"sync_mode": "recent_uploads",
|
||||
"max_items": request.max_items,
|
||||
"analysis_model_profile_id": profile["id"],
|
||||
"last_sync_job_id": job_row["id"],
|
||||
"last_sync_requested_at": legacy.utc_now(),
|
||||
"last_sync_error": "",
|
||||
},
|
||||
)
|
||||
queued_row = await legacy.trigger_orchestrated_job(job_row)
|
||||
source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],))
|
||||
workspace = _build_workspace_payload(source_row)
|
||||
workspace["sync_job"] = legacy.job_payload(queued_row)
|
||||
return workspace
|
||||
|
||||
@app.get("/v2/wechat-video/accounts/{account_id}")
|
||||
def get_wechat_video_account(
|
||||
account_id: str,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
return _build_workspace_payload(source_row)
|
||||
|
||||
@app.get("/v2/wechat-video/accounts/{account_id}/workspace")
|
||||
def get_wechat_video_account_workspace(
|
||||
account_id: str,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
return _build_workspace_payload(source_row)
|
||||
|
||||
@app.get("/v2/wechat-video/accounts/{account_id}/videos")
|
||||
def list_wechat_video_account_videos(
|
||||
account_id: str,
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
status: str = Query(default=""),
|
||||
q: str = Query(default=""),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=max(limit * 4, 200)))
|
||||
normalized_status = status.strip().lower()
|
||||
normalized_query = q.strip().lower()
|
||||
if normalized_status:
|
||||
video_rows = [row for row in video_rows if str(row.get("status") or "").lower() == normalized_status]
|
||||
if normalized_query:
|
||||
video_rows = [
|
||||
row
|
||||
for row in video_rows
|
||||
if normalized_query in str(row.get("title") or "").lower()
|
||||
or normalized_query in str(row.get("source_url") or "").lower()
|
||||
]
|
||||
selected_rows = video_rows[:limit]
|
||||
review_rows = _load_related_reviews(source_row, selected_rows, limit=max(limit, 20))
|
||||
document_rows = _load_related_documents(selected_rows, limit=max(limit, 20))
|
||||
review_by_job_id, review_by_url = _build_review_maps(review_rows)
|
||||
document_by_url = _build_document_map(document_rows)
|
||||
return {
|
||||
"account": _build_account_payload(source_row),
|
||||
"total": len(video_rows),
|
||||
"status_counts": dict(Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)),
|
||||
"items": [
|
||||
_build_video_item(row, review_by_job_id, review_by_url, document_by_url)
|
||||
for row in selected_rows
|
||||
],
|
||||
}
|
||||
|
||||
@app.get("/v2/wechat-video/accounts/{account_id}/reviews")
|
||||
def list_wechat_video_account_reviews(
|
||||
account_id: str,
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200))
|
||||
review_rows = _load_related_reviews(source_row, video_rows, limit=limit)
|
||||
return [legacy.review_payload(row) for row in review_rows]
|
||||
|
||||
@app.post("/v2/wechat-video/accounts/{account_id}/reviews")
|
||||
def create_wechat_video_review(
|
||||
account_id: str,
|
||||
request: WechatVideoReviewCreateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
source_job = None
|
||||
if request.source_job_id.strip():
|
||||
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
|
||||
if not _job_belongs_to_account(source_job, source_row):
|
||||
raise HTTPException(status_code=400, detail="source_job_id does not belong to the target WeChat Video account")
|
||||
|
||||
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else source_row.get("project_id", ""))
|
||||
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
|
||||
if source_row.get("project_id") and source_row.get("project_id") != project["id"]:
|
||||
raise HTTPException(status_code=400, detail="WeChat Video account does not belong to target project")
|
||||
|
||||
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
|
||||
publish_url = request.publish_url.strip() or (source_job.get("source_url", "") if source_job else "")
|
||||
if publish_url:
|
||||
_normalize_wechat_source_url(publish_url)
|
||||
title = request.title.strip() or (source_job.get("title", "") if source_job else "") or f"{source_row.get('title', '')} 复盘".strip()
|
||||
if not title:
|
||||
title = "微信视频号复盘"
|
||||
|
||||
review_id = legacy.make_id("review")
|
||||
timestamp = legacy.utc_now()
|
||||
legacy.db.execute(
|
||||
"""
|
||||
INSERT INTO publish_reviews (
|
||||
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
|
||||
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
review_id,
|
||||
account["id"],
|
||||
project["id"],
|
||||
source_job["id"] if source_job else None,
|
||||
(assistant or {}).get("id") or None,
|
||||
title,
|
||||
WECHAT_VIDEO_PLATFORM,
|
||||
request.content_type or "video",
|
||||
publish_url,
|
||||
request.published_at.strip(),
|
||||
json.dumps(request.metrics, ensure_ascii=False),
|
||||
request.verdict.strip(),
|
||||
request.highlights.strip(),
|
||||
request.next_actions.strip(),
|
||||
request.notes.strip(),
|
||||
timestamp,
|
||||
timestamp,
|
||||
),
|
||||
)
|
||||
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
|
||||
return legacy.review_payload(row)
|
||||
Reference in New Issue
Block a user