Add Kuaishou route module

This commit is contained in:
kris
2026-03-23 09:06:04 +08:00
parent 9afac1eff7
commit a695eb04b9

View File

@@ -0,0 +1,381 @@
from __future__ import annotations
import json
from typing import Any
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
from .core_main import (
content_source_payload,
create_content_source,
create_job_record,
job_payload,
load_owned_content_source,
load_owned_job,
make_id,
parse_json_object,
resolve_target_assistant,
resolve_target_kb,
resolve_target_project,
review_payload,
trigger_orchestrated_job,
utc_now,
model_profile_for_account,
db,
)
KUAISHOU_PLATFORM = "kuaishou"
KUAISHOU_URL_HINTS = (
"kuaishou.com",
"v.kuaishou.com",
"chenzhongtech.com",
)
YOUTUBE_URL_HINTS = (
"youtube.com",
"youtu.be",
"m.youtube.com",
"music.youtube.com",
)
class KuaishouContentSourceCreateRequest(BaseModel):
project_id: str = ""
source_kind: str = "creator_account"
handle: str = ""
source_url: str = ""
title: str = ""
local_path: str = ""
metadata: dict[str, Any] = Field(default_factory=dict)
class KuaishouContentSourceSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
handle: str = ""
source_url: str = ""
title: str = ""
analysis_model_profile_id: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
class KuaishouReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
content_type: str = "video"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
def _normalize_text(value: str | None) -> str:
return str(value or "").strip()
def _is_youtube_url(value: str) -> bool:
normalized = _normalize_text(value).lower()
return any(hint in normalized for hint in YOUTUBE_URL_HINTS)
def _is_kuaishou_url(value: str) -> bool:
normalized = _normalize_text(value).lower()
return any(hint in normalized for hint in KUAISHOU_URL_HINTS)
def _ensure_kuaishou_url(value: str) -> str:
normalized = _normalize_text(value)
if not normalized:
return ""
if _is_youtube_url(normalized):
raise HTTPException(status_code=400, detail="YouTube URLs are not supported in the Kuaishou routes")
return normalized
def _content_source_is_kuaishou(row: dict[str, Any]) -> bool:
if _normalize_text(row.get("platform")).lower() == KUAISHOU_PLATFORM:
return True
return _is_kuaishou_url(row.get("source_url", ""))
def _job_is_kuaishou(row: dict[str, Any]) -> bool:
artifacts = parse_json_object(row.get("artifacts_json") or "{}")
source_url = _normalize_text(row.get("source_url"))
if source_url and _is_youtube_url(source_url):
return False
if source_url and _is_kuaishou_url(source_url):
return True
if _normalize_text(artifacts.get("platform")).lower() == KUAISHOU_PLATFORM:
return True
content_source_id = _normalize_text(row.get("content_source_id"))
if content_source_id:
source_row = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (content_source_id,))
return bool(source_row and _content_source_is_kuaishou(source_row))
return False
def _require_owned_kuaishou_source(source_id: str, account_id: str) -> dict[str, Any]:
row = load_owned_content_source(source_id, account_id)
if not _content_source_is_kuaishou(row):
raise HTTPException(status_code=400, detail="Content source does not belong to the Kuaishou route")
return row
def _list_kuaishou_jobs(account_id: str, project_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
rows = db.fetch_all(
"SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
(account_id, max(limit, 1) * 10),
)
items: list[dict[str, Any]] = []
for row in rows:
if project_id and _normalize_text(row.get("project_id")) != project_id:
continue
if _job_is_kuaishou(row):
items.append(job_payload(row))
if len(items) >= limit:
break
return items
def _list_kuaishou_reviews(account_id: str, project_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?"]
params: list[Any] = [account_id, KUAISHOU_PLATFORM]
if project_id is not None:
normalized = project_id.strip()
if normalized:
clauses.append("project_id = ?")
params.append(normalized)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
sql = f"""
SELECT * FROM publish_reviews
WHERE {' AND '.join(clauses)}
ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC
LIMIT ?
"""
params.append(limit)
return [review_payload(row) for row in db.fetch_all(sql, tuple(params))]
def register_kuaishou_routes(app: Any, legacy: Any) -> None:
"""Register a small Kuaishou route set on top of the shared collector tables."""
@app.get("/v2/kuaishou/content-sources")
def list_kuaishou_content_sources(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?", "platform = ?"]
params: list[Any] = [account["id"], KUAISHOU_PLATFORM]
if project_id:
resolve_target_project(account["id"], project_id, username=account["username"])
clauses.append("project_id = ?")
params.append(project_id)
rows = legacy.db.fetch_all(
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
tuple(params),
)
return [content_source_payload(row) for row in rows]
@app.post("/v2/kuaishou/content-sources")
def create_kuaishou_content_source_api(
request: KuaishouContentSourceCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = resolve_target_project(account["id"], request.project_id or None, username=account["username"])
source_url = _ensure_kuaishou_url(request.source_url)
if source_url and _is_youtube_url(source_url):
raise HTTPException(status_code=400, detail="YouTube URLs are not supported in the Kuaishou routes")
row = create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=_normalize_text(request.source_kind) or "creator_account",
platform=KUAISHOU_PLATFORM,
handle=_normalize_text(request.handle),
source_url=source_url,
title=_normalize_text(request.title) or _normalize_text(request.handle) or source_url,
local_path=_normalize_text(request.local_path),
metadata=request.metadata,
)
return content_source_payload(row)
@app.post("/v2/kuaishou/pipelines/content-source-sync")
async def create_kuaishou_content_source_sync_job(
request: KuaishouContentSourceSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_row = None
if request.content_source_id.strip():
source_row = _require_owned_kuaishou_source(request.content_source_id.strip(), account["id"])
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
kb = resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
profile = model_profile_for_account(account["id"], request.analysis_model_profile_id or None)
source_url = _ensure_kuaishou_url(
request.source_url or (source_row or {}).get("source_url", "")
)
if not source_url:
raise HTTPException(status_code=400, detail="source_url or content_source_id with a Kuaishou URL is required")
handle = _normalize_text(request.handle or (source_row or {}).get("handle", ""))
source_title = (
_normalize_text(request.title)
or (source_row or {}).get("title", "").strip()
or handle
or source_url
)
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
if not source_row:
source_row = create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind="creator_account",
platform=KUAISHOU_PLATFORM,
handle=handle,
source_url=source_url,
title=source_title,
metadata={
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
},
)
job_row = create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{source_title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": KUAISHOU_PLATFORM,
"handle": handle,
"source_account_url": source_url,
"source_title": source_title,
"max_items": request.max_items,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
},
analysis_model_profile_id=profile["id"],
)
legacy.update_content_source_metadata(
source_row["id"],
{
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": profile["id"],
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": utc_now(),
},
)
return job_payload(await trigger_orchestrated_job(job_row))
@app.get("/v2/kuaishou/jobs")
def list_kuaishou_jobs_api(
project_id: str | None = Query(default=None),
limit: int = Query(default=20, ge=1, le=100),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
return _list_kuaishou_jobs(account["id"], project_id=project_id, limit=limit)
@app.get("/v2/kuaishou/workspace")
def get_kuaishou_workspace(
project_id: str | None = Query(default=None),
limit: int = Query(default=10, ge=1, le=50),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
content_sources = list_kuaishou_content_sources(project_id=project_id, account=account)
reviews = _list_kuaishou_reviews(account["id"], project_id=project_id, limit=limit)
jobs = _list_kuaishou_jobs(account["id"], project_id=project_id, limit=limit)
return {
"platform": KUAISHOU_PLATFORM,
"project_id": project_id or "",
"content_sources": content_sources,
"recent_jobs": jobs,
"recent_reviews": reviews,
"counts": {
"content_sources": len(content_sources),
"jobs": len(jobs),
"reviews": len(reviews),
},
}
@app.get("/v2/kuaishou/reviews")
def list_kuaishou_reviews_api(
project_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
return _list_kuaishou_reviews(account["id"], project_id=project_id, limit=limit)
@app.post("/v2/kuaishou/reviews")
def create_kuaishou_review(
request: KuaishouReviewCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_job = None
if request.source_job_id.strip():
source_job = load_owned_job(request.source_job_id.strip(), account["id"])
if not _job_is_kuaishou(source_job):
raise HTTPException(status_code=400, detail="Source job does not belong to the Kuaishou route")
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
review_id = make_id("review")
title = request.title.strip() or (source_job.get("title", "") if source_job else "")
if not title:
title = f"{project['name']} 快手复盘"
timestamp = utc_now()
db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
KUAISHOU_PLATFORM,
request.content_type or "video",
_normalize_text(request.publish_url),
_normalize_text(request.published_at),
json.dumps(request.metrics, ensure_ascii=False),
_normalize_text(request.verdict),
_normalize_text(request.highlights),
_normalize_text(request.next_actions),
_normalize_text(request.notes),
timestamp,
timestamp,
),
)
row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return review_payload(row)