From a46e4f47b35c8e2071516cf9c01dd53489c7701f Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 09:06:13 +0800 Subject: [PATCH] Add bilibili collector routes --- collector-service/app/bilibili_features.py | 545 +++++++++++++++++++++ 1 file changed, 545 insertions(+) create mode 100644 collector-service/app/bilibili_features.py diff --git a/collector-service/app/bilibili_features.py b/collector-service/app/bilibili_features.py new file mode 100644 index 0000000..062e03e --- /dev/null +++ b/collector-service/app/bilibili_features.py @@ -0,0 +1,545 @@ +from __future__ import annotations + +import json +from typing import Any + +from fastapi import Depends, HTTPException, Query +from pydantic import BaseModel, Field + + +def _safe_json_dumps(value: Any) -> str: + return json.dumps(value, ensure_ascii=False, separators=(",", ":")) + + +def _first_non_empty(*values: Any) -> str: + for value in values: + if value is None: + continue + if isinstance(value, str): + stripped = value.strip() + if stripped: + return stripped + elif value not in ("", [], {}, ()): + return str(value) + return "" + + +class BilibiliContentSourceCreateRequest(BaseModel): + project_id: str = "" + source_kind: str = "creator_account" + platform: str = "" + handle: str = "" + source_url: str = "" + title: str = "" + local_path: str = "" + metadata: dict[str, Any] = Field(default_factory=dict) + + +class BilibiliContentSourceSyncRequest(BaseModel): + project_id: str = "" + knowledge_base_id: str = "" + assistant_id: str = "" + content_source_id: str = "" + platform: str = "" + handle: str = "" + source_url: str = "" + title: str = "" + analysis_model_profile_id: str = "" + language: str = "auto" + max_items: int = Field(default=5, ge=1, le=20) + skip_existing: bool = True + auto_trigger_analysis: bool = True + + +class BilibiliReviewCreateRequest(BaseModel): + project_id: str = "" + source_job_id: str = "" + assistant_id: str = "" + title: str = "" + platform: str = "bilibili" + content_type: str = "video" + publish_url: str = "" + published_at: str = "" + metrics: dict[str, Any] = Field(default_factory=dict) + verdict: str = "" + highlights: str = "" + next_actions: str = "" + notes: str = "" + + +class BilibiliReviewUpdateRequest(BaseModel): + title: str | None = None + platform: str | None = None + content_type: str | None = None + publish_url: str | None = None + published_at: str | None = None + metrics: dict[str, Any] | None = None + verdict: str | None = None + highlights: str | None = None + next_actions: str | None = None + notes: str | None = None + assistant_id: str | None = None + + +def _is_youtube_url(source_url: str) -> bool: + lowered = source_url.strip().lower() + return "youtube.com" in lowered or "youtu.be" in lowered + + +def _resolve_bilibili_platform(legacy: Any, platform: str, source_url: str = "") -> str: + if _is_youtube_url(source_url): + raise HTTPException(status_code=400, detail="YouTube sources are not supported in the bilibili routes") + + inferred = legacy.infer_platform_from_url(source_url) if source_url.strip() else "" + normalized = legacy.normalize_platform_slug(platform, allow_blank=True) + if not normalized: + normalized = inferred or "bilibili" + + if normalized == "youtube": + raise HTTPException(status_code=400, detail="YouTube sources are not supported in the bilibili routes") + if inferred and inferred not in {"bilibili", "youtube"} and not platform.strip(): + raise HTTPException( + status_code=400, + detail=f"Bilibili routes only accept bilibili sources, not {inferred}", + ) + if normalized != "bilibili": + raise HTTPException( + status_code=400, + detail=f"Bilibili routes only accept bilibili sources, not {normalized}", + ) + return "bilibili" + + +def _content_source_query(legacy: Any, account_id: str, project_id: str | None = None) -> tuple[str, tuple[Any, ...]]: + clauses = ["user_id = ?", "platform = 'bilibili'"] + params: list[Any] = [account_id] + if project_id is not None: + normalized_project = project_id.strip() + if normalized_project: + clauses.append("project_id = ?") + params.append(normalized_project) + else: + clauses.append("(project_id IS NULL OR project_id = '')") + sql = f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC" + return sql, tuple(params) + + +def _job_query( + source_id: str | None = None, + project_id: str | None = None, + limit: int = 50, +) -> tuple[str, tuple[Any, ...]]: + clauses = ["j.user_id = ?", "cs.platform = 'bilibili'"] + params: list[Any] = [] + if source_id: + clauses.append("j.content_source_id = ?") + params.append(source_id) + if project_id is not None: + normalized_project = project_id.strip() + if normalized_project: + clauses.append("j.project_id = ?") + params.append(normalized_project) + else: + clauses.append("(j.project_id IS NULL OR j.project_id = '')") + sql = ( + "SELECT j.* " + "FROM jobs j " + "JOIN content_sources cs ON cs.id = j.content_source_id " + f"WHERE {' AND '.join(clauses)} " + "ORDER BY j.created_at DESC " + "LIMIT ?" + ) + params = [*params] + return sql, tuple([*params, limit]) + + +def _review_query(project_id: str | None = None, limit: int = 50) -> tuple[str, tuple[Any, ...]]: + clauses = ["r.user_id = ?", "r.platform = 'bilibili'"] + params: list[Any] = [] + if project_id is not None: + normalized_project = project_id.strip() + if normalized_project: + clauses.append("r.project_id = ?") + params.append(normalized_project) + else: + clauses.append("(r.project_id IS NULL OR r.project_id = '')") + sql = ( + "SELECT r.* " + "FROM publish_reviews r " + f"WHERE {' AND '.join(clauses)} " + "ORDER BY COALESCE(NULLIF(r.published_at, ''), r.created_at) DESC, r.created_at DESC " + "LIMIT ?" + ) + return sql, tuple([*params, limit]) + + +def _build_sync_result(legacy: Any, row: dict[str, Any], content_source: dict[str, Any]) -> dict[str, Any]: + payload = legacy.job_payload(row) + payload["content_source"] = legacy.content_source_payload(content_source) + return payload + + +def register_bilibili_routes(app: Any, legacy: Any) -> None: + def now() -> str: + return legacy.utc_now() + + def make_id(prefix: str) -> str: + return legacy.make_id(prefix) + + def resolve_project(account: dict[str, Any], project_id: str) -> dict[str, Any]: + return legacy.resolve_target_project(account["id"], project_id or None, username=account["username"]) + + def resolve_kb(account: dict[str, Any], kb_id: str, project_id: str) -> dict[str, Any]: + return legacy.resolve_target_kb(account["id"], kb_id or None, project_id, username=account["username"]) + + def resolve_assistant(account: dict[str, Any], assistant_id: str, project_id: str) -> dict[str, Any] | None: + return legacy.resolve_target_assistant(account["id"], assistant_id or None, project_id) + + def create_or_update_source( + *, + account: dict[str, Any], + request: BilibiliContentSourceCreateRequest, + sync_request: BilibiliContentSourceSyncRequest | None = None, + ) -> dict[str, Any]: + source_url = _first_non_empty(request.source_url, sync_request.source_url if sync_request else "") + _resolve_bilibili_platform(legacy, request.platform or (sync_request.platform if sync_request else ""), source_url) + + project = resolve_project(account, request.project_id or (sync_request.project_id if sync_request else "")) + title = _first_non_empty(request.title, sync_request.title if sync_request else "", request.handle, source_url) + metadata: dict[str, Any] = dict(request.metadata) + metadata.setdefault("platform", "bilibili") + if sync_request: + metadata.update( + { + "sync_mode": "recent_uploads", + "max_items": sync_request.max_items, + "analysis_model_profile_id": sync_request.analysis_model_profile_id, + } + ) + + return legacy.create_content_source( + account_id=account["id"], + project_id=project["id"], + source_kind=(request.source_kind or "creator_account").strip(), + platform="bilibili", + handle=request.handle.strip(), + source_url=source_url.strip(), + title=title.strip(), + local_path=request.local_path.strip(), + metadata=metadata, + ) + + async def sync_source( + *, + account: dict[str, Any], + request: BilibiliContentSourceSyncRequest, + content_source: dict[str, Any] | None = None, + ) -> dict[str, Any]: + source_row = content_source + if request.content_source_id.strip(): + source_row = legacy.load_owned_content_source(request.content_source_id.strip(), account["id"]) + + source_url = _first_non_empty( + request.source_url, + (source_row or {}).get("source_url", ""), + ) + _resolve_bilibili_platform( + legacy, + request.platform or (source_row or {}).get("platform", ""), + source_url, + ) + + project_id = request.project_id or (source_row or {}).get("project_id", "") + project = resolve_project(account, project_id) + kb = resolve_kb(account, request.knowledge_base_id, project["id"]) + assistant = resolve_assistant(account, request.assistant_id, project["id"]) + source_title = _first_non_empty( + request.title, + (source_row or {}).get("title", ""), + request.handle, + source_url, + ) + + if source_row and source_row.get("project_id") and source_row["project_id"] != project["id"]: + raise HTTPException(status_code=400, detail="Content source does not belong to the target project") + + if not source_row: + source_row = create_or_update_source( + account=account, + request=BilibiliContentSourceCreateRequest( + project_id=project["id"], + source_kind="creator_account", + platform="bilibili", + handle=request.handle.strip(), + source_url=source_url, + title=source_title, + local_path="", + metadata={ + "sync_mode": "recent_uploads", + "max_items": request.max_items, + "analysis_model_profile_id": request.analysis_model_profile_id, + }, + ), + sync_request=request, + ) + + job_row = legacy.create_job_record( + account_id=account["id"], + project_id=project["id"], + knowledge_base_id=kb["id"], + source_type="content_source_sync", + line_type="content_source_sync", + workflow_key="content_source_sync_pipeline", + title=f"{source_title} 内容源同步", + language=request.language, + source_url=source_url, + assistant_id=(assistant or {}).get("id"), + content_source_id=source_row["id"], + artifacts={ + "platform": "bilibili", + "source_kind": source_row.get("source_kind", "creator_account"), + "source_title": source_title, + "source_url": source_url, + "max_items": request.max_items, + "skip_existing": request.skip_existing, + "auto_trigger_analysis": request.auto_trigger_analysis, + "analysis_model_profile_id": request.analysis_model_profile_id, + }, + analysis_model_profile_id=request.analysis_model_profile_id, + ) + legacy.update_content_source_metadata( + source_row["id"], + { + "platform": "bilibili", + "last_sync_job_id": job_row["id"], + "last_sync_requested_at": now(), + "max_items": request.max_items, + "analysis_model_profile_id": request.analysis_model_profile_id, + }, + ) + return _build_sync_result(legacy, await legacy.trigger_orchestrated_job(job_row), source_row) + + @app.get("/v2/bilibili/content-sources") + def list_bilibili_content_sources( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + sql, params = _content_source_query(legacy, account["id"], project_id) + return [legacy.content_source_payload(row) for row in legacy.db.fetch_all(sql, params)] + + @app.post("/v2/bilibili/content-sources") + def create_bilibili_content_source( + request: BilibiliContentSourceCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = create_or_update_source(account=account, request=request) + return legacy.content_source_payload(row) + + @app.get("/v2/bilibili/content-sources/{source_id}") + def get_bilibili_content_source( + source_id: str, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = legacy.load_owned_content_source(source_id, account["id"]) + if row.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili content source not found") + return legacy.content_source_payload(row) + + @app.post("/v2/bilibili/content-sources/{source_id}/sync") + async def sync_bilibili_content_source( + source_id: str, + request: BilibiliContentSourceSyncRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = legacy.load_owned_content_source(source_id, account["id"]) + if row.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili content source not found") + return await sync_source(account=account, request=request, content_source=row) + + @app.post("/v2/bilibili/pipelines/content-source-sync") + async def create_bilibili_content_source_sync_job( + request: BilibiliContentSourceSyncRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + return await sync_source(account=account, request=request) + + @app.get("/v2/bilibili/content-sources/{source_id}/jobs") + def list_bilibili_content_source_jobs( + source_id: str, + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + row = legacy.load_owned_content_source(source_id, account["id"]) + if row.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili content source not found") + sql, params = _job_query(source_id=source_id, limit=limit) + rows = legacy.db.fetch_all(sql, (account["id"], *params)) + return [legacy.job_payload(item) for item in rows] + + @app.get("/v2/bilibili/jobs") + def list_bilibili_jobs( + project_id: str | None = Query(default=None), + content_source_id: str | None = Query(default=None), + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + if content_source_id: + row = legacy.load_owned_content_source(content_source_id.strip(), account["id"]) + if row.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili content source not found") + sql, params = _job_query(source_id=content_source_id.strip() if content_source_id else None, project_id=project_id, limit=limit) + rows = legacy.db.fetch_all(sql, (account["id"], *params)) + return [legacy.job_payload(item) for item in rows] + + @app.get("/v2/bilibili/jobs/{job_id}") + def get_bilibili_job( + job_id: str, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = legacy.load_owned_job(job_id, account["id"]) + if row.get("content_source_id"): + source = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (row["content_source_id"], account["id"])) + if not source or source.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili job not found") + return legacy.job_context_payload(row) + + @app.get("/v2/bilibili/reviews") + def list_bilibili_reviews( + project_id: str | None = Query(default=None), + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + sql, params = _review_query(project_id=project_id, limit=limit) + rows = legacy.db.fetch_all(sql, (account["id"], *params)) + return [legacy.review_payload(item) for item in rows] + + @app.get("/v2/bilibili/reviews/{review_id}") + def get_bilibili_review( + review_id: str, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = legacy.load_owned_review(review_id, account["id"]) + if row.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili review not found") + return legacy.review_payload(row) + + @app.post("/v2/bilibili/reviews") + def create_bilibili_review( + request: BilibiliReviewCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + source_job = None + if request.source_job_id.strip(): + source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"]) + if source_job.get("content_source_id"): + source = legacy.db.fetch_one( + "SELECT * FROM content_sources WHERE id = ? AND user_id = ?", + (source_job["content_source_id"], account["id"]), + ) + if not source or source.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili source job not found") + normalized_platform = _resolve_bilibili_platform(legacy, request.platform, source_job.get("source_url", "") if source_job else "") + requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "") + project = resolve_project(account, requested_project_id) + assistant = resolve_assistant(account, request.assistant_id, project["id"]) + review_id = make_id("review") + title = _first_non_empty(request.title, source_job.get("title", "") if source_job else "", f"{project['name']} 复盘") + timestamp = now() + legacy.db.execute( + """ + INSERT INTO publish_reviews ( + id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type, + publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + review_id, + account["id"], + project["id"], + source_job["id"] if source_job else None, + (assistant or {}).get("id") or None, + title, + normalized_platform, + request.content_type.strip() or "video", + request.publish_url.strip(), + request.published_at.strip(), + _safe_json_dumps(request.metrics), + request.verdict.strip(), + request.highlights.strip(), + request.next_actions.strip(), + request.notes.strip(), + timestamp, + timestamp, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return legacy.review_payload(row) + + @app.patch("/v2/bilibili/reviews/{review_id}") + def update_bilibili_review( + review_id: str, + request: BilibiliReviewUpdateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + current = legacy.load_owned_review(review_id, account["id"]) + if current.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili review not found") + assistant_id = current.get("assistant_id") or None + if request.assistant_id is not None: + assistant = resolve_assistant(account, request.assistant_id or "", current.get("project_id", "")) + assistant_id = (assistant or {}).get("id") or None + if request.platform is not None: + _resolve_bilibili_platform(legacy, request.platform, current.get("publish_url", "")) + legacy.db.execute( + """ + UPDATE publish_reviews + SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?, + metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?, + assistant_id = ?, updated_at = ? + WHERE id = ? AND user_id = ? + """, + ( + request.title if request.title is not None else current.get("title", ""), + "bilibili", + request.content_type if request.content_type is not None else current.get("content_type", "video"), + request.publish_url if request.publish_url is not None else current.get("publish_url", ""), + request.published_at if request.published_at is not None else current.get("published_at", ""), + _safe_json_dumps(request.metrics if request.metrics is not None else legacy.parse_json_object(current.get("metrics_json") or "{}")), + request.verdict if request.verdict is not None else current.get("verdict", ""), + request.highlights if request.highlights is not None else current.get("highlights", ""), + request.next_actions if request.next_actions is not None else current.get("next_actions", ""), + request.notes if request.notes is not None else current.get("notes", ""), + assistant_id, + now(), + review_id, + account["id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + return legacy.review_payload(row) + + @app.get("/v2/bilibili/content-sources/{source_id}/reviews") + def list_bilibili_content_source_reviews( + source_id: str, + limit: int = Query(default=50, ge=1, le=200), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> list[dict[str, Any]]: + row = legacy.load_owned_content_source(source_id, account["id"]) + if row.get("platform") != "bilibili": + raise HTTPException(status_code=404, detail="Bilibili content source not found") + rows = legacy.db.fetch_all( + """ + SELECT r.* + FROM publish_reviews r + JOIN jobs j ON j.id = r.source_job_id + WHERE r.user_id = ? AND r.platform = 'bilibili' AND j.content_source_id = ? + ORDER BY COALESCE(NULLIF(r.published_at, ''), r.created_at) DESC, r.created_at DESC + LIMIT ? + """, + (account["id"], source_id, limit), + ) + return [legacy.review_payload(item) for item in rows] + + +__all__ = ["register_bilibili_routes"]