Files
storyforge/collector-service/app/douyin_features.py
2026-03-22 12:11:15 +08:00

3396 lines
144 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import json
import math
import re
from collections import Counter
from datetime import datetime, timedelta, timezone
from html import unescape
from typing import Any, Iterable
from urllib.parse import quote, unquote
import httpx
from fastapi import Depends, HTTPException
from pydantic import BaseModel, Field
from starlette.concurrency import run_in_threadpool
DEFAULT_CREATOR_CENTER_URLS = [
"https://creator.douyin.com/creator-micro/home",
"https://creator.douyin.com/creator-micro/data",
"https://creator.douyin.com/creator-micro/content/manage"
]
DEFAULT_TIMEOUT = 20.0
MAX_HTML_SEARCH_BYTES = 2_000_000
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
)
class ManualPageCapture(BaseModel):
url: str = ""
title: str = ""
payload: dict[str, Any] = Field(default_factory=dict)
class DouyinAccountSyncRequest(BaseModel):
profile_url: str = ""
session_cookie: str = ""
creator_center_urls: list[str] = Field(default_factory=lambda: list(DEFAULT_CREATOR_CENTER_URLS))
allow_creator_center_profile_fallback: bool = False
compact_response: bool = False
manual_profile_payload: dict[str, Any] | None = None
manual_creator_pages: list[ManualPageCapture] = Field(default_factory=list)
manual_work_payloads: list[dict[str, Any]] = Field(default_factory=list)
discovery_note: str = ""
class DouyinAccountAnalysisRequest(BaseModel):
model_profile_ids: list[str] = Field(default_factory=list)
linked_account_ids: list[str] = Field(default_factory=list)
include_linked_accounts: bool = True
include_recent_similar_candidates: bool = True
max_videos: int = 12
extra_focus: str = ""
temperature: float = 0.35
auto_analyze_top_videos: bool = True
top_video_analysis_count: int = 6
class DouyinTopVideoAnalysisRequest(BaseModel):
model_profile_id: str | None = None
top_video_count: int = 6
min_score: float = 45.0
temperature: float = 0.25
class DouyinSimilarSearchRequest(BaseModel):
source_account_id: str | None = None
profile_url: str | None = None
candidate_urls: list[str] = Field(default_factory=list)
seed_linked_accounts: bool = True
search_public_pages: bool = True
model_profile_id: str | None = None
max_candidates: int = 10
extra_requirements: str = ""
class DouyinBenchmarkLinkRequest(BaseModel):
target_account_ids: list[str] = Field(default_factory=list)
target_profile_urls: list[str] = Field(default_factory=list)
relation_type: str = "benchmark"
note: str = ""
search_id: str = ""
class DouyinTrackedAccountRequest(BaseModel):
tracked_account_id: str = ""
assistant_id: str = ""
note: str = ""
class DouyinTrackingCursorRequest(BaseModel):
last_seen_at: str = ""
def _safe_json_dumps(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
def _safe_json_loads(value: str | None, fallback: Any) -> Any:
if not value:
return fallback
try:
return json.loads(value)
except Exception:
return fallback
def _first_non_empty(*values: Any) -> str:
for value in values:
if value is None:
continue
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
elif value not in ("", [], {}, ()):
return str(value)
return ""
def _dedupe_strings(values: Iterable[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
item = value.strip()
if not item:
continue
key = item.lower()
if key in seen:
continue
seen.add(key)
result.append(item)
return result
def _compact_text(value: Any, limit: int = 500) -> str:
text = str(value or "").strip()
if len(text) <= limit:
return text
return f"{text[: limit - 1]}"
def _parse_count(value: Any) -> float:
if value is None:
return 0.0
if isinstance(value, (int, float)):
return float(value)
text = str(value).strip().lower().replace(",", "")
if not text:
return 0.0
multiplier = 1.0
if text.endswith("w") or text.endswith(""):
multiplier = 10_000.0
text = text[:-1]
elif text.endswith("亿"):
multiplier = 100_000_000.0
text = text[:-1]
text = text.replace("+", "")
match = re.search(r"-?\d+(?:\.\d+)?", text)
if not match:
return 0.0
try:
return float(match.group()) * multiplier
except ValueError:
return 0.0
def _normalize_timestamp(value: Any) -> str | None:
if value in (None, "", 0, "0"):
return None
if isinstance(value, str):
stripped = value.strip()
if not stripped:
return None
if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped):
return stripped
if stripped.isdigit():
value = int(stripped)
else:
return stripped
if isinstance(value, (int, float)):
ts = float(value)
if ts > 10_000_000_000:
ts /= 1000.0
try:
return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat()
except Exception:
return None
return None
def _parse_iso_datetime(value: Any) -> datetime | None:
text = str(value or "").strip()
if not text:
return None
normalized = text.replace("Z", "+00:00")
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _extract_hashtags(*texts: str) -> list[str]:
tags: list[str] = []
for text in texts:
if not text:
continue
tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text))
return _dedupe_strings(tags)
def _extract_keywords(*texts: str) -> list[str]:
candidates: list[str] = []
for text in texts:
if not text:
continue
candidates.extend(_extract_hashtags(text))
candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text))
candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text))
stop_words = {
"视频",
"作品",
"抖音",
"账号",
"内容",
"发布",
"更多",
"关注",
"用户",
"douyin",
"profile"
}
filtered = [item for item in candidates if item.lower() not in stop_words]
return _dedupe_strings(filtered)
def _video_score_breakdown(video: dict[str, Any]) -> dict[str, Any]:
stats = video.get("stats", {}) or {}
play = float(stats.get("play") or 0)
like = float(stats.get("like") or 0)
comment = float(stats.get("comment") or 0)
share = float(stats.get("share") or 0)
collect = float(stats.get("collect") or 0)
published_dt = _parse_iso_datetime(video.get("published_at"))
if published_dt:
age_days = max(0.0, (datetime.now(timezone.utc) - published_dt).total_seconds() / 86400.0)
else:
age_days = 999.0
if play > 0:
rate_denominator = play
else:
rate_denominator = max(
like * 18.0,
comment * 70.0,
share * 95.0,
collect * 55.0,
1000.0
)
engagement_rate = (like + comment * 2.2 + share * 4.2 + collect * 3.0) / max(rate_denominator, 1.0)
share_rate = share / max(rate_denominator, 1.0)
collect_rate = collect / max(rate_denominator, 1.0)
comment_rate = comment / max(rate_denominator, 1.0)
like_rate = like / max(rate_denominator, 1.0)
volume_component = min(36.0, math.log10(play + 1.0) * 9.0)
interaction_component = min(28.0, engagement_rate * 100.0)
spread_component = min(18.0, (share_rate * 1200.0) + (collect_rate * 700.0))
freshness_component = max(0.0, 18.0 - min(age_days, 36.0) * 0.5)
baseline_component = 6.0 if play > 0 or like > 0 else 0.0
performance_score = round(
min(100.0, volume_component + interaction_component + spread_component + freshness_component + baseline_component),
2
)
popularity_score = round(
min(
100.0,
math.log10(play + 1.0) * 24.0
+ math.log10(like + 1.0) * 22.0
+ math.log10(comment + 1.0) * 20.0
+ math.log10(share + 1.0) * 18.0
+ math.log10(collect + 1.0) * 16.0
),
2
)
commercial_score = round(
min(
100.0,
performance_score * 0.58
+ min(24.0, share_rate * 2200.0)
+ min(18.0, collect_rate * 2000.0)
+ min(12.0, comment_rate * 900.0)
),
2
)
signals: list[str] = []
if share_rate >= 0.01:
signals.append("分享率高,具备扩散和二次传播潜力")
if collect_rate >= 0.008:
signals.append("收藏率高,适合沉淀模板、知识产品或私域承接")
if like_rate >= 0.05:
signals.append("点赞率突出,说明钩子与情绪价值有效")
if comment_rate >= 0.01:
signals.append("评论率较高,适合做互动运营和评论区转化")
if age_days <= 14 and play >= 10_000:
signals.append("近期作品仍有较高播放,说明题材仍在窗口期")
if not signals:
signals.append("当前数据中性,需要结合转化目标继续验证")
return {
"performance_score": performance_score,
"popularity_score": popularity_score,
"commercial_score": commercial_score,
"engagement_rate": round(engagement_rate, 4),
"share_rate": round(share_rate, 4),
"collect_rate": round(collect_rate, 4),
"comment_rate": round(comment_rate, 4),
"like_rate": round(like_rate, 4),
"age_days": round(age_days if age_days < 999 else 0.0, 1) if published_dt else None,
"components": {
"volume": round(volume_component, 2),
"interaction": round(interaction_component, 2),
"spread": round(spread_component, 2),
"freshness": round(freshness_component, 2),
"baseline": round(baseline_component, 2)
},
"signals": signals[:4]
}
def _flatten_json(value: Any, prefix: str = "") -> list[tuple[str, str, str]]:
rows: list[tuple[str, str, str]] = []
if isinstance(value, dict):
for key, child in value.items():
next_prefix = f"{prefix}.{key}" if prefix else str(key)
rows.extend(_flatten_json(child, next_prefix))
elif isinstance(value, list):
for index, child in enumerate(value):
next_prefix = f"{prefix}[{index}]"
rows.extend(_flatten_json(child, next_prefix))
else:
field_type = type(value).__name__
rows.append((prefix or "$", field_type, _compact_text(value, 2000)))
return rows
def _walk_json(value: Any) -> Iterable[dict[str, Any]]:
if isinstance(value, dict):
yield value
for child in value.values():
yield from _walk_json(child)
elif isinstance(value, list):
for child in value:
yield from _walk_json(child)
def _extract_json_objects_from_text(text: str) -> list[Any]:
decoder = json.JSONDecoder()
objects: list[Any] = []
seen: set[str] = set()
if not text:
return objects
candidates = [text, unquote(text), unescape(text), unescape(unquote(text))]
for candidate in candidates:
snippet = candidate[:MAX_HTML_SEARCH_BYTES]
for match in re.finditer(r"[\{\[]", snippet):
try:
obj, _ = decoder.raw_decode(snippet[match.start() :])
except Exception:
continue
marker = _safe_json_dumps(obj)
if marker in seen:
continue
seen.add(marker)
objects.append(obj)
if len(objects) >= 50:
return objects
return objects
def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]:
blobs: list[dict[str, Any]] = []
seen: set[str] = set()
for attrs, content in re.findall(r"<script([^>]*)>(.*?)</script>", html, re.IGNORECASE | re.DOTALL):
script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE)
script_id = script_id_match.group(1) if script_id_match else ""
for obj in _extract_json_objects_from_text(content.strip()):
marker = _safe_json_dumps(obj)
if marker in seen:
continue
seen.add(marker)
blobs.append({
"script_id": script_id,
"payload": obj
})
return blobs
def _profile_candidate_score(value: dict[str, Any]) -> int:
score = 0
interesting_keys = {
"nickname",
"signature",
"sec_uid",
"secUid",
"uid",
"unique_id",
"short_id",
"aweme_count",
"following_count",
"follower_count",
"total_favorited"
}
score += sum(1 for key in interesting_keys if key in value)
if "author" in value and isinstance(value["author"], dict):
score += 2
return score
def _video_candidate_score(value: dict[str, Any]) -> int:
score = 0
if "statistics" in value and isinstance(value["statistics"], dict):
score += 3
if "aweme_id" in value or "item_id" in value:
score += 2
if "desc" in value or "title" in value:
score += 1
return score
def _extract_profile_candidates(payload: Any) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in _walk_json(payload):
if _profile_candidate_score(item) >= 3:
candidates.append(item)
if "author" in item and isinstance(item["author"], dict) and _profile_candidate_score(item["author"]) >= 3:
candidates.append(item["author"])
return candidates
def _extract_video_candidates(payload: Any) -> list[dict[str, Any]]:
candidates: list[dict[str, Any]] = []
for item in _walk_json(payload):
if _video_candidate_score(item) >= 3:
candidates.append(item)
return candidates
def _normalize_profile_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]:
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
avatar = candidate.get("avatar_medium") or candidate.get("avatar_thumb") or candidate.get("avatar_url")
if isinstance(avatar, dict):
avatar = _first_non_empty(
avatar.get("url_list", [""])[0] if isinstance(avatar.get("url_list"), list) else "",
avatar.get("url")
)
signature = _first_non_empty(
candidate.get("signature"),
candidate.get("desc"),
candidate.get("bio"),
candidate.get("description")
)
nickname = _first_non_empty(candidate.get("nickname"), candidate.get("name"), candidate.get("author_name"))
canonical_url = _first_non_empty(
candidate.get("share_url"),
candidate.get("profile_url"),
fallback_url
)
return {
"nickname": nickname,
"signature": signature,
"profile_url": canonical_url,
"canonical_profile_url": canonical_url,
"sec_uid": _first_non_empty(candidate.get("sec_uid"), candidate.get("secUid")),
"douyin_uid": _first_non_empty(candidate.get("uid")),
"douyin_id": _first_non_empty(candidate.get("unique_id"), candidate.get("short_id"), candidate.get("douyin_id")),
"avatar_url": _first_non_empty(avatar),
"stats": {
"followers": _parse_count(candidate.get("follower_count") or stats_source.get("follower_count")),
"following": _parse_count(candidate.get("following_count") or stats_source.get("following_count")),
"likes": _parse_count(candidate.get("total_favorited") or stats_source.get("total_favorited")),
"videos": _parse_count(candidate.get("aweme_count") or stats_source.get("aweme_count"))
},
"tags": _dedupe_strings(
_extract_hashtags(signature, nickname)
+ [str(tag) for tag in candidate.get("tags", []) if isinstance(tag, (str, int, float))]
),
"raw": candidate
}
def _pick_best_profile(candidates: list[dict[str, Any]], fallback_url: str = "") -> dict[str, Any]:
best: dict[str, Any] | None = None
best_score = -1
for candidate in candidates:
normalized = _normalize_profile_candidate(candidate, fallback_url=fallback_url)
score = 0
score += 4 if normalized["nickname"] else 0
score += 3 if normalized["sec_uid"] else 0
score += 2 if normalized["signature"] else 0
score += 1 if normalized["stats"]["followers"] else 0
if score > best_score:
best = normalized
best_score = score
return best or _normalize_profile_candidate({}, fallback_url=fallback_url)
def _normalize_video_candidate(candidate: dict[str, Any]) -> dict[str, Any]:
def _collect_image_urls(node: Any) -> list[str]:
urls: list[str] = []
def _visit(value: Any) -> None:
if isinstance(value, str):
text = value.strip()
if text.startswith("http"):
urls.append(text)
return
if isinstance(value, list):
for item in value[:20]:
_visit(item)
return
if not isinstance(value, dict):
return
for key in ("url", "download_url", "origin_url", "display_url", "cover_url"):
target = value.get(key)
if isinstance(target, str) and target.strip().startswith("http"):
urls.append(target.strip())
url_list = value.get("url_list")
if isinstance(url_list, list):
for item in url_list[:5]:
_visit(item)
for key in ("image", "images", "cover", "display_image", "origin_image"):
child = value.get(key)
if child not in (None, "", [], {}):
_visit(child)
_visit(node)
return _dedupe_strings(urls)
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
video_source = candidate.get("video") if isinstance(candidate.get("video"), dict) else {}
title = _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("share_title"))
description = _first_non_empty(candidate.get("desc"), candidate.get("title"), candidate.get("text"))
cover = candidate.get("cover") or video_source.get("cover")
image_urls = _collect_image_urls(
[
candidate.get("images"),
candidate.get("image_infos"),
candidate.get("image_list"),
candidate.get("slides"),
candidate.get("photos"),
candidate.get("photo"),
candidate.get("image_post_info"),
]
)
if isinstance(cover, dict):
cover = _first_non_empty(
cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
cover.get("url")
)
duration_raw = float(candidate.get("duration") or video_source.get("duration") or 0)
duration_sec = duration_raw / 1000.0 if duration_raw > 1000 else duration_raw
has_video_media = bool(video_source) or duration_sec > 0.3
aweme_type = str(candidate.get("aweme_type") or "")
looks_like_image_text = bool(image_urls) and (not has_video_media or aweme_type in {"51", "55", "61", "68", "122", "150"})
content_type = "image_text" if looks_like_image_text else "video"
return {
"aweme_id": _first_non_empty(candidate.get("aweme_id"), candidate.get("item_id"), candidate.get("group_id")),
"title": title,
"description": description,
"share_url": _first_non_empty(candidate.get("share_url")),
"cover_url": _first_non_empty(cover, image_urls[0] if image_urls else ""),
"duration_sec": duration_sec,
"published_at": _normalize_timestamp(candidate.get("create_time") or candidate.get("publish_time")),
"tags": _extract_hashtags(title, description),
"content_type": content_type,
"content_type_label": "图文" if content_type == "image_text" else "视频",
"image_count": len(image_urls),
"stats": {
"play": _parse_count(stats_source.get("play_count") or candidate.get("play_count")),
"like": _parse_count(stats_source.get("digg_count") or candidate.get("digg_count")),
"comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")),
"share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")),
"collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count"))
},
"raw": candidate
}
def _extract_videos(payloads: Iterable[Any]) -> list[dict[str, Any]]:
videos: list[dict[str, Any]] = []
seen: set[str] = set()
for payload in payloads:
for candidate in _extract_video_candidates(payload):
normalized = _normalize_video_candidate(candidate)
dedupe_key = normalized["aweme_id"] or normalized["share_url"] or normalized["title"]
if not dedupe_key or dedupe_key in seen:
continue
seen.add(dedupe_key)
videos.append(normalized)
videos.sort(
key=lambda item: (
item["stats"]["play"] + item["stats"]["like"] + item["stats"]["comment"] * 4 + item["stats"]["share"] * 6
),
reverse=True
)
return videos
def _merge_profile_payload(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]:
if not overlay:
return base
if not base or not base.get("nickname"):
return overlay
merged = dict(base)
merged["nickname"] = base.get("nickname") or overlay.get("nickname", "")
merged["signature"] = base.get("signature") or overlay.get("signature", "")
merged["profile_url"] = base.get("profile_url") or overlay.get("profile_url", "")
merged["canonical_profile_url"] = base.get("canonical_profile_url") or overlay.get("canonical_profile_url", "")
merged["sec_uid"] = base.get("sec_uid") or overlay.get("sec_uid", "")
merged["douyin_uid"] = base.get("douyin_uid") or overlay.get("douyin_uid", "")
merged["douyin_id"] = base.get("douyin_id") or overlay.get("douyin_id", "")
merged["avatar_url"] = base.get("avatar_url") or overlay.get("avatar_url", "")
merged["tags"] = _dedupe_strings(base.get("tags", []) + overlay.get("tags", []))
merged["stats"] = {
"followers": float(base.get("stats", {}).get("followers") or overlay.get("stats", {}).get("followers") or 0),
"following": float(base.get("stats", {}).get("following") or overlay.get("stats", {}).get("following") or 0),
"likes": float(base.get("stats", {}).get("likes") or overlay.get("stats", {}).get("likes") or 0),
"videos": float(base.get("stats", {}).get("videos") or overlay.get("stats", {}).get("videos") or 0),
}
if not merged.get("raw"):
merged["raw"] = overlay.get("raw", {})
return merged
def _extract_creator_payloads(creator_data: dict[str, Any]) -> list[Any]:
payloads: list[Any] = []
for page in creator_data.get("pages", []):
for blob in page.get("blobs", []):
payload = blob.get("payload")
if payload not in (None, "", [], {}):
payloads.append(payload)
return payloads
def _profile_identity_value(profile: dict[str, Any], field_name: str) -> str:
value = str(profile.get(field_name, "") or "").strip()
if not value:
return ""
if field_name in {"profile_url", "canonical_profile_url"}:
return _normalize_profile_url_input(value)
return value
def _profiles_appear_same(left: dict[str, Any], right: dict[str, Any]) -> bool:
if not left or not right:
return False
for field_name in ("sec_uid", "douyin_uid", "douyin_id", "canonical_profile_url", "profile_url"):
left_value = _profile_identity_value(left, field_name)
right_value = _profile_identity_value(right, field_name)
if left_value and right_value and left_value == right_value:
return True
return False
def _normalize_profile_url_input(value: str) -> str:
text = str(value or "").strip()
if not text:
return ""
match = re.search(r"https?://[^\s]+", text)
if match:
text = match.group(0)
text = text.strip().strip(",。;;、,)")
if text.startswith("www.douyin.com/") or text.startswith("douyin.com/"):
text = f"https://{text}"
return text
def _looks_like_douyin_anti_bot_page(html: str) -> bool:
markers = (
"window.byted_acrawler.init",
"__ac_signature",
"__ac_nonce",
"window.location.reload()"
)
return any(marker in html for marker in markers)
async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]:
headers = {
"User-Agent": DEFAULT_USER_AGENT,
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
}
if cookie.strip():
headers["Cookie"] = cookie.strip()
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
return str(response.url), response.text
async def _discover_profile_urls_from_search(keywords: list[str], limit: int = 8) -> list[str]:
urls: list[str] = []
seen: set[str] = set()
for keyword in keywords[:3]:
search_url = f"https://www.douyin.com/search/{quote(keyword)}?type=user"
try:
_, html = await _fetch_html(search_url)
except Exception:
continue
for match in re.findall(r'href=["\']([^"\']+/user/[^"\']+)["\']', html):
if match.startswith("/"):
match = f"https://www.douyin.com{match}"
cleaned = match.split("?")[0]
if cleaned in seen:
continue
seen.add(cleaned)
urls.append(cleaned)
if len(urls) >= limit:
return urls
return urls
def _summarize_videos(videos: list[dict[str, Any]], limit: int = 8) -> dict[str, Any]:
selected = videos[:limit]
if not selected:
return {
"count": 0,
"top_tags": [],
"avg_play": 0.0,
"avg_like": 0.0,
"avg_comment": 0.0,
"avg_share": 0.0,
"videos": []
}
count = len(selected)
avg_play = sum(item["stats"]["play"] for item in selected) / count
avg_like = sum(item["stats"]["like"] for item in selected) / count
avg_comment = sum(item["stats"]["comment"] for item in selected) / count
avg_share = sum(item["stats"]["share"] for item in selected) / count
tag_counter = Counter(tag for item in selected for tag in item.get("tags", []))
return {
"count": len(videos),
"top_tags": [tag for tag, _ in tag_counter.most_common(8)],
"avg_play": round(avg_play, 2),
"avg_like": round(avg_like, 2),
"avg_comment": round(avg_comment, 2),
"avg_share": round(avg_share, 2),
"videos": [
{
"aweme_id": item["aweme_id"],
"title": _compact_text(item["title"], 120),
"description": _compact_text(item["description"], 180),
"tags": item["tags"][:6],
"published_at": item["published_at"],
"stats": item["stats"]
}
for item in selected
]
}
def _jaccard(left: Iterable[str], right: Iterable[str]) -> float:
left_set = {item.strip().lower() for item in left if item.strip()}
right_set = {item.strip().lower() for item in right if item.strip()}
if not left_set and not right_set:
return 0.0
intersection = len(left_set & right_set)
union = len(left_set | right_set)
return intersection / union if union else 0.0
def _quality_score(account_payload: dict[str, Any]) -> float:
stats = account_payload.get("profile_stats", {})
followers = float(stats.get("followers") or 0)
video_summary = account_payload.get("video_summary", {})
avg_play = float(video_summary.get("avg_play") or 0)
avg_like = float(video_summary.get("avg_like") or 0)
avg_comment = float(video_summary.get("avg_comment") or 0)
avg_share = float(video_summary.get("avg_share") or 0)
base = followers / 10_000.0
engagement = avg_like / 1000.0 + avg_comment / 300.0 + avg_share / 200.0 + avg_play / 5000.0
return round(base + engagement, 3)
def _heuristic_similarity(source_payload: dict[str, Any], candidate_payload: dict[str, Any]) -> dict[str, Any]:
source_keywords = source_payload.get("keywords", [])
candidate_keywords = candidate_payload.get("keywords", [])
topic_overlap = _jaccard(source_keywords, candidate_keywords)
tag_overlap = _jaccard(
source_payload.get("video_summary", {}).get("top_tags", []),
candidate_payload.get("video_summary", {}).get("top_tags", [])
)
source_signature = source_payload.get("signature", "")
candidate_signature = candidate_payload.get("signature", "")
signature_overlap = _jaccard(_extract_keywords(source_signature), _extract_keywords(candidate_signature))
quality = _quality_score(candidate_payload)
score = round(topic_overlap * 55 + tag_overlap * 20 + signature_overlap * 10 + min(quality, 15), 2)
return {
"topic_overlap": round(topic_overlap, 3),
"tag_overlap": round(tag_overlap, 3),
"signature_overlap": round(signature_overlap, 3),
"quality_score": quality,
"heuristic_score": score
}
def _build_model_label(profile: dict[str, Any]) -> str:
return _first_non_empty(profile.get("name"), profile.get("model_name"), profile.get("base_url"))
def _try_parse_agent_json(text: str) -> Any:
stripped = text.strip()
if not stripped:
return {}
try:
return json.loads(stripped)
except Exception:
pass
objects = _extract_json_objects_from_text(stripped)
return objects[0] if objects else {}
def register_douyin_routes(app: Any, legacy: Any) -> None:
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def ensure_schema() -> None:
schema = """
CREATE TABLE IF NOT EXISTS douyin_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
profile_url TEXT NOT NULL DEFAULT '',
canonical_profile_url TEXT NOT NULL DEFAULT '',
sec_uid TEXT NOT NULL DEFAULT '',
douyin_uid TEXT NOT NULL DEFAULT '',
douyin_id TEXT NOT NULL DEFAULT '',
nickname TEXT NOT NULL DEFAULT '',
signature TEXT NOT NULL DEFAULT '',
avatar_url TEXT NOT NULL DEFAULT '',
tags_json TEXT NOT NULL DEFAULT '[]',
profile_stats_json TEXT NOT NULL DEFAULT '{}',
raw_profile_json TEXT NOT NULL DEFAULT '{}',
source_mode TEXT NOT NULL DEFAULT 'public',
sync_status TEXT NOT NULL DEFAULT 'pending',
last_public_sync_at TEXT,
last_creator_sync_at TEXT,
last_analysis_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_updated
ON douyin_accounts(user_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_sec_uid
ON douyin_accounts(user_id, sec_uid);
CREATE TABLE IF NOT EXISTS douyin_account_snapshots (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
snapshot_type TEXT NOT NULL,
source_url TEXT NOT NULL DEFAULT '',
raw_payload_json TEXT NOT NULL DEFAULT '{}',
summary_json TEXT NOT NULL DEFAULT '{}',
field_count INTEGER NOT NULL DEFAULT 0,
collected_at TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_snapshots_account_collected
ON douyin_account_snapshots(account_id, collected_at DESC);
CREATE TABLE IF NOT EXISTS douyin_snapshot_fields (
snapshot_id TEXT NOT NULL,
field_path TEXT NOT NULL,
field_type TEXT NOT NULL DEFAULT 'string',
field_value_text TEXT NOT NULL DEFAULT '',
PRIMARY KEY(snapshot_id, field_path),
FOREIGN KEY(snapshot_id) REFERENCES douyin_account_snapshots(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS douyin_videos (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
aweme_id TEXT NOT NULL DEFAULT '',
title TEXT NOT NULL DEFAULT '',
description TEXT NOT NULL DEFAULT '',
share_url TEXT NOT NULL DEFAULT '',
cover_url TEXT NOT NULL DEFAULT '',
duration_sec REAL NOT NULL DEFAULT 0,
published_at TEXT,
tags_json TEXT NOT NULL DEFAULT '[]',
stats_json TEXT NOT NULL DEFAULT '{}',
raw_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_updated
ON douyin_videos(account_id, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_aweme
ON douyin_videos(account_id, aweme_id);
CREATE TABLE IF NOT EXISTS douyin_video_analyses (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
user_id TEXT NOT NULL,
video_id TEXT NOT NULL,
report_id TEXT NOT NULL DEFAULT '',
model_profile_id TEXT NOT NULL DEFAULT '',
model_label TEXT NOT NULL DEFAULT '',
source_type TEXT NOT NULL DEFAULT 'top_score_auto',
status TEXT NOT NULL DEFAULT 'ok',
performance_score REAL NOT NULL DEFAULT 0,
commercial_score REAL NOT NULL DEFAULT 0,
hook_score REAL NOT NULL DEFAULT 0,
retention_score REAL NOT NULL DEFAULT 0,
conversion_score REAL NOT NULL DEFAULT 0,
summary_text TEXT NOT NULL DEFAULT '',
suggestion_text TEXT NOT NULL DEFAULT '',
parsed_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(video_id) REFERENCES douyin_videos(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_video_analyses_video_created
ON douyin_video_analyses(video_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_douyin_video_analyses_account_created
ON douyin_video_analyses(account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS douyin_analysis_reports (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
user_id TEXT NOT NULL,
focus_text TEXT NOT NULL DEFAULT '',
model_profile_ids_json TEXT NOT NULL DEFAULT '[]',
linked_account_ids_json TEXT NOT NULL DEFAULT '[]',
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_analysis_reports_account_created
ON douyin_analysis_reports(account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS douyin_analysis_suggestions (
id TEXT PRIMARY KEY,
report_id TEXT NOT NULL,
model_profile_id TEXT NOT NULL DEFAULT '',
model_label TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'ok',
suggestion_text TEXT NOT NULL DEFAULT '',
parsed_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(report_id) REFERENCES douyin_analysis_reports(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_douyin_analysis_suggestions_report
ON douyin_analysis_suggestions(report_id, created_at ASC);
CREATE TABLE IF NOT EXISTS douyin_similarity_searches (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT,
source_profile_url TEXT NOT NULL DEFAULT '',
keywords_json TEXT NOT NULL DEFAULT '[]',
prompt_text TEXT NOT NULL DEFAULT '',
context_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_similarity_searches_user_created
ON douyin_similarity_searches(user_id, created_at DESC);
CREATE TABLE IF NOT EXISTS douyin_similarity_candidates (
id TEXT PRIMARY KEY,
search_id TEXT NOT NULL,
candidate_account_id TEXT,
candidate_profile_url TEXT NOT NULL DEFAULT '',
heuristic_score REAL NOT NULL DEFAULT 0,
agent_score REAL NOT NULL DEFAULT 0,
rationale_text TEXT NOT NULL DEFAULT '',
dimensions_json TEXT NOT NULL DEFAULT '{}',
raw_output_json TEXT NOT NULL DEFAULT '{}',
rank_index INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
FOREIGN KEY(search_id) REFERENCES douyin_similarity_searches(id) ON DELETE CASCADE,
FOREIGN KEY(candidate_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_similarity_candidates_search_rank
ON douyin_similarity_candidates(search_id, rank_index ASC);
CREATE TABLE IF NOT EXISTS douyin_account_relations (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_account_id TEXT NOT NULL,
target_account_id TEXT,
target_profile_url TEXT NOT NULL DEFAULT '',
relation_type TEXT NOT NULL DEFAULT 'benchmark',
note TEXT NOT NULL DEFAULT '',
search_id TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
FOREIGN KEY(target_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_account_relations_source
ON douyin_account_relations(source_account_id, created_at DESC);
CREATE TABLE IF NOT EXISTS douyin_tracked_accounts (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
tracked_account_id TEXT NOT NULL,
assistant_id TEXT,
note TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, tracked_account_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(tracked_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_douyin_tracked_accounts_user_updated
ON douyin_tracked_accounts(user_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS douyin_tracking_cursors (
user_id TEXT PRIMARY KEY,
last_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
ensure_schema()
@app.on_event("startup")
def _startup_douyin_schema() -> None:
ensure_schema()
def _require_owned_account(account_id: str, user_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM douyin_accounts WHERE id = ? AND user_id = ?",
(account_id, user_id)
)
if not row:
raise HTTPException(status_code=404, detail="Douyin account not found")
return row
def _fetch_model_profiles(account_id: str) -> list[dict[str, Any]]:
return legacy.db.fetch_all(
"""
SELECT *
FROM model_profiles
WHERE owner_account_id IS NULL OR owner_account_id = ?
ORDER BY is_default DESC, created_at ASC
""",
(account_id,)
)
def _resolve_model_profiles(account: dict[str, Any], requested_ids: list[str]) -> list[dict[str, Any]]:
profiles = _fetch_model_profiles(account["id"])
if not profiles:
raise HTTPException(status_code=400, detail="No available model profiles")
if not requested_ids:
return profiles
profile_map = {row["id"]: row for row in profiles}
missing = [profile_id for profile_id in requested_ids if profile_id not in profile_map]
if missing:
raise HTTPException(status_code=404, detail=f"Unknown model profiles: {', '.join(missing)}")
return [profile_map[profile_id] for profile_id in requested_ids]
async def _collect_public_profile(profile_url: str, manual_payload: dict[str, Any] | None) -> dict[str, Any]:
source_url = _normalize_profile_url_input(profile_url)
blobs: list[dict[str, Any]] = []
errors: list[str] = []
if manual_payload:
blobs.append({"script_id": "manual_profile_payload", "payload": manual_payload})
if source_url:
try:
final_url, html = await _fetch_html(source_url)
source_url = final_url
if not html.strip():
errors.append("public_profile_empty_html")
elif _looks_like_douyin_anti_bot_page(html):
errors.append("public_profile_anti_bot_challenge")
elif not blobs:
blobs.extend(_extract_json_blobs_from_html(html))
if not blobs:
errors.append("public_profile_no_json_blobs")
else:
blobs.extend(_extract_json_blobs_from_html(html))
if not blobs:
errors.append("public_profile_no_json_blobs")
except Exception as exc:
errors.append(f"public_profile_fetch_failed: {exc}")
payloads = [item["payload"] for item in blobs]
profile = _pick_best_profile(
[candidate for payload in payloads for candidate in _extract_profile_candidates(payload)],
fallback_url=source_url
)
videos = _extract_videos(payloads)
if source_url and not profile.get("nickname") and not videos and not errors:
if not blobs:
errors.append("public_profile_no_json_blobs")
errors.append("public_profile_no_candidates")
return {
"profile": profile,
"videos": videos,
"raw_pages": blobs,
"errors": errors,
"source_url": source_url
}
async def _collect_creator_center_pages(
urls: list[str],
cookie: str,
manual_pages: list[ManualPageCapture]
) -> dict[str, Any]:
pages: list[dict[str, Any]] = []
errors: list[str] = []
for page in manual_pages:
pages.append({
"url": page.url,
"title": page.title,
"blobs": [{"script_id": "manual_creator_payload", "payload": page.payload}]
})
if cookie.strip():
for url in urls:
try:
final_url, html = await _fetch_html(url, cookie=cookie)
pages.append({
"url": final_url,
"title": "",
"blobs": _extract_json_blobs_from_html(html)
})
except Exception as exc:
errors.append(f"creator_center_fetch_failed[{url}]: {exc}")
return {"pages": pages, "errors": errors}
def _upsert_account(
owner: dict[str, Any],
profile: dict[str, Any],
sync_request: DouyinAccountSyncRequest,
public_data: dict[str, Any],
creator_data: dict[str, Any]
) -> dict[str, Any]:
lookup_candidates = [
("sec_uid", profile.get("sec_uid", "")),
("douyin_id", profile.get("douyin_id", "")),
("canonical_profile_url", profile.get("canonical_profile_url", ""))
]
existing: dict[str, Any] | None = None
for field_name, field_value in lookup_candidates:
if not field_value:
continue
existing = legacy.db.fetch_one(
f"SELECT * FROM douyin_accounts WHERE user_id = ? AND {field_name} = ? LIMIT 1",
(owner["id"], field_value)
)
if existing:
break
account_id = existing["id"] if existing else make_id("dyacct")
created_at = existing["created_at"] if existing else now()
updated_at = now()
tags = _dedupe_strings(profile.get("tags", []) + _extract_keywords(profile.get("nickname", ""), profile.get("signature", "")))
profile_stats = profile.get("stats", {})
source_mode = "creator_center" if creator_data["pages"] else "public"
sync_status = "partial" if public_data["errors"] or creator_data["errors"] else "ready"
if existing:
legacy.db.execute(
"""
UPDATE douyin_accounts
SET profile_url = ?, canonical_profile_url = ?, sec_uid = ?, douyin_uid = ?, douyin_id = ?,
nickname = ?, signature = ?, avatar_url = ?, tags_json = ?, profile_stats_json = ?,
raw_profile_json = ?, source_mode = ?, sync_status = ?, last_public_sync_at = ?,
last_creator_sync_at = ?, updated_at = ?
WHERE id = ?
""",
(
profile.get("profile_url", ""),
profile.get("canonical_profile_url", ""),
profile.get("sec_uid", ""),
profile.get("douyin_uid", ""),
profile.get("douyin_id", ""),
profile.get("nickname", ""),
profile.get("signature", ""),
profile.get("avatar_url", ""),
_safe_json_dumps(tags),
_safe_json_dumps(profile_stats),
_safe_json_dumps({
"profile": profile.get("raw", {}),
"discovery_note": sync_request.discovery_note
}),
source_mode,
sync_status,
now() if public_data["raw_pages"] else existing.get("last_public_sync_at"),
now() if creator_data["pages"] else existing.get("last_creator_sync_at"),
updated_at,
account_id
)
)
else:
legacy.db.execute(
"""
INSERT INTO douyin_accounts (
id, user_id, profile_url, canonical_profile_url, sec_uid, douyin_uid, douyin_id,
nickname, signature, avatar_url, tags_json, profile_stats_json, raw_profile_json,
source_mode, sync_status, last_public_sync_at, last_creator_sync_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
owner["id"],
profile.get("profile_url", ""),
profile.get("canonical_profile_url", ""),
profile.get("sec_uid", ""),
profile.get("douyin_uid", ""),
profile.get("douyin_id", ""),
profile.get("nickname", ""),
profile.get("signature", ""),
profile.get("avatar_url", ""),
_safe_json_dumps(tags),
_safe_json_dumps(profile_stats),
_safe_json_dumps({
"profile": profile.get("raw", {}),
"discovery_note": sync_request.discovery_note
}),
source_mode,
sync_status,
now() if public_data["raw_pages"] else None,
now() if creator_data["pages"] else None,
created_at,
updated_at
)
)
account_row = _require_owned_account(account_id, owner["id"])
_persist_snapshots_and_videos(account_row, public_data, creator_data, sync_request)
return _require_owned_account(account_id, owner["id"])
def _persist_snapshot(
account_row: dict[str, Any],
snapshot_type: str,
source_url: str,
payload: Any,
summary: dict[str, Any],
index_fields: bool = True
) -> str:
snapshot_id = make_id("dysnap")
collected_at = now()
fields = _flatten_json(payload) if index_fields else []
legacy.db.execute(
"""
INSERT INTO douyin_account_snapshots (
id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
field_count, collected_at, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
snapshot_id,
account_row["id"],
snapshot_type,
source_url,
_safe_json_dumps(payload),
_safe_json_dumps(summary),
len(fields),
collected_at,
collected_at
)
)
for field_path, field_type, field_value in fields:
legacy.db.execute(
"""
INSERT OR REPLACE INTO douyin_snapshot_fields (
snapshot_id, field_path, field_type, field_value_text
) VALUES (?, ?, ?, ?)
""",
(snapshot_id, field_path, field_type, field_value)
)
return snapshot_id
def _persist_snapshots_and_videos(
account_row: dict[str, Any],
public_data: dict[str, Any],
creator_data: dict[str, Any],
sync_request: DouyinAccountSyncRequest
) -> None:
if public_data["raw_pages"]:
public_payload = {
"pages": public_data["raw_pages"],
"errors": public_data["errors"],
"source_url": public_data["source_url"]
}
_persist_snapshot(
account_row,
"public_profile",
public_data["source_url"],
public_payload,
{
"video_count": len(public_data["videos"]),
"nickname": public_data["profile"].get("nickname", ""),
"tags": public_data["profile"].get("tags", [])
},
index_fields=not sync_request.compact_response
)
for page in creator_data["pages"]:
payload = {
"title": page["title"],
"blobs": page["blobs"]
}
_persist_snapshot(
account_row,
"creator_center",
page["url"],
payload,
{
"blob_count": len(page["blobs"]),
"field_count": 0 if sync_request.compact_response else len(_flatten_json(payload))
},
index_fields=not sync_request.compact_response
)
for manual_video in sync_request.manual_work_payloads:
normalized = _normalize_video_candidate(manual_video)
public_data["videos"].append(normalized)
deduped: dict[str, dict[str, Any]] = {}
for video in public_data["videos"]:
key = video["aweme_id"] or video["share_url"] or video["title"]
if key and key not in deduped:
deduped[key] = video
for video in deduped.values():
existing = None
if video["aweme_id"]:
existing = legacy.db.fetch_one(
"SELECT id FROM douyin_videos WHERE account_id = ? AND aweme_id = ? LIMIT 1",
(account_row["id"], video["aweme_id"])
)
video_id = existing["id"] if existing else make_id("dyvideo")
created_at = now()
if existing:
legacy.db.execute(
"""
UPDATE douyin_videos
SET title = ?, description = ?, share_url = ?, cover_url = ?, duration_sec = ?,
published_at = ?, tags_json = ?, stats_json = ?, raw_json = ?, updated_at = ?
WHERE id = ?
""",
(
video["title"],
video["description"],
video["share_url"],
video["cover_url"],
video["duration_sec"],
video["published_at"],
_safe_json_dumps(video["tags"]),
_safe_json_dumps(video["stats"]),
_safe_json_dumps(video["raw"]),
now(),
video_id
)
)
else:
legacy.db.execute(
"""
INSERT INTO douyin_videos (
id, account_id, aweme_id, title, description, share_url, cover_url,
duration_sec, published_at, tags_json, stats_json, raw_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
video_id,
account_row["id"],
video["aweme_id"],
video["title"],
video["description"],
video["share_url"],
video["cover_url"],
video["duration_sec"],
video["published_at"],
_safe_json_dumps(video["tags"]),
_safe_json_dumps(video["stats"]),
_safe_json_dumps(video["raw"]),
created_at,
created_at
)
)
def _list_videos(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_videos
WHERE account_id = ?
ORDER BY COALESCE(published_at, updated_at) DESC, updated_at DESC
LIMIT ?
""",
(account_id, limit)
)
payloads: list[dict[str, Any]] = []
for row in rows:
raw_payload = _safe_json_loads(row["raw_json"], {})
normalized = _normalize_video_candidate(raw_payload) if isinstance(raw_payload, dict) and raw_payload else {}
payloads.append({
"id": row["id"],
"aweme_id": row["aweme_id"],
"title": row["title"],
"description": row["description"],
"share_url": row["share_url"],
"cover_url": row["cover_url"],
"duration_sec": row["duration_sec"],
"published_at": row["published_at"],
"tags": _safe_json_loads(row["tags_json"], []),
"stats": _safe_json_loads(row["stats_json"], {}),
"content_type": normalized.get("content_type", "video"),
"content_type_label": normalized.get("content_type_label", "视频"),
"image_count": int(normalized.get("image_count") or 0),
"raw": raw_payload
})
return payloads
def _latest_video_analysis_map(account_id: str) -> dict[str, dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT analysis.*
FROM douyin_video_analyses analysis
INNER JOIN (
SELECT video_id, MAX(created_at) AS latest_created_at
FROM douyin_video_analyses
WHERE account_id = ?
GROUP BY video_id
) latest
ON latest.video_id = analysis.video_id
AND latest.latest_created_at = analysis.created_at
WHERE analysis.account_id = ?
""",
(account_id, account_id)
)
payloads: dict[str, dict[str, Any]] = {}
for row in rows:
parsed = _safe_json_loads(row["parsed_json"], {})
payloads[row["video_id"]] = {
"id": row["id"],
"video_id": row["video_id"],
"report_id": row["report_id"],
"model_profile_id": row["model_profile_id"],
"model_label": row["model_label"],
"source_type": row["source_type"],
"status": row["status"],
"performance_score": float(row["performance_score"] or 0),
"commercial_score": float(row["commercial_score"] or 0),
"hook_score": float(row["hook_score"] or 0),
"retention_score": float(row["retention_score"] or 0),
"conversion_score": float(row["conversion_score"] or 0),
"summary_text": row["summary_text"],
"suggestion_text": row["suggestion_text"],
"parsed_json": parsed,
"created_at": row["created_at"],
"updated_at": row["updated_at"]
}
return payloads
def _build_video_payload(video: dict[str, Any], latest_analysis: dict[str, Any] | None = None) -> dict[str, Any]:
score = _video_score_breakdown(video)
payload = {
"id": video["id"],
"aweme_id": video["aweme_id"],
"title": video["title"],
"description": video["description"],
"share_url": video["share_url"],
"cover_url": video["cover_url"],
"duration_sec": video["duration_sec"],
"published_at": video["published_at"],
"tags": video["tags"],
"content_type": video.get("content_type", "video"),
"content_type_label": video.get("content_type_label", "视频"),
"image_count": int(video.get("image_count") or 0),
"stats": video["stats"],
"score": score
}
if latest_analysis:
payload["latest_analysis"] = latest_analysis
return payload
def _video_sort_key(video: dict[str, Any], sort_by: str) -> tuple[Any, ...]:
if sort_by in {"popular", "popularity"}:
return (
float(video.get("score", {}).get("popularity_score") or 0),
float(video.get("score", {}).get("performance_score") or 0),
float(video.get("score", {}).get("commercial_score") or 0)
)
if sort_by == "latest":
return (
_parse_iso_datetime(video.get("published_at")) or datetime.fromtimestamp(0, tz=timezone.utc),
video.get("score", {}).get("performance_score", 0)
)
if sort_by == "commercial":
return (
float(video.get("score", {}).get("commercial_score") or 0),
float(video.get("score", {}).get("performance_score") or 0)
)
if sort_by == "play":
return (
float((video.get("stats") or {}).get("play") or 0),
float(video.get("score", {}).get("performance_score") or 0)
)
if sort_by == "like":
return (
float((video.get("stats") or {}).get("like") or 0),
float(video.get("score", {}).get("performance_score") or 0)
)
if sort_by == "share":
return (
float((video.get("stats") or {}).get("share") or 0),
float(video.get("score", {}).get("performance_score") or 0)
)
if sort_by == "comment":
return (
float((video.get("stats") or {}).get("comment") or 0),
float(video.get("score", {}).get("performance_score") or 0)
)
return (
float(video.get("score", {}).get("performance_score") or 0),
float(video.get("score", {}).get("commercial_score") or 0)
)
def _build_video_workspace_payload(
account_row: dict[str, Any],
limit: int = 60
) -> dict[str, Any]:
raw_videos = _list_videos(account_row["id"], limit=max(limit, 24))
latest_analysis_map = _latest_video_analysis_map(account_row["id"])
videos = [
_build_video_payload(video, latest_analysis_map.get(video["id"]))
for video in raw_videos
]
videos_by_score = sorted(videos, key=lambda item: _video_sort_key(item, "score"), reverse=True)
videos_by_latest = sorted(videos, key=lambda item: _video_sort_key(item, "latest"), reverse=True)
high_score_threshold = 60.0
high_score_videos = [video for video in videos_by_score if float(video["score"]["performance_score"]) >= high_score_threshold]
analyzed_count = sum(1 for video in videos if video.get("latest_analysis"))
video_only_count = sum(1 for video in videos if video.get("content_type") == "video")
image_text_count = sum(1 for video in videos if video.get("content_type") == "image_text")
return {
"items": videos,
"top_scored_video_ids": [video["id"] for video in videos_by_score[: min(12, len(videos_by_score))]],
"latest_video_ids": [video["id"] for video in videos_by_latest[: min(12, len(videos_by_latest))]],
"high_score_threshold": high_score_threshold,
"meta": {
"total_count": len(videos),
"analyzed_count": analyzed_count,
"high_score_count": len(high_score_videos),
"video_count": video_only_count,
"image_text_count": image_text_count
}
}
def _finalize_sync_workspace(
owner: dict[str, Any],
request: DouyinAccountSyncRequest,
public_data: dict[str, Any],
creator_data: dict[str, Any]
) -> dict[str, Any]:
creator_payloads = _extract_creator_payloads(creator_data)
if creator_payloads:
creator_profile = _pick_best_profile(
[candidate for payload in creator_payloads for candidate in _extract_profile_candidates(payload)]
)
creator_videos = _extract_videos(creator_payloads)
creator_identity_match = _profiles_appear_same(public_data["profile"], creator_profile)
should_merge_creator = creator_identity_match or request.allow_creator_center_profile_fallback
if should_merge_creator:
if creator_profile.get("nickname"):
public_data["profile"] = _merge_profile_payload(public_data["profile"], creator_profile)
if not public_data["source_url"]:
public_data["source_url"] = creator_profile.get("canonical_profile_url") or request.profile_url
if request.allow_creator_center_profile_fallback and not creator_identity_match:
public_data["errors"].append("creator_center_profile_fallback_used")
elif public_data["profile"].get("nickname") != creator_profile.get("nickname"):
public_data["errors"].append("creator_center_profile_merge_partial")
public_data["videos"].extend(creator_videos)
elif creator_profile.get("nickname") or creator_videos:
public_data["errors"].append("creator_center_identity_mismatch_skipped")
if not public_data["profile"].get("nickname") and not public_data["videos"]:
message = "No Douyin profile or creator-center data could be extracted"
if "creator_center_identity_mismatch_skipped" in public_data["errors"]:
message = "Creator-center capture belongs to a different logged-in Douyin account; automatic merge was skipped"
raise HTTPException(
status_code=400,
detail={
"message": message,
"profile_url": request.profile_url,
"resolved_profile_url": public_data["source_url"],
"public_blob_count": len(public_data["raw_pages"]),
"public_video_count": len(public_data["videos"]),
"public_errors": public_data["errors"],
"creator_page_count": len(creator_data["pages"]),
"creator_errors": creator_data["errors"]
}
)
account_row = _upsert_account(owner, public_data["profile"], request, public_data, creator_data)
sync_errors = public_data["errors"] + creator_data["errors"]
if request.compact_response:
return {
"account": {
"id": account_row["id"],
"nickname": account_row["nickname"],
"profile_url": account_row["profile_url"],
"douyin_id": account_row["douyin_id"],
"sec_uid": account_row["sec_uid"],
"sync_status": account_row["sync_status"]
},
"sync_errors": sync_errors,
"public_video_count": len(public_data["videos"]),
"creator_page_count": len(creator_data["pages"])
}
workspace = _build_workspace_payload(account_row)
workspace["sync_errors"] = sync_errors
return workspace
def _build_account_payload(account_row: dict[str, Any], include_recent_videos: int = 8) -> dict[str, Any]:
videos = _list_videos(account_row["id"], limit=max(include_recent_videos, 12))
tags = _safe_json_loads(account_row["tags_json"], [])
profile_stats = _safe_json_loads(account_row["profile_stats_json"], {})
video_summary = _summarize_videos(videos, limit=include_recent_videos)
keywords = _dedupe_strings(
tags
+ _extract_keywords(account_row["nickname"], account_row["signature"])
+ video_summary["top_tags"]
+ [video["title"] for video in video_summary["videos"]]
)
return {
"id": account_row["id"],
"nickname": account_row["nickname"],
"signature": account_row["signature"],
"profile_url": account_row["canonical_profile_url"] or account_row["profile_url"],
"avatar_url": account_row["avatar_url"],
"sec_uid": account_row["sec_uid"],
"douyin_id": account_row["douyin_id"],
"profile_stats": profile_stats,
"tags": tags,
"keywords": keywords[:18],
"sync_status": account_row["sync_status"],
"video_summary": video_summary
}
def _list_linked_accounts(account_row: dict[str, Any]) -> list[dict[str, Any]]:
relation_rows = legacy.db.fetch_all(
"""
SELECT rel.*, target.nickname AS target_nickname, target.signature AS target_signature,
target.canonical_profile_url AS target_canonical_profile_url, target.profile_stats_json AS target_profile_stats_json,
target.tags_json AS target_tags_json
FROM douyin_account_relations rel
LEFT JOIN douyin_accounts target ON target.id = rel.target_account_id
WHERE rel.source_account_id = ?
ORDER BY rel.created_at DESC
""",
(account_row["id"],)
)
payloads: list[dict[str, Any]] = []
for row in relation_rows:
payloads.append({
"relation_id": row["id"],
"relation_type": row["relation_type"],
"note": row["note"],
"search_id": row["search_id"],
"created_at": row["created_at"],
"target_account_id": row["target_account_id"],
"target_profile_url": row["target_profile_url"] or row.get("target_canonical_profile_url", ""),
"target_nickname": row.get("target_nickname", ""),
"target_signature": row.get("target_signature", ""),
"target_profile_stats": _safe_json_loads(row.get("target_profile_stats_json"), {}),
"target_tags": _safe_json_loads(row.get("target_tags_json"), [])
})
return payloads
def _load_owned_assistant(assistant_id: str, user_id: str) -> dict[str, Any] | None:
if not str(assistant_id or "").strip():
return None
row = legacy.db.fetch_one(
"SELECT * FROM assistants WHERE id = ? AND user_id = ?",
(assistant_id, user_id)
)
if not row:
raise HTTPException(status_code=404, detail="Assistant not found")
return row
def _get_tracking_cursor(user_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM douyin_tracking_cursors WHERE user_id = ?",
(user_id,)
)
def _set_tracking_cursor(user_id: str, last_seen_at: str) -> dict[str, Any]:
existing = _get_tracking_cursor(user_id)
timestamp = _first_non_empty(last_seen_at, now())
updated_at = now()
if existing:
legacy.db.execute(
"UPDATE douyin_tracking_cursors SET last_seen_at = ?, updated_at = ? WHERE user_id = ?",
(timestamp, updated_at, user_id)
)
else:
legacy.db.execute(
"INSERT INTO douyin_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?)",
(user_id, timestamp, updated_at)
)
return legacy.db.fetch_one("SELECT * FROM douyin_tracking_cursors WHERE user_id = ?", (user_id,))
def _list_tracked_accounts(user_id: str) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT track.*,
assistant.name AS assistant_name
FROM douyin_tracked_accounts track
LEFT JOIN assistants assistant ON assistant.id = track.assistant_id
WHERE track.user_id = ?
ORDER BY track.updated_at DESC
""",
(user_id,)
)
payloads: list[dict[str, Any]] = []
for row in rows:
account_row = _require_owned_account(row["tracked_account_id"], user_id)
account_payload = _build_account_payload(account_row, include_recent_videos=6)
payloads.append({
"id": row["id"],
"tracked_account_id": row["tracked_account_id"],
"assistant_id": row.get("assistant_id", "") or "",
"assistant_name": row.get("assistant_name", "") or "",
"note": row.get("note", "") or "",
"created_at": row["created_at"],
"updated_at": row["updated_at"],
"account": account_payload
})
return payloads
def _extract_tracking_borrowing_points(video: dict[str, Any]) -> list[str]:
latest_analysis = (video.get("latest_analysis") or {}).get("parsed_json") or {}
candidates: list[str] = []
def _collect(value: Any) -> None:
if isinstance(value, list):
for item in value:
if isinstance(item, str) and item.strip():
candidates.append(item.strip())
elif isinstance(item, dict):
for inner in item.values():
if isinstance(inner, str) and inner.strip():
candidates.append(inner.strip())
elif isinstance(value, str) and value.strip():
candidates.append(value.strip())
for key in ("winning_patterns", "replicate_plan", "hook_patterns", "content_engine", "offer_directions", "next_actions"):
_collect(latest_analysis.get(key))
score = video.get("score", {}) or {}
stats = video.get("stats", {}) or {}
if float(score.get("hook_score") or 0) >= 70:
candidates.append("开头抓人,适合借前三秒强结论或反常识开场。")
if float(score.get("commercial_score") or 0) >= 65:
candidates.append("转化信号较强,可拆成交句式和行动指令。")
if float(score.get("performance_score") or 0) >= 70:
candidates.append("整体表现高,值得提炼成可复用栏目模板。")
if float(stats.get("comment") or 0) >= 100:
candidates.append("评论互动明显,适合提炼争议点或提问句。")
if str(video.get("content_type") or "") == "image_text":
candidates.append("图文结构清晰,可借分段标题和卡片式表达。")
deduped: list[str] = []
seen: set[str] = set()
for item in candidates:
normalized = _compact_text(item, 80)
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped.append(normalized)
return deduped[:4]
def _build_tracking_digest_item(tracked_item: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
latest_analysis = video.get("latest_analysis") or {}
summary = (
(latest_analysis.get("parsed_json") or {}).get("executive_summary")
or latest_analysis.get("summary_text")
or latest_analysis.get("suggestion_text")
or video.get("description")
or video.get("title")
or "暂无摘要"
)
borrowing_points = _extract_tracking_borrowing_points(video)
return {
"tracking_id": tracked_item["id"],
"tracked_account_id": tracked_item["tracked_account_id"],
"assistant_id": tracked_item["assistant_id"],
"assistant_name": tracked_item["assistant_name"],
"account": tracked_item["account"],
"video": video,
"summary": _compact_text(summary, 160),
"borrowing_points": borrowing_points,
"is_high_value": float((video.get("score") or {}).get("performance_score") or 0) >= 70 or bool(borrowing_points),
}
def _build_tracking_digest(user_id: str, since_value: str = "", limit: int = 24) -> dict[str, Any]:
tracked_accounts = _list_tracked_accounts(user_id)
cursor = _get_tracking_cursor(user_id)
since_dt = _parse_iso_datetime(since_value) if since_value else None
if since_dt is None and cursor:
since_dt = _parse_iso_datetime(cursor.get("last_seen_at"))
if since_dt is None:
since_dt = (datetime.now(timezone.utc) - timedelta(days=3)).replace(microsecond=0)
items: list[dict[str, Any]] = []
for tracked in tracked_accounts:
account_row = _require_owned_account(tracked["tracked_account_id"], user_id)
workspace = _build_video_workspace_payload(account_row, limit=36)
for video in workspace.get("items", []):
published_at = _parse_iso_datetime(video.get("published_at"))
if published_at is None or published_at <= since_dt:
continue
items.append(_build_tracking_digest_item(tracked, video))
items.sort(
key=lambda item: (
_parse_iso_datetime(item["video"].get("published_at")) or datetime.fromtimestamp(0, tz=timezone.utc),
float((item["video"].get("score") or {}).get("performance_score") or 0)
),
reverse=True
)
return {
"generated_at": now(),
"since": since_dt.isoformat(),
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
"tracked_accounts": tracked_accounts,
"items": items[: max(1, min(limit, 100))]
}
def _normalize_report_text(value: Any) -> str:
text = str(value or "").strip()
if not text:
return ""
return re.sub(r"\s+", " ", text)
def _build_report_payload(report: dict[str, Any]) -> dict[str, Any]:
suggestions = legacy.db.fetch_all(
"SELECT * FROM douyin_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
(report["id"],)
)
return {
"id": report["id"],
"focus_text": report["focus_text"],
"model_profile_ids": _safe_json_loads(report["model_profile_ids_json"], []),
"linked_account_ids": _safe_json_loads(report["linked_account_ids_json"], []),
"created_at": report["created_at"],
"duplicate_count": 1,
"duplicate_report_ids": [],
"suggestions": [
{
"id": suggestion["id"],
"model_profile_id": suggestion["model_profile_id"],
"model_label": suggestion["model_label"],
"status": suggestion["status"],
"suggestion_text": suggestion["suggestion_text"],
"parsed_json": _safe_json_loads(suggestion["parsed_json"], {})
}
for suggestion in suggestions
]
}
def _report_signature(report_payload: dict[str, Any]) -> str:
parts = [_normalize_report_text(report_payload.get("focus_text"))]
for suggestion in report_payload.get("suggestions", []):
parsed = suggestion.get("parsed_json") or {}
if isinstance(parsed, dict) and parsed:
normalized_content = json.dumps(parsed, ensure_ascii=False, sort_keys=True)
else:
normalized_content = _normalize_report_text(suggestion.get("suggestion_text"))
parts.append(
"|".join(
[
suggestion.get("model_profile_id", ""),
suggestion.get("status", ""),
normalized_content
]
)
)
return "\n".join(parts)
def _list_report_payloads(account_id: str, limit: int = 5, dedupe: bool = True) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_analysis_reports
WHERE account_id = ?
ORDER BY created_at DESC
LIMIT ?
""",
(account_id, max(limit * 4, 20))
)
payloads = [_build_report_payload(report) for report in rows]
if not dedupe:
return payloads[:limit]
unique_payloads: list[dict[str, Any]] = []
seen: dict[str, dict[str, Any]] = {}
for payload in payloads:
signature = _report_signature(payload)
if signature in seen:
seen_payload = seen[signature]
seen_payload["duplicate_count"] = int(seen_payload.get("duplicate_count") or 1) + 1
seen_payload.setdefault("duplicate_report_ids", []).append(payload["id"])
continue
seen[signature] = payload
unique_payloads.append(payload)
focus_filtered: list[dict[str, Any]] = []
focus_seen: dict[str, dict[str, Any]] = {}
for payload in unique_payloads:
focus_key = _normalize_report_text(payload.get("focus_text") or "__default__")
if focus_key in focus_seen:
seen_payload = focus_seen[focus_key]
seen_payload["duplicate_count"] = int(seen_payload.get("duplicate_count") or 1) + 1
seen_payload.setdefault("duplicate_report_ids", []).append(payload["id"])
continue
focus_seen[focus_key] = payload
focus_filtered.append(payload)
return focus_filtered[:limit]
def _delete_report(report_id: str) -> None:
legacy.db.execute("DELETE FROM douyin_analysis_suggestions WHERE report_id = ?", (report_id,))
legacy.db.execute("DELETE FROM douyin_analysis_reports WHERE id = ?", (report_id,))
def _find_duplicate_report_payload(
account_id: str,
focus_text: str,
suggestion_payloads: list[dict[str, Any]],
exclude_report_id: str = ""
) -> dict[str, Any] | None:
candidate_rows = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_analysis_reports
WHERE account_id = ? AND focus_text = ? AND id != ?
ORDER BY created_at DESC
LIMIT 10
""",
(account_id, focus_text, exclude_report_id)
)
probe_payload = {
"focus_text": focus_text,
"suggestions": suggestion_payloads
}
probe_signature = _report_signature(probe_payload)
for row in candidate_rows:
candidate_payload = _build_report_payload(row)
if _report_signature(candidate_payload) == probe_signature:
return candidate_payload
return None
def _build_workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
account_payload = _build_account_payload(account_row)
video_workspace = _build_video_workspace_payload(account_row)
latest_public_snapshot = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_account_snapshots
WHERE account_id = ? AND snapshot_type = 'public_profile'
ORDER BY collected_at DESC
LIMIT 1
""",
(account_row["id"],)
)
latest_creator_snapshot = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_account_snapshots
WHERE account_id = ? AND snapshot_type = 'creator_center'
ORDER BY collected_at DESC
LIMIT 1
""",
(account_row["id"],)
)
report_payloads = _list_report_payloads(account_row["id"], limit=5, dedupe=True)
recent_searches = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_similarity_searches
WHERE source_account_id = ?
ORDER BY created_at DESC
LIMIT 5
""",
(account_row["id"],)
)
return {
"account": account_payload,
"latest_public_snapshot": {
"id": latest_public_snapshot["id"],
"source_url": latest_public_snapshot["source_url"],
"field_count": latest_public_snapshot["field_count"],
"collected_at": latest_public_snapshot["collected_at"],
"summary": _safe_json_loads(latest_public_snapshot["summary_json"], {})
} if latest_public_snapshot else None,
"latest_creator_snapshot": {
"id": latest_creator_snapshot["id"],
"source_url": latest_creator_snapshot["source_url"],
"field_count": latest_creator_snapshot["field_count"],
"collected_at": latest_creator_snapshot["collected_at"],
"summary": _safe_json_loads(latest_creator_snapshot["summary_json"], {})
} if latest_creator_snapshot else None,
"linked_accounts": _list_linked_accounts(account_row),
"recent_reports": report_payloads,
"video_workspace": {
"top_scored_video_ids": video_workspace["top_scored_video_ids"],
"latest_video_ids": video_workspace["latest_video_ids"],
"high_score_threshold": video_workspace["high_score_threshold"],
"meta": video_workspace["meta"]
},
"recent_similarity_searches": [
{
"id": row["id"],
"keywords": _safe_json_loads(row["keywords_json"], []),
"created_at": row["created_at"]
}
for row in recent_searches
],
"available_model_profiles": [
{
"id": row["id"],
"name": row["name"],
"model_name": row["model_name"],
"base_url": row["base_url"],
"is_default": bool(row["is_default"])
}
for row in _fetch_model_profiles(account_row["user_id"])
]
}
def _list_snapshots(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_account_snapshots
WHERE account_id = ?
ORDER BY collected_at DESC
LIMIT ?
""",
(account_id, limit)
)
return [
{
"id": row["id"],
"snapshot_type": row["snapshot_type"],
"source_url": row["source_url"],
"field_count": row["field_count"],
"collected_at": row["collected_at"],
"summary": _safe_json_loads(row["summary_json"], {})
}
for row in rows
]
def _get_snapshot_detail(snapshot_id: str, account_id: str) -> dict[str, Any]:
row = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_account_snapshots
WHERE id = ? AND account_id = ?
LIMIT 1
""",
(snapshot_id, account_id)
)
if not row:
raise HTTPException(status_code=404, detail="Snapshot not found")
fields = legacy.db.fetch_all(
"""
SELECT field_path, field_type, field_value_text
FROM douyin_snapshot_fields
WHERE snapshot_id = ?
ORDER BY field_path ASC
""",
(snapshot_id,)
)
return {
"id": row["id"],
"snapshot_type": row["snapshot_type"],
"source_url": row["source_url"],
"field_count": row["field_count"],
"collected_at": row["collected_at"],
"summary": _safe_json_loads(row["summary_json"], {}),
"raw_payload": _safe_json_loads(row["raw_payload_json"], {}),
"fields": fields
}
def _build_video_context_items(
video_workspace: dict[str, Any],
max_top_items: int = 6,
max_latest_items: int = 6
) -> dict[str, list[dict[str, Any]]]:
items = video_workspace.get("items", [])
item_map = {item["id"]: item for item in items}
top_items = [
item_map[video_id]
for video_id in video_workspace.get("top_scored_video_ids", [])[:max_top_items]
if video_id in item_map
]
latest_items = [
item_map[video_id]
for video_id in video_workspace.get("latest_video_ids", [])[:max_latest_items]
if video_id in item_map
]
def _brief(video: dict[str, Any]) -> dict[str, Any]:
return {
"video_id": video["id"],
"aweme_id": video["aweme_id"],
"title": video["title"],
"description": video["description"],
"published_at": video["published_at"],
"tags": video["tags"][:6],
"stats": video["stats"],
"score": video["score"],
"latest_analysis": (video.get("latest_analysis") or {}).get("parsed_json") or {}
}
return {
"top_performing_videos": [_brief(item) for item in top_items],
"latest_videos": [_brief(item) for item in latest_items]
}
def _bounded_score(value: Any, fallback: float = 0.0) -> float:
parsed = _parse_count(value)
if parsed <= 0 and value not in (0, "0", 0.0):
parsed = fallback
return round(max(0.0, min(100.0, parsed or fallback)), 2)
def _merge_structured_payload(fallback: Any, parsed: Any) -> Any:
if not isinstance(fallback, dict) or not isinstance(parsed, dict):
return parsed or fallback
merged: dict[str, Any] = {}
for key, fallback_value in fallback.items():
if key not in parsed:
merged[key] = fallback_value
continue
parsed_value = parsed[key]
if isinstance(fallback_value, dict) and isinstance(parsed_value, dict):
merged[key] = _merge_structured_payload(fallback_value, parsed_value)
else:
merged[key] = parsed_value if parsed_value not in (None, "", [], {}) else fallback_value
for key, parsed_value in parsed.items():
if key not in merged:
merged[key] = parsed_value
return merged
def _infer_offer_directions(keywords: list[str]) -> list[str]:
normalized = {item.lower() for item in keywords}
offers: list[str] = []
if {"创业", "成交", "获客"} & normalized:
offers.append("创业获客咨询、成交训练营或老板 IP 陪跑")
if {"文案", "短视频文案", "口播", "二创"} & normalized:
offers.append("短视频文案模板包、脚本代写或内容陪跑服务")
if {"教育", "教育规划"} & normalized:
offers.append("教育规划咨询、升学产品或高客单咨询服务")
if {"后期", "剪辑", "产品"} & normalized:
offers.append("剪辑优化、内容包装或产品策划服务")
if not offers:
offers.append("内容咨询、账号诊断和主题训练营")
offers.append("以高分作品为样板的复刻栏目和线索承接页")
return _dedupe_strings(offers)[:4]
def _build_video_analysis_fallback(account_payload: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
score = video["score"]
tags = video.get("tags") or []
keywords = _extract_keywords(video.get("title", ""), video.get("description", ""))
hook_patterns: list[str] = []
title = video.get("title", "")
if any(token in title for token in ("怎么", "如何", "为什么")):
hook_patterns.append("问题解决型开场,先给结果再给方法")
if any(token in title for token in ("", "误区", "", "不要")):
hook_patterns.append("避坑警示型开场,容易拉停留和评论")
if re.search(r"\d", title):
hook_patterns.append("数字型表达能快速建立信息密度和预期")
if not hook_patterns:
hook_patterns.append("强结论或冲突判断先出,适合 3 秒内抢注意力")
structure_patterns = [
"开头给结论或反常识观点,中段拆 2-3 个要点,结尾给执行动作",
"用具体场景或常见错误承接,降低理解门槛",
"把方法论压缩成可收藏的清单,利于后续转化"
]
commercial_judgement = (
"这条内容适合做高意向线索承接,优先放在咨询、训练营或模板产品前链路。"
if score["commercial_score"] >= 70
else "这条内容更适合作为流量内容,用来放大覆盖,再通过评论区和私信承接。"
)
operator_actions = [
"把标题里的核心钩子沉淀成 3-5 个固定开场模板,持续复用",
"在评论区补一个可执行清单,测试评论区转化和私信承接",
"围绕同主题连续发 3 条变体,验证题材是否可规模化"
]
if score["collect_rate"] >= 0.008:
operator_actions.append("把这条内容延展成可下载资料或收藏型产品,提高转化效率")
if score["share_rate"] >= 0.01:
operator_actions.append("把传播点提炼成系列化选题,优先投放同类话题")
return {
"headline_summary": f"{_compact_text(title, 30)}》属于高可复制内容,核心价值在于{score['signals'][0]}",
"hook_breakdown": hook_patterns[:3],
"structure_breakdown": structure_patterns,
"commercial_angle": {
"score": score["commercial_score"],
"judgement": commercial_judgement,
"suitable_for": _infer_offer_directions(account_payload.get("keywords", []))[:3]
},
"replication_plan": [
f"围绕 {tags[0] if tags else '当前主题'} 再做 3 条不同人群切口",
"保持同类开头结构,但替换成更具体的场景和结果承诺",
"在结尾加入明确的下一步动作,承接评论、私信或表单"
],
"operator_actions": _dedupe_strings(operator_actions)[:5],
"risk_notes": [
"如果后续复刻只保留题材、不保留强钩子,数据会明显回落",
"如果评论区没有承接动作,商业化价值会停留在播放层"
],
"scores": {
"hook": min(100.0, round(score["performance_score"] * 0.92 + 4, 2)),
"retention": min(100.0, round(score["performance_score"] * 0.88 + 6, 2)),
"conversion": min(100.0, round(score["commercial_score"] * 0.93 + 3, 2)),
"commercial": score["commercial_score"]
},
"raw_keywords": keywords[:8]
}
def _build_account_analysis_fallback(
target_payload: dict[str, Any],
benchmark_payloads: list[dict[str, Any]],
analysis_context: dict[str, Any]
) -> dict[str, Any]:
video_workspace = analysis_context.get("video_workspace", {})
top_videos = video_workspace.get("top_performing_videos", [])
latest_videos = video_workspace.get("latest_videos", [])
keywords = _dedupe_strings(
list(target_payload.get("keywords", []))
+ list(target_payload.get("tags", []))
+ list(target_payload.get("video_summary", {}).get("top_tags", []))
)
avg_top_score = round(
sum(float(item.get("score", {}).get("performance_score") or 0) for item in top_videos) / max(len(top_videos), 1),
2
)
avg_latest_score = round(
sum(float(item.get("score", {}).get("performance_score") or 0) for item in latest_videos) / max(len(latest_videos), 1),
2
)
monetization_score = round(
min(
100.0,
avg_top_score * 0.55
+ float(target_payload.get("video_summary", {}).get("avg_share") or 0) / 120.0
+ float(target_payload.get("video_summary", {}).get("avg_comment") or 0) / 80.0
),
2
)
audience = "想提升短视频获客效率、内容转化和账号定位的创业者与内容运营者"
if {"教育", "教育规划"} & {item.lower() for item in keywords}:
audience = "关注教育规划、升学决策和信息差机会的人群"
core_promise = (
f"{_compact_text(target_payload.get('video_summary', {}).get('top_tags', ['内容方法'])[0], 10)} 相关主题,"
"快速给用户一个能立刻套用的内容方法或判断。"
)
hook_patterns = []
titles = [item.get("title", "") for item in top_videos[:5]]
if any(re.search(r"\d", title) for title in titles):
hook_patterns.append("数字型开头,直接降低理解成本")
if any(any(token in title for token in ("怎么", "如何", "为什么")) for title in titles):
hook_patterns.append("问题解决型开头,先抛问题再给答案")
if any(any(token in title for token in ("", "误区", "", "不要")) for title in titles):
hook_patterns.append("避坑警示型开头,容易拉停留和讨论")
hook_patterns = _dedupe_strings(hook_patterns + ["强结论先行,适合 3 秒内抢注意力"])[:4]
winning_patterns = []
for video in top_videos[:4]:
winning_patterns.append({
"video_title": video.get("title", ""),
"score": video.get("score", {}).get("performance_score", 0),
"why": "高分原因主要来自 " + "".join(video.get("score", {}).get("signals", [])[:2]),
"replication_angle": "保留原题材与开头结构,再改写为更具体的人群场景和结果承诺"
})
latest_signal = []
for video in latest_videos[:4]:
signal = "近期内容仍在有效窗口期"
if float(video.get("score", {}).get("performance_score") or 0) + 8 < avg_top_score:
signal = "最近作品热度弱于历史高分样本,需要回到已验证题材"
latest_signal.append({
"video_title": video.get("title", ""),
"signal": signal,
"action": "优先做同题材复刻、加强开头结论和结尾承接动作"
})
benchmark_insights = [
f"对标账号 {payload.get('nickname', '未命名账号')} 可借鉴其 {', '.join(payload.get('video_summary', {}).get('top_tags', [])[:3]) or '选题聚焦'},但不要直接照搬口吻。"
for payload in benchmark_payloads[:3]
] or [
"当前可用对标账号较少,建议优先围绕高分作品题材扩充对标池。"
]
operational_gaps = [
"高分内容和最近内容之间如果存在明显分差,说明选题复盘还没有形成固定机制",
"如果收藏和评论信号强,但页面承接动作弱,商业化效率会被浪费",
"账号标签较散时,用户对你卖什么、能解决什么问题的认知会不够集中"
]
if avg_latest_score >= avg_top_score - 5:
operational_gaps[0] = "最近内容与高分内容差距不大,可以开始标准化选题库和周更节奏"
return {
"executive_summary": (
f"这个账号当前最值得放大的内容方向是 {', '.join(target_payload.get('video_summary', {}).get('top_tags', [])[:3]) or '已验证高分题材'}"
f"高分作品平均得分 {avg_top_score},最近作品平均得分 {avg_latest_score}"
"已经具备做商业化内容矩阵和固定转化链路的基础。"
),
"commercial_positioning": {
"audience": audience,
"core_promise": core_promise,
"monetization_readiness_score": monetization_score,
"offer_directions": _infer_offer_directions(keywords)
},
"content_engine": {
"pillars": target_payload.get("video_summary", {}).get("top_tags", [])[:6],
"hook_patterns": hook_patterns,
"structure_patterns": [
"开头先给结论或冲突点,中段拆 2-3 个关键动作,结尾给明确下一步",
"围绕用户熟悉的问题场景切入,降低完播门槛",
"把方法论做成可收藏的清单,提高后续转化机会"
],
"cta_patterns": [
"高收藏内容结尾引导先收藏再执行",
"高评论内容结尾抛反问,引导评论区互动",
"高分享内容结尾补一句适合谁转发给谁,放大自然传播"
]
},
"winning_patterns": winning_patterns,
"latest_signal": latest_signal,
"benchmark_insights": benchmark_insights,
"monetization_plan": [
"把高分题材拆成免费内容、低门槛产品和咨询服务三级承接",
"优先围绕高收藏内容制作模板、清单或训练营资料",
"让评论区和私信都指向同一个明确转化动作,避免流量浪费"
],
"operational_gaps": operational_gaps,
"next_30_day_actions": [
"每周固定复刻 2 条高分题材,再测试 1 条新角度",
"给高分作品统一补评论区承接话术和私信关键词",
"每周复盘高分榜和最新榜,保留题材、更新场景和切口",
"把表现最稳的 3 条内容做成系列化栏目"
],
"risk_watchlist": [
"题材过多会稀释账号定位,影响成交效率",
"如果只追求播放而不设计承接动作,商业化会停留在表层流量",
"复制高分标题但不复制结构和场景,容易出现数据回落"
]
}
async def _run_top_video_analyses(
account_row: dict[str, Any],
owner: dict[str, Any],
profile: dict[str, Any],
*,
top_video_count: int = 6,
min_score: float = 45.0,
report_id: str = "",
source_type: str = "top_score_auto",
temperature: float = 0.25
) -> list[dict[str, Any]]:
video_workspace = _build_video_workspace_payload(account_row, limit=max(top_video_count * 3, 24))
item_map = {item["id"]: item for item in video_workspace["items"]}
ranked_videos = [
item_map[video_id]
for video_id in video_workspace["top_scored_video_ids"]
if video_id in item_map and float(item_map[video_id]["score"]["performance_score"]) >= float(min_score)
][: max(1, min(top_video_count, 12))]
if not ranked_videos:
return []
account_payload = _build_account_payload(account_row, include_recent_videos=8)
system_prompt = (
"你是商业化短视频拆解顾问。你要针对单条作品给出可用于商业化运营的复盘。"
"请返回严格 JSON 对象字段必须包含headline_summary、hook_breakdown、"
"structure_breakdown、commercial_angle、replication_plan、operator_actions、"
"risk_notes、scores。scores 里必须包含 hook、retention、conversion、commercial范围 0-100。"
)
async def _analyze_video(video: dict[str, Any]) -> dict[str, Any]:
prompt_context = {
"account": {
"id": account_payload["id"],
"nickname": account_payload["nickname"],
"signature": account_payload["signature"],
"tags": account_payload["tags"][:12]
},
"video": {
"id": video["id"],
"aweme_id": video["aweme_id"],
"title": video["title"],
"description": video["description"],
"published_at": video["published_at"],
"tags": video["tags"],
"stats": video["stats"],
"score": video["score"]
}
}
user_prompt = (
"请从商业化运营视角拆解这条作品。重点回答:为什么这条作品值得关注、"
"适合承接什么产品/服务、下一步怎么复刻、运营动作怎么排。"
f"\n\n输入上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
)
try:
output = await legacy.call_model(
profile,
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=temperature
)
parsed = _try_parse_agent_json(output)
status = "ok"
except Exception as exc:
output = str(exc)
parsed = {}
status = "error"
score = video["score"]
if not isinstance(parsed, dict):
parsed = {}
parsed = _merge_structured_payload(_build_video_analysis_fallback(account_payload, video), parsed)
parsed_scores = parsed.get("scores", {}) if isinstance(parsed, dict) else {}
analysis_id = make_id("dyva")
summary_text = _first_non_empty(
(parsed.get("headline_summary") if isinstance(parsed, dict) else ""),
(parsed.get("summary") if isinstance(parsed, dict) else ""),
(parsed.get("commercial_angle", {}) or {}).get("judgement") if isinstance(parsed, dict) else "",
output
)
hook_score = _bounded_score(parsed_scores.get("hook"), fallback=score["performance_score"])
retention_score = _bounded_score(parsed_scores.get("retention"), fallback=score["performance_score"])
conversion_score = _bounded_score(parsed_scores.get("conversion"), fallback=score["commercial_score"])
commercial_score = _bounded_score(parsed_scores.get("commercial"), fallback=score["commercial_score"])
created_at = now()
legacy.db.execute(
"""
INSERT INTO douyin_video_analyses (
id, account_id, user_id, video_id, report_id, model_profile_id, model_label,
source_type, status, performance_score, commercial_score, hook_score,
retention_score, conversion_score, summary_text, suggestion_text,
parsed_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
analysis_id,
account_row["id"],
owner["id"],
video["id"],
report_id,
profile["id"],
_build_model_label(profile),
source_type,
status,
score["performance_score"],
commercial_score,
hook_score,
retention_score,
conversion_score,
_compact_text(summary_text, 240),
output if output.strip() else _safe_json_dumps(parsed),
_safe_json_dumps(parsed),
created_at,
created_at
)
)
return {
"id": analysis_id,
"video_id": video["id"],
"video_title": video["title"],
"status": status,
"summary_text": _compact_text(summary_text, 240),
"parsed_json": parsed,
"performance_score": score["performance_score"],
"commercial_score": commercial_score,
"hook_score": hook_score,
"retention_score": retention_score,
"conversion_score": conversion_score,
"created_at": created_at
}
return await asyncio.gather(*[_analyze_video(video) for video in ranked_videos])
async def _run_account_analysis(
account_row: dict[str, Any],
owner: dict[str, Any],
request: DouyinAccountAnalysisRequest
) -> dict[str, Any]:
target_payload = _build_account_payload(account_row, include_recent_videos=max(4, min(request.max_videos, 12)))
video_workspace = _build_video_workspace_payload(account_row, limit=max(request.max_videos * 3, 24))
video_context = _build_video_context_items(
video_workspace,
max_top_items=min(max(request.max_videos, 4), 8),
max_latest_items=min(max(request.max_videos, 4), 8)
)
linked_rows = _list_linked_accounts(account_row)
linked_account_ids = list(request.linked_account_ids)
if request.include_linked_accounts:
linked_account_ids.extend(
row["target_account_id"] for row in linked_rows if row.get("target_account_id")
)
linked_account_ids = _dedupe_strings(linked_account_ids)
benchmark_payloads: list[dict[str, Any]] = []
for linked_account_id in linked_account_ids:
linked_row = _require_owned_account(linked_account_id, owner["id"])
benchmark_payloads.append(_build_account_payload(linked_row, include_recent_videos=6))
if request.include_recent_similar_candidates and not benchmark_payloads:
latest_search = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_similarity_searches
WHERE source_account_id = ?
ORDER BY created_at DESC
LIMIT 1
""",
(account_row["id"],)
)
if latest_search:
candidate_rows = legacy.db.fetch_all(
"""
SELECT cand.*, acct.user_id AS account_user_id
FROM douyin_similarity_candidates cand
LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
WHERE cand.search_id = ?
ORDER BY cand.rank_index ASC
LIMIT 3
""",
(latest_search["id"],)
)
for candidate_row in candidate_rows:
candidate_account_id = candidate_row.get("candidate_account_id")
if not candidate_account_id:
continue
linked_candidate = _require_owned_account(candidate_account_id, owner["id"])
benchmark_payloads.append(_build_account_payload(linked_candidate, include_recent_videos=6))
profiles = _resolve_model_profiles(owner, request.model_profile_ids)
system_prompt = (
"你是资深抖音商业化增长顾问。你会基于账号画像、创作者中心字段、作品表现、"
"高分作品样本、最近更新信号和对标账号内容,给出可直接指导运营和商业化的结论。"
"请始终返回严格 JSON 对象包含这些字段executive_summary、commercial_positioning、"
"content_engine、winning_patterns、latest_signal、benchmark_insights、"
"monetization_plan、operational_gaps、next_30_day_actions、risk_watchlist。"
"commercial_positioning 必须是对象,至少包含 audience、core_promise、monetization_readiness_score、"
"offer_directions。content_engine 必须包含 pillars、hook_patterns、structure_patterns、cta_patterns。"
"winning_patterns、latest_signal、benchmark_insights、offer_directions、next_30_day_actions、"
"risk_watchlist 每个字段请给 3-6 条中文建议。"
)
analysis_context = {
"target_account": target_payload,
"benchmark_accounts": benchmark_payloads[:6],
"focus": request.extra_focus,
"video_workspace": {
"high_score_threshold": video_workspace["high_score_threshold"],
"meta": video_workspace["meta"],
**video_context
},
"creator_center_snapshot_summary": _safe_json_loads(
(legacy.db.fetch_one(
"""
SELECT summary_json
FROM douyin_account_snapshots
WHERE account_id = ? AND snapshot_type = 'creator_center'
ORDER BY collected_at DESC
LIMIT 1
""",
(account_row["id"],)
) or {}).get("summary_json"),
{}
)
}
user_prompt = (
"请从商业化运营视角分析以下抖音账号。除了账号定位和内容打法,"
"还要明确给出:什么内容最值得继续放大、什么内容已经过时、"
"适合承接什么类型的产品/服务、未来 30 天运营动作如何排优先级。"
"如果提供了对标账号,要重点指出可借鉴但不应直接照搬的部分。"
f"\n\n输入上下文:\n{json.dumps(analysis_context, ensure_ascii=False, indent=2)}"
)
report_id = make_id("dyreport")
created_at = now()
legacy.db.execute(
"""
INSERT INTO douyin_analysis_reports (
id, account_id, user_id, focus_text, model_profile_ids_json,
linked_account_ids_json, prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
report_id,
account_row["id"],
owner["id"],
request.extra_focus,
_safe_json_dumps([profile["id"] for profile in profiles]),
_safe_json_dumps(linked_account_ids),
user_prompt,
_safe_json_dumps(analysis_context),
created_at
)
)
async def _analyze_with_model(profile: dict[str, Any]) -> dict[str, Any]:
try:
output = await legacy.call_model(
profile,
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=request.temperature
)
parsed = _try_parse_agent_json(output)
status = "ok"
except Exception as exc:
output = str(exc)
parsed = {}
status = "error"
if not isinstance(parsed, dict):
parsed = {}
parsed = _merge_structured_payload(
_build_account_analysis_fallback(target_payload, benchmark_payloads, analysis_context),
parsed
)
suggestion_id = make_id("dysady")
legacy.db.execute(
"""
INSERT INTO douyin_analysis_suggestions (
id, report_id, model_profile_id, model_label, status,
suggestion_text, parsed_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
suggestion_id,
report_id,
profile["id"],
_build_model_label(profile),
status,
output if output.strip() else _safe_json_dumps(parsed),
_safe_json_dumps(parsed),
now()
)
)
return {
"id": suggestion_id,
"model_profile_id": profile["id"],
"model_label": _build_model_label(profile),
"status": status,
"suggestion_text": output,
"parsed_json": parsed
}
suggestions = await asyncio.gather(*[_analyze_with_model(profile) for profile in profiles])
duplicate_report = _find_duplicate_report_payload(
account_row["id"],
request.extra_focus,
suggestions,
exclude_report_id=report_id
)
if duplicate_report:
_delete_report(report_id)
return {
"report_id": duplicate_report["id"],
"created_at": duplicate_report["created_at"],
"context": analysis_context,
"suggestions": duplicate_report["suggestions"],
"auto_video_analyses": [],
"duplicate_of_report_id": duplicate_report["id"],
"duplicate_count": duplicate_report.get("duplicate_count", 1)
}
auto_video_analyses: list[dict[str, Any]] = []
if request.auto_analyze_top_videos and profiles:
auto_video_analyses = await _run_top_video_analyses(
account_row,
owner,
profiles[0],
top_video_count=request.top_video_analysis_count,
min_score=45.0,
report_id=report_id,
source_type="account_analysis_auto",
temperature=min(request.temperature, 0.3)
)
legacy.db.execute(
"UPDATE douyin_accounts SET last_analysis_at = ?, updated_at = ? WHERE id = ?",
(now(), now(), account_row["id"])
)
return {
"report_id": report_id,
"created_at": created_at,
"context": analysis_context,
"suggestions": suggestions,
"auto_video_analyses": auto_video_analyses
}
async def _prepare_similarity_source(
owner: dict[str, Any],
request: DouyinSimilarSearchRequest
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
if request.source_account_id:
account_row = _require_owned_account(request.source_account_id, owner["id"])
return account_row, _build_account_payload(account_row)
if not (request.profile_url or "").strip():
raise HTTPException(status_code=400, detail="source_account_id or profile_url is required")
public_data = await _collect_public_profile(request.profile_url or "", None)
if not public_data["profile"].get("nickname") and not public_data["videos"]:
raise HTTPException(status_code=400, detail="Unable to parse the shared Douyin profile page")
payload = {
"id": "",
"nickname": public_data["profile"].get("nickname", ""),
"signature": public_data["profile"].get("signature", ""),
"profile_url": public_data["profile"].get("canonical_profile_url", "") or request.profile_url,
"avatar_url": public_data["profile"].get("avatar_url", ""),
"sec_uid": public_data["profile"].get("sec_uid", ""),
"douyin_id": public_data["profile"].get("douyin_id", ""),
"profile_stats": public_data["profile"].get("stats", {}),
"tags": public_data["profile"].get("tags", []),
"video_summary": _summarize_videos(public_data["videos"], limit=6)
}
payload["keywords"] = _dedupe_strings(
payload["tags"] + _extract_keywords(payload["nickname"], payload["signature"])
+ payload["video_summary"]["top_tags"]
+ [video["title"] for video in payload["video_summary"]["videos"]]
)
return None, payload
async def _fetch_or_create_candidate(owner: dict[str, Any], profile_url: str) -> dict[str, Any] | None:
existing = legacy.db.fetch_one(
"""
SELECT *
FROM douyin_accounts
WHERE user_id = ? AND (canonical_profile_url = ? OR profile_url = ?)
LIMIT 1
""",
(owner["id"], profile_url, profile_url)
)
if existing:
return existing
public_data = await _collect_public_profile(profile_url, None)
profile = public_data["profile"]
if not (profile.get("nickname") or public_data["videos"]):
return None
sync_request = DouyinAccountSyncRequest(
profile_url=profile_url,
manual_work_payloads=[video["raw"] for video in public_data["videos"]]
)
account_row = _upsert_account(owner, profile, sync_request, public_data, {"pages": [], "errors": []})
return account_row
async def _run_similarity_search(owner: dict[str, Any], request: DouyinSimilarSearchRequest) -> dict[str, Any]:
source_account_row, source_payload = await _prepare_similarity_source(owner, request)
profile = legacy.model_profile_for_account(owner["id"], request.model_profile_id)
existing_accounts = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_accounts
WHERE user_id = ?
ORDER BY updated_at DESC
""",
(owner["id"],)
)
candidate_rows: list[dict[str, Any]] = []
seen_urls: set[str] = set()
source_id = source_account_row["id"] if source_account_row else ""
for row in existing_accounts:
if row["id"] == source_id:
continue
candidate_rows.append(row)
seen_urls.add(row["canonical_profile_url"] or row["profile_url"])
if request.seed_linked_accounts and source_account_row:
for linked in _list_linked_accounts(source_account_row):
candidate_url = linked.get("target_profile_url", "")
if not candidate_url or candidate_url in seen_urls:
continue
seen_urls.add(candidate_url)
if linked.get("target_account_id"):
candidate_rows.append(_require_owned_account(linked["target_account_id"], owner["id"]))
candidate_urls = _dedupe_strings(request.candidate_urls)
if request.search_public_pages:
discovered = await _discover_profile_urls_from_search(source_payload.get("keywords", []), limit=6)
candidate_urls.extend(discovered)
candidate_urls = _dedupe_strings(candidate_urls)
for candidate_url in candidate_urls:
if candidate_url in seen_urls or candidate_url == source_payload.get("profile_url"):
continue
candidate_row = await _fetch_or_create_candidate(owner, candidate_url)
if candidate_row:
candidate_rows.append(candidate_row)
seen_urls.add(candidate_url)
candidate_payloads: list[dict[str, Any]] = []
seen_account_ids: set[str] = set()
for row in candidate_rows:
if row["id"] in seen_account_ids:
continue
seen_account_ids.add(row["id"])
payload = _build_account_payload(row, include_recent_videos=6)
payload["heuristics"] = _heuristic_similarity(source_payload, payload)
candidate_payloads.append(payload)
candidate_payloads.sort(key=lambda item: item["heuristics"]["heuristic_score"], reverse=True)
candidate_payloads = candidate_payloads[: max(3, request.max_candidates)]
search_id = make_id("dysearch")
prompt_context = {
"source_account": source_payload,
"candidate_accounts": candidate_payloads,
"extra_requirements": request.extra_requirements
}
prompt = (
"请从候选账号中筛选与目标账号内容风格、题材、受众和互动逻辑最相似,且整体质量更高的账号。"
"请返回 JSON 数组,每项包含 candidate_account_id、candidate_profile_url、score、"
"rationale、similar_dimensions、optimization_value。score 范围 0-100。"
f"\n\n上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
)
legacy.db.execute(
"""
INSERT INTO douyin_similarity_searches (
id, user_id, source_account_id, source_profile_url, keywords_json,
prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
search_id,
owner["id"],
source_account_row["id"] if source_account_row else None,
source_payload.get("profile_url", ""),
_safe_json_dumps(source_payload.get("keywords", [])),
prompt,
_safe_json_dumps(prompt_context),
now()
)
)
if not candidate_payloads:
return {
"search_id": search_id,
"source_account": source_payload,
"model_profile": {
"id": profile["id"],
"label": _build_model_label(profile)
},
"raw_model_output": "No candidate accounts available. Sync more Douyin accounts or provide candidate_urls.",
"candidates": []
}
system_prompt = (
"你是抖音相似账号发现专家。你要根据内容主题、标签、风格、更新频率、互动表现和商业化潜力,"
"挑选最值得对标的账号。返回严格 JSON 数组。"
)
try:
output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=0.2)
parsed = _try_parse_agent_json(output)
except Exception as exc:
output = str(exc)
parsed = []
candidate_map = {
payload["id"]: payload for payload in candidate_payloads if payload["id"]
}
if isinstance(parsed, dict):
parsed = parsed.get("items") or parsed.get("candidates") or []
saved_candidates: list[dict[str, Any]] = []
if not isinstance(parsed, list) or not parsed:
parsed = [
{
"candidate_account_id": payload["id"],
"candidate_profile_url": payload["profile_url"],
"score": payload["heuristics"]["heuristic_score"],
"rationale": "Fallback to heuristic similarity because model output was unavailable or unparsable.",
"similar_dimensions": [
{
"topic_overlap": payload["heuristics"]["topic_overlap"],
"tag_overlap": payload["heuristics"]["tag_overlap"],
"quality_score": payload["heuristics"]["quality_score"]
}
],
"optimization_value": "可作为候选对标账号进一步人工确认。"
}
for payload in candidate_payloads
]
for index, item in enumerate(parsed, start=1):
candidate_account_id = _first_non_empty(item.get("candidate_account_id"))
candidate_profile_url = _first_non_empty(item.get("candidate_profile_url"))
payload = candidate_map.get(candidate_account_id)
if not payload:
payload = next(
(candidate for candidate in candidate_payloads if candidate["profile_url"] == candidate_profile_url),
None
)
candidate_id = make_id("dycand")
heuristic_score = payload["heuristics"]["heuristic_score"] if payload else 0
score = _parse_count(item.get("score"))
rationale = _first_non_empty(item.get("rationale"), item.get("reason"), item.get("summary"))
dimensions = item.get("similar_dimensions") or item.get("dimensions") or {}
raw_output = {
"model_output": item,
"candidate_payload": payload or {}
}
legacy.db.execute(
"""
INSERT INTO douyin_similarity_candidates (
id, search_id, candidate_account_id, candidate_profile_url, heuristic_score,
agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
candidate_id,
search_id,
payload["id"] if payload else candidate_account_id or None,
payload["profile_url"] if payload else candidate_profile_url,
heuristic_score,
score,
rationale,
_safe_json_dumps(dimensions),
_safe_json_dumps(raw_output),
index,
now()
)
)
saved_candidates.append({
"id": candidate_id,
"candidate_account_id": payload["id"] if payload else candidate_account_id,
"candidate_profile_url": payload["profile_url"] if payload else candidate_profile_url,
"candidate_nickname": payload["nickname"] if payload else "",
"heuristic_score": heuristic_score,
"agent_score": score,
"rationale_text": rationale,
"dimensions": dimensions,
"rank_index": index
})
return {
"search_id": search_id,
"source_account": source_payload,
"model_profile": {
"id": profile["id"],
"label": _build_model_label(profile)
},
"raw_model_output": output,
"candidates": saved_candidates
}
@app.get("/v2/douyin/accounts")
def list_douyin_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"""
SELECT *
FROM douyin_accounts
WHERE user_id = ?
ORDER BY updated_at DESC
""",
(account["id"],)
)
return [_build_account_payload(row) for row in rows]
@app.post("/v2/douyin/accounts/sync")
async def sync_douyin_account(
request: DouyinAccountSyncRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
if (
not request.profile_url.strip()
and not request.manual_profile_payload
and not request.manual_creator_pages
):
raise HTTPException(
status_code=400,
detail="profile_url、manual_profile_payload 或 manual_creator_pages 至少需要传一个"
)
public_data = await _collect_public_profile(request.profile_url, request.manual_profile_payload)
creator_data = await _collect_creator_center_pages(
request.creator_center_urls,
request.session_cookie,
request.manual_creator_pages
)
return await run_in_threadpool(
_finalize_sync_workspace,
account,
request,
public_data,
creator_data
)
@app.get("/v2/douyin/accounts/{account_id}")
def get_douyin_account(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(account_row)
@app.get("/v2/douyin/accounts/{account_id}/snapshots")
def list_douyin_account_snapshots(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> list[dict[str, Any]]:
account_row = _require_owned_account(account_id, account["id"])
return _list_snapshots(account_row["id"])
@app.get("/v2/douyin/accounts/{account_id}/snapshots/{snapshot_id}")
def get_douyin_account_snapshot(
account_id: str,
snapshot_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
return _get_snapshot_detail(snapshot_id, account_row["id"])
@app.get("/v2/douyin/accounts/{account_id}/creator-fields")
def get_douyin_creator_fields(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
latest_creator_snapshot = legacy.db.fetch_one(
"""
SELECT id
FROM douyin_account_snapshots
WHERE account_id = ? AND snapshot_type = 'creator_center'
ORDER BY collected_at DESC
LIMIT 1
""",
(account_row["id"],)
)
if not latest_creator_snapshot:
raise HTTPException(status_code=404, detail="No creator-center snapshot found")
return _get_snapshot_detail(latest_creator_snapshot["id"], account_row["id"])
@app.get("/v2/douyin/accounts/{account_id}/workspace")
def get_douyin_account_workspace(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(account_row)
@app.get("/v2/douyin/accounts/{account_id}/videos")
def list_douyin_account_videos(
account_id: str,
limit: int = 200,
sort_by: str = "score",
scope: str = "all",
content_type: str = "all",
q: str = "",
tag: str = "",
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
workspace = _build_video_workspace_payload(account_row, limit=max(limit, 24))
items = list(workspace["items"])
item_map = {item["id"]: item for item in items}
normalized_scope = (scope or "all").strip().lower()
if normalized_scope == "top":
items = [item_map[video_id] for video_id in workspace["top_scored_video_ids"] if video_id in item_map]
elif normalized_scope == "latest":
items = [item_map[video_id] for video_id in workspace["latest_video_ids"] if video_id in item_map]
normalized_content_type = (content_type or "all").strip().lower()
if normalized_content_type in {"video", "image_text"}:
items = [
item for item in items
if str(item.get("content_type") or "video").strip().lower() == normalized_content_type
]
query_text = (q or "").strip().lower()
if query_text:
items = [
item for item in items
if query_text in " ".join(
[
item.get("title", ""),
item.get("description", ""),
item.get("aweme_id", ""),
*[str(tag_item) for tag_item in item.get("tags", [])]
]
).lower()
]
tag_text = (tag or "").strip().lower()
if tag_text:
items = [
item for item in items
if any(tag_text in str(tag_item).lower() for tag_item in item.get("tags", []))
]
normalized_sort = (sort_by or "score").strip().lower()
items.sort(key=lambda item: _video_sort_key(item, normalized_sort), reverse=True)
return {
"account_id": account_row["id"],
"sort_by": normalized_sort,
"scope": normalized_scope,
"content_type": normalized_content_type,
"query": q,
"tag": tag,
"high_score_threshold": workspace["high_score_threshold"],
"meta": workspace["meta"],
"top_scored_video_ids": workspace["top_scored_video_ids"],
"latest_video_ids": workspace["latest_video_ids"],
"items": items[: max(1, min(limit, 1000))]
}
@app.get("/v2/douyin/accounts/{account_id}/analysis-reports")
def list_douyin_analysis_reports(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> list[dict[str, Any]]:
account_row = _require_owned_account(account_id, account["id"])
return _build_workspace_payload(account_row)["recent_reports"]
@app.post("/v2/douyin/accounts/{account_id}/analysis")
async def analyze_douyin_account(
account_id: str,
request: DouyinAccountAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
return await _run_account_analysis(account_row, account, request)
@app.post("/v2/douyin/accounts/{account_id}/videos/analyze-top")
async def analyze_douyin_top_videos(
account_id: str,
request: DouyinTopVideoAnalysisRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
profile = legacy.model_profile_for_account(account["id"], request.model_profile_id)
results = await _run_top_video_analyses(
account_row,
account,
profile,
top_video_count=request.top_video_count,
min_score=request.min_score,
source_type="manual_top_video_refresh",
temperature=request.temperature
)
return {
"account_id": account_row["id"],
"model_profile_id": profile["id"],
"analyzed_count": len(results),
"items": results
}
@app.post("/v2/douyin/similar-searches")
async def create_douyin_similarity_search(
request: DouyinSimilarSearchRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
return await _run_similarity_search(account, request)
@app.get("/v2/douyin/similar-searches/{search_id}")
def get_douyin_similarity_search(
search_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
search_row = legacy.db.fetch_one(
"SELECT * FROM douyin_similarity_searches WHERE id = ? AND user_id = ?",
(search_id, account["id"])
)
if not search_row:
raise HTTPException(status_code=404, detail="Similarity search not found")
candidates = legacy.db.fetch_all(
"""
SELECT cand.*, acct.nickname AS candidate_nickname
FROM douyin_similarity_candidates cand
LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
WHERE cand.search_id = ?
ORDER BY cand.rank_index ASC
""",
(search_id,)
)
return {
"id": search_row["id"],
"source_account_id": search_row["source_account_id"],
"source_profile_url": search_row["source_profile_url"],
"keywords": _safe_json_loads(search_row["keywords_json"], []),
"context": _safe_json_loads(search_row["context_json"], {}),
"created_at": search_row["created_at"],
"candidates": [
{
"id": row["id"],
"candidate_account_id": row["candidate_account_id"],
"candidate_profile_url": row["candidate_profile_url"],
"candidate_nickname": row.get("candidate_nickname", ""),
"heuristic_score": row["heuristic_score"],
"agent_score": row["agent_score"],
"rationale_text": row["rationale_text"],
"dimensions": _safe_json_loads(row["dimensions_json"], {}),
"rank_index": row["rank_index"]
}
for row in candidates
]
}
@app.get("/v2/douyin/accounts/{account_id}/benchmark-links")
def list_douyin_benchmark_links(
account_id: str,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> list[dict[str, Any]]:
account_row = _require_owned_account(account_id, account["id"])
return _list_linked_accounts(account_row)
@app.post("/v2/douyin/accounts/{account_id}/benchmark-links")
def create_douyin_benchmark_links(
account_id: str,
request: DouyinBenchmarkLinkRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
account_row = _require_owned_account(account_id, account["id"])
linked_ids: list[str] = []
for target_account_id in request.target_account_ids:
target_row = _require_owned_account(target_account_id, account["id"])
relation_id = make_id("dyrel")
legacy.db.execute(
"""
INSERT INTO douyin_account_relations (
id, user_id, source_account_id, target_account_id, target_profile_url,
relation_type, note, search_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
relation_id,
account["id"],
account_row["id"],
target_row["id"],
target_row["canonical_profile_url"] or target_row["profile_url"],
request.relation_type,
request.note,
request.search_id,
now()
)
)
linked_ids.append(relation_id)
for target_profile_url in _dedupe_strings(request.target_profile_urls):
relation_id = make_id("dyrel")
legacy.db.execute(
"""
INSERT INTO douyin_account_relations (
id, user_id, source_account_id, target_account_id, target_profile_url,
relation_type, note, search_id, created_at
) VALUES (?, ?, ?, NULL, ?, ?, ?, ?, ?)
""",
(
relation_id,
account["id"],
account_row["id"],
target_profile_url,
request.relation_type,
request.note,
request.search_id,
now()
)
)
linked_ids.append(relation_id)
return {
"saved": len(linked_ids),
"relation_ids": linked_ids,
"links": _list_linked_accounts(account_row)
}
@app.get("/v2/douyin/tracking/accounts")
def list_douyin_tracked_accounts(
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
cursor = _get_tracking_cursor(account["id"])
return {
"cursor_last_seen_at": (cursor or {}).get("last_seen_at", ""),
"items": _list_tracked_accounts(account["id"])
}
@app.post("/v2/douyin/tracking/accounts")
def create_douyin_tracked_account(
request: DouyinTrackedAccountRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
tracked_account = _require_owned_account(request.tracked_account_id, account["id"])
assistant = _load_owned_assistant(request.assistant_id, account["id"])
existing = legacy.db.fetch_one(
"SELECT * FROM douyin_tracked_accounts WHERE user_id = ? AND tracked_account_id = ?",
(account["id"], tracked_account["id"])
)
updated_at = now()
if existing:
legacy.db.execute(
"""
UPDATE douyin_tracked_accounts
SET assistant_id = ?, note = ?, updated_at = ?
WHERE id = ?
""",
((assistant or {}).get("id"), request.note.strip(), updated_at, existing["id"])
)
else:
legacy.db.execute(
"""
INSERT INTO douyin_tracked_accounts (
id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(make_id("dytrack"), account["id"], tracked_account["id"], (assistant or {}).get("id"), request.note.strip(), updated_at, updated_at)
)
return {
"tracked_account_id": tracked_account["id"],
"assistant_id": (assistant or {}).get("id", ""),
"items": _list_tracked_accounts(account["id"])
}
@app.post("/v2/douyin/tracking/cursor")
def update_douyin_tracking_cursor(
request: DouyinTrackingCursorRequest,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
cursor = _set_tracking_cursor(account["id"], request.last_seen_at)
return {
"user_id": account["id"],
"last_seen_at": cursor["last_seen_at"],
"updated_at": cursor["updated_at"]
}
@app.get("/v2/douyin/tracking/digest")
def get_douyin_tracking_digest(
since: str | None = None,
limit: int = 24,
account: dict[str, Any] = Depends(legacy.require_approved)
) -> dict[str, Any]:
return _build_tracking_digest(account["id"], since_value=(since or "").strip(), limit=limit)