Compare commits
24 Commits
codex/stor
...
codex/live
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c0d83934e | ||
|
|
98b3efa10b | ||
|
|
3b7d4f0d5b | ||
|
|
28ac70cf8f | ||
|
|
52f20c3c3d | ||
|
|
d427b89409 | ||
|
|
a8d503159f | ||
|
|
4ee3af45d7 | ||
|
|
2570c4200b | ||
|
|
96ab6af9ed | ||
|
|
1719047ef5 | ||
|
|
dedf69193d | ||
|
|
a46e4f47b3 | ||
|
|
a695eb04b9 | ||
|
|
9afac1eff7 | ||
|
|
70e4652996 | ||
|
|
caf51bc293 | ||
|
|
5c39ea2728 | ||
|
|
10eae9ad69 | ||
|
|
5a739a414d | ||
|
|
7910019ef9 | ||
|
|
1256b9df75 | ||
|
|
65d5588b57 | ||
|
|
3d0a898faa |
1
collector-service/app/__init__.py
Normal file
1
collector-service/app/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Collector service source overlay for legacy pyc-backed app."""
|
||||
545
collector-service/app/bilibili_features.py
Normal file
545
collector-service/app/bilibili_features.py
Normal file
@@ -0,0 +1,545 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Depends, HTTPException, Query
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
def _safe_json_dumps(value: Any) -> str:
|
||||
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
|
||||
|
||||
|
||||
def _first_non_empty(*values: Any) -> str:
|
||||
for value in values:
|
||||
if value is None:
|
||||
continue
|
||||
if isinstance(value, str):
|
||||
stripped = value.strip()
|
||||
if stripped:
|
||||
return stripped
|
||||
elif value not in ("", [], {}, ()):
|
||||
return str(value)
|
||||
return ""
|
||||
|
||||
|
||||
class BilibiliContentSourceCreateRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
source_kind: str = "creator_account"
|
||||
platform: str = ""
|
||||
handle: str = ""
|
||||
source_url: str = ""
|
||||
title: str = ""
|
||||
local_path: str = ""
|
||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class BilibiliContentSourceSyncRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
knowledge_base_id: str = ""
|
||||
assistant_id: str = ""
|
||||
content_source_id: str = ""
|
||||
platform: str = ""
|
||||
handle: str = ""
|
||||
source_url: str = ""
|
||||
title: str = ""
|
||||
analysis_model_profile_id: str = ""
|
||||
language: str = "auto"
|
||||
max_items: int = Field(default=5, ge=1, le=20)
|
||||
skip_existing: bool = True
|
||||
auto_trigger_analysis: bool = True
|
||||
|
||||
|
||||
class BilibiliReviewCreateRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
source_job_id: str = ""
|
||||
assistant_id: str = ""
|
||||
title: str = ""
|
||||
platform: str = "bilibili"
|
||||
content_type: str = "video"
|
||||
publish_url: str = ""
|
||||
published_at: str = ""
|
||||
metrics: dict[str, Any] = Field(default_factory=dict)
|
||||
verdict: str = ""
|
||||
highlights: str = ""
|
||||
next_actions: str = ""
|
||||
notes: str = ""
|
||||
|
||||
|
||||
class BilibiliReviewUpdateRequest(BaseModel):
|
||||
title: str | None = None
|
||||
platform: str | None = None
|
||||
content_type: str | None = None
|
||||
publish_url: str | None = None
|
||||
published_at: str | None = None
|
||||
metrics: dict[str, Any] | None = None
|
||||
verdict: str | None = None
|
||||
highlights: str | None = None
|
||||
next_actions: str | None = None
|
||||
notes: str | None = None
|
||||
assistant_id: str | None = None
|
||||
|
||||
|
||||
def _is_youtube_url(source_url: str) -> bool:
|
||||
lowered = source_url.strip().lower()
|
||||
return "youtube.com" in lowered or "youtu.be" in lowered
|
||||
|
||||
|
||||
def _resolve_bilibili_platform(legacy: Any, platform: str, source_url: str = "") -> str:
|
||||
if _is_youtube_url(source_url):
|
||||
raise HTTPException(status_code=400, detail="YouTube sources are not supported in the bilibili routes")
|
||||
|
||||
inferred = legacy.infer_platform_from_url(source_url) if source_url.strip() else ""
|
||||
normalized = legacy.normalize_platform_slug(platform, allow_blank=True)
|
||||
if not normalized:
|
||||
normalized = inferred or "bilibili"
|
||||
|
||||
if normalized == "youtube":
|
||||
raise HTTPException(status_code=400, detail="YouTube sources are not supported in the bilibili routes")
|
||||
if inferred and inferred not in {"bilibili", "youtube"} and not platform.strip():
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Bilibili routes only accept bilibili sources, not {inferred}",
|
||||
)
|
||||
if normalized != "bilibili":
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Bilibili routes only accept bilibili sources, not {normalized}",
|
||||
)
|
||||
return "bilibili"
|
||||
|
||||
|
||||
def _content_source_query(legacy: Any, account_id: str, project_id: str | None = None) -> tuple[str, tuple[Any, ...]]:
|
||||
clauses = ["user_id = ?", "platform = 'bilibili'"]
|
||||
params: list[Any] = [account_id]
|
||||
if project_id is not None:
|
||||
normalized_project = project_id.strip()
|
||||
if normalized_project:
|
||||
clauses.append("project_id = ?")
|
||||
params.append(normalized_project)
|
||||
else:
|
||||
clauses.append("(project_id IS NULL OR project_id = '')")
|
||||
sql = f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC"
|
||||
return sql, tuple(params)
|
||||
|
||||
|
||||
def _job_query(
|
||||
source_id: str | None = None,
|
||||
project_id: str | None = None,
|
||||
limit: int = 50,
|
||||
) -> tuple[str, tuple[Any, ...]]:
|
||||
clauses = ["j.user_id = ?", "cs.platform = 'bilibili'"]
|
||||
params: list[Any] = []
|
||||
if source_id:
|
||||
clauses.append("j.content_source_id = ?")
|
||||
params.append(source_id)
|
||||
if project_id is not None:
|
||||
normalized_project = project_id.strip()
|
||||
if normalized_project:
|
||||
clauses.append("j.project_id = ?")
|
||||
params.append(normalized_project)
|
||||
else:
|
||||
clauses.append("(j.project_id IS NULL OR j.project_id = '')")
|
||||
sql = (
|
||||
"SELECT j.* "
|
||||
"FROM jobs j "
|
||||
"JOIN content_sources cs ON cs.id = j.content_source_id "
|
||||
f"WHERE {' AND '.join(clauses)} "
|
||||
"ORDER BY j.created_at DESC "
|
||||
"LIMIT ?"
|
||||
)
|
||||
params = [*params]
|
||||
return sql, tuple([*params, limit])
|
||||
|
||||
|
||||
def _review_query(project_id: str | None = None, limit: int = 50) -> tuple[str, tuple[Any, ...]]:
|
||||
clauses = ["r.user_id = ?", "r.platform = 'bilibili'"]
|
||||
params: list[Any] = []
|
||||
if project_id is not None:
|
||||
normalized_project = project_id.strip()
|
||||
if normalized_project:
|
||||
clauses.append("r.project_id = ?")
|
||||
params.append(normalized_project)
|
||||
else:
|
||||
clauses.append("(r.project_id IS NULL OR r.project_id = '')")
|
||||
sql = (
|
||||
"SELECT r.* "
|
||||
"FROM publish_reviews r "
|
||||
f"WHERE {' AND '.join(clauses)} "
|
||||
"ORDER BY COALESCE(NULLIF(r.published_at, ''), r.created_at) DESC, r.created_at DESC "
|
||||
"LIMIT ?"
|
||||
)
|
||||
return sql, tuple([*params, limit])
|
||||
|
||||
|
||||
def _build_sync_result(legacy: Any, row: dict[str, Any], content_source: dict[str, Any]) -> dict[str, Any]:
|
||||
payload = legacy.job_payload(row)
|
||||
payload["content_source"] = legacy.content_source_payload(content_source)
|
||||
return payload
|
||||
|
||||
|
||||
def register_bilibili_routes(app: Any, legacy: Any) -> None:
|
||||
def now() -> str:
|
||||
return legacy.utc_now()
|
||||
|
||||
def make_id(prefix: str) -> str:
|
||||
return legacy.make_id(prefix)
|
||||
|
||||
def resolve_project(account: dict[str, Any], project_id: str) -> dict[str, Any]:
|
||||
return legacy.resolve_target_project(account["id"], project_id or None, username=account["username"])
|
||||
|
||||
def resolve_kb(account: dict[str, Any], kb_id: str, project_id: str) -> dict[str, Any]:
|
||||
return legacy.resolve_target_kb(account["id"], kb_id or None, project_id, username=account["username"])
|
||||
|
||||
def resolve_assistant(account: dict[str, Any], assistant_id: str, project_id: str) -> dict[str, Any] | None:
|
||||
return legacy.resolve_target_assistant(account["id"], assistant_id or None, project_id)
|
||||
|
||||
def create_or_update_source(
|
||||
*,
|
||||
account: dict[str, Any],
|
||||
request: BilibiliContentSourceCreateRequest,
|
||||
sync_request: BilibiliContentSourceSyncRequest | None = None,
|
||||
) -> dict[str, Any]:
|
||||
source_url = _first_non_empty(request.source_url, sync_request.source_url if sync_request else "")
|
||||
_resolve_bilibili_platform(legacy, request.platform or (sync_request.platform if sync_request else ""), source_url)
|
||||
|
||||
project = resolve_project(account, request.project_id or (sync_request.project_id if sync_request else ""))
|
||||
title = _first_non_empty(request.title, sync_request.title if sync_request else "", request.handle, source_url)
|
||||
metadata: dict[str, Any] = dict(request.metadata)
|
||||
metadata.setdefault("platform", "bilibili")
|
||||
if sync_request:
|
||||
metadata.update(
|
||||
{
|
||||
"sync_mode": "recent_uploads",
|
||||
"max_items": sync_request.max_items,
|
||||
"analysis_model_profile_id": sync_request.analysis_model_profile_id,
|
||||
}
|
||||
)
|
||||
|
||||
return legacy.create_content_source(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
source_kind=(request.source_kind or "creator_account").strip(),
|
||||
platform="bilibili",
|
||||
handle=request.handle.strip(),
|
||||
source_url=source_url.strip(),
|
||||
title=title.strip(),
|
||||
local_path=request.local_path.strip(),
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
async def sync_source(
|
||||
*,
|
||||
account: dict[str, Any],
|
||||
request: BilibiliContentSourceSyncRequest,
|
||||
content_source: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
source_row = content_source
|
||||
if request.content_source_id.strip():
|
||||
source_row = legacy.load_owned_content_source(request.content_source_id.strip(), account["id"])
|
||||
|
||||
source_url = _first_non_empty(
|
||||
request.source_url,
|
||||
(source_row or {}).get("source_url", ""),
|
||||
)
|
||||
_resolve_bilibili_platform(
|
||||
legacy,
|
||||
request.platform or (source_row or {}).get("platform", ""),
|
||||
source_url,
|
||||
)
|
||||
|
||||
project_id = request.project_id or (source_row or {}).get("project_id", "")
|
||||
project = resolve_project(account, project_id)
|
||||
kb = resolve_kb(account, request.knowledge_base_id, project["id"])
|
||||
assistant = resolve_assistant(account, request.assistant_id, project["id"])
|
||||
source_title = _first_non_empty(
|
||||
request.title,
|
||||
(source_row or {}).get("title", ""),
|
||||
request.handle,
|
||||
source_url,
|
||||
)
|
||||
|
||||
if source_row and source_row.get("project_id") and source_row["project_id"] != project["id"]:
|
||||
raise HTTPException(status_code=400, detail="Content source does not belong to the target project")
|
||||
|
||||
if not source_row:
|
||||
source_row = create_or_update_source(
|
||||
account=account,
|
||||
request=BilibiliContentSourceCreateRequest(
|
||||
project_id=project["id"],
|
||||
source_kind="creator_account",
|
||||
platform="bilibili",
|
||||
handle=request.handle.strip(),
|
||||
source_url=source_url,
|
||||
title=source_title,
|
||||
local_path="",
|
||||
metadata={
|
||||
"sync_mode": "recent_uploads",
|
||||
"max_items": request.max_items,
|
||||
"analysis_model_profile_id": request.analysis_model_profile_id,
|
||||
},
|
||||
),
|
||||
sync_request=request,
|
||||
)
|
||||
|
||||
job_row = legacy.create_job_record(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
knowledge_base_id=kb["id"],
|
||||
source_type="content_source_sync",
|
||||
line_type="content_source_sync",
|
||||
workflow_key="content_source_sync_pipeline",
|
||||
title=f"{source_title} 内容源同步",
|
||||
language=request.language,
|
||||
source_url=source_url,
|
||||
assistant_id=(assistant or {}).get("id"),
|
||||
content_source_id=source_row["id"],
|
||||
artifacts={
|
||||
"platform": "bilibili",
|
||||
"source_kind": source_row.get("source_kind", "creator_account"),
|
||||
"source_title": source_title,
|
||||
"source_url": source_url,
|
||||
"max_items": request.max_items,
|
||||
"skip_existing": request.skip_existing,
|
||||
"auto_trigger_analysis": request.auto_trigger_analysis,
|
||||
"analysis_model_profile_id": request.analysis_model_profile_id,
|
||||
},
|
||||
analysis_model_profile_id=request.analysis_model_profile_id,
|
||||
)
|
||||
legacy.update_content_source_metadata(
|
||||
source_row["id"],
|
||||
{
|
||||
"platform": "bilibili",
|
||||
"last_sync_job_id": job_row["id"],
|
||||
"last_sync_requested_at": now(),
|
||||
"max_items": request.max_items,
|
||||
"analysis_model_profile_id": request.analysis_model_profile_id,
|
||||
},
|
||||
)
|
||||
return _build_sync_result(legacy, await legacy.trigger_orchestrated_job(job_row), source_row)
|
||||
|
||||
@app.get("/v2/bilibili/content-sources")
|
||||
def list_bilibili_content_sources(
|
||||
project_id: str | None = Query(default=None),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
sql, params = _content_source_query(legacy, account["id"], project_id)
|
||||
return [legacy.content_source_payload(row) for row in legacy.db.fetch_all(sql, params)]
|
||||
|
||||
@app.post("/v2/bilibili/content-sources")
|
||||
def create_bilibili_content_source(
|
||||
request: BilibiliContentSourceCreateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
row = create_or_update_source(account=account, request=request)
|
||||
return legacy.content_source_payload(row)
|
||||
|
||||
@app.get("/v2/bilibili/content-sources/{source_id}")
|
||||
def get_bilibili_content_source(
|
||||
source_id: str,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
row = legacy.load_owned_content_source(source_id, account["id"])
|
||||
if row.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili content source not found")
|
||||
return legacy.content_source_payload(row)
|
||||
|
||||
@app.post("/v2/bilibili/content-sources/{source_id}/sync")
|
||||
async def sync_bilibili_content_source(
|
||||
source_id: str,
|
||||
request: BilibiliContentSourceSyncRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
row = legacy.load_owned_content_source(source_id, account["id"])
|
||||
if row.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili content source not found")
|
||||
return await sync_source(account=account, request=request, content_source=row)
|
||||
|
||||
@app.post("/v2/bilibili/pipelines/content-source-sync")
|
||||
async def create_bilibili_content_source_sync_job(
|
||||
request: BilibiliContentSourceSyncRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
return await sync_source(account=account, request=request)
|
||||
|
||||
@app.get("/v2/bilibili/content-sources/{source_id}/jobs")
|
||||
def list_bilibili_content_source_jobs(
|
||||
source_id: str,
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
row = legacy.load_owned_content_source(source_id, account["id"])
|
||||
if row.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili content source not found")
|
||||
sql, params = _job_query(source_id=source_id, limit=limit)
|
||||
rows = legacy.db.fetch_all(sql, (account["id"], *params))
|
||||
return [legacy.job_payload(item) for item in rows]
|
||||
|
||||
@app.get("/v2/bilibili/jobs")
|
||||
def list_bilibili_jobs(
|
||||
project_id: str | None = Query(default=None),
|
||||
content_source_id: str | None = Query(default=None),
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
if content_source_id:
|
||||
row = legacy.load_owned_content_source(content_source_id.strip(), account["id"])
|
||||
if row.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili content source not found")
|
||||
sql, params = _job_query(source_id=content_source_id.strip() if content_source_id else None, project_id=project_id, limit=limit)
|
||||
rows = legacy.db.fetch_all(sql, (account["id"], *params))
|
||||
return [legacy.job_payload(item) for item in rows]
|
||||
|
||||
@app.get("/v2/bilibili/jobs/{job_id}")
|
||||
def get_bilibili_job(
|
||||
job_id: str,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
row = legacy.load_owned_job(job_id, account["id"])
|
||||
if row.get("content_source_id"):
|
||||
source = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (row["content_source_id"], account["id"]))
|
||||
if not source or source.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili job not found")
|
||||
return legacy.job_context_payload(row)
|
||||
|
||||
@app.get("/v2/bilibili/reviews")
|
||||
def list_bilibili_reviews(
|
||||
project_id: str | None = Query(default=None),
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
sql, params = _review_query(project_id=project_id, limit=limit)
|
||||
rows = legacy.db.fetch_all(sql, (account["id"], *params))
|
||||
return [legacy.review_payload(item) for item in rows]
|
||||
|
||||
@app.get("/v2/bilibili/reviews/{review_id}")
|
||||
def get_bilibili_review(
|
||||
review_id: str,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
row = legacy.load_owned_review(review_id, account["id"])
|
||||
if row.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili review not found")
|
||||
return legacy.review_payload(row)
|
||||
|
||||
@app.post("/v2/bilibili/reviews")
|
||||
def create_bilibili_review(
|
||||
request: BilibiliReviewCreateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_job = None
|
||||
if request.source_job_id.strip():
|
||||
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
|
||||
if source_job.get("content_source_id"):
|
||||
source = legacy.db.fetch_one(
|
||||
"SELECT * FROM content_sources WHERE id = ? AND user_id = ?",
|
||||
(source_job["content_source_id"], account["id"]),
|
||||
)
|
||||
if not source or source.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili source job not found")
|
||||
normalized_platform = _resolve_bilibili_platform(legacy, request.platform, source_job.get("source_url", "") if source_job else "")
|
||||
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
|
||||
project = resolve_project(account, requested_project_id)
|
||||
assistant = resolve_assistant(account, request.assistant_id, project["id"])
|
||||
review_id = make_id("review")
|
||||
title = _first_non_empty(request.title, source_job.get("title", "") if source_job else "", f"{project['name']} 复盘")
|
||||
timestamp = now()
|
||||
legacy.db.execute(
|
||||
"""
|
||||
INSERT INTO publish_reviews (
|
||||
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
|
||||
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
review_id,
|
||||
account["id"],
|
||||
project["id"],
|
||||
source_job["id"] if source_job else None,
|
||||
(assistant or {}).get("id") or None,
|
||||
title,
|
||||
normalized_platform,
|
||||
request.content_type.strip() or "video",
|
||||
request.publish_url.strip(),
|
||||
request.published_at.strip(),
|
||||
_safe_json_dumps(request.metrics),
|
||||
request.verdict.strip(),
|
||||
request.highlights.strip(),
|
||||
request.next_actions.strip(),
|
||||
request.notes.strip(),
|
||||
timestamp,
|
||||
timestamp,
|
||||
),
|
||||
)
|
||||
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
|
||||
return legacy.review_payload(row)
|
||||
|
||||
@app.patch("/v2/bilibili/reviews/{review_id}")
|
||||
def update_bilibili_review(
|
||||
review_id: str,
|
||||
request: BilibiliReviewUpdateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
current = legacy.load_owned_review(review_id, account["id"])
|
||||
if current.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili review not found")
|
||||
assistant_id = current.get("assistant_id") or None
|
||||
if request.assistant_id is not None:
|
||||
assistant = resolve_assistant(account, request.assistant_id or "", current.get("project_id", ""))
|
||||
assistant_id = (assistant or {}).get("id") or None
|
||||
if request.platform is not None:
|
||||
_resolve_bilibili_platform(legacy, request.platform, current.get("publish_url", ""))
|
||||
legacy.db.execute(
|
||||
"""
|
||||
UPDATE publish_reviews
|
||||
SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?,
|
||||
metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?,
|
||||
assistant_id = ?, updated_at = ?
|
||||
WHERE id = ? AND user_id = ?
|
||||
""",
|
||||
(
|
||||
request.title if request.title is not None else current.get("title", ""),
|
||||
"bilibili",
|
||||
request.content_type if request.content_type is not None else current.get("content_type", "video"),
|
||||
request.publish_url if request.publish_url is not None else current.get("publish_url", ""),
|
||||
request.published_at if request.published_at is not None else current.get("published_at", ""),
|
||||
_safe_json_dumps(request.metrics if request.metrics is not None else legacy.parse_json_object(current.get("metrics_json") or "{}")),
|
||||
request.verdict if request.verdict is not None else current.get("verdict", ""),
|
||||
request.highlights if request.highlights is not None else current.get("highlights", ""),
|
||||
request.next_actions if request.next_actions is not None else current.get("next_actions", ""),
|
||||
request.notes if request.notes is not None else current.get("notes", ""),
|
||||
assistant_id,
|
||||
now(),
|
||||
review_id,
|
||||
account["id"],
|
||||
),
|
||||
)
|
||||
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
|
||||
return legacy.review_payload(row)
|
||||
|
||||
@app.get("/v2/bilibili/content-sources/{source_id}/reviews")
|
||||
def list_bilibili_content_source_reviews(
|
||||
source_id: str,
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
row = legacy.load_owned_content_source(source_id, account["id"])
|
||||
if row.get("platform") != "bilibili":
|
||||
raise HTTPException(status_code=404, detail="Bilibili content source not found")
|
||||
rows = legacy.db.fetch_all(
|
||||
"""
|
||||
SELECT r.*
|
||||
FROM publish_reviews r
|
||||
JOIN jobs j ON j.id = r.source_job_id
|
||||
WHERE r.user_id = ? AND r.platform = 'bilibili' AND j.content_source_id = ?
|
||||
ORDER BY COALESCE(NULLIF(r.published_at, ''), r.created_at) DESC, r.created_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(account["id"], source_id, limit),
|
||||
)
|
||||
return [legacy.review_payload(item) for item in rows]
|
||||
|
||||
|
||||
__all__ = ["register_bilibili_routes"]
|
||||
4810
collector-service/app/core_main.py
Normal file
4810
collector-service/app/core_main.py
Normal file
File diff suppressed because it is too large
Load Diff
394
collector-service/app/database.py
Normal file
394
collector-service/app/database.py
Normal file
@@ -0,0 +1,394 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterator
|
||||
|
||||
|
||||
def utc_now() -> str:
|
||||
from datetime import datetime, timezone
|
||||
|
||||
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
|
||||
|
||||
|
||||
def dict_factory(cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict[str, Any]:
|
||||
return {col[0]: row[idx] for idx, col in enumerate(cursor.description)}
|
||||
|
||||
|
||||
class Database:
|
||||
def __init__(self, path: str) -> None:
|
||||
self.path = Path(path)
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def connect(self) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(self.path)
|
||||
conn.row_factory = dict_factory
|
||||
conn.execute("PRAGMA foreign_keys = ON")
|
||||
return conn
|
||||
|
||||
@contextmanager
|
||||
def session(self) -> Iterator[sqlite3.Connection]:
|
||||
conn = self.connect()
|
||||
try:
|
||||
yield conn
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def fetch_one(self, sql: str, params: tuple[Any, ...] = ()) -> dict[str, Any] | None:
|
||||
with self.session() as conn:
|
||||
return conn.execute(sql, params).fetchone()
|
||||
|
||||
def fetch_all(self, sql: str, params: tuple[Any, ...] = ()) -> list[dict[str, Any]]:
|
||||
with self.session() as conn:
|
||||
return list(conn.execute(sql, params).fetchall())
|
||||
|
||||
def execute(self, sql: str, params: tuple[Any, ...] = ()) -> None:
|
||||
with self.session() as conn:
|
||||
conn.execute(sql, params)
|
||||
|
||||
def table_exists(self, name: str) -> bool:
|
||||
row = self.fetch_one(
|
||||
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?",
|
||||
(name,),
|
||||
)
|
||||
return bool(row)
|
||||
|
||||
def column_exists(self, table: str, column: str) -> bool:
|
||||
with self.session() as conn:
|
||||
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
|
||||
return any(row["name"] == column for row in rows)
|
||||
|
||||
def init_schema(self) -> None:
|
||||
schema = """
|
||||
CREATE TABLE IF NOT EXISTS accounts (
|
||||
id TEXT PRIMARY KEY,
|
||||
username TEXT NOT NULL UNIQUE,
|
||||
password_hash TEXT NOT NULL,
|
||||
password_salt TEXT NOT NULL,
|
||||
display_name TEXT NOT NULL,
|
||||
role TEXT NOT NULL,
|
||||
approval_status TEXT NOT NULL,
|
||||
approved_by TEXT,
|
||||
approved_at TEXT,
|
||||
preferred_analysis_model_id TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS auth_tokens (
|
||||
token TEXT PRIMARY KEY,
|
||||
account_id TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL,
|
||||
FOREIGN KEY(account_id) REFERENCES accounts(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS model_profiles (
|
||||
id TEXT PRIMARY KEY,
|
||||
owner_account_id TEXT,
|
||||
name TEXT NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
base_url TEXT NOT NULL,
|
||||
api_key TEXT NOT NULL DEFAULT '',
|
||||
model_name TEXT NOT NULL,
|
||||
is_system INTEGER NOT NULL DEFAULT 0,
|
||||
is_default INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(owner_account_id) REFERENCES accounts(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS knowledge_bases (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
project_id TEXT,
|
||||
name TEXT NOT NULL,
|
||||
description TEXT NOT NULL DEFAULT '',
|
||||
sync_status TEXT NOT NULL DEFAULT 'ready',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS knowledge_documents (
|
||||
id TEXT PRIMARY KEY,
|
||||
knowledge_base_id TEXT NOT NULL,
|
||||
title TEXT NOT NULL,
|
||||
source_type TEXT NOT NULL,
|
||||
source_url TEXT NOT NULL DEFAULT '',
|
||||
transcript_text TEXT NOT NULL DEFAULT '',
|
||||
style_summary TEXT NOT NULL DEFAULT '',
|
||||
combined_text TEXT NOT NULL DEFAULT '',
|
||||
analysis_json TEXT NOT NULL DEFAULT '{}',
|
||||
storyboard_json TEXT NOT NULL DEFAULT '[]',
|
||||
source_artifact_json TEXT NOT NULL DEFAULT '{}',
|
||||
analysis_model_profile_id TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS assistants (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
project_id TEXT,
|
||||
name TEXT NOT NULL,
|
||||
description TEXT NOT NULL DEFAULT '',
|
||||
system_prompt TEXT NOT NULL DEFAULT '',
|
||||
generation_goal TEXT NOT NULL DEFAULT '',
|
||||
config_json TEXT NOT NULL DEFAULT '{}',
|
||||
model_profile_id TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS assistant_knowledge_bases (
|
||||
assistant_id TEXT NOT NULL,
|
||||
knowledge_base_id TEXT NOT NULL,
|
||||
PRIMARY KEY (assistant_id, knowledge_base_id),
|
||||
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
project_id TEXT,
|
||||
parent_job_id TEXT,
|
||||
assistant_id TEXT,
|
||||
knowledge_base_id TEXT NOT NULL,
|
||||
content_source_id TEXT,
|
||||
source_type TEXT NOT NULL,
|
||||
line_type TEXT NOT NULL DEFAULT 'analysis',
|
||||
workflow_key TEXT NOT NULL DEFAULT '',
|
||||
orchestrator TEXT NOT NULL DEFAULT 'n8n',
|
||||
provider_name TEXT NOT NULL DEFAULT '',
|
||||
provider_task_id TEXT NOT NULL DEFAULT '',
|
||||
source_url TEXT,
|
||||
title TEXT NOT NULL,
|
||||
language TEXT NOT NULL DEFAULT 'auto',
|
||||
status TEXT NOT NULL,
|
||||
transcript_text TEXT NOT NULL DEFAULT '',
|
||||
style_summary TEXT NOT NULL DEFAULT '',
|
||||
upload_status TEXT NOT NULL DEFAULT 'pending',
|
||||
error TEXT NOT NULL DEFAULT '',
|
||||
artifacts_json TEXT NOT NULL DEFAULT '{}',
|
||||
result_json TEXT NOT NULL DEFAULT '{}',
|
||||
analysis_model_profile_id TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL,
|
||||
FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS projects (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
description TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS content_sources (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
project_id TEXT,
|
||||
source_kind TEXT NOT NULL,
|
||||
platform TEXT NOT NULL DEFAULT '',
|
||||
handle TEXT NOT NULL DEFAULT '',
|
||||
source_url TEXT NOT NULL DEFAULT '',
|
||||
title TEXT NOT NULL DEFAULT '',
|
||||
local_path TEXT NOT NULL DEFAULT '',
|
||||
metadata_json TEXT NOT NULL DEFAULT '{}',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS publish_reviews (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
project_id TEXT,
|
||||
source_job_id TEXT,
|
||||
assistant_id TEXT,
|
||||
title TEXT NOT NULL,
|
||||
platform TEXT NOT NULL DEFAULT 'douyin',
|
||||
content_type TEXT NOT NULL DEFAULT 'video',
|
||||
publish_url TEXT NOT NULL DEFAULT '',
|
||||
published_at TEXT NOT NULL DEFAULT '',
|
||||
metrics_json TEXT NOT NULL DEFAULT '{}',
|
||||
verdict TEXT NOT NULL DEFAULT '',
|
||||
highlights TEXT NOT NULL DEFAULT '',
|
||||
next_actions TEXT NOT NULL DEFAULT '',
|
||||
notes TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
|
||||
FOREIGN KEY(source_job_id) REFERENCES jobs(id) ON DELETE SET NULL,
|
||||
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS live_recorder_sources (
|
||||
id TEXT PRIMARY KEY,
|
||||
platform TEXT NOT NULL DEFAULT '',
|
||||
source_url TEXT NOT NULL,
|
||||
remote_name TEXT NOT NULL UNIQUE,
|
||||
title TEXT NOT NULL DEFAULT '',
|
||||
metadata_json TEXT NOT NULL DEFAULT '{}',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
UNIQUE(platform, source_url)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS live_recorder_bindings (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
project_id TEXT,
|
||||
assistant_id TEXT,
|
||||
source_id TEXT NOT NULL,
|
||||
title TEXT NOT NULL DEFAULT '',
|
||||
quality TEXT NOT NULL DEFAULT '原画',
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
UNIQUE(user_id, source_id),
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
|
||||
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL,
|
||||
FOREIGN KEY(source_id) REFERENCES live_recorder_sources(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS job_events (
|
||||
id TEXT PRIMARY KEY,
|
||||
job_id TEXT NOT NULL,
|
||||
event_type TEXT NOT NULL,
|
||||
payload_json TEXT NOT NULL DEFAULT '{}',
|
||||
created_at TEXT NOT NULL,
|
||||
FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS app_updates (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
platform TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
version_code INTEGER NOT NULL,
|
||||
version_name TEXT NOT NULL,
|
||||
min_supported_code INTEGER NOT NULL,
|
||||
apk_url TEXT NOT NULL,
|
||||
apk_sha256 TEXT NOT NULL DEFAULT '',
|
||||
notes TEXT NOT NULL DEFAULT '',
|
||||
force_update INTEGER NOT NULL DEFAULT 0,
|
||||
is_active INTEGER NOT NULL DEFAULT 1,
|
||||
published_at INTEGER NOT NULL,
|
||||
created_by TEXT NOT NULL
|
||||
);
|
||||
"""
|
||||
with self.session() as conn:
|
||||
conn.executescript(schema)
|
||||
self.migrate_schema()
|
||||
|
||||
def migrate_schema(self) -> None:
|
||||
table_columns: dict[str, dict[str, str]] = {
|
||||
"knowledge_bases": {
|
||||
"project_id": "TEXT",
|
||||
},
|
||||
"knowledge_documents": {
|
||||
"analysis_json": "TEXT NOT NULL DEFAULT '{}'",
|
||||
"storyboard_json": "TEXT NOT NULL DEFAULT '[]'",
|
||||
"source_artifact_json": "TEXT NOT NULL DEFAULT '{}'",
|
||||
},
|
||||
"assistants": {
|
||||
"project_id": "TEXT",
|
||||
"config_json": "TEXT NOT NULL DEFAULT '{}'",
|
||||
},
|
||||
"jobs": {
|
||||
"project_id": "TEXT",
|
||||
"parent_job_id": "TEXT",
|
||||
"content_source_id": "TEXT",
|
||||
"line_type": "TEXT NOT NULL DEFAULT 'analysis'",
|
||||
"workflow_key": "TEXT NOT NULL DEFAULT ''",
|
||||
"orchestrator": "TEXT NOT NULL DEFAULT 'n8n'",
|
||||
"provider_name": "TEXT NOT NULL DEFAULT ''",
|
||||
"provider_task_id": "TEXT NOT NULL DEFAULT ''",
|
||||
"result_json": "TEXT NOT NULL DEFAULT '{}'",
|
||||
},
|
||||
}
|
||||
|
||||
for table, columns in table_columns.items():
|
||||
if not self.table_exists(table):
|
||||
continue
|
||||
for column, definition in columns.items():
|
||||
if self.column_exists(table, column):
|
||||
continue
|
||||
self.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}")
|
||||
|
||||
self.ensure_default_projects()
|
||||
|
||||
def ensure_default_projects(self) -> None:
|
||||
if not self.table_exists("projects"):
|
||||
return
|
||||
|
||||
accounts = self.fetch_all("SELECT id, username FROM accounts ORDER BY created_at ASC")
|
||||
for account in accounts:
|
||||
project = self.fetch_one(
|
||||
"SELECT * FROM projects WHERE user_id = ? ORDER BY created_at ASC LIMIT 1",
|
||||
(account["id"],),
|
||||
)
|
||||
if not project:
|
||||
project_id = f"proj_{account['id']}"
|
||||
now = utc_now()
|
||||
self.execute(
|
||||
"""
|
||||
INSERT INTO projects (id, user_id, name, description, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
project_id,
|
||||
account["id"],
|
||||
f"{account['username']} 默认项目",
|
||||
"系统自动创建的默认项目",
|
||||
now,
|
||||
now,
|
||||
),
|
||||
)
|
||||
project = self.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,))
|
||||
|
||||
if not project:
|
||||
continue
|
||||
|
||||
if self.column_exists("knowledge_bases", "project_id"):
|
||||
self.execute(
|
||||
"""
|
||||
UPDATE knowledge_bases
|
||||
SET project_id = ?
|
||||
WHERE user_id = ? AND (project_id IS NULL OR project_id = '')
|
||||
""",
|
||||
(project["id"], account["id"]),
|
||||
)
|
||||
|
||||
if self.column_exists("assistants", "project_id"):
|
||||
self.execute(
|
||||
"""
|
||||
UPDATE assistants
|
||||
SET project_id = ?
|
||||
WHERE user_id = ? AND (project_id IS NULL OR project_id = '')
|
||||
""",
|
||||
(project["id"], account["id"]),
|
||||
)
|
||||
|
||||
if self.column_exists("jobs", "project_id"):
|
||||
self.execute(
|
||||
"""
|
||||
UPDATE jobs
|
||||
SET project_id = ?
|
||||
WHERE user_id = ? AND (project_id IS NULL OR project_id = '')
|
||||
""",
|
||||
(project["id"], account["id"]),
|
||||
)
|
||||
900
collector-service/app/domestic_platform_features.py
Normal file
900
collector-service/app/domestic_platform_features.py
Normal file
@@ -0,0 +1,900 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Body, Depends, HTTPException, Query
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class PlatformAnalysisRequest(BaseModel):
|
||||
model_profile_ids: list[str] = Field(default_factory=list)
|
||||
linked_account_ids: list[str] = Field(default_factory=list)
|
||||
include_linked_accounts: bool = True
|
||||
include_recent_similar_candidates: bool = True
|
||||
max_videos: int = Field(default=6, ge=1, le=20)
|
||||
extra_focus: str = ""
|
||||
temperature: float = 0.35
|
||||
auto_analyze_top_videos: bool = False
|
||||
top_video_analysis_count: int = Field(default=4, ge=1, le=10)
|
||||
|
||||
|
||||
class PlatformTopVideoAnalysisRequest(BaseModel):
|
||||
model_profile_id: str = ""
|
||||
top_video_count: int = Field(default=5, ge=1, le=12)
|
||||
min_score: float = 0
|
||||
temperature: float = 0.25
|
||||
|
||||
|
||||
class PlatformSimilaritySearchRequest(BaseModel):
|
||||
source_account_id: str = ""
|
||||
candidate_urls: list[str] = Field(default_factory=list)
|
||||
seed_linked_accounts: bool = True
|
||||
search_public_pages: bool = True
|
||||
model_profile_id: str = ""
|
||||
max_candidates: int = Field(default=8, ge=1, le=20)
|
||||
extra_requirements: str = ""
|
||||
|
||||
|
||||
class PlatformBenchmarkLinksRequest(BaseModel):
|
||||
target_account_ids: list[str] = Field(default_factory=list)
|
||||
target_profile_urls: list[str] = Field(default_factory=list)
|
||||
relation_type: str = "benchmark"
|
||||
note: str = ""
|
||||
search_id: str = ""
|
||||
|
||||
|
||||
class PlatformTrackingAccountRequest(BaseModel):
|
||||
tracked_account_id: str
|
||||
assistant_id: str = ""
|
||||
note: str = ""
|
||||
|
||||
|
||||
class PlatformTrackingCursorRequest(BaseModel):
|
||||
last_seen_at: str
|
||||
|
||||
|
||||
def register_domestic_platform_routes(app: Any, legacy: Any, *, platform: str, label: str) -> None:
|
||||
table_prefix = platform
|
||||
|
||||
def now() -> str:
|
||||
return legacy.utc_now()
|
||||
|
||||
def make_id(prefix: str) -> str:
|
||||
return legacy.make_id(prefix)
|
||||
|
||||
def _safe_json_dumps(value: Any) -> str:
|
||||
return json.dumps(value or {}, ensure_ascii=False)
|
||||
|
||||
def _parse_json(raw: str, fallback: Any) -> Any:
|
||||
cleaned = str(raw or "").strip()
|
||||
if not cleaned:
|
||||
return fallback
|
||||
try:
|
||||
value = json.loads(cleaned)
|
||||
return value
|
||||
except json.JSONDecodeError:
|
||||
return fallback
|
||||
|
||||
def ensure_schema() -> None:
|
||||
schema = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_reports (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
account_source_id TEXT NOT NULL,
|
||||
focus_text TEXT NOT NULL DEFAULT '',
|
||||
prompt_text TEXT NOT NULL DEFAULT '',
|
||||
context_json TEXT NOT NULL DEFAULT '{{}}',
|
||||
created_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(account_source_id) REFERENCES content_sources(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_analysis_reports_account_created
|
||||
ON {table_prefix}_analysis_reports(account_source_id, created_at DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {table_prefix}_analysis_suggestions (
|
||||
id TEXT PRIMARY KEY,
|
||||
report_id TEXT NOT NULL,
|
||||
model_profile_id TEXT NOT NULL DEFAULT '',
|
||||
model_label TEXT NOT NULL DEFAULT '',
|
||||
status TEXT NOT NULL DEFAULT 'ok',
|
||||
suggestion_text TEXT NOT NULL DEFAULT '',
|
||||
parsed_json TEXT NOT NULL DEFAULT '{{}}',
|
||||
created_at TEXT NOT NULL,
|
||||
FOREIGN KEY(report_id) REFERENCES {table_prefix}_analysis_reports(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_searches (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
source_account_id TEXT NOT NULL,
|
||||
prompt_text TEXT NOT NULL DEFAULT '',
|
||||
context_json TEXT NOT NULL DEFAULT '{{}}',
|
||||
created_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {table_prefix}_similarity_candidates (
|
||||
id TEXT PRIMARY KEY,
|
||||
search_id TEXT NOT NULL,
|
||||
candidate_account_id TEXT,
|
||||
candidate_profile_url TEXT NOT NULL DEFAULT '',
|
||||
heuristic_score REAL NOT NULL DEFAULT 0,
|
||||
agent_score REAL NOT NULL DEFAULT 0,
|
||||
rationale_text TEXT NOT NULL DEFAULT '',
|
||||
dimensions_json TEXT NOT NULL DEFAULT '{{}}',
|
||||
raw_output_json TEXT NOT NULL DEFAULT '{{}}',
|
||||
rank_index INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL,
|
||||
FOREIGN KEY(search_id) REFERENCES {table_prefix}_similarity_searches(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(candidate_account_id) REFERENCES content_sources(id) ON DELETE SET NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_similarity_candidates_search_rank
|
||||
ON {table_prefix}_similarity_candidates(search_id, rank_index ASC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {table_prefix}_account_relations (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
source_account_id TEXT NOT NULL,
|
||||
target_account_id TEXT,
|
||||
target_profile_url TEXT NOT NULL DEFAULT '',
|
||||
relation_type TEXT NOT NULL DEFAULT 'benchmark',
|
||||
note TEXT NOT NULL DEFAULT '',
|
||||
search_id TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(source_account_id) REFERENCES content_sources(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(target_account_id) REFERENCES content_sources(id) ON DELETE SET NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_account_relations_source
|
||||
ON {table_prefix}_account_relations(source_account_id, created_at DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {table_prefix}_tracked_accounts (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id TEXT NOT NULL,
|
||||
tracked_account_id TEXT NOT NULL,
|
||||
assistant_id TEXT,
|
||||
note TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
UNIQUE(user_id, tracked_account_id),
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(tracked_account_id) REFERENCES content_sources(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_{table_prefix}_tracked_accounts_user_updated
|
||||
ON {table_prefix}_tracked_accounts(user_id, updated_at DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS {table_prefix}_tracking_cursors (
|
||||
user_id TEXT PRIMARY KEY,
|
||||
last_seen_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
|
||||
);
|
||||
"""
|
||||
with legacy.db.session() as conn:
|
||||
conn.executescript(schema)
|
||||
|
||||
ensure_schema()
|
||||
|
||||
@app.on_event("startup")
|
||||
def _startup_platform_schema() -> None:
|
||||
ensure_schema()
|
||||
|
||||
def _content_source_rows(user_id: str, platform_value: str, kind: str = "") -> list[dict[str, Any]]:
|
||||
rows = legacy.db.fetch_all(
|
||||
"SELECT * FROM content_sources WHERE user_id = ? AND platform = ? ORDER BY updated_at DESC, created_at DESC",
|
||||
(user_id, platform_value),
|
||||
)
|
||||
if kind:
|
||||
rows = [row for row in rows if row.get("source_kind") == kind]
|
||||
return rows
|
||||
|
||||
def _content_source_payload(row: dict[str, Any]) -> dict[str, Any]:
|
||||
return legacy.content_source_payload(row)
|
||||
|
||||
def _source_metadata(row: dict[str, Any]) -> dict[str, Any]:
|
||||
return _content_source_payload(row).get("metadata", {})
|
||||
|
||||
def _require_account(account_id: str, user_id: str) -> dict[str, Any]:
|
||||
row = legacy.db.fetch_one(
|
||||
"SELECT * FROM content_sources WHERE id = ? AND user_id = ? AND source_kind = 'creator_account' AND platform = ?",
|
||||
(account_id, user_id, platform),
|
||||
)
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail=f"{label} account not found")
|
||||
return row
|
||||
|
||||
def _linked_video_sources(account_row: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
project_id = account_row.get("project_id", "")
|
||||
rows = legacy.db.fetch_all(
|
||||
"SELECT * FROM content_sources WHERE user_id = ? AND project_id = ? AND source_kind = 'video_link' AND platform = ? ORDER BY updated_at DESC, created_at DESC",
|
||||
(account_row["user_id"], project_id, platform),
|
||||
)
|
||||
account_id = account_row["id"]
|
||||
source_url = str(account_row.get("source_url") or "").strip()
|
||||
linked: list[dict[str, Any]] = []
|
||||
for row in rows:
|
||||
metadata = _source_metadata(row)
|
||||
if metadata.get("origin_content_source_id") == account_id or metadata.get("source_account_url") == source_url:
|
||||
linked.append(row)
|
||||
return linked
|
||||
|
||||
def _jobs_for_source(source_id: str) -> list[dict[str, Any]]:
|
||||
return legacy.db.fetch_all(
|
||||
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC",
|
||||
(source_id,),
|
||||
)
|
||||
|
||||
def _latest_job_for_source(source_id: str) -> dict[str, Any] | None:
|
||||
return legacy.db.fetch_one(
|
||||
"SELECT * FROM jobs WHERE content_source_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||
(source_id,),
|
||||
)
|
||||
|
||||
def _extract_performance_score(job_row: dict[str, Any] | None) -> float:
|
||||
if not job_row:
|
||||
return 0.0
|
||||
result_map = _parse_json(job_row.get("result_json") or "{}", {})
|
||||
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
|
||||
candidates = [
|
||||
result_map.get("performance_score"),
|
||||
(result_map.get("analysis") or {}).get("performance_score"),
|
||||
(result_map.get("scores") or {}).get("performance_score"),
|
||||
artifacts_map.get("performance_score"),
|
||||
(artifacts_map.get("scores") or {}).get("performance_score"),
|
||||
]
|
||||
for value in candidates:
|
||||
try:
|
||||
return float(value)
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
return 0.0
|
||||
|
||||
def _extract_metrics(job_row: dict[str, Any] | None) -> dict[str, Any]:
|
||||
if not job_row:
|
||||
return {}
|
||||
result_map = _parse_json(job_row.get("result_json") or "{}", {})
|
||||
artifacts_map = _parse_json(job_row.get("artifacts_json") or "{}", {})
|
||||
return (
|
||||
result_map.get("metrics")
|
||||
or artifacts_map.get("metrics")
|
||||
or result_map.get("stats")
|
||||
or artifacts_map.get("stats")
|
||||
or {}
|
||||
)
|
||||
|
||||
def _video_payload(source_row: dict[str, Any]) -> dict[str, Any]:
|
||||
payload = _content_source_payload(source_row)
|
||||
metadata = payload.get("metadata", {})
|
||||
latest_job = _latest_job_for_source(source_row["id"])
|
||||
metrics = _extract_metrics(latest_job)
|
||||
tags = metadata.get("tags") or []
|
||||
if not isinstance(tags, list):
|
||||
tags = []
|
||||
return {
|
||||
"id": source_row["id"],
|
||||
"aweme_id": str(metadata.get("external_id") or source_row["id"]),
|
||||
"title": payload.get("title") or "未命名作品",
|
||||
"description": metadata.get("summary") or metadata.get("description") or (latest_job or {}).get("style_summary", ""),
|
||||
"share_url": payload.get("source_url", ""),
|
||||
"cover_url": metadata.get("cover_url") or "",
|
||||
"duration_sec": float(metadata.get("duration_sec") or 0),
|
||||
"published_at": metadata.get("published_at") or source_row.get("created_at"),
|
||||
"tags": tags,
|
||||
"content_type": metadata.get("content_type") or "video",
|
||||
"stats": {
|
||||
"play": metrics.get("play_count") or metrics.get("play") or 0,
|
||||
"like": metrics.get("like_count") or metrics.get("like") or 0,
|
||||
"comment": metrics.get("comment_count") or metrics.get("comment") or 0,
|
||||
"share": metrics.get("share_count") or metrics.get("share") or 0,
|
||||
},
|
||||
"score": {
|
||||
"performance_score": _extract_performance_score(latest_job),
|
||||
},
|
||||
"source": payload,
|
||||
"latest_job_id": (latest_job or {}).get("id", ""),
|
||||
}
|
||||
|
||||
def _account_payload(account_row: dict[str, Any]) -> dict[str, Any]:
|
||||
payload = _content_source_payload(account_row)
|
||||
metadata = payload.get("metadata", {})
|
||||
videos = [_video_payload(item) for item in _linked_video_sources(account_row)]
|
||||
play_values = [float(video["stats"].get("play") or 0) for video in videos if float(video["stats"].get("play") or 0) > 0]
|
||||
like_values = [float(video["stats"].get("like") or 0) for video in videos if float(video["stats"].get("like") or 0) > 0]
|
||||
tags = metadata.get("tags") or []
|
||||
if not isinstance(tags, list):
|
||||
tags = []
|
||||
return {
|
||||
"id": account_row["id"],
|
||||
"platform": platform,
|
||||
"profile_url": payload.get("source_url", ""),
|
||||
"canonical_profile_url": payload.get("source_url", ""),
|
||||
"handle": payload.get("handle", ""),
|
||||
"nickname": payload.get("title") or payload.get("handle") or "未命名账号",
|
||||
"signature": metadata.get("bio") or metadata.get("description") or "",
|
||||
"avatar_url": metadata.get("avatar_url") or "",
|
||||
"tags": tags,
|
||||
"keywords": metadata.get("keywords") or [],
|
||||
"sync_status": "ready" if payload.get("metadata", {}).get("last_sync_error", "") == "" else "partial",
|
||||
"video_summary": {
|
||||
"count": len(videos),
|
||||
"avg_play": sum(play_values) / len(play_values) if play_values else 0,
|
||||
"avg_like": sum(like_values) / len(like_values) if like_values else 0,
|
||||
"videos": videos[:8],
|
||||
},
|
||||
"project_id": payload.get("project_id", ""),
|
||||
"created_at": payload.get("created_at", ""),
|
||||
"updated_at": payload.get("updated_at", ""),
|
||||
}
|
||||
|
||||
def _relation_payload(row: dict[str, Any]) -> dict[str, Any]:
|
||||
target = None
|
||||
if row.get("target_account_id"):
|
||||
target = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["target_account_id"],))
|
||||
return {
|
||||
"id": row["id"],
|
||||
"source_account_id": row["source_account_id"],
|
||||
"target_account_id": row.get("target_account_id", "") or "",
|
||||
"target_profile_url": row.get("target_profile_url", ""),
|
||||
"target_nickname": (_account_payload(target)["nickname"] if target else ""),
|
||||
"relation_type": row.get("relation_type", "benchmark"),
|
||||
"note": row.get("note", ""),
|
||||
"search_id": row.get("search_id", ""),
|
||||
"created_at": row["created_at"],
|
||||
}
|
||||
|
||||
def _report_payload(row: dict[str, Any]) -> dict[str, Any]:
|
||||
suggestions = [
|
||||
{
|
||||
"id": suggestion["id"],
|
||||
"status": suggestion.get("status", "ok"),
|
||||
"model_profile_id": suggestion.get("model_profile_id", ""),
|
||||
"model_label": suggestion.get("model_label", ""),
|
||||
"suggestion_text": suggestion.get("suggestion_text", ""),
|
||||
"parsed_json": _parse_json(suggestion.get("parsed_json") or "{}", {}),
|
||||
"created_at": suggestion.get("created_at", ""),
|
||||
}
|
||||
for suggestion in legacy.db.fetch_all(
|
||||
f"SELECT * FROM {table_prefix}_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
|
||||
(row["id"],),
|
||||
)
|
||||
]
|
||||
return {
|
||||
"id": row["id"],
|
||||
"focus_text": row.get("focus_text", ""),
|
||||
"suggestions": suggestions,
|
||||
"created_at": row["created_at"],
|
||||
}
|
||||
|
||||
def _workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
|
||||
reports = legacy.db.fetch_all(
|
||||
f"SELECT * FROM {table_prefix}_analysis_reports WHERE account_source_id = ? ORDER BY created_at DESC LIMIT 6",
|
||||
(account_row["id"],),
|
||||
)
|
||||
relations = legacy.db.fetch_all(
|
||||
f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC",
|
||||
(account_row["id"],),
|
||||
)
|
||||
return {
|
||||
"account": _account_payload(account_row),
|
||||
"recent_reports": [_report_payload(row) for row in reports],
|
||||
"linked_accounts": [_relation_payload(row) for row in relations],
|
||||
}
|
||||
|
||||
async def _call_reasoning_model(user_id: str, prompt: str, *, system_prompt: str, model_profile_id: str = "", temperature: float = 0.3) -> tuple[str, dict[str, Any]]:
|
||||
profile = legacy.model_profile_for_account(user_id, model_profile_id or None)
|
||||
output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=temperature)
|
||||
parsed = legacy.parse_json_object(output)
|
||||
return output, parsed if isinstance(parsed, dict) else {}
|
||||
|
||||
async def _create_sync_job_for_account(account_row: dict[str, Any], assistant_id: str = "") -> dict[str, Any]:
|
||||
project_id = account_row.get("project_id") or ""
|
||||
if not project_id:
|
||||
raise HTTPException(status_code=400, detail="Account source is not attached to a project")
|
||||
kb = legacy.resolve_target_kb(account_row["user_id"], None, project_id)
|
||||
source_payload = _content_source_payload(account_row)
|
||||
profile = legacy.model_profile_for_account(account_row["user_id"], None)
|
||||
job_row = legacy.create_job_record(
|
||||
account_id=account_row["user_id"],
|
||||
project_id=project_id,
|
||||
knowledge_base_id=kb["id"],
|
||||
content_source_id=account_row["id"],
|
||||
assistant_id=assistant_id or None,
|
||||
source_type="creator_account",
|
||||
line_type="analysis",
|
||||
workflow_key="content_source_sync_pipeline",
|
||||
title=f"{source_payload.get('title') or source_payload.get('handle') or label} 内容同步",
|
||||
language="auto",
|
||||
source_url=source_payload.get("source_url", ""),
|
||||
artifacts={
|
||||
"source_account_url": source_payload.get("source_url", ""),
|
||||
"platform": platform,
|
||||
"handle": source_payload.get("handle", ""),
|
||||
"max_items": int(source_payload.get("metadata", {}).get("max_items") or 5),
|
||||
"skip_existing": True,
|
||||
"auto_trigger_analysis": True,
|
||||
},
|
||||
analysis_model_profile_id=profile["id"],
|
||||
)
|
||||
queued = await legacy.trigger_orchestrated_job(job_row)
|
||||
return legacy.job_payload(queued)
|
||||
|
||||
def _tracking_cursor(user_id: str) -> dict[str, Any] | None:
|
||||
return legacy.db.fetch_one(
|
||||
f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?",
|
||||
(user_id,),
|
||||
)
|
||||
|
||||
def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]:
|
||||
existing = _tracking_cursor(user_id)
|
||||
updated_at = now()
|
||||
if existing:
|
||||
legacy.db.execute(
|
||||
f"UPDATE {table_prefix}_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?",
|
||||
(last_seen_at, updated_at, user_id),
|
||||
)
|
||||
else:
|
||||
legacy.db.execute(
|
||||
f"INSERT INTO {table_prefix}_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)",
|
||||
(user_id, last_seen_at, updated_at),
|
||||
)
|
||||
return legacy.db.fetch_one(
|
||||
f"SELECT * FROM {table_prefix}_tracking_cursors WHERE user_id = ?",
|
||||
(user_id,),
|
||||
)
|
||||
|
||||
def _tracking_digest_item(tracked_row: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
|
||||
latest_job = _latest_job_for_source(video["id"])
|
||||
summary = (latest_job or {}).get("style_summary") or video.get("description") or "已发现更新内容"
|
||||
assistant = None
|
||||
if tracked_row.get("assistant_id"):
|
||||
assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (tracked_row["assistant_id"],))
|
||||
if assistant_row:
|
||||
assistant = legacy.assistant_payload(assistant_row)
|
||||
borrowing_points = [point for point in [summary[:36], video.get("title", "")[:36]] if point]
|
||||
return {
|
||||
"tracking_id": tracked_row["id"],
|
||||
"tracked_account_id": tracked_row["tracked_account_id"],
|
||||
"tracked_account_name": _account_payload(_require_account(tracked_row["tracked_account_id"], tracked_row["user_id"]))["nickname"],
|
||||
"assistant_id": tracked_row.get("assistant_id", "") or "",
|
||||
"assistant_name": (assistant or {}).get("name", ""),
|
||||
"note": tracked_row.get("note", ""),
|
||||
"video": video,
|
||||
"summary_text": summary,
|
||||
"borrowing_points": borrowing_points[:3],
|
||||
"created_at": video.get("published_at") or now(),
|
||||
}
|
||||
|
||||
def _tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]:
|
||||
tracked_rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
|
||||
(user_id,),
|
||||
)
|
||||
cursor = _tracking_cursor(user_id)
|
||||
threshold = (since_value or (cursor or {}).get("last_seen_at") or "").strip()
|
||||
items: list[dict[str, Any]] = []
|
||||
for tracked in tracked_rows:
|
||||
account_row = _require_account(tracked["tracked_account_id"], user_id)
|
||||
for video in _account_payload(account_row)["video_summary"]["videos"]:
|
||||
published_at = str(video.get("published_at") or "")
|
||||
if threshold and published_at and published_at <= threshold:
|
||||
continue
|
||||
items.append(_tracking_digest_item(tracked, video))
|
||||
items.sort(key=lambda item: item.get("created_at", ""), reverse=True)
|
||||
return {
|
||||
"items": items[:limit],
|
||||
"tracked_accounts": [
|
||||
{
|
||||
"id": row["id"],
|
||||
"tracked_account_id": row["tracked_account_id"],
|
||||
"assistant_id": row.get("assistant_id", "") or "",
|
||||
"note": row.get("note", ""),
|
||||
"updated_at": row["updated_at"],
|
||||
}
|
||||
for row in tracked_rows
|
||||
],
|
||||
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
|
||||
}
|
||||
|
||||
@app.get(f"/v2/{platform}/accounts")
|
||||
def list_platform_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
|
||||
return [_account_payload(row) for row in _content_source_rows(account["id"], platform, "creator_account")]
|
||||
|
||||
@app.get(f"/v2/{platform}/accounts/{{account_id}}/workspace")
|
||||
def get_platform_account_workspace(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
|
||||
account_row = _require_account(account_id, account["id"])
|
||||
return _workspace_payload(account_row)
|
||||
|
||||
@app.get(f"/v2/{platform}/accounts/{{account_id}}/videos")
|
||||
def list_platform_account_videos(
|
||||
account_id: str,
|
||||
limit: int = Query(default=80, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
account_row = _require_account(account_id, account["id"])
|
||||
items = [_video_payload(row) for row in _linked_video_sources(account_row)]
|
||||
items.sort(key=lambda item: (item["score"]["performance_score"], item.get("published_at") or ""), reverse=True)
|
||||
top_ids = [item["id"] for item in items if float(item["score"]["performance_score"] or 0) >= 60][:12]
|
||||
latest_ids = [item["id"] for item in sorted(items, key=lambda item: item.get("published_at") or "", reverse=True)[:12]]
|
||||
return {
|
||||
"items": items[:limit],
|
||||
"count": len(items),
|
||||
"meta": {"platform": platform, "account_id": account_id},
|
||||
"top_scored_video_ids": top_ids,
|
||||
"latest_video_ids": latest_ids,
|
||||
"high_score_threshold": 60,
|
||||
}
|
||||
|
||||
@app.post(f"/v2/{platform}/accounts/{{account_id}}/analysis")
|
||||
async def analyze_platform_account(
|
||||
account_id: str,
|
||||
request: PlatformAnalysisRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
account_row = _require_account(account_id, account["id"])
|
||||
workspace = _workspace_payload(account_row)
|
||||
context = {
|
||||
"account": workspace["account"],
|
||||
"top_videos": workspace["account"]["video_summary"]["videos"][: max(1, min(request.max_videos, 8))],
|
||||
"linked_accounts": workspace["linked_accounts"][:5],
|
||||
"extra_focus": request.extra_focus,
|
||||
}
|
||||
prompt = (
|
||||
f"请从新媒体商业化运营视角,分析这个{label}账号,输出执行摘要、可借鉴点、风险提醒和下一步动作。"
|
||||
f"\n\n输入:\n{json.dumps(context, ensure_ascii=False, indent=2)}"
|
||||
)
|
||||
output, parsed = await _call_reasoning_model(
|
||||
account["id"],
|
||||
prompt,
|
||||
system_prompt="你是新媒体账号分析顾问。尽量输出 JSON,字段包括 executive_summary、borrow_points、risks、next_actions。",
|
||||
temperature=request.temperature,
|
||||
)
|
||||
report_id = make_id(f"{platform}_report")
|
||||
legacy.db.execute(
|
||||
f"INSERT INTO {table_prefix}_analysis_reports (id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
report_id,
|
||||
account["id"],
|
||||
account_row["id"],
|
||||
request.extra_focus or "",
|
||||
prompt,
|
||||
_safe_json_dumps(context),
|
||||
now(),
|
||||
),
|
||||
)
|
||||
suggestion_id = make_id(f"{platform}_suggestion")
|
||||
profile = legacy.model_profile_for_account(account["id"], None)
|
||||
legacy.db.execute(
|
||||
f"INSERT INTO {table_prefix}_analysis_suggestions (id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
suggestion_id,
|
||||
report_id,
|
||||
profile["id"],
|
||||
f"{profile.get('name', '')} · {profile.get('model_name', '')}".strip(" ·"),
|
||||
"ok",
|
||||
output[:4000],
|
||||
_safe_json_dumps(parsed),
|
||||
now(),
|
||||
),
|
||||
)
|
||||
report_row = legacy.db.fetch_one(
|
||||
f"SELECT * FROM {table_prefix}_analysis_reports WHERE id = ?",
|
||||
(report_id,),
|
||||
)
|
||||
report_payload = _report_payload(report_row)
|
||||
return {
|
||||
"report_id": report_id,
|
||||
"account_id": account_row["id"],
|
||||
"suggestions": report_payload["suggestions"],
|
||||
"context": context,
|
||||
}
|
||||
|
||||
@app.post(f"/v2/{platform}/accounts/{{account_id}}/videos/analyze-top")
|
||||
async def analyze_platform_top_videos(
|
||||
account_id: str,
|
||||
request: PlatformTopVideoAnalysisRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
account_row = _require_account(account_id, account["id"])
|
||||
videos = [_video_payload(row) for row in _linked_video_sources(account_row)]
|
||||
ranked = [
|
||||
video for video in sorted(videos, key=lambda item: item["score"]["performance_score"], reverse=True)
|
||||
if float(video["score"]["performance_score"] or 0) >= float(request.min_score or 0)
|
||||
][: request.top_video_count]
|
||||
results: list[dict[str, Any]] = []
|
||||
for video in ranked:
|
||||
prompt = (
|
||||
f"请拆解这条{label}作品为什么值得关注,输出 summary、borrow_points、risks。"
|
||||
f"\n\n输入:\n{json.dumps(video, ensure_ascii=False, indent=2)}"
|
||||
)
|
||||
output, parsed = await _call_reasoning_model(
|
||||
account["id"],
|
||||
prompt,
|
||||
system_prompt="你是短视频内容拆解助手。尽量输出 JSON,字段包括 summary、borrow_points、risks。",
|
||||
model_profile_id=request.model_profile_id,
|
||||
temperature=request.temperature,
|
||||
)
|
||||
summary_text = str(parsed.get("summary") or parsed.get("headline_summary") or output)[:240]
|
||||
results.append(
|
||||
{
|
||||
"id": make_id(f"{platform}_va"),
|
||||
"video_id": video["id"],
|
||||
"video_title": video["title"],
|
||||
"status": "ok",
|
||||
"summary_text": summary_text,
|
||||
"parsed_json": parsed,
|
||||
"performance_score": video["score"]["performance_score"],
|
||||
"created_at": now(),
|
||||
}
|
||||
)
|
||||
return {
|
||||
"account_id": account_row["id"],
|
||||
"analyzed_count": len(results),
|
||||
"items": results,
|
||||
}
|
||||
|
||||
@app.post(f"/v2/{platform}/similar-searches")
|
||||
async def create_platform_similarity_search(
|
||||
request: PlatformSimilaritySearchRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
account_row = _require_account(request.source_account_id, account["id"])
|
||||
source_payload = _account_payload(account_row)
|
||||
candidates = [
|
||||
row for row in _content_source_rows(account["id"], platform, "creator_account")
|
||||
if row["id"] != account_row["id"]
|
||||
][: max(5, request.max_candidates)]
|
||||
ranked_candidates: list[dict[str, Any]] = []
|
||||
source_tags = set(source_payload.get("tags") or [])
|
||||
for index, row in enumerate(candidates, start=1):
|
||||
payload = _account_payload(row)
|
||||
overlap = len(source_tags.intersection(set(payload.get("tags") or [])))
|
||||
heuristic = overlap * 10 + max(0, 50 - index)
|
||||
rationale = f"与源账号同平台,标签重合 {overlap},适合作为{label}对标候选。"
|
||||
ranked_candidates.append(
|
||||
{
|
||||
"candidate_account_id": row["id"],
|
||||
"candidate_profile_url": payload.get("profile_url", ""),
|
||||
"candidate_nickname": payload.get("nickname", ""),
|
||||
"heuristic_score": float(heuristic),
|
||||
"agent_score": float(heuristic),
|
||||
"rationale_text": rationale,
|
||||
"dimensions_json": {"tag_overlap": overlap},
|
||||
}
|
||||
)
|
||||
ranked_candidates.sort(key=lambda item: item["agent_score"], reverse=True)
|
||||
ranked_candidates = ranked_candidates[: request.max_candidates]
|
||||
search_id = make_id(f"{platform}_search")
|
||||
legacy.db.execute(
|
||||
f"INSERT INTO {table_prefix}_similarity_searches (id, user_id, source_account_id, prompt_text, context_json, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
search_id,
|
||||
account["id"],
|
||||
account_row["id"],
|
||||
request.extra_requirements or "",
|
||||
_safe_json_dumps({"source_account": source_payload}),
|
||||
now(),
|
||||
),
|
||||
)
|
||||
for idx, item in enumerate(ranked_candidates):
|
||||
legacy.db.execute(
|
||||
f"""INSERT INTO {table_prefix}_similarity_candidates
|
||||
(id, search_id, candidate_account_id, candidate_profile_url, heuristic_score, agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
make_id(f"{platform}_candidate"),
|
||||
search_id,
|
||||
item.get("candidate_account_id") or None,
|
||||
item.get("candidate_profile_url", ""),
|
||||
item.get("heuristic_score", 0),
|
||||
item.get("agent_score", 0),
|
||||
item.get("rationale_text", ""),
|
||||
_safe_json_dumps(item.get("dimensions_json") or {}),
|
||||
_safe_json_dumps(item),
|
||||
idx,
|
||||
now(),
|
||||
),
|
||||
)
|
||||
return {"id": search_id, "search_id": search_id}
|
||||
|
||||
@app.get(f"/v2/{platform}/similar-searches/{{search_id}}")
|
||||
def get_platform_similarity_search(search_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
|
||||
search_row = legacy.db.fetch_one(
|
||||
f"SELECT * FROM {table_prefix}_similarity_searches WHERE id = ? AND user_id = ?",
|
||||
(search_id, account["id"]),
|
||||
)
|
||||
if not search_row:
|
||||
raise HTTPException(status_code=404, detail="Similarity search not found")
|
||||
candidate_rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM {table_prefix}_similarity_candidates WHERE search_id = ? ORDER BY rank_index ASC",
|
||||
(search_id,),
|
||||
)
|
||||
candidates = []
|
||||
for row in candidate_rows:
|
||||
payload = _parse_json(row.get("raw_output_json") or "{}", {})
|
||||
payload.setdefault("candidate_account_id", row.get("candidate_account_id", ""))
|
||||
payload.setdefault("candidate_profile_url", row.get("candidate_profile_url", ""))
|
||||
payload.setdefault("rationale_text", row.get("rationale_text", ""))
|
||||
payload.setdefault("agent_score", row.get("agent_score", 0))
|
||||
payload.setdefault("heuristic_score", row.get("heuristic_score", 0))
|
||||
candidates.append(payload)
|
||||
return {
|
||||
"id": search_row["id"],
|
||||
"search_id": search_row["id"],
|
||||
"source_account_id": search_row["source_account_id"],
|
||||
"candidates": candidates,
|
||||
"created_at": search_row["created_at"],
|
||||
}
|
||||
|
||||
@app.get(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links")
|
||||
def list_platform_benchmark_links(account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
|
||||
_require_account(account_id, account["id"])
|
||||
rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM {table_prefix}_account_relations WHERE source_account_id = ? ORDER BY created_at DESC",
|
||||
(account_id,),
|
||||
)
|
||||
return [_relation_payload(row) for row in rows]
|
||||
|
||||
@app.post(f"/v2/{platform}/accounts/{{account_id}}/benchmark-links")
|
||||
def create_platform_benchmark_links(
|
||||
account_id: str,
|
||||
request: PlatformBenchmarkLinksRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_account = _require_account(account_id, account["id"])
|
||||
created: list[dict[str, Any]] = []
|
||||
for target_account_id in request.target_account_ids:
|
||||
target = _require_account(target_account_id, account["id"])
|
||||
relation_id = make_id(f"{platform}_link")
|
||||
legacy.db.execute(
|
||||
f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
relation_id,
|
||||
account["id"],
|
||||
source_account["id"],
|
||||
target["id"],
|
||||
target.get("source_url", ""),
|
||||
request.relation_type or "benchmark",
|
||||
request.note or "",
|
||||
request.search_id or "",
|
||||
now(),
|
||||
),
|
||||
)
|
||||
created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,))))
|
||||
for target_profile_url in request.target_profile_urls:
|
||||
cleaned = str(target_profile_url or "").strip()
|
||||
if not cleaned:
|
||||
continue
|
||||
relation_id = make_id(f"{platform}_link")
|
||||
legacy.db.execute(
|
||||
f"INSERT INTO {table_prefix}_account_relations (id, user_id, source_account_id, target_account_id, target_profile_url, relation_type, note, search_id, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
relation_id,
|
||||
account["id"],
|
||||
source_account["id"],
|
||||
None,
|
||||
cleaned,
|
||||
request.relation_type or "benchmark",
|
||||
request.note or "",
|
||||
request.search_id or "",
|
||||
now(),
|
||||
),
|
||||
)
|
||||
created.append(_relation_payload(legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_account_relations WHERE id = ?", (relation_id,))))
|
||||
return {"links": created}
|
||||
|
||||
@app.get(f"/v2/{platform}/tracking/accounts")
|
||||
def list_platform_tracking_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
|
||||
rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
|
||||
(account["id"],),
|
||||
)
|
||||
cursor = _tracking_cursor(account["id"])
|
||||
return {
|
||||
"items": [
|
||||
{
|
||||
"id": row["id"],
|
||||
"tracked_account_id": row["tracked_account_id"],
|
||||
"assistant_id": row.get("assistant_id", "") or "",
|
||||
"note": row.get("note", ""),
|
||||
"updated_at": row["updated_at"],
|
||||
}
|
||||
for row in rows
|
||||
],
|
||||
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
|
||||
}
|
||||
|
||||
@app.post(f"/v2/{platform}/tracking/accounts")
|
||||
def create_platform_tracking_account(
|
||||
request: PlatformTrackingAccountRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
tracked = _require_account(request.tracked_account_id, account["id"])
|
||||
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, tracked.get("project_id", ""))
|
||||
existing = legacy.db.fetch_one(
|
||||
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
|
||||
(account["id"], tracked["id"]),
|
||||
)
|
||||
if existing:
|
||||
legacy.db.execute(
|
||||
f"UPDATE {table_prefix}_tracked_accounts SET assistant_id = ?, note = ?, updated_at = ? WHERE id = ?",
|
||||
(((assistant or {}).get("id") or None), request.note or "", now(), existing["id"]),
|
||||
)
|
||||
row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (existing["id"],))
|
||||
else:
|
||||
tracking_id = make_id(f"{platform}_track")
|
||||
legacy.db.execute(
|
||||
f"INSERT INTO {table_prefix}_tracked_accounts (id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
tracking_id,
|
||||
account["id"],
|
||||
tracked["id"],
|
||||
(assistant or {}).get("id") or None,
|
||||
request.note or "",
|
||||
now(),
|
||||
now(),
|
||||
),
|
||||
)
|
||||
row = legacy.db.fetch_one(f"SELECT * FROM {table_prefix}_tracked_accounts WHERE id = ?", (tracking_id,))
|
||||
return {
|
||||
"id": row["id"],
|
||||
"tracked_account_id": row["tracked_account_id"],
|
||||
"assistant_id": row.get("assistant_id", "") or "",
|
||||
"note": row.get("note", ""),
|
||||
"updated_at": row["updated_at"],
|
||||
}
|
||||
|
||||
@app.post(f"/v2/{platform}/tracking/accounts/{{tracked_account_id}}/refresh")
|
||||
async def refresh_platform_tracked_account(tracked_account_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
|
||||
tracked_row = legacy.db.fetch_one(
|
||||
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
|
||||
(account["id"], tracked_account_id),
|
||||
)
|
||||
if not tracked_row:
|
||||
raise HTTPException(status_code=404, detail="Tracked account not found")
|
||||
account_row = _require_account(tracked_account_id, account["id"])
|
||||
queued = await _create_sync_job_for_account(account_row, assistant_id=tracked_row.get("assistant_id", "") or "")
|
||||
legacy.db.execute(
|
||||
f"UPDATE {table_prefix}_tracked_accounts SET updated_at = ? WHERE id = ?",
|
||||
(now(), tracked_row["id"]),
|
||||
)
|
||||
return {"tracking_id": tracked_row["id"], "tracked_account_id": tracked_account_id, "sync_job_id": queued["id"], "status": queued["status"]}
|
||||
|
||||
@app.post(f"/v2/{platform}/tracking/refresh")
|
||||
async def refresh_platform_tracking(account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
|
||||
tracked_rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM {table_prefix}_tracked_accounts WHERE user_id = ? ORDER BY updated_at DESC",
|
||||
(account["id"],),
|
||||
)
|
||||
refreshed = 0
|
||||
failed = 0
|
||||
items: list[dict[str, Any]] = []
|
||||
for row in tracked_rows:
|
||||
try:
|
||||
account_row = _require_account(row["tracked_account_id"], account["id"])
|
||||
queued = await _create_sync_job_for_account(account_row, assistant_id=row.get("assistant_id", "") or "")
|
||||
refreshed += 1
|
||||
items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "sync_job_id": queued["id"], "status": queued["status"]})
|
||||
except Exception as exc:
|
||||
failed += 1
|
||||
items.append({"tracking_id": row["id"], "tracked_account_id": row["tracked_account_id"], "error": str(exc)})
|
||||
return {"refreshed": refreshed, "failed": failed, "items": items}
|
||||
|
||||
@app.post(f"/v2/{platform}/tracking/cursor")
|
||||
def update_platform_tracking_cursor(request: PlatformTrackingCursorRequest, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
|
||||
cursor = _set_tracking_cursor(account["id"], request.last_seen_at)
|
||||
return {"last_seen_at": cursor["last_seen_at"], "updated_at": cursor["updated_at"]}
|
||||
|
||||
@app.get(f"/v2/{platform}/tracking/digest")
|
||||
def get_platform_tracking_digest(
|
||||
since: str = Query(default=""),
|
||||
limit: int = Query(default=24, ge=1, le=100),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
return _tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)
|
||||
2699
collector-service/app/douyin_features.py
Normal file
2699
collector-service/app/douyin_features.py
Normal file
File diff suppressed because it is too large
Load Diff
226
collector-service/app/integrations.py
Normal file
226
collector-service/app/integrations.py
Normal file
@@ -0,0 +1,226 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import mimetypes
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
def _join_url(base_url: str, path: str) -> str:
|
||||
base = base_url.rstrip("/")
|
||||
if path.startswith("http://") or path.startswith("https://"):
|
||||
return path
|
||||
return f"{base}/{path.lstrip('/')}"
|
||||
|
||||
|
||||
def _unwrap_response(payload: Any) -> dict[str, Any]:
|
||||
if not isinstance(payload, dict):
|
||||
return {"value": payload}
|
||||
if payload.get("success") is True and "data" in payload:
|
||||
data = payload.get("data")
|
||||
if isinstance(data, dict):
|
||||
return data
|
||||
return {"value": data}
|
||||
return payload
|
||||
|
||||
|
||||
class N8NClient:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
base_url: str,
|
||||
workflow_paths: dict[str, str],
|
||||
shared_secret: str = "",
|
||||
timeout: float = 60.0,
|
||||
) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.workflow_paths = workflow_paths
|
||||
self.shared_secret = shared_secret.strip()
|
||||
self.timeout = timeout
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
return bool(self.base_url)
|
||||
|
||||
async def trigger(self, workflow_key: str, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
workflow_path = self.workflow_paths.get(workflow_key, "").strip()
|
||||
if not workflow_path:
|
||||
raise ValueError(f"workflow path not configured for {workflow_key}")
|
||||
try:
|
||||
workflow_path = workflow_path.format(**payload)
|
||||
except KeyError:
|
||||
pass
|
||||
headers: dict[str, str] = {}
|
||||
if self.shared_secret:
|
||||
headers["X-Orchestrator-Secret"] = self.shared_secret
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.post(
|
||||
_join_url(self.base_url, workflow_path),
|
||||
json=payload,
|
||||
headers=headers,
|
||||
)
|
||||
response.raise_for_status()
|
||||
if not response.content:
|
||||
return {"triggered": True}
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
|
||||
class CutVideoClient:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
base_url: str,
|
||||
api_key: str = "",
|
||||
timeout: float = 120.0,
|
||||
upload_timeout: float = 1800.0,
|
||||
) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.api_key = api_key.strip()
|
||||
self.timeout = timeout
|
||||
self.upload_timeout = upload_timeout
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
return bool(self.base_url)
|
||||
|
||||
def _headers(self) -> dict[str, str]:
|
||||
headers: dict[str, str] = {}
|
||||
if self.api_key:
|
||||
headers["Authorization"] = f"Bearer {self.api_key}"
|
||||
return headers
|
||||
|
||||
async def submit_job(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.post(
|
||||
_join_url(self.base_url, "/api/jobs"),
|
||||
json=payload,
|
||||
headers=self._headers(),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def upload_source_file(self, source_path: Path, *, folder_name: str = "") -> dict[str, Any]:
|
||||
content_type = mimetypes.guess_type(source_path.name)[0] or "application/octet-stream"
|
||||
headers = self._headers()
|
||||
data = {"folder_name": folder_name} if folder_name else {}
|
||||
async with httpx.AsyncClient(timeout=self.upload_timeout) as client:
|
||||
with source_path.open("rb") as handle:
|
||||
response = await client.post(
|
||||
_join_url(self.base_url, "/api/uploads"),
|
||||
data=data,
|
||||
files={"files": (source_path.name, handle, content_type)},
|
||||
headers=headers,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def get_task(self, task_id: str) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
_join_url(self.base_url, f"/api/tasks/{task_id}"),
|
||||
headers=self._headers(),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def get_run(self, run_id: str) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
_join_url(self.base_url, f"/api/runs/{run_id}"),
|
||||
headers=self._headers(),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def list_runs(self) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
_join_url(self.base_url, "/api/runs"),
|
||||
headers=self._headers(),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
|
||||
class AsrHttpClient:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
base_url: str,
|
||||
transcribe_path: str = "/transcribe",
|
||||
field_name: str = "wav",
|
||||
timeout: float = 120.0,
|
||||
) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.transcribe_path = transcribe_path
|
||||
self.field_name = field_name.strip() or "wav"
|
||||
self.timeout = timeout
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
return bool(self.base_url)
|
||||
|
||||
async def transcribe_audio(self, audio_path: Path) -> dict[str, Any]:
|
||||
content_type = mimetypes.guess_type(audio_path.name)[0] or "application/octet-stream"
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
with audio_path.open("rb") as handle:
|
||||
response = await client.post(
|
||||
_join_url(self.base_url, self.transcribe_path),
|
||||
files={self.field_name: (audio_path.name, handle, content_type)},
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
|
||||
class HuobaoDramaClient:
|
||||
def __init__(self, *, base_url: str, timeout: float = 180.0) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.timeout = timeout
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
return bool(self.base_url)
|
||||
|
||||
async def create_drama(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.post(
|
||||
_join_url(self.base_url, "/api/v1/dramas"),
|
||||
json=payload,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def generate_image(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.post(
|
||||
_join_url(self.base_url, "/api/v1/images"),
|
||||
json=payload,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def get_image(self, image_id: str) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
_join_url(self.base_url, f"/api/v1/images/{image_id}"),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def generate_video(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.post(
|
||||
_join_url(self.base_url, "/api/v1/videos"),
|
||||
json=payload,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
|
||||
async def get_video(self, video_id: str) -> dict[str, Any]:
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.get(
|
||||
_join_url(self.base_url, f"/api/v1/videos/{video_id}"),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return _unwrap_response(response.json())
|
||||
381
collector-service/app/kuaishou_features.py
Normal file
381
collector-service/app/kuaishou_features.py
Normal file
@@ -0,0 +1,381 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Depends, HTTPException, Query
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from .core_main import (
|
||||
content_source_payload,
|
||||
create_content_source,
|
||||
create_job_record,
|
||||
job_payload,
|
||||
load_owned_content_source,
|
||||
load_owned_job,
|
||||
make_id,
|
||||
parse_json_object,
|
||||
resolve_target_assistant,
|
||||
resolve_target_kb,
|
||||
resolve_target_project,
|
||||
review_payload,
|
||||
trigger_orchestrated_job,
|
||||
utc_now,
|
||||
model_profile_for_account,
|
||||
db,
|
||||
)
|
||||
|
||||
KUAISHOU_PLATFORM = "kuaishou"
|
||||
KUAISHOU_URL_HINTS = (
|
||||
"kuaishou.com",
|
||||
"v.kuaishou.com",
|
||||
"chenzhongtech.com",
|
||||
)
|
||||
YOUTUBE_URL_HINTS = (
|
||||
"youtube.com",
|
||||
"youtu.be",
|
||||
"m.youtube.com",
|
||||
"music.youtube.com",
|
||||
)
|
||||
|
||||
|
||||
class KuaishouContentSourceCreateRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
source_kind: str = "creator_account"
|
||||
handle: str = ""
|
||||
source_url: str = ""
|
||||
title: str = ""
|
||||
local_path: str = ""
|
||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class KuaishouContentSourceSyncRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
knowledge_base_id: str = ""
|
||||
assistant_id: str = ""
|
||||
content_source_id: str = ""
|
||||
handle: str = ""
|
||||
source_url: str = ""
|
||||
title: str = ""
|
||||
analysis_model_profile_id: str = ""
|
||||
language: str = "auto"
|
||||
max_items: int = Field(default=5, ge=1, le=20)
|
||||
skip_existing: bool = True
|
||||
auto_trigger_analysis: bool = True
|
||||
|
||||
|
||||
class KuaishouReviewCreateRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
source_job_id: str = ""
|
||||
assistant_id: str = ""
|
||||
title: str = ""
|
||||
content_type: str = "video"
|
||||
publish_url: str = ""
|
||||
published_at: str = ""
|
||||
metrics: dict[str, Any] = Field(default_factory=dict)
|
||||
verdict: str = ""
|
||||
highlights: str = ""
|
||||
next_actions: str = ""
|
||||
notes: str = ""
|
||||
|
||||
|
||||
def _normalize_text(value: str | None) -> str:
|
||||
return str(value or "").strip()
|
||||
|
||||
|
||||
def _is_youtube_url(value: str) -> bool:
|
||||
normalized = _normalize_text(value).lower()
|
||||
return any(hint in normalized for hint in YOUTUBE_URL_HINTS)
|
||||
|
||||
|
||||
def _is_kuaishou_url(value: str) -> bool:
|
||||
normalized = _normalize_text(value).lower()
|
||||
return any(hint in normalized for hint in KUAISHOU_URL_HINTS)
|
||||
|
||||
|
||||
def _ensure_kuaishou_url(value: str) -> str:
|
||||
normalized = _normalize_text(value)
|
||||
if not normalized:
|
||||
return ""
|
||||
if _is_youtube_url(normalized):
|
||||
raise HTTPException(status_code=400, detail="YouTube URLs are not supported in the Kuaishou routes")
|
||||
return normalized
|
||||
|
||||
|
||||
def _content_source_is_kuaishou(row: dict[str, Any]) -> bool:
|
||||
if _normalize_text(row.get("platform")).lower() == KUAISHOU_PLATFORM:
|
||||
return True
|
||||
return _is_kuaishou_url(row.get("source_url", ""))
|
||||
|
||||
|
||||
def _job_is_kuaishou(row: dict[str, Any]) -> bool:
|
||||
artifacts = parse_json_object(row.get("artifacts_json") or "{}")
|
||||
source_url = _normalize_text(row.get("source_url"))
|
||||
if source_url and _is_youtube_url(source_url):
|
||||
return False
|
||||
if source_url and _is_kuaishou_url(source_url):
|
||||
return True
|
||||
if _normalize_text(artifacts.get("platform")).lower() == KUAISHOU_PLATFORM:
|
||||
return True
|
||||
content_source_id = _normalize_text(row.get("content_source_id"))
|
||||
if content_source_id:
|
||||
source_row = db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (content_source_id,))
|
||||
return bool(source_row and _content_source_is_kuaishou(source_row))
|
||||
return False
|
||||
|
||||
|
||||
def _require_owned_kuaishou_source(source_id: str, account_id: str) -> dict[str, Any]:
|
||||
row = load_owned_content_source(source_id, account_id)
|
||||
if not _content_source_is_kuaishou(row):
|
||||
raise HTTPException(status_code=400, detail="Content source does not belong to the Kuaishou route")
|
||||
return row
|
||||
|
||||
|
||||
def _list_kuaishou_jobs(account_id: str, project_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
|
||||
rows = db.fetch_all(
|
||||
"SELECT * FROM jobs WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
|
||||
(account_id, max(limit, 1) * 10),
|
||||
)
|
||||
items: list[dict[str, Any]] = []
|
||||
for row in rows:
|
||||
if project_id and _normalize_text(row.get("project_id")) != project_id:
|
||||
continue
|
||||
if _job_is_kuaishou(row):
|
||||
items.append(job_payload(row))
|
||||
if len(items) >= limit:
|
||||
break
|
||||
return items
|
||||
|
||||
|
||||
def _list_kuaishou_reviews(account_id: str, project_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
|
||||
clauses = ["user_id = ?", "platform = ?"]
|
||||
params: list[Any] = [account_id, KUAISHOU_PLATFORM]
|
||||
if project_id is not None:
|
||||
normalized = project_id.strip()
|
||||
if normalized:
|
||||
clauses.append("project_id = ?")
|
||||
params.append(normalized)
|
||||
else:
|
||||
clauses.append("(project_id IS NULL OR project_id = '')")
|
||||
sql = f"""
|
||||
SELECT * FROM publish_reviews
|
||||
WHERE {' AND '.join(clauses)}
|
||||
ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
params.append(limit)
|
||||
return [review_payload(row) for row in db.fetch_all(sql, tuple(params))]
|
||||
|
||||
|
||||
def register_kuaishou_routes(app: Any, legacy: Any) -> None:
|
||||
"""Register a small Kuaishou route set on top of the shared collector tables."""
|
||||
|
||||
@app.get("/v2/kuaishou/content-sources")
|
||||
def list_kuaishou_content_sources(
|
||||
project_id: str | None = Query(default=None),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
clauses = ["user_id = ?", "platform = ?"]
|
||||
params: list[Any] = [account["id"], KUAISHOU_PLATFORM]
|
||||
if project_id:
|
||||
resolve_target_project(account["id"], project_id, username=account["username"])
|
||||
clauses.append("project_id = ?")
|
||||
params.append(project_id)
|
||||
rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
|
||||
tuple(params),
|
||||
)
|
||||
return [content_source_payload(row) for row in rows]
|
||||
|
||||
@app.post("/v2/kuaishou/content-sources")
|
||||
def create_kuaishou_content_source_api(
|
||||
request: KuaishouContentSourceCreateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
project = resolve_target_project(account["id"], request.project_id or None, username=account["username"])
|
||||
source_url = _ensure_kuaishou_url(request.source_url)
|
||||
if source_url and _is_youtube_url(source_url):
|
||||
raise HTTPException(status_code=400, detail="YouTube URLs are not supported in the Kuaishou routes")
|
||||
row = create_content_source(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
source_kind=_normalize_text(request.source_kind) or "creator_account",
|
||||
platform=KUAISHOU_PLATFORM,
|
||||
handle=_normalize_text(request.handle),
|
||||
source_url=source_url,
|
||||
title=_normalize_text(request.title) or _normalize_text(request.handle) or source_url,
|
||||
local_path=_normalize_text(request.local_path),
|
||||
metadata=request.metadata,
|
||||
)
|
||||
return content_source_payload(row)
|
||||
|
||||
@app.post("/v2/kuaishou/pipelines/content-source-sync")
|
||||
async def create_kuaishou_content_source_sync_job(
|
||||
request: KuaishouContentSourceSyncRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = None
|
||||
if request.content_source_id.strip():
|
||||
source_row = _require_owned_kuaishou_source(request.content_source_id.strip(), account["id"])
|
||||
|
||||
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
|
||||
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
|
||||
kb = resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
|
||||
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
|
||||
profile = model_profile_for_account(account["id"], request.analysis_model_profile_id or None)
|
||||
|
||||
source_url = _ensure_kuaishou_url(
|
||||
request.source_url or (source_row or {}).get("source_url", "")
|
||||
)
|
||||
if not source_url:
|
||||
raise HTTPException(status_code=400, detail="source_url or content_source_id with a Kuaishou URL is required")
|
||||
|
||||
handle = _normalize_text(request.handle or (source_row or {}).get("handle", ""))
|
||||
source_title = (
|
||||
_normalize_text(request.title)
|
||||
or (source_row or {}).get("title", "").strip()
|
||||
or handle
|
||||
or source_url
|
||||
)
|
||||
|
||||
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
|
||||
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
|
||||
|
||||
if not source_row:
|
||||
source_row = create_content_source(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
source_kind="creator_account",
|
||||
platform=KUAISHOU_PLATFORM,
|
||||
handle=handle,
|
||||
source_url=source_url,
|
||||
title=source_title,
|
||||
metadata={
|
||||
"sync_mode": "recent_uploads",
|
||||
"max_items": request.max_items,
|
||||
"analysis_model_profile_id": profile["id"],
|
||||
},
|
||||
)
|
||||
|
||||
job_row = create_job_record(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
knowledge_base_id=kb["id"],
|
||||
source_type="content_source_sync",
|
||||
line_type="content_source_sync",
|
||||
workflow_key="content_source_sync_pipeline",
|
||||
title=f"{source_title} 内容源同步",
|
||||
language=request.language,
|
||||
source_url=source_url,
|
||||
assistant_id=(assistant or {}).get("id"),
|
||||
content_source_id=source_row["id"],
|
||||
artifacts={
|
||||
"platform": KUAISHOU_PLATFORM,
|
||||
"handle": handle,
|
||||
"source_account_url": source_url,
|
||||
"source_title": source_title,
|
||||
"max_items": request.max_items,
|
||||
"skip_existing": request.skip_existing,
|
||||
"auto_trigger_analysis": request.auto_trigger_analysis,
|
||||
},
|
||||
analysis_model_profile_id=profile["id"],
|
||||
)
|
||||
legacy.update_content_source_metadata(
|
||||
source_row["id"],
|
||||
{
|
||||
"sync_mode": "recent_uploads",
|
||||
"max_items": request.max_items,
|
||||
"analysis_model_profile_id": profile["id"],
|
||||
"last_sync_job_id": job_row["id"],
|
||||
"last_sync_requested_at": utc_now(),
|
||||
},
|
||||
)
|
||||
return job_payload(await trigger_orchestrated_job(job_row))
|
||||
|
||||
@app.get("/v2/kuaishou/jobs")
|
||||
def list_kuaishou_jobs_api(
|
||||
project_id: str | None = Query(default=None),
|
||||
limit: int = Query(default=20, ge=1, le=100),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
return _list_kuaishou_jobs(account["id"], project_id=project_id, limit=limit)
|
||||
|
||||
@app.get("/v2/kuaishou/workspace")
|
||||
def get_kuaishou_workspace(
|
||||
project_id: str | None = Query(default=None),
|
||||
limit: int = Query(default=10, ge=1, le=50),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
content_sources = list_kuaishou_content_sources(project_id=project_id, account=account)
|
||||
reviews = _list_kuaishou_reviews(account["id"], project_id=project_id, limit=limit)
|
||||
jobs = _list_kuaishou_jobs(account["id"], project_id=project_id, limit=limit)
|
||||
return {
|
||||
"platform": KUAISHOU_PLATFORM,
|
||||
"project_id": project_id or "",
|
||||
"content_sources": content_sources,
|
||||
"recent_jobs": jobs,
|
||||
"recent_reviews": reviews,
|
||||
"counts": {
|
||||
"content_sources": len(content_sources),
|
||||
"jobs": len(jobs),
|
||||
"reviews": len(reviews),
|
||||
},
|
||||
}
|
||||
|
||||
@app.get("/v2/kuaishou/reviews")
|
||||
def list_kuaishou_reviews_api(
|
||||
project_id: str | None = Query(default=None),
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
return _list_kuaishou_reviews(account["id"], project_id=project_id, limit=limit)
|
||||
|
||||
@app.post("/v2/kuaishou/reviews")
|
||||
def create_kuaishou_review(
|
||||
request: KuaishouReviewCreateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_job = None
|
||||
if request.source_job_id.strip():
|
||||
source_job = load_owned_job(request.source_job_id.strip(), account["id"])
|
||||
if not _job_is_kuaishou(source_job):
|
||||
raise HTTPException(status_code=400, detail="Source job does not belong to the Kuaishou route")
|
||||
|
||||
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
|
||||
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
|
||||
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
|
||||
review_id = make_id("review")
|
||||
title = request.title.strip() or (source_job.get("title", "") if source_job else "")
|
||||
if not title:
|
||||
title = f"{project['name']} 快手复盘"
|
||||
timestamp = utc_now()
|
||||
db.execute(
|
||||
"""
|
||||
INSERT INTO publish_reviews (
|
||||
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
|
||||
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
review_id,
|
||||
account["id"],
|
||||
project["id"],
|
||||
source_job["id"] if source_job else None,
|
||||
(assistant or {}).get("id") or None,
|
||||
title,
|
||||
KUAISHOU_PLATFORM,
|
||||
request.content_type or "video",
|
||||
_normalize_text(request.publish_url),
|
||||
_normalize_text(request.published_at),
|
||||
json.dumps(request.metrics, ensure_ascii=False),
|
||||
_normalize_text(request.verdict),
|
||||
_normalize_text(request.highlights),
|
||||
_normalize_text(request.next_actions),
|
||||
_normalize_text(request.notes),
|
||||
timestamp,
|
||||
timestamp,
|
||||
),
|
||||
)
|
||||
row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
|
||||
return review_payload(row)
|
||||
68
collector-service/app/legacy_runtime.py
Normal file
68
collector-service/app/legacy_runtime.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.machinery
|
||||
import importlib.util
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
PYCACHE_DIR = BASE_DIR / "__pycache__"
|
||||
LEGACY_PYC_DIR = BASE_DIR / "_legacy_pyc"
|
||||
SUPPORTED_PYTHON = (3, 11)
|
||||
|
||||
_LEGACY_MODULE: Any | None = None
|
||||
|
||||
|
||||
def _ensure_supported_runtime() -> None:
|
||||
if sys.version_info[:2] != SUPPORTED_PYTHON:
|
||||
version = ".".join(map(str, sys.version_info[:3]))
|
||||
required = ".".join(map(str, SUPPORTED_PYTHON))
|
||||
raise RuntimeError(
|
||||
f"Legacy collector bytecode requires Python {required}. Current runtime: {version}."
|
||||
)
|
||||
|
||||
|
||||
def _ensure_package() -> None:
|
||||
package = sys.modules.get("app")
|
||||
if package is None:
|
||||
package = types.ModuleType("app")
|
||||
package.__path__ = [str(BASE_DIR)]
|
||||
sys.modules["app"] = package
|
||||
|
||||
|
||||
def _load_sourceless_module(module_name: str, pyc_path: Path) -> Any:
|
||||
loader = importlib.machinery.SourcelessFileLoader(module_name, str(pyc_path))
|
||||
spec = importlib.util.spec_from_loader(module_name, loader)
|
||||
if spec is None:
|
||||
raise RuntimeError(f"Unable to create spec for {module_name}")
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def load_legacy_main() -> Any:
|
||||
global _LEGACY_MODULE
|
||||
if _LEGACY_MODULE is not None:
|
||||
return _LEGACY_MODULE
|
||||
|
||||
_ensure_supported_runtime()
|
||||
_ensure_package()
|
||||
|
||||
for name in ("database", "fastgpt", "openai_compat"):
|
||||
full_name = f"app.{name}"
|
||||
if full_name not in sys.modules:
|
||||
pyc_dir = LEGACY_PYC_DIR if (LEGACY_PYC_DIR / f"{name}.cpython-311.pyc").exists() else PYCACHE_DIR
|
||||
_load_sourceless_module(full_name, pyc_dir / f"{name}.cpython-311.pyc")
|
||||
|
||||
legacy_name = "app.main_legacy"
|
||||
if legacy_name in sys.modules:
|
||||
_LEGACY_MODULE = sys.modules[legacy_name]
|
||||
return _LEGACY_MODULE
|
||||
|
||||
main_pyc_dir = LEGACY_PYC_DIR if (LEGACY_PYC_DIR / "main.cpython-311.pyc").exists() else PYCACHE_DIR
|
||||
_LEGACY_MODULE = _load_sourceless_module(legacy_name, main_pyc_dir / "main.cpython-311.pyc")
|
||||
_LEGACY_MODULE.__package__ = "app"
|
||||
return _LEGACY_MODULE
|
||||
24
collector-service/app/main.py
Normal file
24
collector-service/app/main.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from .domestic_platform_features import register_domestic_platform_routes
|
||||
from .douyin_features import register_douyin_routes
|
||||
from .oneliner_features import register_oneliner_routes
|
||||
|
||||
try:
|
||||
from . import core_main as core
|
||||
except Exception:
|
||||
# Keep a bytecode-backed fallback so the app can still boot if the
|
||||
# recovered source baseline is incomplete in this workspace.
|
||||
from .legacy_runtime import load_legacy_main
|
||||
|
||||
core = load_legacy_main()
|
||||
|
||||
app = core.app
|
||||
|
||||
register_douyin_routes(app, core)
|
||||
register_domestic_platform_routes(app, core, platform="xiaohongshu", label="小红书")
|
||||
register_domestic_platform_routes(app, core, platform="bilibili", label="哔哩哔哩")
|
||||
register_domestic_platform_routes(app, core, platform="kuaishou", label="快手")
|
||||
register_domestic_platform_routes(app, core, platform="wechat_video", label="微信视频号")
|
||||
register_oneliner_routes(app, core)
|
||||
app.openapi_schema = None
|
||||
3908
collector-service/app/oneliner_features.py
Normal file
3908
collector-service/app/oneliner_features.py
Normal file
File diff suppressed because it is too large
Load Diff
45
collector-service/app/openai_compat.py
Normal file
45
collector-service/app/openai_compat.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
|
||||
class OpenAICompatClient:
|
||||
def __init__(self, timeout: float = 180.0) -> None:
|
||||
self.timeout = timeout
|
||||
|
||||
async def chat_completion(
|
||||
self,
|
||||
*,
|
||||
base_url: str,
|
||||
api_key: str,
|
||||
model: str,
|
||||
system_prompt: str,
|
||||
user_prompt: str,
|
||||
temperature: float = 0.7,
|
||||
) -> str:
|
||||
url = base_url.rstrip("/") + "/chat/completions"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if api_key.strip():
|
||||
headers["Authorization"] = f"Bearer {api_key.strip()}"
|
||||
payload: dict[str, Any] = {
|
||||
"model": model,
|
||||
"temperature": temperature,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt},
|
||||
],
|
||||
}
|
||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||
response = await client.post(url, headers=headers, json=payload)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
choices = data.get("choices") or []
|
||||
if not choices:
|
||||
return ""
|
||||
message = choices[0].get("message") or {}
|
||||
content = message.get("content") or ""
|
||||
if isinstance(content, list):
|
||||
return "\n".join(str(item.get("text", "")) for item in content if isinstance(item, dict)).strip()
|
||||
return str(content).strip()
|
||||
531
collector-service/app/wechat_video_features.py
Normal file
531
collector-service/app/wechat_video_features.py
Normal file
@@ -0,0 +1,531 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from collections import Counter
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Depends, HTTPException, Query
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
# This module is intentionally self-contained because the task only allows
|
||||
# writes to a new file. To activate it, import `register_wechat_video_routes`
|
||||
# from `app.main` and call it with `(app, core)`.
|
||||
|
||||
WECHAT_VIDEO_PLATFORM = "wechat_video"
|
||||
ACCOUNT_SOURCE_KIND = "creator_account"
|
||||
YOUTUBE_HOST_MARKERS = ("youtube.com", "youtu.be")
|
||||
|
||||
|
||||
class WechatVideoAccountSyncRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
knowledge_base_id: str = ""
|
||||
assistant_id: str = ""
|
||||
content_source_id: str = ""
|
||||
profile_url: str = ""
|
||||
handle: str = ""
|
||||
title: str = ""
|
||||
analysis_model_profile_id: str = ""
|
||||
language: str = "auto"
|
||||
max_items: int = Field(default=5, ge=1, le=20)
|
||||
skip_existing: bool = True
|
||||
auto_trigger_analysis: bool = True
|
||||
|
||||
|
||||
class WechatVideoReviewCreateRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
source_job_id: str = ""
|
||||
assistant_id: str = ""
|
||||
title: str = ""
|
||||
content_type: str = "video"
|
||||
publish_url: str = ""
|
||||
published_at: str = ""
|
||||
metrics: dict[str, Any] = Field(default_factory=dict)
|
||||
verdict: str = ""
|
||||
highlights: str = ""
|
||||
next_actions: str = ""
|
||||
notes: str = ""
|
||||
|
||||
|
||||
def register_wechat_video_routes(app: Any, legacy: Any) -> None:
|
||||
if getattr(app.state, "wechat_video_routes_registered", False):
|
||||
return
|
||||
app.state.wechat_video_routes_registered = True
|
||||
|
||||
def _account_not_found() -> HTTPException:
|
||||
return HTTPException(status_code=404, detail="WeChat Video account not found")
|
||||
|
||||
def _normalize_wechat_source_url(source_url: str) -> str:
|
||||
normalized = source_url.strip()
|
||||
if not normalized:
|
||||
return ""
|
||||
lowered = normalized.lower()
|
||||
if any(marker in lowered for marker in YOUTUBE_HOST_MARKERS):
|
||||
raise HTTPException(status_code=400, detail="YouTube is not supported by wechat_video routes")
|
||||
inferred = legacy.infer_platform_from_url(normalized)
|
||||
if inferred != WECHAT_VIDEO_PLATFORM:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="wechat_video routes only accept channels.weixin.qq.com or mp.weixin.qq.com/s URLs",
|
||||
)
|
||||
return normalized
|
||||
|
||||
def _require_owned_account(source_id: str, user_id: str) -> dict[str, Any]:
|
||||
row = legacy.load_owned_content_source(source_id, user_id)
|
||||
if row.get("platform") != WECHAT_VIDEO_PLATFORM or row.get("source_kind") != ACCOUNT_SOURCE_KIND:
|
||||
raise _account_not_found()
|
||||
return row
|
||||
|
||||
def _list_sync_job_rows(source_row: dict[str, Any], *, limit: int = 50) -> list[dict[str, Any]]:
|
||||
return legacy.db.fetch_all(
|
||||
"""
|
||||
SELECT *
|
||||
FROM jobs
|
||||
WHERE user_id = ? AND content_source_id = ? AND source_type = 'content_source_sync'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(source_row["user_id"], source_row["id"], max(1, limit)),
|
||||
)
|
||||
|
||||
def _list_video_job_rows(source_row: dict[str, Any], *, limit: int = 200) -> list[dict[str, Any]]:
|
||||
sync_rows = _list_sync_job_rows(source_row, limit=max(1, limit))
|
||||
if not sync_rows:
|
||||
return []
|
||||
parent_job_ids = [row["id"] for row in sync_rows]
|
||||
placeholders = ",".join("?" for _ in parent_job_ids)
|
||||
query = f"""
|
||||
SELECT *
|
||||
FROM jobs
|
||||
WHERE user_id = ? AND source_type = 'video_link' AND parent_job_id IN ({placeholders})
|
||||
ORDER BY created_at DESC
|
||||
"""
|
||||
params: tuple[Any, ...] = (source_row["user_id"], *parent_job_ids)
|
||||
return legacy.db.fetch_all(query, params)[: max(1, limit)]
|
||||
|
||||
def _dedupe_latest_video_jobs(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
||||
deduped: list[dict[str, Any]] = []
|
||||
seen_urls: set[str] = set()
|
||||
for row in rows:
|
||||
source_url = str(row.get("source_url") or "").strip()
|
||||
if not source_url or source_url in seen_urls:
|
||||
continue
|
||||
seen_urls.add(source_url)
|
||||
deduped.append(row)
|
||||
return deduped
|
||||
|
||||
def _fetch_content_source(source_id: str) -> dict[str, Any] | None:
|
||||
if not source_id:
|
||||
return None
|
||||
return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_id,))
|
||||
|
||||
def _load_related_reviews(source_row: dict[str, Any], video_rows: list[dict[str, Any]], *, limit: int = 50) -> list[dict[str, Any]]:
|
||||
candidate_rows = legacy.db.fetch_all(
|
||||
"""
|
||||
SELECT *
|
||||
FROM publish_reviews
|
||||
WHERE user_id = ? AND platform = ?
|
||||
ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC
|
||||
LIMIT 400
|
||||
""",
|
||||
(source_row["user_id"], WECHAT_VIDEO_PLATFORM),
|
||||
)
|
||||
job_ids = {row["id"] for row in video_rows}
|
||||
video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")}
|
||||
results: list[dict[str, Any]] = []
|
||||
for row in candidate_rows:
|
||||
source_job_id = str(row.get("source_job_id") or "").strip()
|
||||
publish_url = str(row.get("publish_url") or "").strip()
|
||||
if source_job_id and source_job_id in job_ids:
|
||||
results.append(row)
|
||||
continue
|
||||
if publish_url and publish_url in video_urls:
|
||||
results.append(row)
|
||||
return results[: max(1, limit)]
|
||||
|
||||
def _load_related_documents(video_rows: list[dict[str, Any]], *, limit: int = 30) -> list[dict[str, Any]]:
|
||||
kb_ids = {str(row.get("knowledge_base_id") or "").strip() for row in video_rows if row.get("knowledge_base_id")}
|
||||
video_urls = {str(row.get("source_url") or "").strip() for row in video_rows if row.get("source_url")}
|
||||
documents: list[dict[str, Any]] = []
|
||||
seen_document_ids: set[str] = set()
|
||||
for kb_id in kb_ids:
|
||||
for row in legacy.db.fetch_all(
|
||||
"""
|
||||
SELECT *
|
||||
FROM knowledge_documents
|
||||
WHERE knowledge_base_id = ?
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 200
|
||||
""",
|
||||
(kb_id,),
|
||||
):
|
||||
if row["id"] in seen_document_ids:
|
||||
continue
|
||||
if str(row.get("source_url") or "").strip() not in video_urls:
|
||||
continue
|
||||
seen_document_ids.add(row["id"])
|
||||
documents.append(row)
|
||||
if len(documents) >= limit:
|
||||
return documents
|
||||
return documents
|
||||
|
||||
def _build_review_maps(review_rows: list[dict[str, Any]]) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
|
||||
by_job_id: dict[str, dict[str, Any]] = {}
|
||||
by_url: dict[str, dict[str, Any]] = {}
|
||||
for row in review_rows:
|
||||
source_job_id = str(row.get("source_job_id") or "").strip()
|
||||
publish_url = str(row.get("publish_url") or "").strip()
|
||||
if source_job_id and source_job_id not in by_job_id:
|
||||
by_job_id[source_job_id] = row
|
||||
if publish_url and publish_url not in by_url:
|
||||
by_url[publish_url] = row
|
||||
return by_job_id, by_url
|
||||
|
||||
def _build_document_map(document_rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
||||
by_url: dict[str, dict[str, Any]] = {}
|
||||
for row in document_rows:
|
||||
source_url = str(row.get("source_url") or "").strip()
|
||||
if source_url and source_url not in by_url:
|
||||
by_url[source_url] = row
|
||||
return by_url
|
||||
|
||||
def _build_account_payload(source_row: dict[str, Any]) -> dict[str, Any]:
|
||||
payload = legacy.content_source_payload(source_row)
|
||||
metadata = payload.get("metadata") or {}
|
||||
latest_sync_job = None
|
||||
last_sync_job_id = str(metadata.get("last_sync_job_id") or "")
|
||||
if last_sync_job_id:
|
||||
latest_sync_job = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (last_sync_job_id,))
|
||||
payload["platform_label"] = legacy.platform_label(WECHAT_VIDEO_PLATFORM)
|
||||
payload["last_sync_job_id"] = last_sync_job_id
|
||||
payload["last_sync_completed_at"] = str(metadata.get("last_sync_completed_at") or "")
|
||||
payload["last_sync_error"] = str(metadata.get("last_sync_error") or "")
|
||||
payload["last_sync_status"] = str((latest_sync_job or {}).get("status") or "")
|
||||
payload["sync_mode"] = str(metadata.get("sync_mode") or "recent_uploads")
|
||||
return payload
|
||||
|
||||
def _build_video_item(
|
||||
job_row: dict[str, Any],
|
||||
review_by_job_id: dict[str, dict[str, Any]],
|
||||
review_by_url: dict[str, dict[str, Any]],
|
||||
document_by_url: dict[str, dict[str, Any]],
|
||||
) -> dict[str, Any]:
|
||||
source_url = str(job_row.get("source_url") or "").strip()
|
||||
content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip())
|
||||
review_row = review_by_job_id.get(job_row["id"]) or review_by_url.get(source_url)
|
||||
document_row = document_by_url.get(source_url)
|
||||
artifacts = legacy.parse_job_artifacts(job_row)
|
||||
return {
|
||||
"id": job_row["id"],
|
||||
"title": job_row.get("title", ""),
|
||||
"status": job_row.get("status", ""),
|
||||
"source_url": source_url,
|
||||
"external_id": str(artifacts.get("external_id") or ""),
|
||||
"origin_sync_job_id": str(artifacts.get("origin_sync_job_id") or ""),
|
||||
"job": legacy.job_payload(job_row),
|
||||
"content_source": legacy.content_source_payload(content_source) if content_source else None,
|
||||
"latest_review": legacy.review_payload(review_row) if review_row else None,
|
||||
"document": legacy.document_payload(document_row) if document_row else None,
|
||||
}
|
||||
|
||||
def _build_workspace_payload(source_row: dict[str, Any]) -> dict[str, Any]:
|
||||
sync_rows = _list_sync_job_rows(source_row, limit=20)
|
||||
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200))
|
||||
review_rows = _load_related_reviews(source_row, video_rows, limit=20)
|
||||
document_rows = _load_related_documents(video_rows, limit=12)
|
||||
review_by_job_id, review_by_url = _build_review_maps(review_rows)
|
||||
document_by_url = _build_document_map(document_rows)
|
||||
status_counts = Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)
|
||||
latest_sync = legacy.job_context_payload(sync_rows[0]) if sync_rows else None
|
||||
return {
|
||||
"account": _build_account_payload(source_row),
|
||||
"latest_sync_job": latest_sync,
|
||||
"sync_jobs": [legacy.job_payload(row) for row in sync_rows[:10]],
|
||||
"videos": {
|
||||
"total": len(video_rows),
|
||||
"status_counts": dict(status_counts),
|
||||
"items": [
|
||||
_build_video_item(row, review_by_job_id, review_by_url, document_by_url)
|
||||
for row in video_rows[:20]
|
||||
],
|
||||
},
|
||||
"reviews": [legacy.review_payload(row) for row in review_rows],
|
||||
"recent_documents": [legacy.document_payload(row) for row in document_rows],
|
||||
"stats": {
|
||||
"sync_job_count": len(sync_rows),
|
||||
"video_job_count": len(video_rows),
|
||||
"completed_video_count": status_counts.get("completed", 0),
|
||||
"failed_video_count": status_counts.get("failed", 0),
|
||||
"review_count": len(review_rows),
|
||||
"document_count": len(document_rows),
|
||||
},
|
||||
}
|
||||
|
||||
def _update_account_source(
|
||||
source_row: dict[str, Any],
|
||||
*,
|
||||
source_url: str,
|
||||
title: str,
|
||||
handle: str,
|
||||
metadata_updates: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
merged_metadata = legacy.merge_json_field(source_row.get("metadata_json") or "{}", metadata_updates)
|
||||
legacy.db.execute(
|
||||
"""
|
||||
UPDATE content_sources
|
||||
SET handle = ?, source_url = ?, title = ?, platform = ?, metadata_json = ?, updated_at = ?
|
||||
WHERE id = ? AND user_id = ?
|
||||
""",
|
||||
(
|
||||
handle,
|
||||
source_url,
|
||||
title,
|
||||
WECHAT_VIDEO_PLATFORM,
|
||||
merged_metadata,
|
||||
legacy.utc_now(),
|
||||
source_row["id"],
|
||||
source_row["user_id"],
|
||||
),
|
||||
)
|
||||
return legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],))
|
||||
|
||||
def _job_belongs_to_account(job_row: dict[str, Any], source_row: dict[str, Any]) -> bool:
|
||||
if str(job_row.get("content_source_id") or "").strip():
|
||||
content_source = _fetch_content_source(str(job_row.get("content_source_id") or "").strip())
|
||||
metadata = (legacy.content_source_payload(content_source).get("metadata") or {}) if content_source else {}
|
||||
if content_source and str(metadata.get("origin_content_source_id") or "") == source_row["id"]:
|
||||
return True
|
||||
if str(job_row.get("parent_job_id") or "").strip():
|
||||
parent_row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["parent_job_id"],))
|
||||
if parent_row and str(parent_row.get("content_source_id") or "") == source_row["id"]:
|
||||
return True
|
||||
return False
|
||||
|
||||
@app.get("/v2/wechat-video/accounts")
|
||||
def list_wechat_video_accounts(
|
||||
project_id: str | None = Query(default=None),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
clauses = ["user_id = ?", "platform = ?", "source_kind = ?"]
|
||||
params: list[Any] = [account["id"], WECHAT_VIDEO_PLATFORM, ACCOUNT_SOURCE_KIND]
|
||||
if project_id:
|
||||
project = legacy.resolve_target_project(account["id"], project_id, username=account["username"])
|
||||
clauses.append("project_id = ?")
|
||||
params.append(project["id"])
|
||||
rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY updated_at DESC",
|
||||
tuple(params),
|
||||
)
|
||||
return [_build_account_payload(row) for row in rows]
|
||||
|
||||
@app.post("/v2/wechat-video/accounts/sync")
|
||||
async def sync_wechat_video_account(
|
||||
request: WechatVideoAccountSyncRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = None
|
||||
if request.content_source_id.strip():
|
||||
source_row = _require_owned_account(request.content_source_id.strip(), account["id"])
|
||||
|
||||
source_url = _normalize_wechat_source_url(request.profile_url or (source_row or {}).get("source_url", ""))
|
||||
if not source_url:
|
||||
raise HTTPException(status_code=400, detail="profile_url or content_source_id is required")
|
||||
|
||||
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
|
||||
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
|
||||
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
|
||||
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
|
||||
|
||||
kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
|
||||
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
|
||||
profile = legacy.model_profile_for_account(account["id"], request.analysis_model_profile_id or None)
|
||||
handle = request.handle.strip() or (source_row or {}).get("handle", "").strip()
|
||||
title = request.title.strip() or (source_row or {}).get("title", "").strip() or handle or source_url
|
||||
metadata_updates = {
|
||||
"account_type": WECHAT_VIDEO_PLATFORM,
|
||||
"sync_mode": "recent_uploads",
|
||||
"max_items": request.max_items,
|
||||
"analysis_model_profile_id": profile["id"],
|
||||
"last_sync_error": "",
|
||||
}
|
||||
|
||||
if not source_row:
|
||||
source_row = legacy.create_content_source(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
source_kind=ACCOUNT_SOURCE_KIND,
|
||||
platform=WECHAT_VIDEO_PLATFORM,
|
||||
handle=handle,
|
||||
source_url=source_url,
|
||||
title=title,
|
||||
metadata=metadata_updates,
|
||||
)
|
||||
else:
|
||||
source_row = _update_account_source(
|
||||
source_row,
|
||||
source_url=source_url,
|
||||
title=title,
|
||||
handle=handle,
|
||||
metadata_updates=metadata_updates,
|
||||
)
|
||||
|
||||
job_row = legacy.create_job_record(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
knowledge_base_id=kb["id"],
|
||||
source_type="content_source_sync",
|
||||
line_type="content_source_sync",
|
||||
workflow_key="content_source_sync_pipeline",
|
||||
title=f"{title} 内容源同步",
|
||||
language=request.language,
|
||||
source_url=source_url,
|
||||
assistant_id=(assistant or {}).get("id"),
|
||||
content_source_id=source_row["id"],
|
||||
artifacts={
|
||||
"platform": WECHAT_VIDEO_PLATFORM,
|
||||
"handle": handle,
|
||||
"source_account_url": source_url,
|
||||
"source_title": title,
|
||||
"max_items": request.max_items,
|
||||
"skip_existing": request.skip_existing,
|
||||
"auto_trigger_analysis": request.auto_trigger_analysis,
|
||||
},
|
||||
analysis_model_profile_id=profile["id"],
|
||||
)
|
||||
legacy.update_content_source_metadata(
|
||||
source_row["id"],
|
||||
{
|
||||
"sync_mode": "recent_uploads",
|
||||
"max_items": request.max_items,
|
||||
"analysis_model_profile_id": profile["id"],
|
||||
"last_sync_job_id": job_row["id"],
|
||||
"last_sync_requested_at": legacy.utc_now(),
|
||||
"last_sync_error": "",
|
||||
},
|
||||
)
|
||||
queued_row = await legacy.trigger_orchestrated_job(job_row)
|
||||
source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (source_row["id"],))
|
||||
workspace = _build_workspace_payload(source_row)
|
||||
workspace["sync_job"] = legacy.job_payload(queued_row)
|
||||
return workspace
|
||||
|
||||
@app.get("/v2/wechat-video/accounts/{account_id}")
|
||||
def get_wechat_video_account(
|
||||
account_id: str,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
return _build_workspace_payload(source_row)
|
||||
|
||||
@app.get("/v2/wechat-video/accounts/{account_id}/workspace")
|
||||
def get_wechat_video_account_workspace(
|
||||
account_id: str,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
return _build_workspace_payload(source_row)
|
||||
|
||||
@app.get("/v2/wechat-video/accounts/{account_id}/videos")
|
||||
def list_wechat_video_account_videos(
|
||||
account_id: str,
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
status: str = Query(default=""),
|
||||
q: str = Query(default=""),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=max(limit * 4, 200)))
|
||||
normalized_status = status.strip().lower()
|
||||
normalized_query = q.strip().lower()
|
||||
if normalized_status:
|
||||
video_rows = [row for row in video_rows if str(row.get("status") or "").lower() == normalized_status]
|
||||
if normalized_query:
|
||||
video_rows = [
|
||||
row
|
||||
for row in video_rows
|
||||
if normalized_query in str(row.get("title") or "").lower()
|
||||
or normalized_query in str(row.get("source_url") or "").lower()
|
||||
]
|
||||
selected_rows = video_rows[:limit]
|
||||
review_rows = _load_related_reviews(source_row, selected_rows, limit=max(limit, 20))
|
||||
document_rows = _load_related_documents(selected_rows, limit=max(limit, 20))
|
||||
review_by_job_id, review_by_url = _build_review_maps(review_rows)
|
||||
document_by_url = _build_document_map(document_rows)
|
||||
return {
|
||||
"account": _build_account_payload(source_row),
|
||||
"total": len(video_rows),
|
||||
"status_counts": dict(Counter(str(row.get("status") or "").strip() or "unknown" for row in video_rows)),
|
||||
"items": [
|
||||
_build_video_item(row, review_by_job_id, review_by_url, document_by_url)
|
||||
for row in selected_rows
|
||||
],
|
||||
}
|
||||
|
||||
@app.get("/v2/wechat-video/accounts/{account_id}/reviews")
|
||||
def list_wechat_video_account_reviews(
|
||||
account_id: str,
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
video_rows = _dedupe_latest_video_jobs(_list_video_job_rows(source_row, limit=200))
|
||||
review_rows = _load_related_reviews(source_row, video_rows, limit=limit)
|
||||
return [legacy.review_payload(row) for row in review_rows]
|
||||
|
||||
@app.post("/v2/wechat-video/accounts/{account_id}/reviews")
|
||||
def create_wechat_video_review(
|
||||
account_id: str,
|
||||
request: WechatVideoReviewCreateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = _require_owned_account(account_id, account["id"])
|
||||
source_job = None
|
||||
if request.source_job_id.strip():
|
||||
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
|
||||
if not _job_belongs_to_account(source_job, source_row):
|
||||
raise HTTPException(status_code=400, detail="source_job_id does not belong to the target WeChat Video account")
|
||||
|
||||
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else source_row.get("project_id", ""))
|
||||
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
|
||||
if source_row.get("project_id") and source_row.get("project_id") != project["id"]:
|
||||
raise HTTPException(status_code=400, detail="WeChat Video account does not belong to target project")
|
||||
|
||||
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
|
||||
publish_url = request.publish_url.strip() or (source_job.get("source_url", "") if source_job else "")
|
||||
if publish_url:
|
||||
_normalize_wechat_source_url(publish_url)
|
||||
title = request.title.strip() or (source_job.get("title", "") if source_job else "") or f"{source_row.get('title', '')} 复盘".strip()
|
||||
if not title:
|
||||
title = "微信视频号复盘"
|
||||
|
||||
review_id = legacy.make_id("review")
|
||||
timestamp = legacy.utc_now()
|
||||
legacy.db.execute(
|
||||
"""
|
||||
INSERT INTO publish_reviews (
|
||||
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
|
||||
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
review_id,
|
||||
account["id"],
|
||||
project["id"],
|
||||
source_job["id"] if source_job else None,
|
||||
(assistant or {}).get("id") or None,
|
||||
title,
|
||||
WECHAT_VIDEO_PLATFORM,
|
||||
request.content_type or "video",
|
||||
publish_url,
|
||||
request.published_at.strip(),
|
||||
json.dumps(request.metrics, ensure_ascii=False),
|
||||
request.verdict.strip(),
|
||||
request.highlights.strip(),
|
||||
request.next_actions.strip(),
|
||||
request.notes.strip(),
|
||||
timestamp,
|
||||
timestamp,
|
||||
),
|
||||
)
|
||||
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
|
||||
return legacy.review_payload(row)
|
||||
765
collector-service/app/xiaohongshu_features.py
Normal file
765
collector-service/app/xiaohongshu_features.py
Normal file
@@ -0,0 +1,765 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from html import unescape
|
||||
from typing import Any, Iterable
|
||||
from urllib.parse import unquote
|
||||
|
||||
import httpx
|
||||
from fastapi import Depends, HTTPException, Query
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
DEFAULT_TIMEOUT = 20.0
|
||||
MAX_HTML_SEARCH_BYTES = 2_000_000
|
||||
DEFAULT_USER_AGENT = (
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
|
||||
)
|
||||
XHS_PLATFORM = "xiaohongshu"
|
||||
|
||||
|
||||
class XHSManualPageCapture(BaseModel):
|
||||
url: str = ""
|
||||
title: str = ""
|
||||
payload: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class XiaohongshuContentSourceCreateRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
source_kind: str
|
||||
handle: str = ""
|
||||
source_url: str = ""
|
||||
title: str = ""
|
||||
local_path: str = ""
|
||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class XiaohongshuContentSourceSyncRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
knowledge_base_id: str = ""
|
||||
assistant_id: str = ""
|
||||
content_source_id: str = ""
|
||||
source_url: str = ""
|
||||
handle: str = ""
|
||||
title: str = ""
|
||||
language: str = "auto"
|
||||
max_items: int = Field(default=5, ge=1, le=20)
|
||||
skip_existing: bool = True
|
||||
auto_trigger_analysis: bool = True
|
||||
manual_source_payload: dict[str, Any] | None = None
|
||||
manual_pages: list[XHSManualPageCapture] = Field(default_factory=list)
|
||||
discovery_note: str = ""
|
||||
|
||||
|
||||
class XiaohongshuReviewCreateRequest(BaseModel):
|
||||
project_id: str = ""
|
||||
source_job_id: str = ""
|
||||
assistant_id: str = ""
|
||||
title: str = ""
|
||||
platform: str = XHS_PLATFORM
|
||||
content_type: str = "note"
|
||||
publish_url: str = ""
|
||||
published_at: str = ""
|
||||
metrics: dict[str, Any] = Field(default_factory=dict)
|
||||
verdict: str = ""
|
||||
highlights: str = ""
|
||||
next_actions: str = ""
|
||||
notes: str = ""
|
||||
|
||||
|
||||
class XiaohongshuReviewUpdateRequest(BaseModel):
|
||||
title: str | None = None
|
||||
platform: str | None = None
|
||||
content_type: str | None = None
|
||||
publish_url: str | None = None
|
||||
published_at: str | None = None
|
||||
metrics: dict[str, Any] | None = None
|
||||
verdict: str | None = None
|
||||
highlights: str | None = None
|
||||
next_actions: str | None = None
|
||||
notes: str | None = None
|
||||
assistant_id: str | None = None
|
||||
|
||||
|
||||
def _safe_json_dumps(value: Any) -> str:
|
||||
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
|
||||
|
||||
|
||||
def _first_non_empty(*values: Any) -> str:
|
||||
for value in values:
|
||||
if value is None:
|
||||
continue
|
||||
if isinstance(value, str):
|
||||
stripped = value.strip()
|
||||
if stripped:
|
||||
return stripped
|
||||
elif value not in ("", [], {}, ()):
|
||||
return str(value)
|
||||
return ""
|
||||
|
||||
|
||||
def _dedupe_strings(values: Iterable[str]) -> list[str]:
|
||||
result: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for value in values:
|
||||
item = str(value).strip()
|
||||
if not item:
|
||||
continue
|
||||
key = item.lower()
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
result.append(item)
|
||||
return result
|
||||
|
||||
|
||||
def _compact_text(value: Any, limit: int = 500) -> str:
|
||||
text = str(value or "").strip()
|
||||
if len(text) <= limit:
|
||||
return text
|
||||
return f"{text[: limit - 1]}…"
|
||||
|
||||
|
||||
def _parse_count(value: Any) -> float:
|
||||
if value is None:
|
||||
return 0.0
|
||||
if isinstance(value, (int, float)):
|
||||
return float(value)
|
||||
text = str(value).strip().lower().replace(",", "")
|
||||
if not text:
|
||||
return 0.0
|
||||
multiplier = 1.0
|
||||
if text.endswith("w") or text.endswith("万"):
|
||||
multiplier = 10_000.0
|
||||
text = text[:-1]
|
||||
elif text.endswith("亿"):
|
||||
multiplier = 100_000_000.0
|
||||
text = text[:-1]
|
||||
text = text.replace("+", "")
|
||||
match = re.search(r"-?\d+(?:\.\d+)?", text)
|
||||
if not match:
|
||||
return 0.0
|
||||
try:
|
||||
return float(match.group()) * multiplier
|
||||
except ValueError:
|
||||
return 0.0
|
||||
|
||||
|
||||
def _normalize_timestamp(value: Any) -> str | None:
|
||||
if value in (None, "", 0, "0"):
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
stripped = value.strip()
|
||||
if not stripped:
|
||||
return None
|
||||
if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped):
|
||||
return stripped
|
||||
if stripped.isdigit():
|
||||
value = int(stripped)
|
||||
else:
|
||||
return stripped
|
||||
if isinstance(value, (int, float)):
|
||||
ts = float(value)
|
||||
if ts > 10_000_000_000:
|
||||
ts /= 1000.0
|
||||
try:
|
||||
return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat()
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def _extract_hashtags(*texts: str) -> list[str]:
|
||||
tags: list[str] = []
|
||||
for text in texts:
|
||||
if not text:
|
||||
continue
|
||||
tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text))
|
||||
return _dedupe_strings(tags)
|
||||
|
||||
|
||||
def _extract_keywords(*texts: str) -> list[str]:
|
||||
candidates: list[str] = []
|
||||
for text in texts:
|
||||
if not text:
|
||||
continue
|
||||
candidates.extend(_extract_hashtags(text))
|
||||
candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text))
|
||||
candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text))
|
||||
stop_words = {
|
||||
"小红书",
|
||||
"笔记",
|
||||
"内容",
|
||||
"账号",
|
||||
"发布",
|
||||
"更多",
|
||||
"关注",
|
||||
"用户",
|
||||
"xhs",
|
||||
"xiaohongshu",
|
||||
}
|
||||
return _dedupe_strings(item for item in candidates if item.lower() not in stop_words)
|
||||
|
||||
|
||||
def _walk_json(value: Any) -> Iterable[dict[str, Any]]:
|
||||
if isinstance(value, dict):
|
||||
yield value
|
||||
for child in value.values():
|
||||
yield from _walk_json(child)
|
||||
elif isinstance(value, list):
|
||||
for child in value:
|
||||
yield from _walk_json(child)
|
||||
|
||||
|
||||
def _extract_json_objects_from_text(text: str) -> list[Any]:
|
||||
decoder = json.JSONDecoder()
|
||||
objects: list[Any] = []
|
||||
seen: set[str] = set()
|
||||
if not text:
|
||||
return objects
|
||||
|
||||
candidates = [text, unquote(text), unescape(text), unescape(unquote(text))]
|
||||
for candidate in candidates:
|
||||
snippet = candidate[:MAX_HTML_SEARCH_BYTES]
|
||||
for match in re.finditer(r"[\{\[]", snippet):
|
||||
try:
|
||||
obj, _ = decoder.raw_decode(snippet[match.start() :])
|
||||
except Exception:
|
||||
continue
|
||||
marker = _safe_json_dumps(obj)
|
||||
if marker in seen:
|
||||
continue
|
||||
seen.add(marker)
|
||||
objects.append(obj)
|
||||
if len(objects) >= 50:
|
||||
return objects
|
||||
return objects
|
||||
|
||||
|
||||
def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]:
|
||||
blobs: list[dict[str, Any]] = []
|
||||
seen: set[str] = set()
|
||||
for attrs, content in re.findall(r"<script([^>]*)>(.*?)</script>", html, re.IGNORECASE | re.DOTALL):
|
||||
script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE)
|
||||
script_id = script_id_match.group(1) if script_id_match else ""
|
||||
for obj in _extract_json_objects_from_text(content.strip()):
|
||||
marker = _safe_json_dumps(obj)
|
||||
if marker in seen:
|
||||
continue
|
||||
seen.add(marker)
|
||||
blobs.append({"script_id": script_id, "payload": obj})
|
||||
return blobs
|
||||
|
||||
|
||||
async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]:
|
||||
headers = {
|
||||
"User-Agent": DEFAULT_USER_AGENT,
|
||||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||||
}
|
||||
if cookie.strip():
|
||||
headers["Cookie"] = cookie.strip()
|
||||
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client:
|
||||
response = await client.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
return str(response.url), response.text
|
||||
|
||||
|
||||
def _note_candidate_score(value: dict[str, Any]) -> int:
|
||||
score = 0
|
||||
if any(key in value for key in ("note_id", "noteId", "id", "post_id")):
|
||||
score += 2
|
||||
if any(key in value for key in ("title", "desc", "content", "text", "note")):
|
||||
score += 2
|
||||
if any(key in value for key in ("author", "user", "owner")):
|
||||
score += 2
|
||||
if "stats" in value and isinstance(value["stats"], dict):
|
||||
score += 2
|
||||
return score
|
||||
|
||||
|
||||
def _extract_note_candidates(payload: Any) -> list[dict[str, Any]]:
|
||||
candidates: list[dict[str, Any]] = []
|
||||
for item in _walk_json(payload):
|
||||
if _note_candidate_score(item) >= 4:
|
||||
candidates.append(item)
|
||||
for key in ("author", "user", "owner"):
|
||||
child = item.get(key)
|
||||
if isinstance(child, dict) and _note_candidate_score(child) >= 3:
|
||||
candidates.append(child)
|
||||
return candidates
|
||||
|
||||
|
||||
def _normalize_note_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]:
|
||||
stats_source = candidate.get("stats") if isinstance(candidate.get("stats"), dict) else {}
|
||||
author = candidate.get("author") if isinstance(candidate.get("author"), dict) else {}
|
||||
if not author and isinstance(candidate.get("user"), dict):
|
||||
author = candidate["user"]
|
||||
cover = candidate.get("cover") or candidate.get("image") or candidate.get("images")
|
||||
if isinstance(cover, list) and cover:
|
||||
cover = cover[0]
|
||||
if isinstance(cover, dict):
|
||||
cover = _first_non_empty(
|
||||
cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
|
||||
cover.get("url"),
|
||||
)
|
||||
return {
|
||||
"note_id": _first_non_empty(candidate.get("note_id"), candidate.get("noteId"), candidate.get("id"), candidate.get("post_id")),
|
||||
"title": _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("content"), candidate.get("text")),
|
||||
"content": _first_non_empty(candidate.get("content"), candidate.get("desc"), candidate.get("text"), candidate.get("note")),
|
||||
"author_name": _first_non_empty(author.get("nickname"), author.get("name"), candidate.get("nickname")),
|
||||
"author_url": _first_non_empty(author.get("profile_url"), candidate.get("profile_url")),
|
||||
"share_url": _first_non_empty(candidate.get("share_url"), candidate.get("url"), fallback_url),
|
||||
"cover_url": _first_non_empty(cover),
|
||||
"published_at": _normalize_timestamp(candidate.get("publish_time") or candidate.get("created_at") or candidate.get("create_time")),
|
||||
"tags": _extract_hashtags(
|
||||
_first_non_empty(candidate.get("title")),
|
||||
_first_non_empty(candidate.get("desc"), candidate.get("content")),
|
||||
),
|
||||
"stats": {
|
||||
"like": _parse_count(stats_source.get("like_count") or stats_source.get("liked_count") or candidate.get("like_count")),
|
||||
"comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")),
|
||||
"collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count")),
|
||||
"share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")),
|
||||
},
|
||||
"raw": candidate,
|
||||
}
|
||||
|
||||
|
||||
def _extract_notes(payloads: Iterable[Any]) -> list[dict[str, Any]]:
|
||||
notes: list[dict[str, Any]] = []
|
||||
seen: set[str] = set()
|
||||
for payload in payloads:
|
||||
for candidate in _extract_note_candidates(payload):
|
||||
normalized = _normalize_note_candidate(candidate)
|
||||
dedupe_key = normalized["note_id"] or normalized["share_url"] or normalized["title"]
|
||||
if not dedupe_key or dedupe_key in seen:
|
||||
continue
|
||||
seen.add(dedupe_key)
|
||||
notes.append(normalized)
|
||||
notes.sort(
|
||||
key=lambda item: (
|
||||
item["stats"]["like"] + item["stats"]["comment"] * 3 + item["stats"]["collect"] * 2 + item["stats"]["share"] * 4
|
||||
),
|
||||
reverse=True,
|
||||
)
|
||||
return notes
|
||||
|
||||
|
||||
def _is_xhs_source_row(row: dict[str, Any]) -> bool:
|
||||
platform = str(row.get("platform", "") or "").strip().lower()
|
||||
if platform == XHS_PLATFORM:
|
||||
return True
|
||||
source_url = str(row.get("source_url", "") or "")
|
||||
normalized = source_url.strip().lower()
|
||||
return "xiaohongshu.com" in normalized or "xhslink.com" in normalized
|
||||
|
||||
|
||||
def _job_matches_platform(row: dict[str, Any], legacy: Any) -> bool:
|
||||
if row.get("content_source_id"):
|
||||
source = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],))
|
||||
if source:
|
||||
return _is_xhs_source_row(source)
|
||||
source_url = str(row.get("source_url") or "")
|
||||
return "xiaohongshu.com" in source_url.lower() or "xhslink.com" in source_url.lower()
|
||||
|
||||
|
||||
def _review_matches_platform(row: dict[str, Any], legacy: Any) -> bool:
|
||||
return str(row.get("platform", "") or "").strip().lower() == XHS_PLATFORM
|
||||
|
||||
|
||||
def _normalize_platform(value: str | None) -> str:
|
||||
return str(value or "").strip().lower()
|
||||
|
||||
|
||||
def _require_xhs_platform(value: str | None) -> str:
|
||||
normalized = _normalize_platform(value or XHS_PLATFORM)
|
||||
if normalized != XHS_PLATFORM:
|
||||
raise HTTPException(status_code=400, detail="Xiaohongshu routes only support the xiaohongshu platform")
|
||||
return normalized
|
||||
|
||||
|
||||
def register_xiaohongshu_routes(app: Any, legacy: Any) -> None:
|
||||
def now() -> str:
|
||||
return legacy.utc_now()
|
||||
|
||||
def make_id(prefix: str) -> str:
|
||||
return legacy.make_id(prefix)
|
||||
|
||||
def _content_source_row_or_404(source_id: str, account_id: str) -> dict[str, Any]:
|
||||
row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (source_id, account_id))
|
||||
if not row:
|
||||
raise HTTPException(status_code=404, detail="Content source not found")
|
||||
if not _is_xhs_source_row(row):
|
||||
raise HTTPException(status_code=404, detail="Content source not found")
|
||||
return row
|
||||
|
||||
def _xhs_job_payload(row: dict[str, Any]) -> dict[str, Any]:
|
||||
payload = legacy.job_payload(row)
|
||||
if row.get("content_source_id"):
|
||||
source_row = legacy.db.fetch_one("SELECT * FROM content_sources WHERE id = ?", (row["content_source_id"],))
|
||||
if source_row and _is_xhs_source_row(source_row):
|
||||
payload["content_source"] = legacy.content_source_payload(source_row)
|
||||
return payload
|
||||
|
||||
def _xhs_review_payload(row: dict[str, Any]) -> dict[str, Any]:
|
||||
payload = legacy.review_payload(row)
|
||||
if payload.get("platform", "") != XHS_PLATFORM:
|
||||
payload["platform"] = XHS_PLATFORM
|
||||
return payload
|
||||
|
||||
async def _collect_public_source(
|
||||
source_url: str,
|
||||
manual_payload: dict[str, Any] | None,
|
||||
manual_pages: list[XHSManualPageCapture],
|
||||
) -> dict[str, Any]:
|
||||
source_url = source_url.strip()
|
||||
blobs: list[dict[str, Any]] = []
|
||||
errors: list[str] = []
|
||||
|
||||
if manual_payload:
|
||||
blobs.append({"script_id": "manual_source_payload", "payload": manual_payload})
|
||||
|
||||
for page in manual_pages:
|
||||
blobs.append({
|
||||
"script_id": "manual_page_payload",
|
||||
"url": page.url,
|
||||
"title": page.title,
|
||||
"payload": page.payload,
|
||||
})
|
||||
|
||||
if source_url:
|
||||
try:
|
||||
final_url, html = await _fetch_html(source_url)
|
||||
source_url = final_url
|
||||
blobs.extend(_extract_json_blobs_from_html(html))
|
||||
except Exception as exc:
|
||||
errors.append(f"source_fetch_failed: {exc}")
|
||||
|
||||
payloads = [item["payload"] for item in blobs]
|
||||
notes = _extract_notes(payloads)
|
||||
source_title = _first_non_empty(
|
||||
manual_payload.get("title", "") if manual_payload else "",
|
||||
*(item.get("title", "") for item in notes[:3]),
|
||||
source_url,
|
||||
)
|
||||
return {
|
||||
"source_url": source_url,
|
||||
"title": source_title,
|
||||
"notes": notes,
|
||||
"raw_pages": blobs,
|
||||
"errors": errors,
|
||||
}
|
||||
|
||||
@app.get("/v2/xiaohongshu/content-sources")
|
||||
def list_content_sources(
|
||||
project_id: str | None = Query(default=None),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
clauses = ["user_id = ?", "platform = ?"]
|
||||
params: list[Any] = [account["id"], XHS_PLATFORM]
|
||||
if project_id is not None:
|
||||
normalized_project = project_id.strip()
|
||||
if normalized_project:
|
||||
clauses.append("project_id = ?")
|
||||
params.append(normalized_project)
|
||||
else:
|
||||
clauses.append("(project_id IS NULL OR project_id = '')")
|
||||
rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM content_sources WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
|
||||
tuple(params),
|
||||
)
|
||||
return [legacy.content_source_payload(row) for row in rows]
|
||||
|
||||
@app.post("/v2/xiaohongshu/content-sources")
|
||||
def create_content_source_api(
|
||||
request: XiaohongshuContentSourceCreateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
project = legacy.resolve_target_project(account["id"], request.project_id or None, username=account["username"])
|
||||
row = legacy.create_content_source(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
source_kind=request.source_kind.strip(),
|
||||
platform=XHS_PLATFORM,
|
||||
handle=request.handle.strip(),
|
||||
source_url=request.source_url.strip(),
|
||||
title=request.title.strip(),
|
||||
local_path=request.local_path.strip(),
|
||||
metadata={
|
||||
**request.metadata,
|
||||
"platform_label": "小红书",
|
||||
"platform": XHS_PLATFORM,
|
||||
},
|
||||
)
|
||||
return legacy.content_source_payload(row)
|
||||
|
||||
@app.get("/v2/xiaohongshu/content-sources/{source_id}")
|
||||
def get_content_source(source_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
|
||||
row = _content_source_row_or_404(source_id, account["id"])
|
||||
return legacy.content_source_payload(row)
|
||||
|
||||
@app.post("/v2/xiaohongshu/content-sources/sync")
|
||||
async def sync_content_source(
|
||||
request: XiaohongshuContentSourceSyncRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_row = None
|
||||
if request.content_source_id.strip():
|
||||
source_row = _content_source_row_or_404(request.content_source_id.strip(), account["id"])
|
||||
|
||||
requested_project_id = request.project_id or (source_row.get("project_id", "") if source_row else "")
|
||||
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
|
||||
kb = legacy.resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"])
|
||||
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
|
||||
source_url = (request.source_url or (source_row or {}).get("source_url") or "").strip()
|
||||
if not source_url and not source_row:
|
||||
raise HTTPException(status_code=400, detail="source_url or content_source_id is required")
|
||||
|
||||
if source_row and source_row.get("project_id") and source_row.get("project_id") != project["id"]:
|
||||
raise HTTPException(status_code=400, detail="Content source does not belong to target project")
|
||||
|
||||
if source_row and not _is_xhs_source_row(source_row):
|
||||
raise HTTPException(status_code=400, detail="Content source is not scoped to Xiaohongshu")
|
||||
|
||||
source_kind = (source_row or {}).get("source_kind", "creator_account")
|
||||
handle = (request.handle or (source_row or {}).get("handle", "")).strip()
|
||||
source_title = (
|
||||
request.title.strip()
|
||||
or (source_row or {}).get("title", "").strip()
|
||||
or handle
|
||||
or source_url
|
||||
)
|
||||
|
||||
if not source_row:
|
||||
source_row = legacy.create_content_source(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
source_kind=source_kind or "creator_account",
|
||||
platform=XHS_PLATFORM,
|
||||
handle=handle,
|
||||
source_url=source_url,
|
||||
title=source_title,
|
||||
metadata={
|
||||
"platform": XHS_PLATFORM,
|
||||
"platform_label": "小红书",
|
||||
"sync_mode": "recent_notes",
|
||||
"max_items": request.max_items,
|
||||
},
|
||||
)
|
||||
|
||||
public_data = await _collect_public_source(source_url, request.manual_source_payload, request.manual_pages)
|
||||
note_count = len(public_data["notes"])
|
||||
top_notes = [
|
||||
{
|
||||
"note_id": item["note_id"],
|
||||
"title": _compact_text(item["title"], 120),
|
||||
"content": _compact_text(item["content"], 180),
|
||||
"author_name": item["author_name"],
|
||||
"published_at": item["published_at"],
|
||||
"stats": item["stats"],
|
||||
"tags": item["tags"][:6],
|
||||
}
|
||||
for item in public_data["notes"][: request.max_items]
|
||||
]
|
||||
|
||||
job_row = legacy.create_job_record(
|
||||
account_id=account["id"],
|
||||
project_id=project["id"],
|
||||
knowledge_base_id=kb["id"],
|
||||
source_type="content_source_sync",
|
||||
line_type="content_source_sync",
|
||||
workflow_key="content_source_sync_pipeline",
|
||||
title=f"{source_title} 内容源同步",
|
||||
language=request.language,
|
||||
source_url=source_url,
|
||||
assistant_id=(assistant or {}).get("id"),
|
||||
content_source_id=source_row["id"],
|
||||
artifacts={
|
||||
"platform": XHS_PLATFORM,
|
||||
"handle": handle,
|
||||
"source_account_url": source_url,
|
||||
"source_title": source_title,
|
||||
"skip_existing": request.skip_existing,
|
||||
"auto_trigger_analysis": request.auto_trigger_analysis,
|
||||
"max_items": request.max_items,
|
||||
"note_count": note_count,
|
||||
"top_notes": top_notes,
|
||||
"raw_pages": public_data["raw_pages"],
|
||||
"errors": public_data["errors"],
|
||||
"discovery_note": request.discovery_note.strip(),
|
||||
},
|
||||
analysis_model_profile_id="",
|
||||
)
|
||||
|
||||
legacy.update_content_source_metadata(
|
||||
source_row["id"],
|
||||
{
|
||||
"platform": XHS_PLATFORM,
|
||||
"platform_label": "小红书",
|
||||
"sync_mode": "recent_notes",
|
||||
"max_items": request.max_items,
|
||||
"note_count": note_count,
|
||||
"last_sync_job_id": job_row["id"],
|
||||
"last_sync_requested_at": now(),
|
||||
},
|
||||
)
|
||||
return legacy.job_payload(await legacy.trigger_orchestrated_job(job_row))
|
||||
|
||||
@app.get("/v2/xiaohongshu/jobs")
|
||||
def list_jobs(
|
||||
parent_job_id: str | None = Query(default=None),
|
||||
line_type: str | None = Query(default=None),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
clauses = ["user_id = ?"]
|
||||
params: list[Any] = [account["id"]]
|
||||
if parent_job_id is not None:
|
||||
normalized_parent = parent_job_id.strip()
|
||||
if normalized_parent:
|
||||
clauses.append("parent_job_id = ?")
|
||||
params.append(normalized_parent)
|
||||
else:
|
||||
clauses.append("(parent_job_id IS NULL OR parent_job_id = '')")
|
||||
if line_type:
|
||||
clauses.append("line_type = ?")
|
||||
params.append(line_type.strip())
|
||||
rows = legacy.db.fetch_all(
|
||||
f"SELECT * FROM jobs WHERE {' AND '.join(clauses)} ORDER BY created_at DESC",
|
||||
tuple(params),
|
||||
)
|
||||
return [_xhs_job_payload(row) for row in rows if _job_matches_platform(row, legacy)]
|
||||
|
||||
@app.get("/v2/xiaohongshu/jobs/{job_id}")
|
||||
def get_job(job_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> dict[str, Any]:
|
||||
row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"]))
|
||||
if not row or not _job_matches_platform(row, legacy):
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
return _xhs_job_payload(row)
|
||||
|
||||
@app.get("/v2/xiaohongshu/jobs/{job_id}/events")
|
||||
def get_job_events(job_id: str, account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
|
||||
row = legacy.db.fetch_one("SELECT * FROM jobs WHERE id = ? AND user_id = ?", (job_id, account["id"]))
|
||||
if not row or not _job_matches_platform(row, legacy):
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
return [
|
||||
legacy.job_event_payload(item)
|
||||
for item in legacy.db.fetch_all("SELECT * FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (job_id,))
|
||||
]
|
||||
|
||||
@app.get("/v2/xiaohongshu/reviews")
|
||||
def list_reviews(
|
||||
project_id: str | None = Query(default=None),
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> list[dict[str, Any]]:
|
||||
clauses = ["user_id = ?", "platform = ?"]
|
||||
params: list[Any] = [account["id"], XHS_PLATFORM]
|
||||
if project_id is not None:
|
||||
normalized_project = project_id.strip()
|
||||
if normalized_project:
|
||||
clauses.append("project_id = ?")
|
||||
params.append(normalized_project)
|
||||
else:
|
||||
clauses.append("(project_id IS NULL OR project_id = '')")
|
||||
sql = (
|
||||
f"SELECT * FROM publish_reviews WHERE {' AND '.join(clauses)} "
|
||||
"ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC LIMIT ?"
|
||||
)
|
||||
params.append(limit)
|
||||
return [_xhs_review_payload(row) for row in legacy.db.fetch_all(sql, tuple(params))]
|
||||
|
||||
@app.post("/v2/xiaohongshu/reviews")
|
||||
def create_review(
|
||||
request: XiaohongshuReviewCreateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
source_job = None
|
||||
if request.source_job_id.strip():
|
||||
source_job = legacy.load_owned_job(request.source_job_id.strip(), account["id"])
|
||||
if not _job_matches_platform(source_job, legacy):
|
||||
raise HTTPException(status_code=404, detail="Job not found")
|
||||
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
|
||||
project = legacy.resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
|
||||
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
|
||||
review_id = make_id("review")
|
||||
title = request.title.strip() or (source_job.get("title", "") if source_job else "")
|
||||
if not title:
|
||||
title = f"{project['name']} 复盘"
|
||||
timestamp = now()
|
||||
normalized_platform = _require_xhs_platform(request.platform)
|
||||
legacy.db.execute(
|
||||
"""
|
||||
INSERT INTO publish_reviews (
|
||||
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
|
||||
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
|
||||
)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
review_id,
|
||||
account["id"],
|
||||
project["id"],
|
||||
source_job["id"] if source_job else None,
|
||||
(assistant or {}).get("id") or None,
|
||||
title,
|
||||
normalized_platform,
|
||||
request.content_type.strip() or "note",
|
||||
request.publish_url.strip(),
|
||||
request.published_at.strip(),
|
||||
_safe_json_dumps(request.metrics),
|
||||
request.verdict.strip(),
|
||||
request.highlights.strip(),
|
||||
request.next_actions.strip(),
|
||||
request.notes.strip(),
|
||||
timestamp,
|
||||
timestamp,
|
||||
),
|
||||
)
|
||||
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
|
||||
return _xhs_review_payload(row)
|
||||
|
||||
@app.patch("/v2/xiaohongshu/reviews/{review_id}")
|
||||
def update_review(
|
||||
review_id: str,
|
||||
request: XiaohongshuReviewUpdateRequest,
|
||||
account: dict[str, Any] = Depends(legacy.require_approved),
|
||||
) -> dict[str, Any]:
|
||||
current = legacy.load_owned_review(review_id, account["id"])
|
||||
if not _review_matches_platform(current, legacy):
|
||||
raise HTTPException(status_code=404, detail="Review not found")
|
||||
assistant_id = current.get("assistant_id") or None
|
||||
if request.assistant_id is not None:
|
||||
assistant = legacy.resolve_target_assistant(account["id"], request.assistant_id or None, current.get("project_id", ""))
|
||||
assistant_id = (assistant or {}).get("id") or None
|
||||
normalized_platform = current.get("platform", XHS_PLATFORM)
|
||||
if request.platform is not None:
|
||||
normalized_platform = _require_xhs_platform(request.platform)
|
||||
legacy.db.execute(
|
||||
"""
|
||||
UPDATE publish_reviews
|
||||
SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?,
|
||||
metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?,
|
||||
assistant_id = ?, updated_at = ?
|
||||
WHERE id = ? AND user_id = ?
|
||||
""",
|
||||
(
|
||||
request.title if request.title is not None else current.get("title", ""),
|
||||
normalized_platform,
|
||||
request.content_type if request.content_type is not None else current.get("content_type", "note"),
|
||||
request.publish_url if request.publish_url is not None else current.get("publish_url", ""),
|
||||
request.published_at if request.published_at is not None else current.get("published_at", ""),
|
||||
_safe_json_dumps(request.metrics if request.metrics is not None else legacy.parse_json_object(current.get("metrics_json") or "{}")),
|
||||
request.verdict if request.verdict is not None else current.get("verdict", ""),
|
||||
request.highlights if request.highlights is not None else current.get("highlights", ""),
|
||||
request.next_actions if request.next_actions is not None else current.get("next_actions", ""),
|
||||
request.notes if request.notes is not None else current.get("notes", ""),
|
||||
assistant_id,
|
||||
now(),
|
||||
review_id,
|
||||
account["id"],
|
||||
),
|
||||
)
|
||||
row = legacy.db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
|
||||
return _xhs_review_payload(row)
|
||||
41
collector-service/run_source_overlay.sh
Executable file
41
collector-service/run_source_overlay.sh
Executable file
@@ -0,0 +1,41 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
PORT="${PORT:-18083}"
|
||||
HOST="${HOST:-127.0.0.1}"
|
||||
|
||||
# Mirror the current live collector runtime so we can verify the source overlay
|
||||
# against the same database and external integrations without touching 8081.
|
||||
export DATA_DIR="${DATA_DIR:-/Users/kris/code/StoryForge-gitea/data/collector}"
|
||||
export DATABASE_PATH="${DATABASE_PATH:-$DATA_DIR/storyforge.db}"
|
||||
export DEFAULT_EXTERNAL_BASE_URL="${DEFAULT_EXTERNAL_BASE_URL:-https://test.hyzq.net/storyforge}"
|
||||
export LOCAL_OPENAI_BASE_URL="${LOCAL_OPENAI_BASE_URL:-http://host.docker.internal:8317/v1}"
|
||||
export LOCAL_OPENAI_MODEL="${LOCAL_OPENAI_MODEL:-GLM-5}"
|
||||
export LOCAL_OPENAI_API_KEY="${LOCAL_OPENAI_API_KEY:-}"
|
||||
export YTDLP_BIN="${YTDLP_BIN:-yt-dlp}"
|
||||
export FFMPEG_BIN="${FFMPEG_BIN:-ffmpeg}"
|
||||
export WHISPER_BIN="${WHISPER_BIN:-}"
|
||||
export WHISPER_MODEL="${WHISPER_MODEL:-$DATA_DIR/models/ggml-base.en.bin}"
|
||||
export ASR_HTTP_BASE_URL="${ASR_HTTP_BASE_URL:-http://host.docker.internal:8088}"
|
||||
export ASR_HTTP_TRANSCRIBE_PATH="${ASR_HTTP_TRANSCRIBE_PATH:-/transcribe}"
|
||||
export ASR_HTTP_FIELD_NAME="${ASR_HTTP_FIELD_NAME:-wav}"
|
||||
export ASR_HTTP_TIMEOUT_SEC="${ASR_HTTP_TIMEOUT_SEC:-120}"
|
||||
export N8N_BASE_URL="${N8N_BASE_URL:-http://n8n:5678}"
|
||||
export N8N_ANALYSIS_WEBHOOK_PATH="${N8N_ANALYSIS_WEBHOOK_PATH:-/webhook/storyforge-analysis}"
|
||||
export N8N_REAL_CUT_WEBHOOK_PATH="${N8N_REAL_CUT_WEBHOOK_PATH:-/webhook/storyforge-real-cut}"
|
||||
export N8N_AI_VIDEO_WEBHOOK_PATH="${N8N_AI_VIDEO_WEBHOOK_PATH:-/webhook/storyforge-ai-video}"
|
||||
export N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH="${N8N_CONTENT_SOURCE_SYNC_WEBHOOK_PATH:-/webhook/storyforge-content-source-sync}"
|
||||
export ORCHESTRATOR_SHARED_SECRET="${ORCHESTRATOR_SHARED_SECRET:-storyforge-local-secret}"
|
||||
export CUTVIDEO_BASE_URL="${CUTVIDEO_BASE_URL:-http://192.168.31.18:7860}"
|
||||
export CUTVIDEO_API_KEY="${CUTVIDEO_API_KEY:-}"
|
||||
export CUTVIDEO_BASE_CONFIG="${CUTVIDEO_BASE_CONFIG:-example.job.yaml}"
|
||||
export CUTVIDEO_POLL_INTERVAL_SEC="${CUTVIDEO_POLL_INTERVAL_SEC:-10}"
|
||||
export CUTVIDEO_MAX_WAIT_SEC="${CUTVIDEO_MAX_WAIT_SEC:-1800}"
|
||||
export CUTVIDEO_UPLOAD_TIMEOUT_SEC="${CUTVIDEO_UPLOAD_TIMEOUT_SEC:-1800}"
|
||||
export HUOBAO_BASE_URL="${HUOBAO_BASE_URL:-http://host.docker.internal:5678}"
|
||||
export HUOBAO_POLL_INTERVAL_SEC="${HUOBAO_POLL_INTERVAL_SEC:-10}"
|
||||
export HUOBAO_MAX_WAIT_SEC="${HUOBAO_MAX_WAIT_SEC:-900}"
|
||||
|
||||
cd "$ROOT_DIR"
|
||||
exec ./.venv311/bin/python -m uvicorn app.main:app --host "$HOST" --port "$PORT"
|
||||
Reference in New Issue
Block a user