Files
storyforge/collector-service/app/douyin_features.py

2610 lines
107 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import json
import re
from collections import Counter
from datetime import datetime, timedelta, timezone
from html import unescape
from typing import Any, Iterable
from urllib.parse import quote, unquote
import httpx
from fastapi import Depends, HTTPException
from pydantic import BaseModel, Field
DEFAULT_CREATOR_CENTER_URLS = [
"https://creator.douyin.com/creator-micro/home",
"https://creator.douyin.com/creator-micro/data",
"https://creator.douyin.com/creator-micro/content/manage"
]
DEFAULT_TIMEOUT = 20.0
MAX_HTML_SEARCH_BYTES = 2_000_000
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
)
class ManualPageCapture(BaseModel):
url: str = ""
title: str = ""
payload: dict[str, Any] = Field(default_factory=dict)
class DouyinAccountSyncRequest(BaseModel):
profile_url: str = ""
session_cookie: str = ""
creator_center_urls: list[str] = Field(default_factory=lambda: list(DEFAULT_CREATOR_CENTER_URLS))
manual_profile_payload: dict[str, Any] | None = None
manual_creator_pages: list[ManualPageCapture] = Field(default_factory=list)
manual_work_payloads: list[dict[str, Any]] = Field(default_factory=list)
discovery_note: str = ""
class DouyinAccountAnalysisRequest(BaseModel):
model_profile_ids: list[str] = Field(default_factory=list)
linked_account_ids: list[str] = Field(default_factory=list)
include_linked_accounts: bool = True
include_recent_similar_candidates: bool = True
max_videos: int = 12
extra_focus: str = ""
temperature: float = 0.35
auto_analyze_top_videos: bool = True
top_video_analysis_count: int = 6
class DouyinTopVideoAnalysisRequest(BaseModel):
model_profile_id: str | None = None
top_video_count: int = 6
min_score: float = 45.0
temperature: float = 0.25
class DouyinSimilarSearchRequest(BaseModel):
source_account_id: str | None = None
profile_url: str | None = None
candidate_urls: list[str] = Field(default_factory=list)
seed_linked_accounts: bool = True
search_public_pages: bool = True
model_profile_id: str | None = None
max_candidates: int = 10
extra_requirements: str = ""
class DouyinBenchmarkLinkRequest(BaseModel):
target_account_ids: list[str] = Field(default_factory=list)
target_profile_urls: list[str] = Field(default_factory=list)
relation_type: str = "benchmark"
note: str = ""
search_id: str = ""
class DouyinTrackedAccountRequest(BaseModel):
tracked_account_id: str = ""
assistant_id: str = ""
note: str = ""
class DouyinTrackingCursorRequest(BaseModel):
last_seen_at: str = ""
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def _safe_json_loads(value: str | None, fallback: Any) -> Any:
if not value:
return fallback
try:
return json.loads(value)
except Exception:
return fallback
def _first_non_empty(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
elif value not in ("", [], {}, ()):
return str(value)
return ""
def _dedupe_strings(values: Iterable[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
item = value.strip()
if not item:
continue
key = item.lower()
if key in seen:
continue
seen.add(key)
result.append(item)
return result
def _compact_text(value: Any, limit: int = 500) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return f"{text[: limit - 1]}"
def _parse_count(value: Any) -> float:
if value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value)
text = str(value).strip().lower().replace(",", "")
if not text:
return 0.0
multiplier = 1.0
if text.endswith("w") or text.endswith(""):
multiplier = 10_000.0
text = text[:-1]
elif text.endswith("亿"):
multiplier = 100_000_000.0
text = text[:-1]
text = text.replace("+", "")
match = re.search(r"-?\d+(?:\.\d+)?", text)
if not match:
return 0.0
try:
return float(match.group()) * multiplier
except ValueError:
return 0.0
def _normalize_timestamp(value: Any) -> str | None:
if value in (None, "", 0, "0"):
return None
if isinstance(value, str):
stripped = value.strip()
if not stripped:
return None
if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped):
return stripped
if stripped.isdigit():
value = int(stripped)
else:
return stripped
if isinstance(value, (int, float)):
ts = float(value)
if ts > 10_000_000_000:
ts /= 1000.0
try:
return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat()
except Exception:
return None
return None
def _extract_hashtags(*texts: str) -> list[str]:
tags: list[str] = []
for text in texts:
if not text:
continue
tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text))
return _dedupe_strings(tags)
def _extract_keywords(*texts: str) -> list[str]:
candidates: list[str] = []
for text in texts:
if not text:
continue
candidates.extend(_extract_hashtags(text))
candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text))
candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text))
stop_words = {
"视频",
"作品",
"抖音",
"账号",
"内容",
"发布",
"更多",
"关注",
"用户",
"douyin",
"profile"
}
filtered = [item for item in candidates if item.lower() not in stop_words]
return _dedupe_strings(filtered)
def _flatten_json(value: Any, prefix: str = "") -> list[tuple[str, str, str]]:
rows: list[tuple[str, str, str]] = []
if isinstance(value, dict):
for key, child in value.items():
next_prefix = f"{prefix}.{key}" if prefix else str(key)
rows.extend(_flatten_json(child, next_prefix))
elif isinstance(value, list):
for index, child in enumerate(value):
next_prefix = f"{prefix}[{index}]"
rows.extend(_flatten_json(child, next_prefix))
else:
field_type = type(value).__name__
rows.append((prefix or "$", field_type, _compact_text(value, 2000)))
return rows
def _walk_json(value: Any) -> Iterable[dict[str, Any]]:
if isinstance(value, dict):
yield value
for child in value.values():
yield from _walk_json(child)
elif isinstance(value, list):
for child in value:
yield from _walk_json(child)
def _extract_json_objects_from_text(text: str) -> list[Any]:
decoder = json.JSONDecoder()
objects: list[Any] = []
seen: set[str] = set()
if not text:
return objects
candidates = [text, unquote(text), unescape(text), unescape(unquote(text))]
for candidate in candidates:
snippet = candidate[:MAX_HTML_SEARCH_BYTES]
for match in re.finditer(r"[\{\[]", snippet):
try:
obj, _ = decoder.raw_decode(snippet[match.start() :])
except Exception:
continue
marker = _safe_json_dumps(obj)
if marker in seen:
continue
seen.add(marker)
objects.append(obj)
if len(objects) >= 50:
return objects
return objects
def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]:
blobs: list[dict[str, Any]] = []
seen: set[str] = set()
for attrs, content in re.findall(r"<script([^>]*)>(.*?)</script>", html, re.IGNORECASE | re.DOTALL):
script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE)
script_id = script_id_match.group(1) if script_id_match else ""
for obj in _extract_json_objects_from_text(content.strip()):
marker = _safe_json_dumps(obj)
if marker in seen:
continue
seen.add(marker)
blobs.append({
"script_id": script_id,
"payload": obj
})
return blobs
def _profile_candidate_score(value: dict[str, Any]) -> int:
score = 0
interesting_keys = {
"nickname",
"signature",
"sec_uid",
"secUid",
"uid",
"unique_id",
"short_id",
"aweme_count",
"following_count",
"follower_count",
"total_favorited"
}
score += sum(1 for key in interesting_keys if key in value)
if "author" in value and isinstance(value["author"], dict):
score += 2
return score
def _video_candidate_score(value: dict[str, Any]) -> int:
score = 0
if "statistics" in value and isinstance(value["statistics"], dict):
score += 3
if "aweme_id" in value or "item_id" in value:
score += 2
if "desc" in value or "title" in value:
score += 1
return score
def _extract_profile_candidates(payload: Any) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in _walk_json(payload):
if _profile_candidate_score(item) >= 3:
candidates.append(item)
if "author" in item and isinstance(item["author"], dict) and _profile_candidate_score(item["author"]) >= 3:
candidates.append(item["author"])
return candidates
def _extract_video_candidates(payload: Any) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in _walk_json(payload):
if _video_candidate_score(item) >= 3:
candidates.append(item)
return candidates
def _normalize_profile_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]:
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
avatar = candidate.get("avatar_medium") or candidate.get("avatar_thumb") or candidate.get("avatar_url")
if isinstance(avatar, dict):
avatar = _first_non_empty(
avatar.get("url_list", [""])[0] if isinstance(avatar.get("url_list"), list) else "",
avatar.get("url")
)
signature = _first_non_empty(
candidate.get("signature"),
candidate.get("desc"),
candidate.get("bio"),
candidate.get("description")
)
nickname = _first_non_empty(candidate.get("nickname"), candidate.get("name"), candidate.get("author_name"))
canonical_url = _first_non_empty(
candidate.get("share_url"),
candidate.get("profile_url"),
fallback_url
)
return {
"nickname": nickname,
"signature": signature,
"profile_url": canonical_url,
"canonical_profile_url": canonical_url,
"sec_uid": _first_non_empty(candidate.get("sec_uid"), candidate.get("secUid")),
"douyin_uid": _first_non_empty(candidate.get("uid")),
"douyin_id": _first_non_empty(candidate.get("unique_id"), candidate.get("short_id"), candidate.get("douyin_id")),
"avatar_url": _first_non_empty(avatar),
"stats": {
"followers": _parse_count(candidate.get("follower_count") or stats_source.get("follower_count")),
"following": _parse_count(candidate.get("following_count") or stats_source.get("following_count")),
"likes": _parse_count(candidate.get("total_favorited") or stats_source.get("total_favorited")),
"videos": _parse_count(candidate.get("aweme_count") or stats_source.get("aweme_count"))
},
"tags": _dedupe_strings(
_extract_hashtags(signature, nickname)
+ [str(tag) for tag in candidate.get("tags", []) if isinstance(tag, (str, int, float))]
),
"raw": candidate
}
def _pick_best_profile(candidates: list[dict[str, Any]], fallback_url: str = "") -> dict[str, Any]:
best: dict[str, Any] | None = None
best_score = -1
for candidate in candidates:
normalized = _normalize_profile_candidate(candidate, fallback_url=fallback_url)
score = 0
score += 4 if normalized["nickname"] else 0
score += 3 if normalized["sec_uid"] else 0
score += 2 if normalized["signature"] else 0
score += 1 if normalized["stats"]["followers"] else 0
if score > best_score:
best = normalized
best_score = score
return best or _normalize_profile_candidate({}, fallback_url=fallback_url)
def _normalize_video_candidate(candidate: dict[str, Any]) -> dict[str, Any]:
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
video_source = candidate.get("video") if isinstance(candidate.get("video"), dict) else {}
title = _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("share_title"))
description = _first_non_empty(candidate.get("desc"), candidate.get("title"), candidate.get("text"))
cover = candidate.get("cover") or video_source.get("cover")
if isinstance(cover, dict):
cover = _first_non_empty(
cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
cover.get("url")
)
return {
"aweme_id": _first_non_empty(candidate.get("aweme_id"), candidate.get("item_id"), candidate.get("group_id")),
"title": title,
"description": description,
"share_url": _first_non_empty(candidate.get("share_url")),
"cover_url": _first_non_empty(cover),
"duration_sec": float(candidate.get("duration") or video_source.get("duration") or 0) / 1000.0
if float(candidate.get("duration") or video_source.get("duration") or 0) > 1000
else float(candidate.get("duration") or video_source.get("duration") or 0),
"published_at": _normalize_timestamp(candidate.get("create_time") or candidate.get("publish_time")),
"tags": _extract_hashtags(title, description),
"stats": {
"play": _parse_count(stats_source.get("play_count") or candidate.get("play_count")),
"like": _parse_count(stats_source.get("digg_count") or candidate.get("digg_count")),
"comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")),
"share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")),
"collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count"))
},
"raw": candidate
}
def _extract_videos(payloads: Iterable[Any]) -> list[dict[str, Any]]:
videos: list[dict[str, Any]] = []
seen: set[str] = set()
for payload in payloads:
for candidate in _extract_video_candidates(payload):
normalized = _normalize_video_candidate(candidate)
dedupe_key = normalized["aweme_id"] or normalized["share_url"] or normalized["title"]
if not dedupe_key or dedupe_key in seen:
continue
seen.add(dedupe_key)
videos.append(normalized)
videos.sort(
key=lambda item: (
item["stats"]["play"] + item["stats"]["like"] + item["stats"]["comment"] * 4 + item["stats"]["share"] * 6
),
reverse=True
)
return videos
async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]:
headers = {
"User-Agent": DEFAULT_USER_AGENT,
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
}
if cookie.strip():
headers["Cookie"] = cookie.strip()
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
return str(response.url), response.text
async def _discover_profile_urls_from_search(keywords: list[str], limit: int = 8) -> list[str]:
urls: list[str] = []
seen: set[str] = set()
for keyword in keywords[:3]:
search_url = f"https://www.douyin.com/search/{quote(keyword)}?type=user"
try:
_, html = await _fetch_html(search_url)
except Exception:
continue
for match in re.findall(r'href=["\']([^"\']+/user/[^"\']+)["\']', html):
if match.startswith("/"):
match = f"https://www.douyin.com{match}"
cleaned = match.split("?")[0]
if cleaned in seen:
continue
seen.add(cleaned)
urls.append(cleaned)
if len(urls) >= limit:
return urls
return urls
def _summarize_videos(videos: list[dict[str, Any]], limit: int = 8) -> dict[str, Any]:
selected = videos[:limit]
if not selected:
return {
"count": 0,
"top_tags": [],
"avg_play": 0.0,
"avg_like": 0.0,
"avg_comment": 0.0,
"avg_share": 0.0,
"videos": []
}
count = len(selected)
avg_play = sum(item["stats"]["play"] for item in selected) / count
avg_like = sum(item["stats"]["like"] for item in selected) / count
avg_comment = sum(item["stats"]["comment"] for item in selected) / count
avg_share = sum(item["stats"]["share"] for item in selected) / count
tag_counter = Counter(tag for item in selected for tag in item.get("tags", []))
return {
"count": len(videos),
"top_tags": [tag for tag, _ in tag_counter.most_common(8)],
"avg_play": round(avg_play, 2),
"avg_like": round(avg_like, 2),
"avg_comment": round(avg_comment, 2),
"avg_share": round(avg_share, 2),
"videos": [
{
"aweme_id": item["aweme_id"],
"title": _compact_text(item["title"], 120),
"description": _compact_text(item["description"], 180),
"tags": item["tags"][:6],
"published_at": item["published_at"],
"stats": item["stats"]
}
for item in selected
]
}
def _jaccard(left: Iterable[str], right: Iterable[str]) -> float:
left_set = {item.strip().lower() for item in left if item.strip()}
right_set = {item.strip().lower() for item in right if item.strip()}
if not left_set and not right_set:
return 0.0
intersection = len(left_set & right_set)
union = len(left_set | right_set)
return intersection / union if union else 0.0
def _quality_score(account_payload: dict[str, Any]) -> float:
stats = account_payload.get("profile_stats", {})
followers = float(stats.get("followers") or 0)
video_summary = account_payload.get("video_summary", {})
avg_play = float(video_summary.get("avg_play") or 0)
avg_like = float(video_summary.get("avg_like") or 0)
avg_comment = float(video_summary.get("avg_comment") or 0)
avg_share = float(video_summary.get("avg_share") or 0)
base = followers / 10_000.0
engagement = avg_like / 1000.0 + avg_comment / 300.0 + avg_share / 200.0 + avg_play / 5000.0
return round(base + engagement, 3)
def _heuristic_similarity(source_payload: dict[str, Any], candidate_payload: dict[str, Any]) -> dict[str, Any]:
source_keywords = source_payload.get("keywords", [])
candidate_keywords = candidate_payload.get("keywords", [])
topic_overlap = _jaccard(source_keywords, candidate_keywords)
tag_overlap = _jaccard(
source_payload.get("video_summary", {}).get("top_tags", []),
candidate_payload.get("video_summary", {}).get("top_tags", [])
)
source_signature = source_payload.get("signature", "")
candidate_signature = candidate_payload.get("signature", "")
signature_overlap = _jaccard(_extract_keywords(source_signature), _extract_keywords(candidate_signature))
quality = _quality_score(candidate_payload)
score = round(topic_overlap * 55 + tag_overlap * 20 + signature_overlap * 10 + min(quality, 15), 2)
return {
"topic_overlap": round(topic_overlap, 3),
"tag_overlap": round(tag_overlap, 3),
"signature_overlap": round(signature_overlap, 3),
"quality_score": quality,
"heuristic_score": score
}
def _build_model_label(profile: dict[str, Any]) -> str:
return _first_non_empty(profile.get("name"), profile.get("model_name"), profile.get("base_url"))
def _try_parse_agent_json(text: str) -> Any:
stripped = text.strip()
if not stripped:
return {}
try:
return json.loads(stripped)
except Exception:
pass
objects = _extract_json_objects_from_text(stripped)
return objects[0] if objects else {}
def register_douyin_routes(app: Any, legacy: Any) -> None:
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def ensure_schema() -> None:
schema = """
CREATE TABLE IF NOT EXISTS douyin_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
profile_url TEXT NOT NULL DEFAULT '',
canonical_profile_url TEXT NOT NULL DEFAULT '',
sec_uid TEXT NOT NULL DEFAULT '',
douyin_uid TEXT NOT NULL DEFAULT '',
douyin_id TEXT NOT NULL DEFAULT '',
nickname TEXT NOT NULL DEFAULT '',
signature TEXT NOT NULL DEFAULT '',
avatar_url TEXT NOT NULL DEFAULT '',
tags_json TEXT NOT NULL DEFAULT '[]',
profile_stats_json TEXT NOT NULL DEFAULT '{}',
raw_profile_json TEXT NOT NULL DEFAULT '{}',
source_mode TEXT NOT NULL DEFAULT 'public',
sync_status TEXT NOT NULL DEFAULT 'pending',
last_public_sync_at TEXT,
last_creator_sync_at TEXT,
last_analysis_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_updated
ON douyin_accounts(user_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_sec_uid
ON douyin_accounts(user_id, sec_uid);
CREATE TABLE IF NOT EXISTS douyin_account_snapshots (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
snapshot_type TEXT NOT NULL,
source_url TEXT NOT NULL DEFAULT '',
raw_payload_json TEXT NOT NULL DEFAULT '{}',
summary_json TEXT NOT NULL DEFAULT '{}',
field_count INTEGER NOT NULL DEFAULT 0,
collected_at TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_snapshots_account_collected
ON douyin_account_snapshots(account_id, collected_at DESC);
CREATE TABLE IF NOT EXISTS douyin_snapshot_fields (
snapshot_id TEXT NOT NULL,
field_path TEXT NOT NULL,
field_type TEXT NOT NULL DEFAULT 'string',
field_value_text TEXT NOT NULL DEFAULT '',
PRIMARY KEY(snapshot_id, field_path),
FOREIGN KEY(snapshot_id) REFERENCES douyin_account_snapshots(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS douyin_videos (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
aweme_id TEXT NOT NULL DEFAULT '',
title TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
share_url TEXT NOT NULL DEFAULT '',
cover_url TEXT NOT NULL DEFAULT '',
duration_sec REAL NOT NULL DEFAULT 0,
published_at TEXT,
tags_json TEXT NOT NULL DEFAULT '[]',
stats_json TEXT NOT NULL DEFAULT '{}',
raw_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_updated
ON douyin_videos(account_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_aweme
ON douyin_videos(account_id, aweme_id);
CREATE TABLE IF NOT EXISTS douyin_analysis_reports (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
user_id TEXT NOT NULL,
focus_text TEXT NOT NULL DEFAULT '',
model_profile_ids_json TEXT NOT NULL DEFAULT '[]',
linked_account_ids_json TEXT NOT NULL DEFAULT '[]',
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_analysis_reports_account_created
ON douyin_analysis_reports(account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS douyin_analysis_suggestions (
id TEXT PRIMARY KEY,
report_id TEXT NOT NULL,
model_profile_id TEXT NOT NULL DEFAULT '',
model_label TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'ok',
suggestion_text TEXT NOT NULL DEFAULT '',
parsed_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(report_id) REFERENCES douyin_analysis_reports(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_analysis_suggestions_report
ON douyin_analysis_suggestions(report_id, created_at ASC);
CREATE TABLE IF NOT EXISTS douyin_similarity_searches (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT,
source_profile_url TEXT NOT NULL DEFAULT '',
keywords_json TEXT NOT NULL DEFAULT '[]',
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_similarity_searches_user_created
ON douyin_similarity_searches(user_id, created_at DESC);
CREATE TABLE IF NOT EXISTS douyin_similarity_candidates (
id TEXT PRIMARY KEY,
search_id TEXT NOT NULL,
candidate_account_id TEXT,
candidate_profile_url TEXT NOT NULL DEFAULT '',
heuristic_score REAL NOT NULL DEFAULT 0,
agent_score REAL NOT NULL DEFAULT 0,
rationale_text TEXT NOT NULL DEFAULT '',
dimensions_json TEXT NOT NULL DEFAULT '{}',
raw_output_json TEXT NOT NULL DEFAULT '{}',
rank_index INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
FOREIGN KEY(search_id) REFERENCES douyin_similarity_searches(id) ON DELETE CASCADE,
FOREIGN KEY(candidate_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_similarity_candidates_search_rank
ON douyin_similarity_candidates(search_id, rank_index ASC);
CREATE TABLE IF NOT EXISTS douyin_account_relations (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT NOT NULL,
target_account_id TEXT,
target_profile_url TEXT NOT NULL DEFAULT '',
relation_type TEXT NOT NULL DEFAULT 'benchmark',
note TEXT NOT NULL DEFAULT '',
search_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
FOREIGN KEY(target_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_account_relations_source
ON douyin_account_relations(source_account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS douyin_tracked_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
tracked_account_id TEXT NOT NULL,
assistant_id TEXT,
note TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, tracked_account_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(tracked_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_tracked_accounts_user_updated
ON douyin_tracked_accounts(user_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS douyin_tracking_cursors (
user_id TEXT PRIMARY KEY,
last_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
ensure_schema()
@app.on_event("startup")
def _startup_douyin_schema() -> None:
ensure_schema()
def _require_owned_account(account_id: str, user_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM douyin_accounts WHERE id = ? AND user_id = ?",
(account_id, user_id)
)
if not row:
raise HTTPException(status_code=404, detail="Douyin account not found")
return row
def _fetch_model_profiles(account_id: str) -> list[dict[str, Any]]:
return legacy.db.fetch_all(
"""
SELECT *
FROM model_profiles
WHERE owner_account_id IS NULL OR owner_account_id = ?
ORDER BY is_default DESC, created_at ASC
""",
(account_id,)
)
def _resolve_model_profiles(account: dict[str, Any], requested_ids: list[str]) -> list[dict[str, Any]]:
profiles = _fetch_model_profiles(account["id"])
if not profiles:
raise HTTPException(status_code=400, detail="No available model profiles")
if not requested_ids:
return profiles
profile_map = {row["id"]: row for row in profiles}
missing = [profile_id for profile_id in requested_ids if profile_id not in profile_map]
if missing:
raise HTTPException(status_code=404, detail=f"Unknown model profiles: {', '.join(missing)}")
return [profile_map[profile_id] for profile_id in requested_ids]
async def _collect_public_profile(profile_url: str, manual_payload: dict[str, Any] | None) -> dict[str, Any]:
source_url = profile_url.strip()
blobs: list[dict[str, Any]] = []
errors: list[str] = []
if manual_payload:
blobs.append({"script_id": "manual_profile_payload", "payload": manual_payload})
if source_url:
try:
final_url, html = await _fetch_html(source_url)
source_url = final_url
blobs.extend(_extract_json_blobs_from_html(html))
except Exception as exc:
errors.append(f"public_profile_fetch_failed: {exc}")
payloads = [item["payload"] for item in blobs]
profile = _pick_best_profile(
[candidate for payload in payloads for candidate in _extract_profile_candidates(payload)],
fallback_url=source_url
)
videos = _extract_videos(payloads)
return {
"profile": profile,
"videos": videos,
"raw_pages": blobs,
"errors": errors,
"source_url": source_url
}
async def _collect_creator_center_pages(
urls: list[str],
cookie: str,
manual_pages: list[ManualPageCapture]
) -> dict[str, Any]:
pages: list[dict[str, Any]] = []
errors: list[str] = []
for page in manual_pages:
pages.append({
"url": page.url,
"title": page.title,
"blobs": [{"script_id": "manual_creator_payload", "payload": page.payload}]
})
if cookie.strip():
for url in urls:
try:
final_url, html = await _fetch_html(url, cookie=cookie)
pages.append({
"url": final_url,
"title": "",
"blobs": _extract_json_blobs_from_html(html)
})
except Exception as exc:
errors.append(f"creator_center_fetch_failed[{url}]: {exc}")
return {"pages": pages, "errors": errors}
def _upsert_account(
owner: dict[str, Any],
profile: dict[str, Any],
sync_request: DouyinAccountSyncRequest,
public_data: dict[str, Any],
creator_data: dict[str, Any]
) -> dict[str, Any]:
lookup_candidates = [
("sec_uid", profile.get("sec_uid", "")),
("douyin_id", profile.get("douyin_id", "")),
("canonical_profile_url", profile.get("canonical_profile_url", ""))
]
existing: dict[str, Any] | None = None
for field_name, field_value in lookup_candidates:
if not field_value:
continue
existing = legacy.db.fetch_one(
f"SELECT * FROM douyin_accounts WHERE user_id = ? AND {field_name} = ? LIMIT 1",
(owner["id"], field_value)
)
if existing:
break
account_id = existing["id"] if existing else make_id("dyacct")
created_at = existing["created_at"] if existing else now()
updated_at = now()
tags = _dedupe_strings(profile.get("tags", []) + _extract_keywords(profile.get("nickname", ""), profile.get("signature", "")))
profile_stats = profile.get("stats", {})
source_mode = "creator_center" if creator_data["pages"] else "public"
sync_status = "partial" if public_data["errors"] or creator_data["errors"] else "ready"
if existing:
legacy.db.execute(
"""
UPDATE douyin_accounts
SET profile_url = ?, canonical_profile_url = ?, sec_uid = ?, douyin_uid = ?, douyin_id = ?,
nickname = ?, signature = ?, avatar_url = ?, tags_json = ?, profile_stats_json = ?,
raw_profile_json = ?, source_mode = ?, sync_status = ?, last_public_sync_at = ?,
last_creator_sync_at = ?, updated_at = ?
WHERE id = ?
""",
(
profile.get("profile_url", ""),
profile.get("canonical_profile_url", ""),
profile.get("sec_uid", ""),
profile.get("douyin_uid", ""),
profile.get("douyin_id", ""),
profile.get("nickname", ""),
profile.get("signature", ""),
profile.get("avatar_url", ""),
_safe_json_dumps(tags),
_safe_json_dumps(profile_stats),
_safe_json_dumps({
"profile": profile.get("raw", {}),
"discovery_note": sync_request.discovery_note
}),
source_mode,
sync_status,
now() if public_data["raw_pages"] else existing.get("last_public_sync_at"),
now() if creator_data["pages"] else existing.get("last_creator_sync_at"),
updated_at,
account_id
)
)
else:
legacy.db.execute(
"""
INSERT INTO douyin_accounts (
id, user_id, profile_url, canonical_profile_url, sec_uid, douyin_uid, douyin_id,
nickname, signature, avatar_url, tags_json, profile_stats_json, raw_profile_json,
source_mode, sync_status, last_public_sync_at, last_creator_sync_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
owner["id"],
profile.get("profile_url", ""),
profile.get("canonical_profile_url", ""),
profile.get("sec_uid", ""),
profile.get("douyin_uid", ""),
profile.get("douyin_id", ""),
profile.get("nickname", ""),
profile.get("signature", ""),
profile.get("avatar_url", ""),
_safe_json_dumps(tags),
_safe_json_dumps(profile_stats),
_safe_json_dumps({
"profile": profile.get("raw", {}),
"discovery_note": sync_request.discovery_note
}),
source_mode,
sync_status,
now() if public_data["raw_pages"] else None,
now() if creator_data["pages"] else None,
created_at,
updated_at
)
)
account_row = _require_owned_account(account_id, owner["id"])
_persist_snapshots_and_videos(account_row, public_data, creator_data, sync_request)
return _require_owned_account(account_id, owner["id"])
def _persist_snapshot(
account_row: dict[str, Any],
snapshot_type: str,
source_url: str,
payload: Any,
summary: dict[str, Any]
) -> str:
snapshot_id = make_id("dysnap")
collected_at = now()
fields = _flatten_json(payload)
legacy.db.execute(
"""
INSERT INTO douyin_account_snapshots (
id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
field_count, collected_at, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
snapshot_id,
account_row["id"],
snapshot_type,
source_url,
_safe_json_dumps(payload),
_safe_json_dumps(summary),
len(fields),
collected_at,
collected_at
)
)
for field_path, field_type, field_value in fields:
legacy.db.execute(
"""
INSERT OR REPLACE INTO douyin_snapshot_fields (
snapshot_id, field_path, field_type, field_value_text
) VALUES (?, ?, ?, ?)
""",
(snapshot_id, field_path, field_type, field_value)
)
return snapshot_id
def _persist_snapshots_and_videos(
account_row: dict[str, Any],
public_data: dict[str, Any],
creator_data: dict[str, Any],
sync_request: DouyinAccountSyncRequest
) -> None:
if public_data["raw_pages"]:
public_payload = {
"pages": public_data["raw_pages"],
"errors": public_data["errors"],
"source_url": public_data["source_url"]
}
_persist_snapshot(
account_row,
"public_profile",
public_data["source_url"],
public_payload,
{
"video_count": len(public_data["videos"]),
"nickname": public_data["profile"].get("nickname", ""),
"tags": public_data["profile"].get("tags", [])
}
)
for page in creator_data["pages"]:
payload = {
"title": page["title"],
"blobs": page["blobs"]
}
_persist_snapshot(
account_row,
"creator_center",
page["url"],
payload,
{
"blob_count": len(page["blobs"]),
"field_count": len(_flatten_json(payload))
}
)
for manual_video in sync_request.manual_work_payloads:
normalized = _normalize_video_candidate(manual_video)
public_data["videos"].append(normalized)
deduped: dict[str, dict[str, Any]] = {}
for video in public_data["videos"]:
key = video["aweme_id"] or video["share_url"] or video["title"]
if key and key not in deduped:
deduped[key] = video
for video in deduped.values():
existing = None
if video["aweme_id"]:
existing = legacy.db.fetch_one(
"SELECT id FROM douyin_videos WHERE account_id = ? AND aweme_id = ? LIMIT 1",
(account_row["id"], video["aweme_id"])
)
video_id = existing["id"] if existing else make_id("dyvideo")
created_at = now()
if existing:
legacy.db.execute(
"""
UPDATE douyin_videos
SET title = ?, description = ?, share_url = ?, cover_url = ?, duration_sec = ?,
published_at = ?, tags_json = ?, stats_json = ?, raw_json = ?, updated_at = ?
WHERE id = ?
""",
(
video["title"],
video["description"],
video["share_url"],
video["cover_url"],
video["duration_sec"],
video["published_at"],
_safe_json_dumps(video["tags"]),
_safe_json_dumps(video["stats"]),
_safe_json_dumps(video["raw"]),
now(),
video_id
)
)
else:
legacy.db.execute(
"""
INSERT INTO douyin_videos (
id, account_id, aweme_id, title, description, share_url, cover_url,
duration_sec, published_at, tags_json, stats_json, raw_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
video_id,
account_row["id"],
video["aweme_id"],
video["title"],
video["description"],
video["share_url"],
video["cover_url"],
video["duration_sec"],
video["published_at"],
_safe_json_dumps(video["tags"]),
_safe_json_dumps(video["stats"]),
_safe_json_dumps(video["raw"]),
created_at,
created_at
)
)
def _list_videos(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_videos
WHERE account_id = ?
ORDER BY COALESCE(published_at, updated_at) DESC, updated_at DESC
LIMIT ?
""",
(account_id, limit)
)
payloads: list[dict[str, Any]] = []
for row in rows:
payloads.append({
"id": row["id"],
"aweme_id": row["aweme_id"],
"title": row["title"],
"description": row["description"],
"share_url": row["share_url"],
"cover_url": row["cover_url"],
"duration_sec": row["duration_sec"],
"published_at": row["published_at"],
"tags": _safe_json_loads(row["tags_json"], []),
"stats": _safe_json_loads(row["stats_json"], {}),
"raw": _safe_json_loads(row["raw_json"], {})
})
return payloads
def _build_account_payload(account_row: dict[str, Any], include_recent_videos: int = 8) -> dict[str, Any]:
videos = _list_videos(account_row["id"], limit=max(include_recent_videos, 12))
tags = _safe_json_loads(account_row["tags_json"], [])
profile_stats = _safe_json_loads(account_row["profile_stats_json"], {})
video_summary = _summarize_videos(videos, limit=include_recent_videos)
keywords = _dedupe_strings(
tags
+ _extract_keywords(account_row["nickname"], account_row["signature"])
+ video_summary["top_tags"]
+ [video["title"] for video in video_summary["videos"]]
)
return {
"id": account_row["id"],
"nickname": account_row["nickname"],
"signature": account_row["signature"],
"profile_url": account_row["canonical_profile_url"] or account_row["profile_url"],
"avatar_url": account_row["avatar_url"],
"sec_uid": account_row["sec_uid"],
"douyin_id": account_row["douyin_id"],
"profile_stats": profile_stats,
"tags": tags,
"keywords": keywords[:18],
"sync_status": account_row["sync_status"],
"video_summary": video_summary
}
def _video_content_type(video: dict[str, Any]) -> str:
raw = video.get("raw") if isinstance(video.get("raw"), dict) else {}
if raw.get("images") or raw.get("image_infos") or raw.get("is_multi_content"):
return "image_text"
return "video"
def _video_performance_score(video: dict[str, Any]) -> float:
stats = video.get("stats") if isinstance(video.get("stats"), dict) else {}
play = float(stats.get("play") or 0)
like = float(stats.get("like") or 0)
comment = float(stats.get("comment") or 0)
share = float(stats.get("share") or 0)
collect = float(stats.get("collect") or 0)
score = (
min(play / 10000.0, 6.0) * 8.0
+ min(like / 1000.0, 6.0) * 7.0
+ min(comment / 200.0, 6.0) * 4.0
+ min(share / 100.0, 6.0) * 4.0
+ min(collect / 100.0, 6.0) * 3.0
)
return round(min(100.0, score), 1)
def _workspace_video_payload(video: dict[str, Any]) -> dict[str, Any]:
tags = video.get("tags") if isinstance(video.get("tags"), list) else []
return {
"id": video.get("id") or video.get("aweme_id") or "",
"aweme_id": video.get("aweme_id") or "",
"title": video.get("title") or video.get("description") or "未命名作品",
"description": video.get("description") or video.get("title") or "",
"share_url": video.get("share_url") or "",
"cover_url": video.get("cover_url") or "",
"duration_sec": video.get("duration_sec") or 0,
"published_at": video.get("published_at") or "",
"tags": tags,
"stats": video.get("stats") if isinstance(video.get("stats"), dict) else {},
"content_type": _video_content_type(video),
"score": {
"performance_score": _video_performance_score(video)
}
}
def _video_sort_key(video: dict[str, Any], sort_by: str) -> tuple[Any, ...]:
stats = video.get("stats") if isinstance(video.get("stats"), dict) else {}
normalized = (sort_by or "score").strip().lower()
if normalized == "latest":
return (video.get("published_at") or "", video.get("id") or "")
if normalized == "play":
return (float(stats.get("play") or 0), video.get("published_at") or "")
if normalized == "like":
return (float(stats.get("like") or 0), video.get("published_at") or "")
if normalized == "comment":
return (float(stats.get("comment") or 0), video.get("published_at") or "")
return (float(video.get("score", {}).get("performance_score") or 0), video.get("published_at") or "")
def _list_linked_accounts(account_row: dict[str, Any]) -> list[dict[str, Any]]:
relation_rows = legacy.db.fetch_all(
"""
SELECT rel.*, target.nickname AS target_nickname, target.signature AS target_signature,
target.canonical_profile_url AS target_canonical_profile_url, target.profile_stats_json AS target_profile_stats_json,
target.tags_json AS target_tags_json
FROM douyin_account_relations rel
LEFT JOIN douyin_accounts target ON target.id = rel.target_account_id
WHERE rel.source_account_id = ?
ORDER BY rel.created_at DESC
""",
(account_row["id"],)
)
payloads: list[dict[str, Any]] = []
for row in relation_rows:
payloads.append({
"relation_id": row["id"],
"relation_type": row["relation_type"],
"note": row["note"],
"search_id": row["search_id"],
"created_at": row["created_at"],
"target_account_id": row["target_account_id"],
"target_profile_url": row["target_profile_url"] or row.get("target_canonical_profile_url", ""),
"target_nickname": row.get("target_nickname", ""),
"target_signature": row.get("target_signature", ""),
"target_profile_stats": _safe_json_loads(row.get("target_profile_stats_json"), {}),
"target_tags": _safe_json_loads(row.get("target_tags_json"), [])
})
return payloads
def _load_owned_assistant(assistant_id: str, user_id: str) -> dict[str, Any] | None:
if not str(assistant_id or "").strip():
return None
row = legacy.db.fetch_one(
"SELECT * FROM assistants WHERE id = ? AND user_id = ?",
(assistant_id, user_id)
)
if not row:
raise HTTPException(status_code=404, detail="Assistant not found")
return row
def _parse_iso_datetime(value: str | None) -> datetime | None:
text = str(value or "").strip()
if not text:
return None
try:
normalized = text.replace("Z", "+00:00")
parsed = datetime.fromisoformat(normalized)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
except Exception:
return None
def _get_tracking_cursor(user_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM douyin_tracking_cursors WHERE user_id = ?",
(user_id,)
)
def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]:
existing = _get_tracking_cursor(user_id)
timestamp = _first_non_empty(last_seen_at, now())
updated_at = now()
if existing:
legacy.db.execute(
"UPDATE douyin_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?",
(timestamp, updated_at, user_id)
)
else:
legacy.db.execute(
"INSERT INTO douyin_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)",
(user_id, timestamp, updated_at)
)
return legacy.db.fetch_one("SELECT * FROM douyin_tracking_cursors WHERE user_id = ?", (user_id,))
def _list_tracked_accounts(user_id: str) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT track.*,
assistant.name AS assistant_name
FROM douyin_tracked_accounts track
LEFT JOIN assistants assistant ON assistant.id = track.assistant_id
WHERE track.user_id = ?
ORDER BY track.updated_at DESC
""",
(user_id,)
)
payloads: list[dict[str, Any]] = []
for row in rows:
account_row = _require_owned_account(row["tracked_account_id"], user_id)
account_payload = _build_account_payload(account_row, include_recent_videos=6)
payloads.append({
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"assistant_name": row.get("assistant_name", "") or "",
"note": row.get("note", "") or "",
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"account": account_payload
})
return payloads
def _extract_tracking_borrowing_points(video: dict[str, Any]) -> list[str]:
stats = video.get("stats", {}) or {}
tags = video.get("tags", []) or []
candidates: list[str] = []
play_count = int(stats.get("play") or 0)
like_count = int(stats.get("like") or 0)
comment_count = int(stats.get("comment") or 0)
share_count = int(stats.get("share") or 0)
if like_count >= 100:
candidates.append("点赞明显更高,适合借标题切口和开头表达。")
if comment_count >= 20:
candidates.append("评论互动活跃,可借提问句和争议点设计。")
if share_count >= 10:
candidates.append("分享意愿较强,可借观点浓度和传播句式。")
if play_count >= 5000:
candidates.append("播放信号较强,值得拆成同题材复用模板。")
if tags:
candidates.append(f"标签集中在 {', '.join(tags[:3])},适合做系列化选题。")
deduped: list[str] = []
seen: set[str] = set()
for item in candidates:
normalized = _compact_text(item, 80)
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped.append(normalized)
return deduped[:4]
def _build_tracking_digest_item(tracked_item: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
stats = video.get("stats", {}) or {}
summary = video.get("description") or video.get("title") or "暂无摘要"
borrowing_points = _extract_tracking_borrowing_points(video)
high_value = int(stats.get("like") or 0) >= 100 or int(stats.get("play") or 0) >= 5000 or bool(borrowing_points)
return {
"tracking_id": tracked_item["id"],
"platform": "douyin",
"tracked_account_id": tracked_item["tracked_account_id"],
"tracked_account_name": tracked_item["account"]["nickname"],
"assistant_id": tracked_item["assistant_id"],
"assistant_name": tracked_item["assistant_name"],
"note": tracked_item.get("note", ""),
"account": tracked_item["account"],
"video": video,
"summary": _compact_text(summary, 160),
"summary_text": _compact_text(summary, 160),
"borrowing_points": borrowing_points,
"is_high_value": high_value,
"created_at": video.get("published_at") or now(),
}
def _build_tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]:
tracked_accounts = _list_tracked_accounts(user_id)
cursor = _get_tracking_cursor(user_id)
since_dt = _parse_iso_datetime(since_value) if since_value else None
if since_dt is None and cursor:
since_dt = _parse_iso_datetime(cursor.get("last_seen_at"))
if since_dt is None:
since_dt = (datetime.now(timezone.utc) - timedelta(days=3)).replace(microsecond=0)
items: list[dict[str, Any]] = []
for tracked in tracked_accounts:
account_payload = tracked.get("account", {}) or {}
for video in account_payload.get("video_summary", {}).get("videos", []):
published_at = _parse_iso_datetime(video.get("published_at"))
if published_at is None or published_at <= since_dt:
continue
items.append(_build_tracking_digest_item(tracked, video))
items.sort(
key=lambda item: _parse_iso_datetime(item["video"].get("published_at")) or datetime.fromtimestamp(0, tz=timezone.utc),
reverse=True
)
return {
"generated_at": now(),
"since": since_dt.isoformat(),
"tracked_accounts": tracked_accounts,
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
"items": items[: max(1, min(limit, 100))]
}
async def _refresh_tracked_account_workspace(
owner: dict[str, Any],
tracked_account_id: str,
discovery_note: str = "tracking_refresh"
) -> dict[str, Any]:
account_row = _require_owned_account(tracked_account_id, owner["id"])
profile_url = _first_non_empty(
account_row.get("canonical_profile_url"),
account_row.get("profile_url")
)
if not profile_url:
raise HTTPException(status_code=400, detail="Tracked account has no profile_url to refresh")
request = DouyinAccountSyncRequest(
profile_url=profile_url,
discovery_note=discovery_note
)
public_data = await _collect_public_profile(profile_url, None)
creator_data = await _collect_creator_center_pages([], "", [])
if not public_data.get("profile", {}).get("canonical_profile_url"):
public_data["profile"]["canonical_profile_url"] = profile_url
if public_data["errors"]:
raise HTTPException(
status_code=502,
detail={
"message": "刷新对标账号失败",
"public_errors": public_data["errors"],
"creator_errors": creator_data["errors"],
},
)
refreshed_account = _upsert_account(owner, public_data["profile"], request, public_data, creator_data)
return {
"account": _build_account_payload(refreshed_account, include_recent_videos=6),
"sync_errors": public_data["errors"] + creator_data["errors"],
"public_video_count": len(public_data.get("videos", [])),
"creator_page_count": len(creator_data.get("pages", [])),
}
def _build_workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
account_payload = _build_account_payload(account_row)
latest_public_snapshot = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_account_snapshots
WHERE account_id = ? AND snapshot_type = 'public_profile'
ORDER BY collected_at DESC
LIMIT 1
""",
(account_row["id"],)
)
latest_creator_snapshot = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_account_snapshots
WHERE account_id = ? AND snapshot_type = 'creator_center'
ORDER BY collected_at DESC
LIMIT 1
""",
(account_row["id"],)
)
reports = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_analysis_reports
WHERE account_id = ?
ORDER BY created_at DESC
LIMIT 5
""",
(account_row["id"],)
)
report_payloads = []
for report in reports:
suggestions = legacy.db.fetch_all(
"SELECT * FROM douyin_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
(report["id"],)
)
report_payloads.append({
"id": report["id"],
"focus_text": report["focus_text"],
"model_profile_ids": _safe_json_loads(report["model_profile_ids_json"], []),
"linked_account_ids": _safe_json_loads(report["linked_account_ids_json"], []),
"created_at": report["created_at"],
"suggestions": [
{
"id": suggestion["id"],
"model_profile_id": suggestion["model_profile_id"],
"model_label": suggestion["model_label"],
"status": suggestion["status"],
"suggestion_text": suggestion["suggestion_text"],
"parsed_json": _safe_json_loads(suggestion["parsed_json"], {})
}
for suggestion in suggestions
]
})
recent_searches = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_similarity_searches
WHERE source_account_id = ?
ORDER BY created_at DESC
LIMIT 5
""",
(account_row["id"],)
)
return {
"account": account_payload,
"latest_public_snapshot": {
"id": latest_public_snapshot["id"],
"source_url": latest_public_snapshot["source_url"],
"field_count": latest_public_snapshot["field_count"],
"collected_at": latest_public_snapshot["collected_at"],
"summary": _safe_json_loads(latest_public_snapshot["summary_json"], {})
} if latest_public_snapshot else None,
"latest_creator_snapshot": {
"id": latest_creator_snapshot["id"],
"source_url": latest_creator_snapshot["source_url"],
"field_count": latest_creator_snapshot["field_count"],
"collected_at": latest_creator_snapshot["collected_at"],
"summary": _safe_json_loads(latest_creator_snapshot["summary_json"], {})
} if latest_creator_snapshot else None,
"linked_accounts": _list_linked_accounts(account_row),
"recent_reports": report_payloads,
"recent_similarity_searches": [
{
"id": row["id"],
"keywords": _safe_json_loads(row["keywords_json"], []),
"created_at": row["created_at"]
}
for row in recent_searches
],
"available_model_profiles": [
{
"id": row["id"],
"name": row["name"],
"model_name": row["model_name"],
"base_url": row["base_url"],
"is_default": bool(row["is_default"])
}
for row in _fetch_model_profiles(account_row["user_id"])
]
}
def _list_snapshots(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_account_snapshots
WHERE account_id = ?
ORDER BY collected_at DESC
LIMIT ?
""",
(account_id, limit)
)
return [
{
"id": row["id"],
"snapshot_type": row["snapshot_type"],
"source_url": row["source_url"],
"field_count": row["field_count"],
"collected_at": row["collected_at"],
"summary": _safe_json_loads(row["summary_json"], {})
}
for row in rows
]
def _get_snapshot_detail(snapshot_id: str, account_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_account_snapshots
WHERE id = ? AND account_id = ?
LIMIT 1
""",
(snapshot_id, account_id)
)
if not row:
raise HTTPException(status_code=404, detail="Snapshot not found")
fields = legacy.db.fetch_all(
"""
SELECT field_path, field_type, field_value_text
FROM douyin_snapshot_fields
WHERE snapshot_id = ?
ORDER BY field_path ASC
""",
(snapshot_id,)
)
return {
"id": row["id"],
"snapshot_type": row["snapshot_type"],
"source_url": row["source_url"],
"field_count": row["field_count"],
"collected_at": row["collected_at"],
"summary": _safe_json_loads(row["summary_json"], {}),
"raw_payload": _safe_json_loads(row["raw_payload_json"], {}),
"fields": fields
}
async def _run_account_analysis(
account_row: dict[str, Any],
owner: dict[str, Any],
request: DouyinAccountAnalysisRequest
) -> dict[str, Any]:
target_payload = _build_account_payload(account_row, include_recent_videos=max(4, min(request.max_videos, 12)))
linked_rows = _list_linked_accounts(account_row)
linked_account_ids = list(request.linked_account_ids)
if request.include_linked_accounts:
linked_account_ids.extend(
row["target_account_id"] for row in linked_rows if row.get("target_account_id")
)
linked_account_ids = _dedupe_strings(linked_account_ids)
benchmark_payloads: list[dict[str, Any]] = []
for linked_account_id in linked_account_ids:
linked_row = _require_owned_account(linked_account_id, owner["id"])
benchmark_payloads.append(_build_account_payload(linked_row, include_recent_videos=6))
if request.include_recent_similar_candidates and not benchmark_payloads:
latest_search = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_similarity_searches
WHERE source_account_id = ?
ORDER BY created_at DESC
LIMIT 1
""",
(account_row["id"],)
)
if latest_search:
candidate_rows = legacy.db.fetch_all(
"""
SELECT cand.*, acct.user_id AS account_user_id
FROM douyin_similarity_candidates cand
LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
WHERE cand.search_id = ?
ORDER BY cand.rank_index ASC
LIMIT 3
""",
(latest_search["id"],)
)
for candidate_row in candidate_rows:
candidate_account_id = candidate_row.get("candidate_account_id")
if not candidate_account_id:
continue
linked_candidate = _require_owned_account(candidate_account_id, owner["id"])
benchmark_payloads.append(_build_account_payload(linked_candidate, include_recent_videos=6))
profiles = _resolve_model_profiles(owner, request.model_profile_ids)
system_prompt = (
"你是资深抖音增长顾问。你会基于账号画像、创作者中心字段、作品表现和对标账号内容,"
"给出可执行的优化建议。请始终返回 JSON 对象,包含这些字段:"
"summary、strengths、weaknesses、benchmark_insights、content_plan、"
"growth_actions、deep_search_hypotheses。每个数组字段请给出 3-6 条中文建议。"
)
analysis_context = {
"target_account": target_payload,
"benchmark_accounts": benchmark_payloads[:6],
"focus": request.extra_focus,
"creator_center_snapshot_summary": _safe_json_loads(
(legacy.db.fetch_one(
"""
SELECT summary_json
FROM douyin_account_snapshots
WHERE account_id = ? AND snapshot_type = 'creator_center'
ORDER BY collected_at DESC
LIMIT 1
""",
(account_row["id"],)
) or {}).get("summary_json"),
{}
)
}
user_prompt = (
"请分析以下抖音账号,并分别给出内容方向、选题结构、互动增长、账号定位和对标拆解建议。"
"如果提供了对标账号,要重点指出可借鉴但不应直接照搬的部分。"
f"\n\n输入上下文:\n{json.dumps(analysis_context, ensure_ascii=False, indent=2)}"
)
report_id = make_id("dyreport")
created_at = now()
legacy.db.execute(
"""
INSERT INTO douyin_analysis_reports (
id, account_id, user_id, focus_text, model_profile_ids_json,
linked_account_ids_json, prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
report_id,
account_row["id"],
owner["id"],
request.extra_focus,
_safe_json_dumps([profile["id"] for profile in profiles]),
_safe_json_dumps(linked_account_ids),
user_prompt,
_safe_json_dumps(analysis_context),
created_at
)
)
async def _analyze_with_model(profile: dict[str, Any]) -> dict[str, Any]:
try:
output = await legacy.call_model(
profile,
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=request.temperature
)
parsed = _try_parse_agent_json(output)
status = "ok"
except Exception as exc:
output = str(exc)
parsed = {}
status = "error"
suggestion_id = make_id("dysady")
legacy.db.execute(
"""
INSERT INTO douyin_analysis_suggestions (
id, report_id, model_profile_id, model_label, status,
suggestion_text, parsed_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
suggestion_id,
report_id,
profile["id"],
_build_model_label(profile),
status,
output,
_safe_json_dumps(parsed),
now()
)
)
return {
"id": suggestion_id,
"model_profile_id": profile["id"],
"model_label": _build_model_label(profile),
"status": status,
"suggestion_text": output,
"parsed_json": parsed
}
suggestions = await asyncio.gather(*[_analyze_with_model(profile) for profile in profiles])
top_video_analyses: list[dict[str, Any]] = []
if request.auto_analyze_top_videos and profiles:
top_video_analyses = await _run_top_video_analyses(
account_row,
owner,
profiles[0],
top_video_count=request.top_video_analysis_count,
min_score=45.0,
source_type="account_analysis_followup",
temperature=min(max(request.temperature, 0.1), 0.4)
)
legacy.db.execute(
"UPDATE douyin_accounts SET last_analysis_at = ?, updated_at = ? WHERE id = ?",
(now(), now(), account_row["id"])
)
return {
"report_id": report_id,
"created_at": created_at,
"context": analysis_context,
"suggestions": suggestions,
"top_video_analyses": top_video_analyses
}
async def _run_top_video_analyses(
account_row: dict[str, Any],
owner: dict[str, Any],
profile: dict[str, Any],
*,
top_video_count: int = 6,
min_score: float = 45.0,
source_type: str = "top_score_auto",
temperature: float = 0.25
) -> list[dict[str, Any]]:
raw_videos = _list_videos(account_row["id"], limit=max(top_video_count * 3, 24))
ranked_videos = [
video for video in (_workspace_video_payload(item) for item in raw_videos)
if float(video.get("score", {}).get("performance_score") or 0) >= float(min_score)
]
ranked_videos.sort(key=lambda item: _video_sort_key(item, "score"), reverse=True)
ranked_videos = ranked_videos[: max(1, min(top_video_count, 12))]
if not ranked_videos:
return []
account_payload = _build_account_payload(account_row, include_recent_videos=8)
system_prompt = (
"你是商业化短视频拆解顾问。你要针对单条作品给出可用于商业化运营的复盘。"
"请返回严格 JSON 对象字段包含headline_summary、hook_breakdown、"
"structure_breakdown、commercial_angle、replication_plan、operator_actions、"
"risk_notes、scores。scores 里包含 hook、retention、conversion、commercial范围 0-100。"
)
async def _analyze_video(video: dict[str, Any]) -> dict[str, Any]:
prompt_context = {
"account": {
"id": account_payload["id"],
"nickname": account_payload["nickname"],
"signature": account_payload["signature"],
"tags": account_payload["tags"][:12]
},
"video": {
"id": video["id"],
"aweme_id": video["aweme_id"],
"title": video["title"],
"description": video["description"],
"published_at": video["published_at"],
"tags": video["tags"],
"stats": video["stats"],
"score": video["score"]
}
}
user_prompt = (
"请从商业化运营视角拆解这条作品。重点回答:为什么值得关注、"
"适合承接什么产品或服务、下一步怎么复刻、运营动作怎么排。"
f"\n\n输入上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
)
try:
output = await legacy.call_model(
profile,
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature
)
parsed = _try_parse_agent_json(output)
status = "ok"
except Exception as exc:
output = str(exc)
parsed = {}
status = "error"
if not isinstance(parsed, dict):
parsed = {}
summary = _first_non_empty(
parsed.get("headline_summary"),
parsed.get("summary"),
parsed.get("commercial_angle"),
output
)
return {
"id": make_id("dyva"),
"account_id": account_row["id"],
"video_id": video["id"],
"model_profile_id": profile["id"],
"model_label": _build_model_label(profile),
"source_type": source_type,
"status": status,
"summary": summary,
"analysis_json": parsed,
"video": video,
"created_at": now()
}
return await asyncio.gather(*[_analyze_video(video) for video in ranked_videos])
async def _prepare_similarity_source(
owner: dict[str, Any],
request: DouyinSimilarSearchRequest
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
if request.source_account_id:
account_row = _require_owned_account(request.source_account_id, owner["id"])
return account_row, _build_account_payload(account_row)
if not (request.profile_url or "").strip():
raise HTTPException(status_code=400, detail="source_account_id or profile_url is required")
public_data = await _collect_public_profile(request.profile_url or "", None)
if not public_data["profile"].get("nickname") and not public_data["videos"]:
raise HTTPException(status_code=400, detail="Unable to parse the shared Douyin profile page")
payload = {
"id": "",
"nickname": public_data["profile"].get("nickname", ""),
"signature": public_data["profile"].get("signature", ""),
"profile_url": public_data["profile"].get("canonical_profile_url", "") or request.profile_url,
"avatar_url": public_data["profile"].get("avatar_url", ""),
"sec_uid": public_data["profile"].get("sec_uid", ""),
"douyin_id": public_data["profile"].get("douyin_id", ""),
"profile_stats": public_data["profile"].get("stats", {}),
"tags": public_data["profile"].get("tags", []),
"video_summary": _summarize_videos(public_data["videos"], limit=6)
}
payload["keywords"] = _dedupe_strings(
payload["tags"] + _extract_keywords(payload["nickname"], payload["signature"])
+ payload["video_summary"]["top_tags"]
+ [video["title"] for video in payload["video_summary"]["videos"]]
)
return None, payload
async def _fetch_or_create_candidate(owner: dict[str, Any], profile_url: str) -> dict[str, Any] | None:
existing = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_accounts
WHERE user_id = ? AND (canonical_profile_url = ? OR profile_url = ?)
LIMIT 1
""",
(owner["id"], profile_url, profile_url)
)
if existing:
return existing
public_data = await _collect_public_profile(profile_url, None)
profile = public_data["profile"]
if not (profile.get("nickname") or public_data["videos"]):
return None
sync_request = DouyinAccountSyncRequest(
profile_url=profile_url,
manual_work_payloads=[video["raw"] for video in public_data["videos"]]
)
account_row = _upsert_account(owner, profile, sync_request, public_data, {"pages": [], "errors": []})
return account_row
async def _run_similarity_search(owner: dict[str, Any], request: DouyinSimilarSearchRequest) -> dict[str, Any]:
source_account_row, source_payload = await _prepare_similarity_source(owner, request)
profile = legacy.model_profile_for_account(owner["id"], request.model_profile_id)
existing_accounts = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_accounts
WHERE user_id = ?
ORDER BY updated_at DESC
""",
(owner["id"],)
)
candidate_rows: list[dict[str, Any]] = []
seen_urls: set[str] = set()
source_id = source_account_row["id"] if source_account_row else ""
for row in existing_accounts:
if row["id"] == source_id:
continue
candidate_rows.append(row)
seen_urls.add(row["canonical_profile_url"] or row["profile_url"])
if request.seed_linked_accounts and source_account_row:
for linked in _list_linked_accounts(source_account_row):
candidate_url = linked.get("target_profile_url", "")
if not candidate_url or candidate_url in seen_urls:
continue
seen_urls.add(candidate_url)
if linked.get("target_account_id"):
candidate_rows.append(_require_owned_account(linked["target_account_id"], owner["id"]))
candidate_urls = _dedupe_strings(request.candidate_urls)
if request.search_public_pages:
discovered = await _discover_profile_urls_from_search(source_payload.get("keywords", []), limit=6)
candidate_urls.extend(discovered)
candidate_urls = _dedupe_strings(candidate_urls)
for candidate_url in candidate_urls:
if candidate_url in seen_urls or candidate_url == source_payload.get("profile_url"):
continue
candidate_row = await _fetch_or_create_candidate(owner, candidate_url)
if candidate_row:
candidate_rows.append(candidate_row)
seen_urls.add(candidate_url)
candidate_payloads: list[dict[str, Any]] = []
seen_account_ids: set[str] = set()
for row in candidate_rows:
if row["id"] in seen_account_ids:
continue
seen_account_ids.add(row["id"])
payload = _build_account_payload(row, include_recent_videos=6)
payload["heuristics"] = _heuristic_similarity(source_payload, payload)
candidate_payloads.append(payload)
candidate_payloads.sort(key=lambda item: item["heuristics"]["heuristic_score"], reverse=True)
candidate_payloads = candidate_payloads[: max(3, request.max_candidates)]
search_id = make_id("dysearch")
prompt_context = {
"source_account": source_payload,
"candidate_accounts": candidate_payloads,
"extra_requirements": request.extra_requirements
}
prompt = (
"请从候选账号中筛选与目标账号内容风格、题材、受众和互动逻辑最相似,且整体质量更高的账号。"
"请返回 JSON 数组,每项包含 candidate_account_id、candidate_profile_url、score、"
"rationale、similar_dimensions、optimization_value。score 范围 0-100。"
f"\n\n上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
)
legacy.db.execute(
"""
INSERT INTO douyin_similarity_searches (
id, user_id, source_account_id, source_profile_url, keywords_json,
prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
search_id,
owner["id"],
source_account_row["id"] if source_account_row else None,
source_payload.get("profile_url", ""),
_safe_json_dumps(source_payload.get("keywords", [])),
prompt,
_safe_json_dumps(prompt_context),
now()
)
)
if not candidate_payloads:
return {
"search_id": search_id,
"source_account": source_payload,
"model_profile": {
"id": profile["id"],
"label": _build_model_label(profile)
},
"raw_model_output": "No candidate accounts available. Sync more Douyin accounts or provide candidate_urls.",
"candidates": []
}
system_prompt = (
"你是抖音相似账号发现专家。你要根据内容主题、标签、风格、更新频率、互动表现和商业化潜力,"
"挑选最值得对标的账号。返回严格 JSON 数组。"
)
try:
output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=0.2)
parsed = _try_parse_agent_json(output)
except Exception as exc:
output = str(exc)
parsed = []
candidate_map = {
payload["id"]: payload for payload in candidate_payloads if payload["id"]
}
if isinstance(parsed, dict):
parsed = parsed.get("items") or parsed.get("candidates") or []
saved_candidates: list[dict[str, Any]] = []
if not isinstance(parsed, list) or not parsed:
parsed = [
{
"candidate_account_id": payload["id"],
"candidate_profile_url": payload["profile_url"],
"score": payload["heuristics"]["heuristic_score"],
"rationale": "Fallback to heuristic similarity because model output was unavailable or unparsable.",
"similar_dimensions": [
{
"topic_overlap": payload["heuristics"]["topic_overlap"],
"tag_overlap": payload["heuristics"]["tag_overlap"],
"quality_score": payload["heuristics"]["quality_score"]
}
],
"optimization_value": "可作为候选对标账号进一步人工确认。"
}
for payload in candidate_payloads
]
for index, item in enumerate(parsed, start=1):
candidate_account_id = _first_non_empty(item.get("candidate_account_id"))
candidate_profile_url = _first_non_empty(item.get("candidate_profile_url"))
payload = candidate_map.get(candidate_account_id)
if not payload:
payload = next(
(candidate for candidate in candidate_payloads if candidate["profile_url"] == candidate_profile_url),
None
)
candidate_id = make_id("dycand")
heuristic_score = payload["heuristics"]["heuristic_score"] if payload else 0
score = _parse_count(item.get("score"))
rationale = _first_non_empty(item.get("rationale"), item.get("reason"), item.get("summary"))
dimensions = item.get("similar_dimensions") or item.get("dimensions") or {}
raw_output = {
"model_output": item,
"candidate_payload": payload or {}
}
legacy.db.execute(
"""
INSERT INTO douyin_similarity_candidates (
id, search_id, candidate_account_id, candidate_profile_url, heuristic_score,
agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
candidate_id,
search_id,
payload["id"] if payload else candidate_account_id or None,
payload["profile_url"] if payload else candidate_profile_url,
heuristic_score,
score,
rationale,
_safe_json_dumps(dimensions),
_safe_json_dumps(raw_output),
index,
now()
)
)
saved_candidates.append({
"id": candidate_id,
"candidate_account_id": payload["id"] if payload else candidate_account_id,
"candidate_profile_url": payload["profile_url"] if payload else candidate_profile_url,
"candidate_nickname": payload["nickname"] if payload else "",
"heuristic_score": heuristic_score,
"agent_score": score,
"rationale_text": rationale,
"dimensions": dimensions,
"rank_index": index
})
return {
"search_id": search_id,
"source_account": source_payload,
"model_profile": {
"id": profile["id"],
"label": _build_model_label(profile)
},
"raw_model_output": output,
"candidates": saved_candidates
}
@app.get("/v2/douyin/accounts")
def list_douyin_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_accounts
WHERE user_id = ?
ORDER BY updated_at DESC
""",
(account["id"],)
)
return [_build_account_payload(row) for row in rows]
@app.post("/v2/douyin/accounts/sync")
async def sync_douyin_account(
request: DouyinAccountSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
if (
not request.profile_url.strip()
and not request.manual_profile_payload
and not request.manual_creator_pages
):
raise HTTPException(
status_code=400,
detail="profile_url、manual_profile_payload 或 manual_creator_pages 至少需要传一个"
)
public_data = await _collect_public_profile(request.profile_url, request.manual_profile_payload)
creator_data = await _collect_creator_center_pages(
request.creator_center_urls,
request.session_cookie,
request.manual_creator_pages
)
if not public_data["profile"].get("nickname") and not public_data["videos"] and not creator_data["pages"]:
raise HTTPException(status_code=400, detail="No Douyin profile or creator-center data could be extracted")
account_row = _upsert_account(account, public_data["profile"], request, public_data, creator_data)
workspace = _build_workspace_payload(account_row)
workspace["sync_errors"] = public_data["errors"] + creator_data["errors"]
return workspace
@app.get("/v2/douyin/accounts/{account_id}")
def get_douyin_account(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(account_row)
@app.get("/v2/douyin/accounts/{account_id}/snapshots")
def list_douyin_account_snapshots(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> list[dict[str, Any]]:
account_row = _require_owned_account(account_id, account["id"])
return _list_snapshots(account_row["id"])
@app.get("/v2/douyin/accounts/{account_id}/snapshots/{snapshot_id}")
def get_douyin_account_snapshot(
account_id: str,
snapshot_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
return _get_snapshot_detail(snapshot_id, account_row["id"])
@app.get("/v2/douyin/accounts/{account_id}/creator-fields")
def get_douyin_creator_fields(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
latest_creator_snapshot = legacy.db.fetch_one(
"""
SELECT id
FROM douyin_account_snapshots
WHERE account_id = ? AND snapshot_type = 'creator_center'
ORDER BY collected_at DESC
LIMIT 1
""",
(account_row["id"],)
)
if not latest_creator_snapshot:
raise HTTPException(status_code=404, detail="No creator-center snapshot found")
return _get_snapshot_detail(latest_creator_snapshot["id"], account_row["id"])
@app.get("/v2/douyin/accounts/{account_id}/workspace")
def get_douyin_account_workspace(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(account_row)
@app.get("/v2/douyin/accounts/{account_id}/videos")
def list_douyin_account_videos(
account_id: str,
limit: int = 200,
sort_by: str = "score",
scope: str = "all",
content_type: str = "all",
q: str = "",
tag: str = "",
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
raw_videos = _list_videos(account_row["id"], limit=max(limit, 24))
items = [_workspace_video_payload(video) for video in raw_videos]
item_map = {item["id"]: item for item in items}
high_score_threshold = 60.0
top_scored_video_ids = [
item["id"]
for item in sorted(items, key=lambda entry: _video_sort_key(entry, "score"), reverse=True)
if float(item.get("score", {}).get("performance_score") or 0) >= high_score_threshold
]
if not top_scored_video_ids:
top_scored_video_ids = [
item["id"]
for item in sorted(items, key=lambda entry: _video_sort_key(entry, "score"), reverse=True)[:5]
]
latest_video_ids = [
item["id"]
for item in sorted(items, key=lambda entry: _video_sort_key(entry, "latest"), reverse=True)[:12]
]
normalized_scope = (scope or "all").strip().lower()
if normalized_scope == "top":
items = [item_map[video_id] for video_id in top_scored_video_ids if video_id in item_map]
elif normalized_scope == "latest":
items = [item_map[video_id] for video_id in latest_video_ids if video_id in item_map]
normalized_content_type = (content_type or "all").strip().lower()
if normalized_content_type in {"video", "image_text"}:
items = [
item for item in items
if str(item.get("content_type") or "video").strip().lower() == normalized_content_type
]
query_text = (q or "").strip().lower()
if query_text:
items = [
item for item in items
if query_text in " ".join(
[
str(item.get("title") or ""),
str(item.get("description") or ""),
str(item.get("aweme_id") or ""),
*[str(tag_item) for tag_item in item.get("tags", [])]
]
).lower()
]
tag_text = (tag or "").strip().lower()
if tag_text:
items = [
item for item in items
if any(tag_text in str(tag_item).lower() for tag_item in item.get("tags", []))
]
normalized_sort = (sort_by or "score").strip().lower()
items.sort(key=lambda item: _video_sort_key(item, normalized_sort), reverse=True)
return {
"account_id": account_row["id"],
"sort_by": normalized_sort,
"scope": normalized_scope,
"content_type": normalized_content_type,
"query": q,
"tag": tag,
"high_score_threshold": high_score_threshold,
"meta": {
"source": "fastgpt-live-fallback",
"total": len(raw_videos),
"filtered": len(items)
},
"top_scored_video_ids": top_scored_video_ids,
"latest_video_ids": latest_video_ids,
"items": items[: max(1, min(limit, 1000))]
}
@app.get("/v2/douyin/accounts/{account_id}/analysis-reports")
def list_douyin_analysis_reports(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> list[dict[str, Any]]:
account_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(account_row)["recent_reports"]
@app.post("/v2/douyin/accounts/{account_id}/analysis")
async def analyze_douyin_account(
account_id: str,
request: DouyinAccountAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
return await _run_account_analysis(account_row, account, request)
@app.post("/v2/douyin/accounts/{account_id}/videos/analyze-top")
async def analyze_douyin_top_videos(
account_id: str,
request: DouyinTopVideoAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
profile = legacy.model_profile_for_account(account["id"], request.model_profile_id)
items = await _run_top_video_analyses(
account_row,
account,
profile,
top_video_count=request.top_video_count,
min_score=request.min_score,
source_type="manual_top_video_refresh",
temperature=request.temperature
)
return {
"account_id": account_row["id"],
"model_profile_id": profile["id"],
"analyzed_count": len(items),
"items": items
}
@app.post("/v2/douyin/similar-searches")
async def create_douyin_similarity_search(
request: DouyinSimilarSearchRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
return await _run_similarity_search(account, request)
@app.get("/v2/douyin/similar-searches/{search_id}")
def get_douyin_similarity_search(
search_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
search_row = legacy.db.fetch_one(
"SELECT * FROM douyin_similarity_searches WHERE id = ? AND user_id = ?",
(search_id, account["id"])
)
if not search_row:
raise HTTPException(status_code=404, detail="Similarity search not found")
candidates = legacy.db.fetch_all(
"""
SELECT cand.*, acct.nickname AS candidate_nickname
FROM douyin_similarity_candidates cand
LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
WHERE cand.search_id = ?
ORDER BY cand.rank_index ASC
""",
(search_id,)
)
return {
"id": search_row["id"],
"source_account_id": search_row["source_account_id"],
"source_profile_url": search_row["source_profile_url"],
"keywords": _safe_json_loads(search_row["keywords_json"], []),
"context": _safe_json_loads(search_row["context_json"], {}),
"created_at": search_row["created_at"],
"candidates": [
{
"id": row["id"],
"candidate_account_id": row["candidate_account_id"],
"candidate_profile_url": row["candidate_profile_url"],
"candidate_nickname": row.get("candidate_nickname", ""),
"heuristic_score": row["heuristic_score"],
"agent_score": row["agent_score"],
"rationale_text": row["rationale_text"],
"dimensions": _safe_json_loads(row["dimensions_json"], {}),
"rank_index": row["rank_index"]
}
for row in candidates
]
}
@app.get("/v2/douyin/accounts/{account_id}/benchmark-links")
def list_douyin_benchmark_links(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> list[dict[str, Any]]:
account_row = _require_owned_account(account_id, account["id"])
return _list_linked_accounts(account_row)
@app.post("/v2/douyin/accounts/{account_id}/benchmark-links")
def create_douyin_benchmark_links(
account_id: str,
request: DouyinBenchmarkLinkRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
linked_ids: list[str] = []
for target_account_id in request.target_account_ids:
target_row = _require_owned_account(target_account_id, account["id"])
relation_id = make_id("dyrel")
legacy.db.execute(
"""
INSERT INTO douyin_account_relations (
id, user_id, source_account_id, target_account_id, target_profile_url,
relation_type, note, search_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
relation_id,
account["id"],
account_row["id"],
target_row["id"],
target_row["canonical_profile_url"] or target_row["profile_url"],
request.relation_type,
request.note,
request.search_id,
now()
)
)
linked_ids.append(relation_id)
for target_profile_url in _dedupe_strings(request.target_profile_urls):
relation_id = make_id("dyrel")
legacy.db.execute(
"""
INSERT INTO douyin_account_relations (
id, user_id, source_account_id, target_account_id, target_profile_url,
relation_type, note, search_id, created_at
) VALUES (?, ?, ?, NULL, ?, ?, ?, ?, ?)
""",
(
relation_id,
account["id"],
account_row["id"],
target_profile_url,
request.relation_type,
request.note,
request.search_id,
now()
)
)
linked_ids.append(relation_id)
return {
"saved": len(linked_ids),
"relation_ids": linked_ids,
"links": _list_linked_accounts(account_row)
}
@app.get("/v2/douyin/tracking/accounts")
def list_douyin_tracked_accounts(
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
cursor = _get_tracking_cursor(account["id"])
return {
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
"items": _list_tracked_accounts(account["id"])
}
@app.post("/v2/douyin/tracking/accounts")
def create_douyin_tracked_account(
request: DouyinTrackedAccountRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
tracked_account = _require_owned_account(request.tracked_account_id, account["id"])
assistant = _load_owned_assistant(request.assistant_id, account["id"])
existing = legacy.db.fetch_one(
"SELECT * FROM douyin_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked_account["id"])
)
updated_at = now()
if existing:
legacy.db.execute(
"""
UPDATE douyin_tracked_accounts
SET assistant_id = ?, note = ?, updated_at = ?
WHERE id = ?
""",
((assistant or {}).get("id"), request.note.strip(), updated_at, existing["id"])
)
else:
legacy.db.execute(
"""
INSERT INTO douyin_tracked_accounts (
id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(make_id("dytrack"), account["id"], tracked_account["id"], (assistant or {}).get("id"), request.note.strip(), updated_at, updated_at)
)
return {
"tracked_account_id": tracked_account["id"],
"assistant_id": (assistant or {}).get("id", ""),
"items": _list_tracked_accounts(account["id"])
}
@app.post("/v2/douyin/tracking/accounts/{tracked_account_id}/refresh")
async def refresh_douyin_tracked_account(
tracked_account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(tracked_account_id, account["id"])
account_payload = _build_account_payload(account_row, include_recent_videos=6)
try:
refreshed = await _refresh_tracked_account_workspace(account, tracked_account_id)
return {
"success": True,
"tracked_account_id": tracked_account_id,
"account": refreshed.get("account", {}),
"sync_errors": refreshed.get("sync_errors", []),
"public_video_count": refreshed.get("public_video_count", 0),
"creator_page_count": refreshed.get("creator_page_count", 0)
}
except HTTPException as exc:
detail = exc.detail if isinstance(exc.detail, dict) else {"message": str(exc.detail)}
return {
"success": False,
"tracked_account_id": tracked_account_id,
"account": account_payload,
"message": detail.get("message") or str(exc.detail),
"detail": detail,
"sync_errors": detail.get("public_errors", []) + detail.get("creator_errors", [])
}
@app.post("/v2/douyin/tracking/refresh")
async def refresh_all_douyin_tracked_accounts(
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
tracked_accounts = _list_tracked_accounts(account["id"])
items: list[dict[str, Any]] = []
errors: list[dict[str, Any]] = []
for tracked in tracked_accounts:
try:
refreshed = await _refresh_tracked_account_workspace(account, tracked["tracked_account_id"])
items.append({
"tracking_id": tracked["id"],
"tracked_account_id": tracked["tracked_account_id"],
"nickname": (refreshed.get("account") or {}).get("nickname", ""),
"sync_errors": refreshed.get("sync_errors", []),
"public_video_count": refreshed.get("public_video_count", 0)
})
except HTTPException as exc:
errors.append({
"tracking_id": tracked["id"],
"tracked_account_id": tracked["tracked_account_id"],
"message": str(exc.detail)
})
except Exception as exc:
errors.append({
"tracking_id": tracked["id"],
"tracked_account_id": tracked["tracked_account_id"],
"message": str(exc)
})
return {
"tracked_count": len(tracked_accounts),
"refreshed": len(items),
"failed": len(errors),
"items": items,
"errors": errors
}
@app.post("/v2/douyin/tracking/cursor")
def update_douyin_tracking_cursor(
request: DouyinTrackingCursorRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
cursor = _set_tracking_cursor(account["id"], request.last_seen_at)
return {
"user_id": account["id"],
"last_seen_at": cursor["last_seen_at"],
"updated_at": cursor["updated_at"]
}
@app.get("/v2/douyin/tracking/digest")
def get_douyin_tracking_digest(
since: str | None = None,
limit: int = 24,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
return _build_tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)