Add bilibili collector routes

This commit is contained in:
kris
2026-03-23 09:06:13 +08:00
parent a695eb04b9
commit a46e4f47b3

View File

@@ -0,0 +1,545 @@
from __future__ import annotations
import json
from typing import Any
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def _first_non_empty(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
elif value not in ("", [], {}, ()):
return str(value)
return ""
class BilibiliContentSourceCreateRequest(BaseModel):
project_id: str = ""
source_kind: str = "creator_account"
platform: str = ""
handle: str = ""
source_url: str = ""
title: str = ""
local_path: str = ""
metadata: dict[str, Any] = Field(default_factory=dict)
class BilibiliContentSourceSyncRequest(BaseModel):
project_id: str = ""
knowledge_base_id: str = ""
assistant_id: str = ""
content_source_id: str = ""
platform: str = ""
handle: str = ""
source_url: str = ""
title: str = ""
analysis_model_profile_id: str = ""
language: str = "auto"
max_items: int = Field(default=5, ge=1, le=20)
skip_existing: bool = True
auto_trigger_analysis: bool = True
class BilibiliReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
platform: str = "bilibili"
content_type: str = "video"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
class BilibiliReviewUpdateRequest(BaseModel):
title: str | None = None
platform: str | None = None
content_type: str | None = None
publish_url: str | None = None
published_at: str | None = None
metrics: dict[str, Any] | None = None
verdict: str | None = None
highlights: str | None = None
next_actions: str | None = None
notes: str | None = None
assistant_id: str | None = None
def _is_youtube_url(source_url: str) -> bool:
lowered = source_url.strip().lower()
return "youtube.com" in lowered or "youtu.be" in lowered
def _resolve_bilibili_platform(legacy: Any, platform: str, source_url: str = "") -> str:
if _is_youtube_url(source_url):
raise HTTPException(status_code=400, detail="YouTube sources are not supported in the bilibili routes")
inferred = legacy.infer_platform_from_url(source_url) if source_url.strip() else ""
normalized = legacy.normalize_platform_slug(platform, allow_blank=True)
if not normalized:
normalized = inferred or "bilibili"
if normalized == "youtube":
raise HTTPException(status_code=400, detail="YouTube sources are not supported in the bilibili routes")
if inferred and inferred not in {"bilibili", "youtube"} and not platform.strip():
raise HTTPException(
status_code=400,
detail=f"Bilibili routes only accept bilibili sources, not {inferred}",
)
if normalized != "bilibili":
raise HTTPException(
status_code=400,
detail=f"Bilibili routes only accept bilibili sources, not {normalized}",
)
return "bilibili"
def _content_source_query(legacy: Any, account_id: str, project_id: str | None = None) -> tuple[str, tuple[Any, ...]]:
clauses = ["user_id = ?", "platform = 'bilibili'"]
params: list[Any] = [account_id]
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("project_id = ?")
params.append(normalized_project)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
sql = f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC"
return sql, tuple(params)
def _job_query(
source_id: str | None = None,
project_id: str | None = None,
limit: int = 50,
) -> tuple[str, tuple[Any, ...]]:
clauses = ["j.user_id = ?", "cs.platform = 'bilibili'"]
params: list[Any] = []
if source_id:
clauses.append("j.content_source_id = ?")
params.append(source_id)
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("j.project_id = ?")
params.append(normalized_project)
else:
clauses.append("(j.project_id IS NULL OR j.project_id = '')")
sql = (
"SELECT j.* "
"FROM jobs j "
"JOIN content_sources cs ON cs.id = j.content_source_id "
f"WHERE {' AND '.join(clauses)} "
"ORDER BY j.created_at DESC "
"LIMIT ?"
)
params = [*params]
return sql, tuple([*params, limit])
def _review_query(project_id: str | None = None, limit: int = 50) -> tuple[str, tuple[Any, ...]]:
clauses = ["r.user_id = ?", "r.platform = 'bilibili'"]
params: list[Any] = []
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("r.project_id = ?")
params.append(normalized_project)
else:
clauses.append("(r.project_id IS NULL OR r.project_id = '')")
sql = (
"SELECT r.* "
"FROM publish_reviews r "
f"WHERE {' AND '.join(clauses)} "
"ORDER BY COALESCE(NULLIF(r.published_at, ''), r.created_at) DESC, r.created_at DESC "
"LIMIT ?"
)
return sql, tuple([*params, limit])
def _build_sync_result(legacy: Any, row: dict[str, Any], content_source: dict[str, Any]) -> dict[str, Any]:
payload = legacy.job_payload(row)
payload["content_source"] = legacy.content_source_payload(content_source)
return payload
def register_bilibili_routes(app: Any, legacy: Any) -> None:
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def resolve_project(account: dict[str, Any], project_id: str) -> dict[str, Any]:
return legacy.resolve_target_project(account["id"], project_id or None, username=account["username"])
def resolve_kb(account: dict[str, Any], kb_id: str, project_id: str) -> dict[str, Any]:
return legacy.resolve_target_kb(account["id"], kb_id or None, project_id, username=account["username"])
def resolve_assistant(account: dict[str, Any], assistant_id: str, project_id: str) -> dict[str, Any] | None:
return legacy.resolve_target_assistant(account["id"], assistant_id or None, project_id)
def create_or_update_source(
*,
account: dict[str, Any],
request: BilibiliContentSourceCreateRequest,
sync_request: BilibiliContentSourceSyncRequest | None = None,
) -> dict[str, Any]:
source_url = _first_non_empty(request.source_url, sync_request.source_url if sync_request else "")
_resolve_bilibili_platform(legacy, request.platform or (sync_request.platform if sync_request else ""), source_url)
project = resolve_project(account, request.project_id or (sync_request.project_id if sync_request else ""))
title = _first_non_empty(request.title, sync_request.title if sync_request else "", request.handle, source_url)
metadata: dict[str, Any] = dict(request.metadata)
metadata.setdefault("platform", "bilibili")
if sync_request:
metadata.update(
{
"sync_mode": "recent_uploads",
"max_items": sync_request.max_items,
"analysis_model_profile_id": sync_request.analysis_model_profile_id,
}
)
return legacy.create_content_source(
account_id=account["id"],
project_id=project["id"],
source_kind=(request.source_kind or "creator_account").strip(),
platform="bilibili",
handle=request.handle.strip(),
source_url=source_url.strip(),
title=title.strip(),
local_path=request.local_path.strip(),
metadata=metadata,
)
async def sync_source(
*,
account: dict[str, Any],
request: BilibiliContentSourceSyncRequest,
content_source: dict[str, Any] | None = None,
) -> dict[str, Any]:
source_row = content_source
if request.content_source_id.strip():
source_row = legacy.load_owned_content_source(request.content_source_id.strip(), account["id"])
source_url = _first_non_empty(
request.source_url,
(source_row or {}).get("source_url", ""),
)
_resolve_bilibili_platform(
legacy,
request.platform or (source_row or {}).get("platform", ""),
source_url,
)
project_id = request.project_id or (source_row or {}).get("project_id", "")
project = resolve_project(account, project_id)
kb = resolve_kb(account, request.knowledge_base_id, project["id"])
assistant = resolve_assistant(account, request.assistant_id, project["id"])
source_title = _first_non_empty(
request.title,
(source_row or {}).get("title", ""),
request.handle,
source_url,
)
if source_row and source_row.get("project_id") and source_row["project_id"] != project["id"]:
raise HTTPException(status_code=400, detail="Content source does not belong to the target project")
if not source_row:
source_row = create_or_update_source(
account=account,
request=BilibiliContentSourceCreateRequest(
project_id=project["id"],
source_kind="creator_account",
platform="bilibili",
handle=request.handle.strip(),
source_url=source_url,
title=source_title,
local_path="",
metadata={
"sync_mode": "recent_uploads",
"max_items": request.max_items,
"analysis_model_profile_id": request.analysis_model_profile_id,
},
),
sync_request=request,
)
job_row = legacy.create_job_record(
account_id=account["id"],
project_id=project["id"],
knowledge_base_id=kb["id"],
source_type="content_source_sync",
line_type="content_source_sync",
workflow_key="content_source_sync_pipeline",
title=f"{source_title} 内容源同步",
language=request.language,
source_url=source_url,
assistant_id=(assistant or {}).get("id"),
content_source_id=source_row["id"],
artifacts={
"platform": "bilibili",
"source_kind": source_row.get("source_kind", "creator_account"),
"source_title": source_title,
"source_url": source_url,
"max_items": request.max_items,
"skip_existing": request.skip_existing,
"auto_trigger_analysis": request.auto_trigger_analysis,
"analysis_model_profile_id": request.analysis_model_profile_id,
},
analysis_model_profile_id=request.analysis_model_profile_id,
)
legacy.update_content_source_metadata(
source_row["id"],
{
"platform": "bilibili",
"last_sync_job_id": job_row["id"],
"last_sync_requested_at": now(),
"max_items": request.max_items,
"analysis_model_profile_id": request.analysis_model_profile_id,
},
)
return _build_sync_result(legacy, await legacy.trigger_orchestrated_job(job_row), source_row)
@app.get("/v2/bilibili/content-sources")
def list_bilibili_content_sources(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
sql, params = _content_source_query(legacy, account["id"], project_id)
return [legacy.content_source_payload(row) for row in legacy.db.fetch_all(sql, params)]
@app.post("/v2/bilibili/content-sources")
def create_bilibili_content_source(
request: BilibiliContentSourceCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = create_or_update_source(account=account, request=request)
return legacy.content_source_payload(row)
@app.get("/v2/bilibili/content-sources/{source_id}")
def get_bilibili_content_source(
source_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = legacy.load_owned_content_source(source_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
return legacy.content_source_payload(row)
@app.post("/v2/bilibili/content-sources/{source_id}/sync")
async def sync_bilibili_content_source(
source_id: str,
request: BilibiliContentSourceSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = legacy.load_owned_content_source(source_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
return await sync_source(account=account, request=request, content_source=row)
@app.post("/v2/bilibili/pipelines/content-source-sync")
async def create_bilibili_content_source_sync_job(
request: BilibiliContentSourceSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
return await sync_source(account=account, request=request)
@app.get("/v2/bilibili/content-sources/{source_id}/jobs")
def list_bilibili_content_source_jobs(
source_id: str,
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
row = legacy.load_owned_content_source(source_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
sql, params = _job_query(source_id=source_id, limit=limit)
rows = legacy.db.fetch_all(sql, (account["id"], *params))
return [legacy.job_payload(item) for item in rows]
@app.get("/v2/bilibili/jobs")
def list_bilibili_jobs(
project_id: str | None = Query(default=None),
content_source_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
if content_source_id:
row = legacy.load_owned_content_source(content_source_id.strip(), account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
sql, params = _job_query(source_id=content_source_id.strip() if content_source_id else None, project_id=project_id, limit=limit)
rows = legacy.db.fetch_all(sql, (account["id"], *params))
return [legacy.job_payload(item) for item in rows]
@app.get("/v2/bilibili/jobs/{job_id}")
def get_bilibili_job(
job_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = legacy.load_owned_job(job_id, account["id"])
if row.get("content_source_id"):
source = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (row["content_source_id"], account["id"]))
if not source or source.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili job not found")
return legacy.job_context_payload(row)
@app.get("/v2/bilibili/reviews")
def list_bilibili_reviews(
project_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
sql, params = _review_query(project_id=project_id, limit=limit)
rows = legacy.db.fetch_all(sql, (account["id"], *params))
return [legacy.review_payload(item) for item in rows]
@app.get("/v2/bilibili/reviews/{review_id}")
def get_bilibili_review(
review_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
row = legacy.load_owned_review(review_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili review not found")
return legacy.review_payload(row)
@app.post("/v2/bilibili/reviews")
def create_bilibili_review(
request: BilibiliReviewCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
source_job = None
if request.source_job_id.strip():
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
if source_job.get("content_source_id"):
source = legacy.db.fetch_one(
"SELECT * FROM content_sources WHERE id = ? AND user_id = ?",
(source_job["content_source_id"], account["id"]),
)
if not source or source.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili source job not found")
normalized_platform = _resolve_bilibili_platform(legacy, request.platform, source_job.get("source_url", "") if source_job else "")
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
project = resolve_project(account, requested_project_id)
assistant = resolve_assistant(account, request.assistant_id, project["id"])
review_id = make_id("review")
title = _first_non_empty(request.title, source_job.get("title", "") if source_job else "", f"{project['name']} 复盘")
timestamp = now()
legacy.db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
normalized_platform,
request.content_type.strip() or "video",
request.publish_url.strip(),
request.published_at.strip(),
_safe_json_dumps(request.metrics),
request.verdict.strip(),
request.highlights.strip(),
request.next_actions.strip(),
request.notes.strip(),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return legacy.review_payload(row)
@app.patch("/v2/bilibili/reviews/{review_id}")
def update_bilibili_review(
review_id: str,
request: BilibiliReviewUpdateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
current = legacy.load_owned_review(review_id, account["id"])
if current.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili review not found")
assistant_id = current.get("assistant_id") or None
if request.assistant_id is not None:
assistant = resolve_assistant(account, request.assistant_id or "", current.get("project_id", ""))
assistant_id = (assistant or {}).get("id") or None
if request.platform is not None:
_resolve_bilibili_platform(legacy, request.platform, current.get("publish_url", ""))
legacy.db.execute(
"""
UPDATE publish_reviews
SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?,
metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?,
assistant_id = ?, updated_at = ?
WHERE id = ? AND user_id = ?
""",
(
request.title if request.title is not None else current.get("title", ""),
"bilibili",
request.content_type if request.content_type is not None else current.get("content_type", "video"),
request.publish_url if request.publish_url is not None else current.get("publish_url", ""),
request.published_at if request.published_at is not None else current.get("published_at", ""),
_safe_json_dumps(request.metrics if request.metrics is not None else legacy.parse_json_object(current.get("metrics_json") or "{}")),
request.verdict if request.verdict is not None else current.get("verdict", ""),
request.highlights if request.highlights is not None else current.get("highlights", ""),
request.next_actions if request.next_actions is not None else current.get("next_actions", ""),
request.notes if request.notes is not None else current.get("notes", ""),
assistant_id,
now(),
review_id,
account["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return legacy.review_payload(row)
@app.get("/v2/bilibili/content-sources/{source_id}/reviews")
def list_bilibili_content_source_reviews(
source_id: str,
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> list[dict[str, Any]]:
row = legacy.load_owned_content_source(source_id, account["id"])
if row.get("platform") != "bilibili":
raise HTTPException(status_code=404, detail="Bilibili content source not found")
rows = legacy.db.fetch_all(
"""
SELECT r.*
FROM publish_reviews r
JOIN jobs j ON j.id = r.source_job_id
WHERE r.user_id = ? AND r.platform = 'bilibili' AND j.content_source_id = ?
ORDER BY COALESCE(NULLIF(r.published_at, ''), r.created_at) DESC, r.created_at DESC
LIMIT ?
""",
(account["id"], source_id, limit),
)
return [legacy.review_payload(item) for item in rows]
__all__ = ["register_bilibili_routes"]