From a695eb04b959dde2311feea7f614977250825f8f Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 09:06:04 +0800 Subject: [PATCH] Add Kuaishou route module --- collector-service/app/kuaishou_features.py | 381 +++++++++++++++++++++ 1 file changed, 381 insertions(+) create mode 100644 collector-service/app/kuaishou_features.py diff --git a/collector-service/app/kuaishou_features.py b/collector-service/app/kuaishou_features.py new file mode 100644 index 0000000..be081cd --- /dev/null +++ b/collector-service/app/kuaishou_features.py @@ -0,0 +1,381 @@ +from __future__ import annotations + +import json +from typing import Any + +from fastapi import Depends, HTTPException, Query +from pydantic import BaseModel, Field + +from .core_main import ( + content_source_payload, + create_content_source, + create_job_record, + job_payload, + load_owned_content_source, + load_owned_job, + make_id, + parse_json_object, + resolve_target_assistant, + resolve_target_kb, + resolve_target_project, + review_payload, + trigger_orchestrated_job, + utc_now, + model_profile_for_account, + db, +) + +KUAISHOU_PLATFORM = "kuaishou" +KUAISHOU_URL_HINTS = ( + "kuaishou.com", + "v.kuaishou.com", + "chenzhongtech.com", +) +YOUTUBE_URL_HINTS = ( + "youtube.com", + "youtu.be", + "m.youtube.com", + "music.youtube.com", +) + + +class KuaishouContentSourceCreateRequest(BaseModel): + project_id: str = "" + source_kind: str = "creator_account" + handle: str = "" + source_url: str = "" + title: str = "" + local_path: str = "" + metadata: dict[str, Any] = Field(default_factory=dict) + + +class KuaishouContentSourceSyncRequest(BaseModel): + project_id: str = "" + knowledge_base_id: str = "" + assistant_id: str = "" + content_source_id: str = "" + handle: str = "" + source_url: str = "" + title: str = "" + analysis_model_profile_id: str = "" + language: str = "auto" + max_items: int = Field(default=5, ge=1, le=20) + skip_existing: bool = True + auto_trigger_analysis: bool = True + + +class KuaishouReviewCreateRequest(BaseModel): + project_id: str = "" + source_job_id: str = "" + assistant_id: str = "" + title: str = "" + content_type: str = "video" + publish_url: str = "" + published_at: str = "" + metrics: dict[str, Any] = Field(default_factory=dict) + verdict: str = "" + highlights: str = "" + next_actions: str = "" + notes: str = "" + + +def _normalize_text(value: str | None) -> str: + return str(value or "").strip() + + +def _is_youtube_url(value: str) -> bool: + normalized = _normalize_text(value).lower() + return any(hint in normalized for hint in YOUTUBE_URL_HINTS) + + +def _is_kuaishou_url(value: str) -> bool: + normalized = _normalize_text(value).lower() + return any(hint in normalized for hint in KUAISHOU_URL_HINTS) + + +def _ensure_kuaishou_url(value: str) -> str: + normalized = _normalize_text(value) + if not normalized: + return "" + if _is_youtube_url(normalized): + raise HTTPException(status_code=400, detail="YouTube URLs are not supported in the Kuaishou routes") + return normalized + + +def _content_source_is_kuaishou(row: dict[str, Any]) -> bool: + if _normalize_text(row.get("platform")).lower() == KUAISHOU_PLATFORM: + return True + return _is_kuaishou_url(row.get("source_url", "")) + + +def _job_is_kuaishou(row: dict[str, Any]) -> bool: + artifacts = parse_json_object(row.get("artifacts_json") or "{}") + source_url = _normalize_text(row.get("source_url")) + if source_url and _is_youtube_url(source_url): + return False + if source_url and _is_kuaishou_url(source_url): + return True + if _normalize_text(artifacts.get("platform")).lower() == KUAISHOU_PLATFORM: + return True + content_source_id = _normalize_text(row.get("content_source_id")) + if content_source_id: + source_row = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (content_source_id,)) + return bool(source_row and _content_source_is_kuaishou(source_row)) + return False + + +def _require_owned_kuaishou_source(source_id: str, account_id: str) -> dict[str, Any]: + row = load_owned_content_source(source_id, account_id) + if not _content_source_is_kuaishou(row): + raise HTTPException(status_code=400, detail="Content source does not belong to the Kuaishou route") + return row + + +def _list_kuaishou_jobs(account_id: str, project_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]: + rows = db.fetch_all( + "SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", + (account_id, max(limit, 1) * 10), + ) + items: list[dict[str, Any]] = [] + for row in rows: + if project_id and _normalize_text(row.get("project_id")) != project_id: + continue + if _job_is_kuaishou(row): + items.append(job_payload(row)) + if len(items) >= limit: + break + return items + + +def _list_kuaishou_reviews(account_id: str, project_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]: + clauses = ["user_id = ?", "platform = ?"] + params: list[Any] = [account_id, KUAISHOU_PLATFORM] + if project_id is not None: + normalized = project_id.strip() + if normalized: + clauses.append("project_id = ?") + params.append(normalized) + else: + clauses.append("(project_id IS NULL OR project_id = '')") + sql = f""" + SELECT * FROM publish_reviews + WHERE {' AND '.join(clauses)} + ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC + LIMIT ? + """ + params.append(limit) + return [review_payload(row) for row in db.fetch_all(sql, tuple(params))] + + +def register_kuaishou_routes(app: Any, legacy: Any) -> None: + """Register a small Kuaishou route set on top of the shared collector tables.""" + + @app.get("/v2/kuaishou/content-sources") + def list_kuaishou_content_sources( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + clauses = ["user_id = ?", "platform = ?"] + params: list[Any] = [account["id"], KUAISHOU_PLATFORM] + if project_id: + resolve_target_project(account["id"], project_id, username=account["username"]) + clauses.append("project_id = ?") + params.append(project_id) + rows = legacy.db.fetch_all( + f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC", + tuple(params), + ) + return [content_source_payload(row) for row in rows] + + @app.post("/v2/kuaishou/content-sources") + def create_kuaishou_content_source_api( + request: KuaishouContentSourceCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) + source_url = _ensure_kuaishou_url(request.source_url) + if source_url and _is_youtube_url(source_url): + raise HTTPException(status_code=400, detail="YouTube URLs are not supported in the Kuaishou routes") + row = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind=_normalize_text(request.source_kind) or "creator_account", + platform=KUAISHOU_PLATFORM, + handle=_normalize_text(request.handle), + source_url=source_url, + title=_normalize_text(request.title) or _normalize_text(request.handle) or source_url, + local_path=_normalize_text(request.local_path), + metadata=request.metadata, + ) + return content_source_payload(row) + + @app.post("/v2/kuaishou/pipelines/content-source-sync") + async def create_kuaishou_content_source_sync_job( + request: KuaishouContentSourceSyncRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_row = None + if request.content_source_id.strip(): + source_row = _require_owned_kuaishou_source(request.content_source_id.strip(), account["id"]) + + requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "") + project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + kb = resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + profile = model_profile_for_account(account["id"], request.analysis_model_profile_id or None) + + source_url = _ensure_kuaishou_url( + request.source_url or (source_row or {}).get("source_url", "") + ) + if not source_url: + raise HTTPException(status_code=400, detail="source_url or content_source_id with a Kuaishou URL is required") + + handle = _normalize_text(request.handle or (source_row or {}).get("handle", "")) + source_title = ( + _normalize_text(request.title) + or (source_row or {}).get("title", "").strip() + or handle + or source_url + ) + + if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]: + raise HTTPException(status_code=400, detail="Content source does not belong to target project") + + if not source_row: + source_row = create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind="creator_account", + platform=KUAISHOU_PLATFORM, + handle=handle, + source_url=source_url, + title=source_title, + metadata={ + "sync_mode": "recent_uploads", + "max_items": request.max_items, + "analysis_model_profile_id": profile["id"], + }, + ) + + job_row = create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="content_source_sync", + line_type="content_source_sync", + workflow_key="content_source_sync_pipeline", + title=f"{source_title} 内容源同步", + language=request.language, + source_url=source_url, + assistant_id=(assistant or {}).get("id"), + content_source_id=source_row["id"], + artifacts={ + "platform": KUAISHOU_PLATFORM, + "handle": handle, + "source_account_url": source_url, + "source_title": source_title, + "max_items": request.max_items, + "skip_existing": request.skip_existing, + "auto_trigger_analysis": request.auto_trigger_analysis, + }, + analysis_model_profile_id=profile["id"], + ) + legacy.update_content_source_metadata( + source_row["id"], + { + "sync_mode": "recent_uploads", + "max_items": request.max_items, + "analysis_model_profile_id": profile["id"], + "last_sync_job_id": job_row["id"], + "last_sync_requested_at": utc_now(), + }, + ) + return job_payload(await trigger_orchestrated_job(job_row)) + + @app.get("/v2/kuaishou/jobs") + def list_kuaishou_jobs_api( + project_id: str | None = Query(default=None), + limit: int = Query(default=20, ge=1, le=100), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + return _list_kuaishou_jobs(account["id"], project_id=project_id, limit=limit) + + @app.get("/v2/kuaishou/workspace") + def get_kuaishou_workspace( + project_id: str | None = Query(default=None), + limit: int = Query(default=10, ge=1, le=50), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + content_sources = list_kuaishou_content_sources(project_id=project_id, account=account) + reviews = _list_kuaishou_reviews(account["id"], project_id=project_id, limit=limit) + jobs = _list_kuaishou_jobs(account["id"], project_id=project_id, limit=limit) + return { + "platform": KUAISHOU_PLATFORM, + "project_id": project_id or "", + "content_sources": content_sources, + "recent_jobs": jobs, + "recent_reviews": reviews, + "counts": { + "content_sources": len(content_sources), + "jobs": len(jobs), + "reviews": len(reviews), + }, + } + + @app.get("/v2/kuaishou/reviews") + def list_kuaishou_reviews_api( + project_id: str | None = Query(default=None), + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + return _list_kuaishou_reviews(account["id"], project_id=project_id, limit=limit) + + @app.post("/v2/kuaishou/reviews") + def create_kuaishou_review( + request: KuaishouReviewCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_job = None + if request.source_job_id.strip(): + source_job = load_owned_job(request.source_job_id.strip(), account["id"]) + if not _job_is_kuaishou(source_job): + raise HTTPException(status_code=400, detail="Source job does not belong to the Kuaishou route") + + requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "") + project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) + assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + review_id = make_id("review") + title = request.title.strip() or (source_job.get("title", "") if source_job else "") + if not title: + title = f"{project['name']} 快手复盘" + timestamp = utc_now() + db.execute( + """ + INSERT INTO publish_reviews ( + id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type, + publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + review_id, + account["id"], + project["id"], + source_job["id"] if source_job else None, + (assistant or {}).get("id") or None, + title, + KUAISHOU_PLATFORM, + request.content_type or "video", + _normalize_text(request.publish_url), + _normalize_text(request.published_at), + json.dumps(request.metrics, ensure_ascii=False), + _normalize_text(request.verdict), + _normalize_text(request.highlights), + _normalize_text(request.next_actions), + _normalize_text(request.notes), + timestamp, + timestamp, + ), + ) + row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return review_payload(row)