feat: add live reviews and douyin tracking routes

This commit is contained in:
kris
2026-03-23 07:39:39 +08:00
parent 7910019ef9
commit 5a739a414d
3 changed files with 547 additions and 1 deletions

View File

@@ -252,6 +252,36 @@ class AiVideoJobRequest(BaseModel):
duration: int = 5
class ReviewCreateRequest(BaseModel):
project_id: str = ""
source_job_id: str = ""
assistant_id: str = ""
title: str = ""
platform: str = "douyin"
content_type: str = "video"
publish_url: str = ""
published_at: str = ""
metrics: dict[str, Any] = Field(default_factory=dict)
verdict: str = ""
highlights: str = ""
next_actions: str = ""
notes: str = ""
class ReviewUpdateRequest(BaseModel):
title: str | None = None
platform: str | None = None
content_type: str | None = None
publish_url: str | None = None
published_at: str | None = None
metrics: dict[str, Any] | None = None
verdict: str | None = None
highlights: str | None = None
next_actions: str | None = None
notes: str | None = None
assistant_id: str | None = None
class InternalStepRequest(BaseModel):
job_id: str = ""
jobId: str = ""
@@ -522,6 +552,41 @@ def assistant_payload(row: dict[str, Any]) -> dict[str, Any]:
}
def review_payload(row: dict[str, Any]) -> dict[str, Any]:
metrics = parse_json_object(row.get("metrics_json") or "{}")
source_job = None
assistant = None
if row.get("source_job_id"):
source_job_row = db.fetch_one("SELECT * FROM jobs WHERE id = ?", (row["source_job_id"],))
if source_job_row:
source_job = job_payload(source_job_row)
if row.get("assistant_id"):
assistant_row = db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],))
if assistant_row:
assistant = assistant_payload(assistant_row)
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"source_job_id": row.get("source_job_id", ""),
"assistant_id": row.get("assistant_id", ""),
"title": row.get("title", ""),
"platform": row.get("platform", "douyin"),
"content_type": row.get("content_type", "video"),
"publish_url": row.get("publish_url", ""),
"published_at": row.get("published_at", ""),
"metrics": metrics,
"verdict": row.get("verdict", ""),
"highlights": row.get("highlights", ""),
"next_actions": row.get("next_actions", ""),
"notes": row.get("notes", ""),
"source_job": source_job,
"assistant": assistant,
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def document_payload(row: dict[str, Any]) -> dict[str, Any]:
analysis_map = parse_json_object(row.get("analysis_json") or "{}")
source_artifacts = parse_json_object(row.get("source_artifact_json") or "{}")
@@ -2027,6 +2092,107 @@ def list_knowledge_documents(knowledge_base_id: str, account: dict[str, Any] = D
return [document_payload(row) for row in rows]
@app.get("/v2/reviews")
def list_reviews(
project_id: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=200),
account: dict[str, Any] = Depends(require_approved),
) -> list[dict[str, Any]]:
clauses = ["user_id = ?"]
params: list[Any] = [account["id"]]
if project_id is not None:
normalized_project = project_id.strip()
if normalized_project:
clauses.append("project_id = ?")
params.append(normalized_project)
else:
clauses.append("(project_id IS NULL OR project_id = '')")
sql = f"SELECT * FROM publish_reviews WHERE {' AND '.join(clauses)} ORDER BY COALESCE(NULLIF(published_at, ''), created_at) DESC, created_at DESC LIMIT ?"
params.append(limit)
return [review_payload(row) for row in db.fetch_all(sql, tuple(params))]
@app.post("/v2/reviews")
def create_review(request: ReviewCreateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
source_job = None
if request.source_job_id.strip():
source_job = load_owned_job(request.source_job_id.strip(), account["id"])
requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "")
project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"])
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"])
review_id = make_id("review")
title = request.title.strip() or (source_job.get("title", "") if source_job else "")
if not title:
title = f"{project['name']} 复盘"
timestamp = utc_now()
db.execute(
"""
INSERT INTO publish_reviews (
id, user_id, project_id, source_job_id, assistant_id, title, platform, content_type,
publish_url, published_at, metrics_json, verdict, highlights, next_actions, notes, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
review_id,
account["id"],
project["id"],
source_job["id"] if source_job else None,
(assistant or {}).get("id") or None,
title,
request.platform or "douyin",
request.content_type or "video",
request.publish_url.strip(),
request.published_at.strip(),
json.dumps(request.metrics, ensure_ascii=False),
request.verdict.strip(),
request.highlights.strip(),
request.next_actions.strip(),
request.notes.strip(),
timestamp,
timestamp,
),
)
row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return review_payload(row)
@app.patch("/v2/reviews/{review_id}")
def update_review(review_id: str, request: ReviewUpdateRequest, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]:
current = load_owned_review(review_id, account["id"])
assistant_id = current.get("assistant_id") or None
if request.assistant_id is not None:
assistant = resolve_target_assistant(account["id"], request.assistant_id or None, current.get("project_id", ""))
assistant_id = (assistant or {}).get("id") or None
db.execute(
"""
UPDATE publish_reviews
SET title = ?, platform = ?, content_type = ?, publish_url = ?, published_at = ?,
metrics_json = ?, verdict = ?, highlights = ?, next_actions = ?, notes = ?,
assistant_id = ?, updated_at = ?
WHERE id = ? AND user_id = ?
""",
(
request.title if request.title is not None else current.get("title", ""),
request.platform if request.platform is not None else current.get("platform", "douyin"),
request.content_type if request.content_type is not None else current.get("content_type", "video"),
request.publish_url if request.publish_url is not None else current.get("publish_url", ""),
request.published_at if request.published_at is not None else current.get("published_at", ""),
json.dumps(request.metrics if request.metrics is not None else parse_json_object(current.get("metrics_json") or "{}"), ensure_ascii=False),
request.verdict if request.verdict is not None else current.get("verdict", ""),
request.highlights if request.highlights is not None else current.get("highlights", ""),
request.next_actions if request.next_actions is not None else current.get("next_actions", ""),
request.notes if request.notes is not None else current.get("notes", ""),
assistant_id,
utc_now(),
review_id,
account["id"],
),
)
row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,))
return review_payload(row)
@app.get("/v2/explore/jobs")
def list_jobs(
parent_job_id: str | None = Query(default=None),
@@ -2452,6 +2618,13 @@ def load_owned_job(job_id: str, account_id: str) -> dict[str, Any]:
return row
def load_owned_review(review_id: str, account_id: str) -> dict[str, Any]:
row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ? AND user_id = ?", (review_id, account_id))
if not row:
raise HTTPException(status_code=404, detail="Review not found")
return row
def load_owned_content_source(source_id: str, account_id: str) -> dict[str, Any]:
row = db.fetch_one("SELECT * FROM content_sources WHERE id = ? AND user_id = ?", (source_id, account_id))
if not row:

View File

@@ -211,6 +211,30 @@ class Database:
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS publish_reviews (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT,
source_job_id TEXT,
assistant_id TEXT,
title TEXT NOT NULL,
platform TEXT NOT NULL DEFAULT 'douyin',
content_type TEXT NOT NULL DEFAULT 'video',
publish_url TEXT NOT NULL DEFAULT '',
published_at TEXT NOT NULL DEFAULT '',
metrics_json TEXT NOT NULL DEFAULT '{}',
verdict TEXT NOT NULL DEFAULT '',
highlights TEXT NOT NULL DEFAULT '',
next_actions TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(source_job_id) REFERENCES jobs(id) ON DELETE SET NULL,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS job_events (
id TEXT PRIMARY KEY,
job_id TEXT NOT NULL,

View File

@@ -4,7 +4,7 @@ import asyncio
import json
import re
from collections import Counter
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from html import unescape
from typing import Any, Iterable
from urllib.parse import quote, unquote
@@ -71,6 +71,16 @@ class DouyinBenchmarkLinkRequest(BaseModel):
search_id: str = ""
class DouyinTrackedAccountRequest(BaseModel):
tracked_account_id: str = ""
assistant_id: str = ""
note: str = ""
class DouyinTrackingCursorRequest(BaseModel):
last_seen_at: str = ""
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
@@ -743,6 +753,30 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
CREATE INDEX IF NOT EXISTS idx_douyin_account_relations_source
ON douyin_account_relations(source_account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS douyin_tracked_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
tracked_account_id TEXT NOT NULL,
assistant_id TEXT,
note TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, tracked_account_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(tracked_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_tracked_accounts_user_updated
ON douyin_tracked_accounts(user_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS douyin_tracking_cursors (
user_id TEXT PRIMARY KEY,
last_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
@@ -1179,6 +1213,190 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
})
return payloads
def _load_owned_assistant(assistant_id: str, user_id: str) -> dict[str, Any] | None:
if not str(assistant_id or "").strip():
return None
row = legacy.db.fetch_one(
"SELECT * FROM assistants WHERE id = ? AND user_id = ?",
(assistant_id, user_id)
)
if not row:
raise HTTPException(status_code=404, detail="Assistant not found")
return row
def _parse_iso_datetime(value: str | None) -> datetime | None:
text = str(value or "").strip()
if not text:
return None
try:
normalized = text.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
except Exception:
return None
def _get_tracking_cursor(user_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM douyin_tracking_cursors WHERE user_id = ?",
(user_id,)
)
def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]:
existing = _get_tracking_cursor(user_id)
timestamp = _first_non_empty(last_seen_at, now())
updated_at = now()
if existing:
legacy.db.execute(
"UPDATE douyin_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?",
(timestamp, updated_at, user_id)
)
else:
legacy.db.execute(
"INSERT INTO douyin_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)",
(user_id, timestamp, updated_at)
)
return legacy.db.fetch_one("SELECT * FROM douyin_tracking_cursors WHERE user_id = ?", (user_id,))
def _list_tracked_accounts(user_id: str) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT track.*,
assistant.name AS assistant_name
FROM douyin_tracked_accounts track
LEFT JOIN assistants assistant ON assistant.id = track.assistant_id
WHERE track.user_id = ?
ORDER BY track.updated_at DESC
""",
(user_id,)
)
payloads: list[dict[str, Any]] = []
for row in rows:
account_row = _require_owned_account(row["tracked_account_id"], user_id)
account_payload = _build_account_payload(account_row, include_recent_videos=6)
payloads.append({
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"assistant_name": row.get("assistant_name", "") or "",
"note": row.get("note", "") or "",
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"account": account_payload
})
return payloads
def _extract_tracking_borrowing_points(video: dict[str, Any]) -> list[str]:
stats = video.get("stats", {}) or {}
tags = video.get("tags", []) or []
candidates: list[str] = []
play_count = int(stats.get("play") or 0)
like_count = int(stats.get("like") or 0)
comment_count = int(stats.get("comment") or 0)
share_count = int(stats.get("share") or 0)
if like_count >= 100:
candidates.append("点赞明显更高,适合借标题切口和开头表达。")
if comment_count >= 20:
candidates.append("评论互动活跃,可借提问句和争议点设计。")
if share_count >= 10:
candidates.append("分享意愿较强,可借观点浓度和传播句式。")
if play_count >= 5000:
candidates.append("播放信号较强,值得拆成同题材复用模板。")
if tags:
candidates.append(f"标签集中在 {', '.join(tags[:3])},适合做系列化选题。")
deduped: list[str] = []
seen: set[str] = set()
for item in candidates:
normalized = _compact_text(item, 80)
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped.append(normalized)
return deduped[:4]
def _build_tracking_digest_item(tracked_item: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
stats = video.get("stats", {}) or {}
summary = video.get("description") or video.get("title") or "暂无摘要"
borrowing_points = _extract_tracking_borrowing_points(video)
high_value = int(stats.get("like") or 0) >= 100 or int(stats.get("play") or 0) >= 5000 or bool(borrowing_points)
return {
"tracking_id": tracked_item["id"],
"tracked_account_id": tracked_item["tracked_account_id"],
"assistant_id": tracked_item["assistant_id"],
"assistant_name": tracked_item["assistant_name"],
"account": tracked_item["account"],
"video": video,
"summary": _compact_text(summary, 160),
"borrowing_points": borrowing_points,
"is_high_value": high_value,
}
def _build_tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]:
tracked_accounts = _list_tracked_accounts(user_id)
cursor = _get_tracking_cursor(user_id)
since_dt = _parse_iso_datetime(since_value) if since_value else None
if since_dt is None and cursor:
since_dt = _parse_iso_datetime(cursor.get("last_seen_at"))
if since_dt is None:
since_dt = (datetime.now(timezone.utc) - timedelta(days=3)).replace(microsecond=0)
items: list[dict[str, Any]] = []
for tracked in tracked_accounts:
account_payload = tracked.get("account", {}) or {}
for video in account_payload.get("video_summary", {}).get("videos", []):
published_at = _parse_iso_datetime(video.get("published_at"))
if published_at is None or published_at <= since_dt:
continue
items.append(_build_tracking_digest_item(tracked, video))
items.sort(
key=lambda item: _parse_iso_datetime(item["video"].get("published_at")) or datetime.fromtimestamp(0, tz=timezone.utc),
reverse=True
)
return {
"generated_at": now(),
"since": since_dt.isoformat(),
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
"tracked_accounts": tracked_accounts,
"items": items[: max(1, min(limit, 100))]
}
async def _refresh_tracked_account_workspace(
owner: dict[str, Any],
tracked_account_id: str,
discovery_note: str = "tracking_refresh"
) -> dict[str, Any]:
account_row = _require_owned_account(tracked_account_id, owner["id"])
profile_url = _first_non_empty(
account_row.get("canonical_profile_url"),
account_row.get("profile_url")
)
if not profile_url:
raise HTTPException(status_code=400, detail="Tracked account has no profile_url to refresh")
request = DouyinAccountSyncRequest(
profile_url=profile_url,
discovery_note=discovery_note
)
public_data = await _collect_public_profile(profile_url, None)
creator_data = await _collect_creator_center_pages([], "", [])
if not public_data.get("profile", {}).get("canonical_profile_url"):
public_data["profile"]["canonical_profile_url"] = profile_url
if public_data["errors"]:
raise HTTPException(
status_code=502,
detail={
"message": "刷新对标账号失败",
"public_errors": public_data["errors"],
"creator_errors": creator_data["errors"],
},
)
refreshed_account = _upsert_account(owner, public_data["profile"], request, public_data, creator_data)
return {
"account": _build_account_payload(refreshed_account, include_recent_videos=6),
"sync_errors": public_data["errors"] + creator_data["errors"],
"public_video_count": len(public_data.get("videos", [])),
"creator_page_count": len(creator_data.get("pages", [])),
}
def _build_workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
account_payload = _build_account_payload(account_row)
latest_public_snapshot = legacy.db.fetch_one(
@@ -1978,3 +2196,134 @@ def register_douyin_routes(app: Any, legacy: Any) -> None:
"relation_ids": linked_ids,
"links": _list_linked_accounts(account_row)
}
@app.get("/v2/douyin/tracking/accounts")
def list_douyin_tracked_accounts(
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
cursor = _get_tracking_cursor(account["id"])
return {
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
"items": _list_tracked_accounts(account["id"])
}
@app.post("/v2/douyin/tracking/accounts")
def create_douyin_tracked_account(
request: DouyinTrackedAccountRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
tracked_account = _require_owned_account(request.tracked_account_id, account["id"])
assistant = _load_owned_assistant(request.assistant_id, account["id"])
existing = legacy.db.fetch_one(
"SELECT * FROM douyin_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked_account["id"])
)
updated_at = now()
if existing:
legacy.db.execute(
"""
UPDATE douyin_tracked_accounts
SET assistant_id = ?, note = ?, updated_at = ?
WHERE id = ?
""",
((assistant or {}).get("id"), request.note.strip(), updated_at, existing["id"])
)
else:
legacy.db.execute(
"""
INSERT INTO douyin_tracked_accounts (
id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(make_id("dytrack"), account["id"], tracked_account["id"], (assistant or {}).get("id"), request.note.strip(), updated_at, updated_at)
)
return {
"tracked_account_id": tracked_account["id"],
"assistant_id": (assistant or {}).get("id", ""),
"items": _list_tracked_accounts(account["id"])
}
@app.post("/v2/douyin/tracking/accounts/{tracked_account_id}/refresh")
async def refresh_douyin_tracked_account(
tracked_account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(tracked_account_id, account["id"])
account_payload = _build_account_payload(account_row, include_recent_videos=6)
try:
refreshed = await _refresh_tracked_account_workspace(account, tracked_account_id)
return {
"success": True,
"tracked_account_id": tracked_account_id,
"account": refreshed.get("account", {}),
"sync_errors": refreshed.get("sync_errors", []),
"public_video_count": refreshed.get("public_video_count", 0),
"creator_page_count": refreshed.get("creator_page_count", 0)
}
except HTTPException as exc:
detail = exc.detail if isinstance(exc.detail, dict) else {"message": str(exc.detail)}
return {
"success": False,
"tracked_account_id": tracked_account_id,
"account": account_payload,
"message": detail.get("message") or str(exc.detail),
"detail": detail,
"sync_errors": detail.get("public_errors", []) + detail.get("creator_errors", [])
}
@app.post("/v2/douyin/tracking/refresh")
async def refresh_all_douyin_tracked_accounts(
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
tracked_accounts = _list_tracked_accounts(account["id"])
items: list[dict[str, Any]] = []
errors: list[dict[str, Any]] = []
for tracked in tracked_accounts:
try:
refreshed = await _refresh_tracked_account_workspace(account, tracked["tracked_account_id"])
items.append({
"tracking_id": tracked["id"],
"tracked_account_id": tracked["tracked_account_id"],
"nickname": (refreshed.get("account") or {}).get("nickname", ""),
"sync_errors": refreshed.get("sync_errors", []),
"public_video_count": refreshed.get("public_video_count", 0)
})
except HTTPException as exc:
errors.append({
"tracking_id": tracked["id"],
"tracked_account_id": tracked["tracked_account_id"],
"message": str(exc.detail)
})
except Exception as exc:
errors.append({
"tracking_id": tracked["id"],
"tracked_account_id": tracked["tracked_account_id"],
"message": str(exc)
})
return {
"tracked_count": len(tracked_accounts),
"refreshed": len(items),
"failed": len(errors),
"items": items,
"errors": errors
}
@app.post("/v2/douyin/tracking/cursor")
def update_douyin_tracking_cursor(
request: DouyinTrackingCursorRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
cursor = _set_tracking_cursor(account["id"], request.last_seen_at)
return {
"user_id": account["id"],
"last_seen_at": cursor["last_seen_at"],
"updated_at": cursor["updated_at"]
}
@app.get("/v2/douyin/tracking/digest")
def get_douyin_tracking_digest(
since: str | None = None,
limit: int = 24,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
return _build_tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)