3136 lines
132 KiB
Python
3136 lines
132 KiB
Python
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import math
|
||
import re
|
||
from collections import Counter
|
||
from datetime import datetime, timezone
|
||
from html import unescape
|
||
from typing import Any, Iterable
|
||
from urllib.parse import quote, unquote
|
||
|
||
import httpx
|
||
from fastapi import Depends, HTTPException
|
||
from pydantic import BaseModel, Field
|
||
from starlette.concurrency import run_in_threadpool
|
||
|
||
DEFAULT_CREATOR_CENTER_URLS = [
|
||
"https://creator.douyin.com/creator-micro/home",
|
||
"https://creator.douyin.com/creator-micro/data",
|
||
"https://creator.douyin.com/creator-micro/content/manage"
|
||
]
|
||
DEFAULT_TIMEOUT = 20.0
|
||
MAX_HTML_SEARCH_BYTES = 2_000_000
|
||
DEFAULT_USER_AGENT = (
|
||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
|
||
)
|
||
|
||
|
||
class ManualPageCapture(BaseModel):
|
||
url: str = ""
|
||
title: str = ""
|
||
payload: dict[str, Any] = Field(default_factory=dict)
|
||
|
||
|
||
class DouyinAccountSyncRequest(BaseModel):
|
||
profile_url: str = ""
|
||
session_cookie: str = ""
|
||
creator_center_urls: list[str] = Field(default_factory=lambda: list(DEFAULT_CREATOR_CENTER_URLS))
|
||
allow_creator_center_profile_fallback: bool = False
|
||
compact_response: bool = False
|
||
manual_profile_payload: dict[str, Any] | None = None
|
||
manual_creator_pages: list[ManualPageCapture] = Field(default_factory=list)
|
||
manual_work_payloads: list[dict[str, Any]] = Field(default_factory=list)
|
||
discovery_note: str = ""
|
||
|
||
|
||
class DouyinAccountAnalysisRequest(BaseModel):
|
||
model_profile_ids: list[str] = Field(default_factory=list)
|
||
linked_account_ids: list[str] = Field(default_factory=list)
|
||
include_linked_accounts: bool = True
|
||
include_recent_similar_candidates: bool = True
|
||
max_videos: int = 12
|
||
extra_focus: str = ""
|
||
temperature: float = 0.35
|
||
auto_analyze_top_videos: bool = True
|
||
top_video_analysis_count: int = 6
|
||
|
||
|
||
class DouyinTopVideoAnalysisRequest(BaseModel):
|
||
model_profile_id: str | None = None
|
||
top_video_count: int = 6
|
||
min_score: float = 45.0
|
||
temperature: float = 0.25
|
||
|
||
|
||
class DouyinSimilarSearchRequest(BaseModel):
|
||
source_account_id: str | None = None
|
||
profile_url: str | None = None
|
||
candidate_urls: list[str] = Field(default_factory=list)
|
||
seed_linked_accounts: bool = True
|
||
search_public_pages: bool = True
|
||
model_profile_id: str | None = None
|
||
max_candidates: int = 10
|
||
extra_requirements: str = ""
|
||
|
||
|
||
class DouyinBenchmarkLinkRequest(BaseModel):
|
||
target_account_ids: list[str] = Field(default_factory=list)
|
||
target_profile_urls: list[str] = Field(default_factory=list)
|
||
relation_type: str = "benchmark"
|
||
note: str = ""
|
||
search_id: str = ""
|
||
|
||
|
||
def _safe_json_dumps(value: Any) -> str:
|
||
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
|
||
|
||
|
||
def _safe_json_loads(value: str | None, fallback: Any) -> Any:
|
||
if not value:
|
||
return fallback
|
||
try:
|
||
return json.loads(value)
|
||
except Exception:
|
||
return fallback
|
||
|
||
|
||
def _first_non_empty(*values: Any) -> str:
|
||
for value in values:
|
||
if value is None:
|
||
continue
|
||
if isinstance(value, str):
|
||
stripped = value.strip()
|
||
if stripped:
|
||
return stripped
|
||
elif value not in ("", [], {}, ()):
|
||
return str(value)
|
||
return ""
|
||
|
||
|
||
def _dedupe_strings(values: Iterable[str]) -> list[str]:
|
||
result: list[str] = []
|
||
seen: set[str] = set()
|
||
for value in values:
|
||
item = value.strip()
|
||
if not item:
|
||
continue
|
||
key = item.lower()
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
result.append(item)
|
||
return result
|
||
|
||
|
||
def _compact_text(value: Any, limit: int = 500) -> str:
|
||
text = str(value or "").strip()
|
||
if len(text) <= limit:
|
||
return text
|
||
return f"{text[: limit - 1]}…"
|
||
|
||
|
||
def _parse_count(value: Any) -> float:
|
||
if value is None:
|
||
return 0.0
|
||
if isinstance(value, (int, float)):
|
||
return float(value)
|
||
text = str(value).strip().lower().replace(",", "")
|
||
if not text:
|
||
return 0.0
|
||
|
||
multiplier = 1.0
|
||
if text.endswith("w") or text.endswith("万"):
|
||
multiplier = 10_000.0
|
||
text = text[:-1]
|
||
elif text.endswith("亿"):
|
||
multiplier = 100_000_000.0
|
||
text = text[:-1]
|
||
|
||
text = text.replace("+", "")
|
||
match = re.search(r"-?\d+(?:\.\d+)?", text)
|
||
if not match:
|
||
return 0.0
|
||
try:
|
||
return float(match.group()) * multiplier
|
||
except ValueError:
|
||
return 0.0
|
||
|
||
|
||
def _normalize_timestamp(value: Any) -> str | None:
|
||
if value in (None, "", 0, "0"):
|
||
return None
|
||
if isinstance(value, str):
|
||
stripped = value.strip()
|
||
if not stripped:
|
||
return None
|
||
if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped):
|
||
return stripped
|
||
if stripped.isdigit():
|
||
value = int(stripped)
|
||
else:
|
||
return stripped
|
||
if isinstance(value, (int, float)):
|
||
ts = float(value)
|
||
if ts > 10_000_000_000:
|
||
ts /= 1000.0
|
||
try:
|
||
return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat()
|
||
except Exception:
|
||
return None
|
||
return None
|
||
|
||
|
||
def _parse_iso_datetime(value: Any) -> datetime | None:
|
||
text = str(value or "").strip()
|
||
if not text:
|
||
return None
|
||
normalized = text.replace("Z", "+00:00")
|
||
try:
|
||
parsed = datetime.fromisoformat(normalized)
|
||
except ValueError:
|
||
return None
|
||
if parsed.tzinfo is None:
|
||
parsed = parsed.replace(tzinfo=timezone.utc)
|
||
return parsed.astimezone(timezone.utc)
|
||
|
||
|
||
def _extract_hashtags(*texts: str) -> list[str]:
|
||
tags: list[str] = []
|
||
for text in texts:
|
||
if not text:
|
||
continue
|
||
tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text))
|
||
return _dedupe_strings(tags)
|
||
|
||
|
||
def _extract_keywords(*texts: str) -> list[str]:
|
||
candidates: list[str] = []
|
||
for text in texts:
|
||
if not text:
|
||
continue
|
||
candidates.extend(_extract_hashtags(text))
|
||
candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text))
|
||
candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text))
|
||
stop_words = {
|
||
"视频",
|
||
"作品",
|
||
"抖音",
|
||
"账号",
|
||
"内容",
|
||
"发布",
|
||
"更多",
|
||
"关注",
|
||
"用户",
|
||
"douyin",
|
||
"profile"
|
||
}
|
||
filtered = [item for item in candidates if item.lower() not in stop_words]
|
||
return _dedupe_strings(filtered)
|
||
|
||
|
||
def _video_score_breakdown(video: dict[str, Any]) -> dict[str, Any]:
|
||
stats = video.get("stats", {}) or {}
|
||
play = float(stats.get("play") or 0)
|
||
like = float(stats.get("like") or 0)
|
||
comment = float(stats.get("comment") or 0)
|
||
share = float(stats.get("share") or 0)
|
||
collect = float(stats.get("collect") or 0)
|
||
|
||
published_dt = _parse_iso_datetime(video.get("published_at"))
|
||
if published_dt:
|
||
age_days = max(0.0, (datetime.now(timezone.utc) - published_dt).total_seconds() / 86400.0)
|
||
else:
|
||
age_days = 999.0
|
||
|
||
if play > 0:
|
||
rate_denominator = play
|
||
else:
|
||
rate_denominator = max(
|
||
like * 18.0,
|
||
comment * 70.0,
|
||
share * 95.0,
|
||
collect * 55.0,
|
||
1000.0
|
||
)
|
||
|
||
engagement_rate = (like + comment * 2.2 + share * 4.2 + collect * 3.0) / max(rate_denominator, 1.0)
|
||
share_rate = share / max(rate_denominator, 1.0)
|
||
collect_rate = collect / max(rate_denominator, 1.0)
|
||
comment_rate = comment / max(rate_denominator, 1.0)
|
||
like_rate = like / max(rate_denominator, 1.0)
|
||
|
||
volume_component = min(36.0, math.log10(play + 1.0) * 9.0)
|
||
interaction_component = min(28.0, engagement_rate * 100.0)
|
||
spread_component = min(18.0, (share_rate * 1200.0) + (collect_rate * 700.0))
|
||
freshness_component = max(0.0, 18.0 - min(age_days, 36.0) * 0.5)
|
||
baseline_component = 6.0 if play > 0 or like > 0 else 0.0
|
||
|
||
performance_score = round(
|
||
min(100.0, volume_component + interaction_component + spread_component + freshness_component + baseline_component),
|
||
2
|
||
)
|
||
popularity_score = round(
|
||
min(
|
||
100.0,
|
||
math.log10(play + 1.0) * 24.0
|
||
+ math.log10(like + 1.0) * 22.0
|
||
+ math.log10(comment + 1.0) * 20.0
|
||
+ math.log10(share + 1.0) * 18.0
|
||
+ math.log10(collect + 1.0) * 16.0
|
||
),
|
||
2
|
||
)
|
||
commercial_score = round(
|
||
min(
|
||
100.0,
|
||
performance_score * 0.58
|
||
+ min(24.0, share_rate * 2200.0)
|
||
+ min(18.0, collect_rate * 2000.0)
|
||
+ min(12.0, comment_rate * 900.0)
|
||
),
|
||
2
|
||
)
|
||
|
||
signals: list[str] = []
|
||
if share_rate >= 0.01:
|
||
signals.append("分享率高,具备扩散和二次传播潜力")
|
||
if collect_rate >= 0.008:
|
||
signals.append("收藏率高,适合沉淀模板、知识产品或私域承接")
|
||
if like_rate >= 0.05:
|
||
signals.append("点赞率突出,说明钩子与情绪价值有效")
|
||
if comment_rate >= 0.01:
|
||
signals.append("评论率较高,适合做互动运营和评论区转化")
|
||
if age_days <= 14 and play >= 10_000:
|
||
signals.append("近期作品仍有较高播放,说明题材仍在窗口期")
|
||
if not signals:
|
||
signals.append("当前数据中性,需要结合转化目标继续验证")
|
||
|
||
return {
|
||
"performance_score": performance_score,
|
||
"popularity_score": popularity_score,
|
||
"commercial_score": commercial_score,
|
||
"engagement_rate": round(engagement_rate, 4),
|
||
"share_rate": round(share_rate, 4),
|
||
"collect_rate": round(collect_rate, 4),
|
||
"comment_rate": round(comment_rate, 4),
|
||
"like_rate": round(like_rate, 4),
|
||
"age_days": round(age_days if age_days < 999 else 0.0, 1) if published_dt else None,
|
||
"components": {
|
||
"volume": round(volume_component, 2),
|
||
"interaction": round(interaction_component, 2),
|
||
"spread": round(spread_component, 2),
|
||
"freshness": round(freshness_component, 2),
|
||
"baseline": round(baseline_component, 2)
|
||
},
|
||
"signals": signals[:4]
|
||
}
|
||
|
||
|
||
def _flatten_json(value: Any, prefix: str = "") -> list[tuple[str, str, str]]:
|
||
rows: list[tuple[str, str, str]] = []
|
||
if isinstance(value, dict):
|
||
for key, child in value.items():
|
||
next_prefix = f"{prefix}.{key}" if prefix else str(key)
|
||
rows.extend(_flatten_json(child, next_prefix))
|
||
elif isinstance(value, list):
|
||
for index, child in enumerate(value):
|
||
next_prefix = f"{prefix}[{index}]"
|
||
rows.extend(_flatten_json(child, next_prefix))
|
||
else:
|
||
field_type = type(value).__name__
|
||
rows.append((prefix or "$", field_type, _compact_text(value, 2000)))
|
||
return rows
|
||
|
||
|
||
def _walk_json(value: Any) -> Iterable[dict[str, Any]]:
|
||
if isinstance(value, dict):
|
||
yield value
|
||
for child in value.values():
|
||
yield from _walk_json(child)
|
||
elif isinstance(value, list):
|
||
for child in value:
|
||
yield from _walk_json(child)
|
||
|
||
|
||
def _extract_json_objects_from_text(text: str) -> list[Any]:
|
||
decoder = json.JSONDecoder()
|
||
objects: list[Any] = []
|
||
seen: set[str] = set()
|
||
if not text:
|
||
return objects
|
||
|
||
candidates = [text, unquote(text), unescape(text), unescape(unquote(text))]
|
||
for candidate in candidates:
|
||
snippet = candidate[:MAX_HTML_SEARCH_BYTES]
|
||
for match in re.finditer(r"[\{\[]", snippet):
|
||
try:
|
||
obj, _ = decoder.raw_decode(snippet[match.start() :])
|
||
except Exception:
|
||
continue
|
||
marker = _safe_json_dumps(obj)
|
||
if marker in seen:
|
||
continue
|
||
seen.add(marker)
|
||
objects.append(obj)
|
||
if len(objects) >= 50:
|
||
return objects
|
||
return objects
|
||
|
||
|
||
def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]:
|
||
blobs: list[dict[str, Any]] = []
|
||
seen: set[str] = set()
|
||
for attrs, content in re.findall(r"<script([^>]*)>(.*?)</script>", html, re.IGNORECASE | re.DOTALL):
|
||
script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE)
|
||
script_id = script_id_match.group(1) if script_id_match else ""
|
||
for obj in _extract_json_objects_from_text(content.strip()):
|
||
marker = _safe_json_dumps(obj)
|
||
if marker in seen:
|
||
continue
|
||
seen.add(marker)
|
||
blobs.append({
|
||
"script_id": script_id,
|
||
"payload": obj
|
||
})
|
||
return blobs
|
||
|
||
|
||
def _profile_candidate_score(value: dict[str, Any]) -> int:
|
||
score = 0
|
||
interesting_keys = {
|
||
"nickname",
|
||
"signature",
|
||
"sec_uid",
|
||
"secUid",
|
||
"uid",
|
||
"unique_id",
|
||
"short_id",
|
||
"aweme_count",
|
||
"following_count",
|
||
"follower_count",
|
||
"total_favorited"
|
||
}
|
||
score += sum(1 for key in interesting_keys if key in value)
|
||
if "author" in value and isinstance(value["author"], dict):
|
||
score += 2
|
||
return score
|
||
|
||
|
||
def _video_candidate_score(value: dict[str, Any]) -> int:
|
||
score = 0
|
||
if "statistics" in value and isinstance(value["statistics"], dict):
|
||
score += 3
|
||
if "aweme_id" in value or "item_id" in value:
|
||
score += 2
|
||
if "desc" in value or "title" in value:
|
||
score += 1
|
||
return score
|
||
|
||
|
||
def _extract_profile_candidates(payload: Any) -> list[dict[str, Any]]:
|
||
candidates: list[dict[str, Any]] = []
|
||
for item in _walk_json(payload):
|
||
if _profile_candidate_score(item) >= 3:
|
||
candidates.append(item)
|
||
if "author" in item and isinstance(item["author"], dict) and _profile_candidate_score(item["author"]) >= 3:
|
||
candidates.append(item["author"])
|
||
return candidates
|
||
|
||
|
||
def _extract_video_candidates(payload: Any) -> list[dict[str, Any]]:
|
||
candidates: list[dict[str, Any]] = []
|
||
for item in _walk_json(payload):
|
||
if _video_candidate_score(item) >= 3:
|
||
candidates.append(item)
|
||
return candidates
|
||
|
||
|
||
def _normalize_profile_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]:
|
||
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
|
||
avatar = candidate.get("avatar_medium") or candidate.get("avatar_thumb") or candidate.get("avatar_url")
|
||
if isinstance(avatar, dict):
|
||
avatar = _first_non_empty(
|
||
avatar.get("url_list", [""])[0] if isinstance(avatar.get("url_list"), list) else "",
|
||
avatar.get("url")
|
||
)
|
||
|
||
signature = _first_non_empty(
|
||
candidate.get("signature"),
|
||
candidate.get("desc"),
|
||
candidate.get("bio"),
|
||
candidate.get("description")
|
||
)
|
||
nickname = _first_non_empty(candidate.get("nickname"), candidate.get("name"), candidate.get("author_name"))
|
||
canonical_url = _first_non_empty(
|
||
candidate.get("share_url"),
|
||
candidate.get("profile_url"),
|
||
fallback_url
|
||
)
|
||
return {
|
||
"nickname": nickname,
|
||
"signature": signature,
|
||
"profile_url": canonical_url,
|
||
"canonical_profile_url": canonical_url,
|
||
"sec_uid": _first_non_empty(candidate.get("sec_uid"), candidate.get("secUid")),
|
||
"douyin_uid": _first_non_empty(candidate.get("uid")),
|
||
"douyin_id": _first_non_empty(candidate.get("unique_id"), candidate.get("short_id"), candidate.get("douyin_id")),
|
||
"avatar_url": _first_non_empty(avatar),
|
||
"stats": {
|
||
"followers": _parse_count(candidate.get("follower_count") or stats_source.get("follower_count")),
|
||
"following": _parse_count(candidate.get("following_count") or stats_source.get("following_count")),
|
||
"likes": _parse_count(candidate.get("total_favorited") or stats_source.get("total_favorited")),
|
||
"videos": _parse_count(candidate.get("aweme_count") or stats_source.get("aweme_count"))
|
||
},
|
||
"tags": _dedupe_strings(
|
||
_extract_hashtags(signature, nickname)
|
||
+ [str(tag) for tag in candidate.get("tags", []) if isinstance(tag, (str, int, float))]
|
||
),
|
||
"raw": candidate
|
||
}
|
||
|
||
|
||
def _pick_best_profile(candidates: list[dict[str, Any]], fallback_url: str = "") -> dict[str, Any]:
|
||
best: dict[str, Any] | None = None
|
||
best_score = -1
|
||
for candidate in candidates:
|
||
normalized = _normalize_profile_candidate(candidate, fallback_url=fallback_url)
|
||
score = 0
|
||
score += 4 if normalized["nickname"] else 0
|
||
score += 3 if normalized["sec_uid"] else 0
|
||
score += 2 if normalized["signature"] else 0
|
||
score += 1 if normalized["stats"]["followers"] else 0
|
||
if score > best_score:
|
||
best = normalized
|
||
best_score = score
|
||
return best or _normalize_profile_candidate({}, fallback_url=fallback_url)
|
||
|
||
|
||
def _normalize_video_candidate(candidate: dict[str, Any]) -> dict[str, Any]:
|
||
def _collect_image_urls(node: Any) -> list[str]:
|
||
urls: list[str] = []
|
||
|
||
def _visit(value: Any) -> None:
|
||
if isinstance(value, str):
|
||
text = value.strip()
|
||
if text.startswith("http"):
|
||
urls.append(text)
|
||
return
|
||
if isinstance(value, list):
|
||
for item in value[:20]:
|
||
_visit(item)
|
||
return
|
||
if not isinstance(value, dict):
|
||
return
|
||
|
||
for key in ("url", "download_url", "origin_url", "display_url", "cover_url"):
|
||
target = value.get(key)
|
||
if isinstance(target, str) and target.strip().startswith("http"):
|
||
urls.append(target.strip())
|
||
|
||
url_list = value.get("url_list")
|
||
if isinstance(url_list, list):
|
||
for item in url_list[:5]:
|
||
_visit(item)
|
||
|
||
for key in ("image", "images", "cover", "display_image", "origin_image"):
|
||
child = value.get(key)
|
||
if child not in (None, "", [], {}):
|
||
_visit(child)
|
||
|
||
_visit(node)
|
||
return _dedupe_strings(urls)
|
||
|
||
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
|
||
video_source = candidate.get("video") if isinstance(candidate.get("video"), dict) else {}
|
||
title = _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("share_title"))
|
||
description = _first_non_empty(candidate.get("desc"), candidate.get("title"), candidate.get("text"))
|
||
cover = candidate.get("cover") or video_source.get("cover")
|
||
image_urls = _collect_image_urls(
|
||
[
|
||
candidate.get("images"),
|
||
candidate.get("image_infos"),
|
||
candidate.get("image_list"),
|
||
candidate.get("slides"),
|
||
candidate.get("photos"),
|
||
candidate.get("photo"),
|
||
candidate.get("image_post_info"),
|
||
]
|
||
)
|
||
if isinstance(cover, dict):
|
||
cover = _first_non_empty(
|
||
cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
|
||
cover.get("url")
|
||
)
|
||
duration_raw = float(candidate.get("duration") or video_source.get("duration") or 0)
|
||
duration_sec = duration_raw / 1000.0 if duration_raw > 1000 else duration_raw
|
||
has_video_media = bool(video_source) or duration_sec > 0.3
|
||
aweme_type = str(candidate.get("aweme_type") or "")
|
||
looks_like_image_text = bool(image_urls) and (not has_video_media or aweme_type in {"51", "55", "61", "68", "122", "150"})
|
||
content_type = "image_text" if looks_like_image_text else "video"
|
||
return {
|
||
"aweme_id": _first_non_empty(candidate.get("aweme_id"), candidate.get("item_id"), candidate.get("group_id")),
|
||
"title": title,
|
||
"description": description,
|
||
"share_url": _first_non_empty(candidate.get("share_url")),
|
||
"cover_url": _first_non_empty(cover, image_urls[0] if image_urls else ""),
|
||
"duration_sec": duration_sec,
|
||
"published_at": _normalize_timestamp(candidate.get("create_time") or candidate.get("publish_time")),
|
||
"tags": _extract_hashtags(title, description),
|
||
"content_type": content_type,
|
||
"content_type_label": "图文" if content_type == "image_text" else "视频",
|
||
"image_count": len(image_urls),
|
||
"stats": {
|
||
"play": _parse_count(stats_source.get("play_count") or candidate.get("play_count")),
|
||
"like": _parse_count(stats_source.get("digg_count") or candidate.get("digg_count")),
|
||
"comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")),
|
||
"share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")),
|
||
"collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count"))
|
||
},
|
||
"raw": candidate
|
||
}
|
||
|
||
|
||
def _extract_videos(payloads: Iterable[Any]) -> list[dict[str, Any]]:
|
||
videos: list[dict[str, Any]] = []
|
||
seen: set[str] = set()
|
||
for payload in payloads:
|
||
for candidate in _extract_video_candidates(payload):
|
||
normalized = _normalize_video_candidate(candidate)
|
||
dedupe_key = normalized["aweme_id"] or normalized["share_url"] or normalized["title"]
|
||
if not dedupe_key or dedupe_key in seen:
|
||
continue
|
||
seen.add(dedupe_key)
|
||
videos.append(normalized)
|
||
videos.sort(
|
||
key=lambda item: (
|
||
item["stats"]["play"] + item["stats"]["like"] + item["stats"]["comment"] * 4 + item["stats"]["share"] * 6
|
||
),
|
||
reverse=True
|
||
)
|
||
return videos
|
||
|
||
|
||
def _merge_profile_payload(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]:
|
||
if not overlay:
|
||
return base
|
||
if not base or not base.get("nickname"):
|
||
return overlay
|
||
|
||
merged = dict(base)
|
||
merged["nickname"] = base.get("nickname") or overlay.get("nickname", "")
|
||
merged["signature"] = base.get("signature") or overlay.get("signature", "")
|
||
merged["profile_url"] = base.get("profile_url") or overlay.get("profile_url", "")
|
||
merged["canonical_profile_url"] = base.get("canonical_profile_url") or overlay.get("canonical_profile_url", "")
|
||
merged["sec_uid"] = base.get("sec_uid") or overlay.get("sec_uid", "")
|
||
merged["douyin_uid"] = base.get("douyin_uid") or overlay.get("douyin_uid", "")
|
||
merged["douyin_id"] = base.get("douyin_id") or overlay.get("douyin_id", "")
|
||
merged["avatar_url"] = base.get("avatar_url") or overlay.get("avatar_url", "")
|
||
merged["tags"] = _dedupe_strings(base.get("tags", []) + overlay.get("tags", []))
|
||
merged["stats"] = {
|
||
"followers": float(base.get("stats", {}).get("followers") or overlay.get("stats", {}).get("followers") or 0),
|
||
"following": float(base.get("stats", {}).get("following") or overlay.get("stats", {}).get("following") or 0),
|
||
"likes": float(base.get("stats", {}).get("likes") or overlay.get("stats", {}).get("likes") or 0),
|
||
"videos": float(base.get("stats", {}).get("videos") or overlay.get("stats", {}).get("videos") or 0),
|
||
}
|
||
if not merged.get("raw"):
|
||
merged["raw"] = overlay.get("raw", {})
|
||
return merged
|
||
|
||
|
||
def _extract_creator_payloads(creator_data: dict[str, Any]) -> list[Any]:
|
||
payloads: list[Any] = []
|
||
for page in creator_data.get("pages", []):
|
||
for blob in page.get("blobs", []):
|
||
payload = blob.get("payload")
|
||
if payload not in (None, "", [], {}):
|
||
payloads.append(payload)
|
||
return payloads
|
||
|
||
|
||
def _profile_identity_value(profile: dict[str, Any], field_name: str) -> str:
|
||
value = str(profile.get(field_name, "") or "").strip()
|
||
if not value:
|
||
return ""
|
||
if field_name in {"profile_url", "canonical_profile_url"}:
|
||
return _normalize_profile_url_input(value)
|
||
return value
|
||
|
||
|
||
def _profiles_appear_same(left: dict[str, Any], right: dict[str, Any]) -> bool:
|
||
if not left or not right:
|
||
return False
|
||
for field_name in ("sec_uid", "douyin_uid", "douyin_id", "canonical_profile_url", "profile_url"):
|
||
left_value = _profile_identity_value(left, field_name)
|
||
right_value = _profile_identity_value(right, field_name)
|
||
if left_value and right_value and left_value == right_value:
|
||
return True
|
||
return False
|
||
|
||
|
||
def _normalize_profile_url_input(value: str) -> str:
|
||
text = str(value or "").strip()
|
||
if not text:
|
||
return ""
|
||
|
||
match = re.search(r"https?://[^\s]+", text)
|
||
if match:
|
||
text = match.group(0)
|
||
|
||
text = text.strip().strip(",。;;、,)")
|
||
if text.startswith("www.douyin.com/") or text.startswith("douyin.com/"):
|
||
text = f"https://{text}"
|
||
return text
|
||
|
||
|
||
def _looks_like_douyin_anti_bot_page(html: str) -> bool:
|
||
markers = (
|
||
"window.byted_acrawler.init",
|
||
"__ac_signature",
|
||
"__ac_nonce",
|
||
"window.location.reload()"
|
||
)
|
||
return any(marker in html for marker in markers)
|
||
|
||
|
||
async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]:
|
||
headers = {
|
||
"User-Agent": DEFAULT_USER_AGENT,
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
|
||
}
|
||
if cookie.strip():
|
||
headers["Cookie"] = cookie.strip()
|
||
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client:
|
||
response = await client.get(url, headers=headers)
|
||
response.raise_for_status()
|
||
return str(response.url), response.text
|
||
|
||
|
||
async def _discover_profile_urls_from_search(keywords: list[str], limit: int = 8) -> list[str]:
|
||
urls: list[str] = []
|
||
seen: set[str] = set()
|
||
for keyword in keywords[:3]:
|
||
search_url = f"https://www.douyin.com/search/{quote(keyword)}?type=user"
|
||
try:
|
||
_, html = await _fetch_html(search_url)
|
||
except Exception:
|
||
continue
|
||
for match in re.findall(r'href=["\']([^"\']+/user/[^"\']+)["\']', html):
|
||
if match.startswith("/"):
|
||
match = f"https://www.douyin.com{match}"
|
||
cleaned = match.split("?")[0]
|
||
if cleaned in seen:
|
||
continue
|
||
seen.add(cleaned)
|
||
urls.append(cleaned)
|
||
if len(urls) >= limit:
|
||
return urls
|
||
return urls
|
||
|
||
|
||
def _summarize_videos(videos: list[dict[str, Any]], limit: int = 8) -> dict[str, Any]:
|
||
selected = videos[:limit]
|
||
if not selected:
|
||
return {
|
||
"count": 0,
|
||
"top_tags": [],
|
||
"avg_play": 0.0,
|
||
"avg_like": 0.0,
|
||
"avg_comment": 0.0,
|
||
"avg_share": 0.0,
|
||
"videos": []
|
||
}
|
||
count = len(selected)
|
||
avg_play = sum(item["stats"]["play"] for item in selected) / count
|
||
avg_like = sum(item["stats"]["like"] for item in selected) / count
|
||
avg_comment = sum(item["stats"]["comment"] for item in selected) / count
|
||
avg_share = sum(item["stats"]["share"] for item in selected) / count
|
||
tag_counter = Counter(tag for item in selected for tag in item.get("tags", []))
|
||
return {
|
||
"count": len(videos),
|
||
"top_tags": [tag for tag, _ in tag_counter.most_common(8)],
|
||
"avg_play": round(avg_play, 2),
|
||
"avg_like": round(avg_like, 2),
|
||
"avg_comment": round(avg_comment, 2),
|
||
"avg_share": round(avg_share, 2),
|
||
"videos": [
|
||
{
|
||
"aweme_id": item["aweme_id"],
|
||
"title": _compact_text(item["title"], 120),
|
||
"description": _compact_text(item["description"], 180),
|
||
"tags": item["tags"][:6],
|
||
"published_at": item["published_at"],
|
||
"stats": item["stats"]
|
||
}
|
||
for item in selected
|
||
]
|
||
}
|
||
|
||
|
||
def _jaccard(left: Iterable[str], right: Iterable[str]) -> float:
|
||
left_set = {item.strip().lower() for item in left if item.strip()}
|
||
right_set = {item.strip().lower() for item in right if item.strip()}
|
||
if not left_set and not right_set:
|
||
return 0.0
|
||
intersection = len(left_set & right_set)
|
||
union = len(left_set | right_set)
|
||
return intersection / union if union else 0.0
|
||
|
||
|
||
def _quality_score(account_payload: dict[str, Any]) -> float:
|
||
stats = account_payload.get("profile_stats", {})
|
||
followers = float(stats.get("followers") or 0)
|
||
video_summary = account_payload.get("video_summary", {})
|
||
avg_play = float(video_summary.get("avg_play") or 0)
|
||
avg_like = float(video_summary.get("avg_like") or 0)
|
||
avg_comment = float(video_summary.get("avg_comment") or 0)
|
||
avg_share = float(video_summary.get("avg_share") or 0)
|
||
base = followers / 10_000.0
|
||
engagement = avg_like / 1000.0 + avg_comment / 300.0 + avg_share / 200.0 + avg_play / 5000.0
|
||
return round(base + engagement, 3)
|
||
|
||
|
||
def _heuristic_similarity(source_payload: dict[str, Any], candidate_payload: dict[str, Any]) -> dict[str, Any]:
|
||
source_keywords = source_payload.get("keywords", [])
|
||
candidate_keywords = candidate_payload.get("keywords", [])
|
||
topic_overlap = _jaccard(source_keywords, candidate_keywords)
|
||
tag_overlap = _jaccard(
|
||
source_payload.get("video_summary", {}).get("top_tags", []),
|
||
candidate_payload.get("video_summary", {}).get("top_tags", [])
|
||
)
|
||
source_signature = source_payload.get("signature", "")
|
||
candidate_signature = candidate_payload.get("signature", "")
|
||
signature_overlap = _jaccard(_extract_keywords(source_signature), _extract_keywords(candidate_signature))
|
||
quality = _quality_score(candidate_payload)
|
||
score = round(topic_overlap * 55 + tag_overlap * 20 + signature_overlap * 10 + min(quality, 15), 2)
|
||
return {
|
||
"topic_overlap": round(topic_overlap, 3),
|
||
"tag_overlap": round(tag_overlap, 3),
|
||
"signature_overlap": round(signature_overlap, 3),
|
||
"quality_score": quality,
|
||
"heuristic_score": score
|
||
}
|
||
|
||
|
||
def _build_model_label(profile: dict[str, Any]) -> str:
|
||
return _first_non_empty(profile.get("name"), profile.get("model_name"), profile.get("base_url"))
|
||
|
||
|
||
def _try_parse_agent_json(text: str) -> Any:
|
||
stripped = text.strip()
|
||
if not stripped:
|
||
return {}
|
||
try:
|
||
return json.loads(stripped)
|
||
except Exception:
|
||
pass
|
||
objects = _extract_json_objects_from_text(stripped)
|
||
return objects[0] if objects else {}
|
||
|
||
|
||
def register_douyin_routes(app: Any, legacy: Any) -> None:
|
||
def now() -> str:
|
||
return legacy.utc_now()
|
||
|
||
def make_id(prefix: str) -> str:
|
||
return legacy.make_id(prefix)
|
||
|
||
def ensure_schema() -> None:
|
||
schema = """
|
||
CREATE TABLE IF NOT EXISTS douyin_accounts (
|
||
id TEXT PRIMARY KEY,
|
||
user_id TEXT NOT NULL,
|
||
profile_url TEXT NOT NULL DEFAULT '',
|
||
canonical_profile_url TEXT NOT NULL DEFAULT '',
|
||
sec_uid TEXT NOT NULL DEFAULT '',
|
||
douyin_uid TEXT NOT NULL DEFAULT '',
|
||
douyin_id TEXT NOT NULL DEFAULT '',
|
||
nickname TEXT NOT NULL DEFAULT '',
|
||
signature TEXT NOT NULL DEFAULT '',
|
||
avatar_url TEXT NOT NULL DEFAULT '',
|
||
tags_json TEXT NOT NULL DEFAULT '[]',
|
||
profile_stats_json TEXT NOT NULL DEFAULT '{}',
|
||
raw_profile_json TEXT NOT NULL DEFAULT '{}',
|
||
source_mode TEXT NOT NULL DEFAULT 'public',
|
||
sync_status TEXT NOT NULL DEFAULT 'pending',
|
||
last_public_sync_at TEXT,
|
||
last_creator_sync_at TEXT,
|
||
last_analysis_at TEXT,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_updated
|
||
ON douyin_accounts(user_id, updated_at DESC);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_sec_uid
|
||
ON douyin_accounts(user_id, sec_uid);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_account_snapshots (
|
||
id TEXT PRIMARY KEY,
|
||
account_id TEXT NOT NULL,
|
||
snapshot_type TEXT NOT NULL,
|
||
source_url TEXT NOT NULL DEFAULT '',
|
||
raw_payload_json TEXT NOT NULL DEFAULT '{}',
|
||
summary_json TEXT NOT NULL DEFAULT '{}',
|
||
field_count INTEGER NOT NULL DEFAULT 0,
|
||
collected_at TEXT NOT NULL,
|
||
created_at TEXT NOT NULL,
|
||
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_snapshots_account_collected
|
||
ON douyin_account_snapshots(account_id, collected_at DESC);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_snapshot_fields (
|
||
snapshot_id TEXT NOT NULL,
|
||
field_path TEXT NOT NULL,
|
||
field_type TEXT NOT NULL DEFAULT 'string',
|
||
field_value_text TEXT NOT NULL DEFAULT '',
|
||
PRIMARY KEY(snapshot_id, field_path),
|
||
FOREIGN KEY(snapshot_id) REFERENCES douyin_account_snapshots(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_videos (
|
||
id TEXT PRIMARY KEY,
|
||
account_id TEXT NOT NULL,
|
||
aweme_id TEXT NOT NULL DEFAULT '',
|
||
title TEXT NOT NULL DEFAULT '',
|
||
description TEXT NOT NULL DEFAULT '',
|
||
share_url TEXT NOT NULL DEFAULT '',
|
||
cover_url TEXT NOT NULL DEFAULT '',
|
||
duration_sec REAL NOT NULL DEFAULT 0,
|
||
published_at TEXT,
|
||
tags_json TEXT NOT NULL DEFAULT '[]',
|
||
stats_json TEXT NOT NULL DEFAULT '{}',
|
||
raw_json TEXT NOT NULL DEFAULT '{}',
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_updated
|
||
ON douyin_videos(account_id, updated_at DESC);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_aweme
|
||
ON douyin_videos(account_id, aweme_id);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_video_analyses (
|
||
id TEXT PRIMARY KEY,
|
||
account_id TEXT NOT NULL,
|
||
user_id TEXT NOT NULL,
|
||
video_id TEXT NOT NULL,
|
||
report_id TEXT NOT NULL DEFAULT '',
|
||
model_profile_id TEXT NOT NULL DEFAULT '',
|
||
model_label TEXT NOT NULL DEFAULT '',
|
||
source_type TEXT NOT NULL DEFAULT 'top_score_auto',
|
||
status TEXT NOT NULL DEFAULT 'ok',
|
||
performance_score REAL NOT NULL DEFAULT 0,
|
||
commercial_score REAL NOT NULL DEFAULT 0,
|
||
hook_score REAL NOT NULL DEFAULT 0,
|
||
retention_score REAL NOT NULL DEFAULT 0,
|
||
conversion_score REAL NOT NULL DEFAULT 0,
|
||
summary_text TEXT NOT NULL DEFAULT '',
|
||
suggestion_text TEXT NOT NULL DEFAULT '',
|
||
parsed_json TEXT NOT NULL DEFAULT '{}',
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
|
||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||
FOREIGN KEY(video_id) REFERENCES douyin_videos(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_video_analyses_video_created
|
||
ON douyin_video_analyses(video_id, created_at DESC);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_video_analyses_account_created
|
||
ON douyin_video_analyses(account_id, created_at DESC);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_analysis_reports (
|
||
id TEXT PRIMARY KEY,
|
||
account_id TEXT NOT NULL,
|
||
user_id TEXT NOT NULL,
|
||
focus_text TEXT NOT NULL DEFAULT '',
|
||
model_profile_ids_json TEXT NOT NULL DEFAULT '[]',
|
||
linked_account_ids_json TEXT NOT NULL DEFAULT '[]',
|
||
prompt_text TEXT NOT NULL DEFAULT '',
|
||
context_json TEXT NOT NULL DEFAULT '{}',
|
||
created_at TEXT NOT NULL,
|
||
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
|
||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_analysis_reports_account_created
|
||
ON douyin_analysis_reports(account_id, created_at DESC);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_analysis_suggestions (
|
||
id TEXT PRIMARY KEY,
|
||
report_id TEXT NOT NULL,
|
||
model_profile_id TEXT NOT NULL DEFAULT '',
|
||
model_label TEXT NOT NULL DEFAULT '',
|
||
status TEXT NOT NULL DEFAULT 'ok',
|
||
suggestion_text TEXT NOT NULL DEFAULT '',
|
||
parsed_json TEXT NOT NULL DEFAULT '{}',
|
||
created_at TEXT NOT NULL,
|
||
FOREIGN KEY(report_id) REFERENCES douyin_analysis_reports(id) ON DELETE CASCADE
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_analysis_suggestions_report
|
||
ON douyin_analysis_suggestions(report_id, created_at ASC);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_similarity_searches (
|
||
id TEXT PRIMARY KEY,
|
||
user_id TEXT NOT NULL,
|
||
source_account_id TEXT,
|
||
source_profile_url TEXT NOT NULL DEFAULT '',
|
||
keywords_json TEXT NOT NULL DEFAULT '[]',
|
||
prompt_text TEXT NOT NULL DEFAULT '',
|
||
context_json TEXT NOT NULL DEFAULT '{}',
|
||
created_at TEXT NOT NULL,
|
||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||
FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_similarity_searches_user_created
|
||
ON douyin_similarity_searches(user_id, created_at DESC);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_similarity_candidates (
|
||
id TEXT PRIMARY KEY,
|
||
search_id TEXT NOT NULL,
|
||
candidate_account_id TEXT,
|
||
candidate_profile_url TEXT NOT NULL DEFAULT '',
|
||
heuristic_score REAL NOT NULL DEFAULT 0,
|
||
agent_score REAL NOT NULL DEFAULT 0,
|
||
rationale_text TEXT NOT NULL DEFAULT '',
|
||
dimensions_json TEXT NOT NULL DEFAULT '{}',
|
||
raw_output_json TEXT NOT NULL DEFAULT '{}',
|
||
rank_index INTEGER NOT NULL DEFAULT 0,
|
||
created_at TEXT NOT NULL,
|
||
FOREIGN KEY(search_id) REFERENCES douyin_similarity_searches(id) ON DELETE CASCADE,
|
||
FOREIGN KEY(candidate_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_similarity_candidates_search_rank
|
||
ON douyin_similarity_candidates(search_id, rank_index ASC);
|
||
|
||
CREATE TABLE IF NOT EXISTS douyin_account_relations (
|
||
id TEXT PRIMARY KEY,
|
||
user_id TEXT NOT NULL,
|
||
source_account_id TEXT NOT NULL,
|
||
target_account_id TEXT,
|
||
target_profile_url TEXT NOT NULL DEFAULT '',
|
||
relation_type TEXT NOT NULL DEFAULT 'benchmark',
|
||
note TEXT NOT NULL DEFAULT '',
|
||
search_id TEXT NOT NULL DEFAULT '',
|
||
created_at TEXT NOT NULL,
|
||
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
||
FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
|
||
FOREIGN KEY(target_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_douyin_account_relations_source
|
||
ON douyin_account_relations(source_account_id, created_at DESC);
|
||
"""
|
||
with legacy.db.session() as conn:
|
||
conn.executescript(schema)
|
||
|
||
ensure_schema()
|
||
|
||
@app.on_event("startup")
|
||
def _startup_douyin_schema() -> None:
|
||
ensure_schema()
|
||
|
||
def _require_owned_account(account_id: str, user_id: str) -> dict[str, Any]:
|
||
row = legacy.db.fetch_one(
|
||
"SELECT * FROM douyin_accounts WHERE id = ? AND user_id = ?",
|
||
(account_id, user_id)
|
||
)
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="Douyin account not found")
|
||
return row
|
||
|
||
def _fetch_model_profiles(account_id: str) -> list[dict[str, Any]]:
|
||
return legacy.db.fetch_all(
|
||
"""
|
||
SELECT *
|
||
FROM model_profiles
|
||
WHERE owner_account_id IS NULL OR owner_account_id = ?
|
||
ORDER BY is_default DESC, created_at ASC
|
||
""",
|
||
(account_id,)
|
||
)
|
||
|
||
def _resolve_model_profiles(account: dict[str, Any], requested_ids: list[str]) -> list[dict[str, Any]]:
|
||
profiles = _fetch_model_profiles(account["id"])
|
||
if not profiles:
|
||
raise HTTPException(status_code=400, detail="No available model profiles")
|
||
if not requested_ids:
|
||
return profiles
|
||
profile_map = {row["id"]: row for row in profiles}
|
||
missing = [profile_id for profile_id in requested_ids if profile_id not in profile_map]
|
||
if missing:
|
||
raise HTTPException(status_code=404, detail=f"Unknown model profiles: {', '.join(missing)}")
|
||
return [profile_map[profile_id] for profile_id in requested_ids]
|
||
|
||
async def _collect_public_profile(profile_url: str, manual_payload: dict[str, Any] | None) -> dict[str, Any]:
|
||
source_url = _normalize_profile_url_input(profile_url)
|
||
blobs: list[dict[str, Any]] = []
|
||
errors: list[str] = []
|
||
|
||
if manual_payload:
|
||
blobs.append({"script_id": "manual_profile_payload", "payload": manual_payload})
|
||
|
||
if source_url:
|
||
try:
|
||
final_url, html = await _fetch_html(source_url)
|
||
source_url = final_url
|
||
if not html.strip():
|
||
errors.append("public_profile_empty_html")
|
||
elif _looks_like_douyin_anti_bot_page(html):
|
||
errors.append("public_profile_anti_bot_challenge")
|
||
elif not blobs:
|
||
blobs.extend(_extract_json_blobs_from_html(html))
|
||
if not blobs:
|
||
errors.append("public_profile_no_json_blobs")
|
||
else:
|
||
blobs.extend(_extract_json_blobs_from_html(html))
|
||
if not blobs:
|
||
errors.append("public_profile_no_json_blobs")
|
||
except Exception as exc:
|
||
errors.append(f"public_profile_fetch_failed: {exc}")
|
||
|
||
payloads = [item["payload"] for item in blobs]
|
||
profile = _pick_best_profile(
|
||
[candidate for payload in payloads for candidate in _extract_profile_candidates(payload)],
|
||
fallback_url=source_url
|
||
)
|
||
videos = _extract_videos(payloads)
|
||
if source_url and not profile.get("nickname") and not videos and not errors:
|
||
if not blobs:
|
||
errors.append("public_profile_no_json_blobs")
|
||
errors.append("public_profile_no_candidates")
|
||
return {
|
||
"profile": profile,
|
||
"videos": videos,
|
||
"raw_pages": blobs,
|
||
"errors": errors,
|
||
"source_url": source_url
|
||
}
|
||
|
||
async def _collect_creator_center_pages(
|
||
urls: list[str],
|
||
cookie: str,
|
||
manual_pages: list[ManualPageCapture]
|
||
) -> dict[str, Any]:
|
||
pages: list[dict[str, Any]] = []
|
||
errors: list[str] = []
|
||
|
||
for page in manual_pages:
|
||
pages.append({
|
||
"url": page.url,
|
||
"title": page.title,
|
||
"blobs": [{"script_id": "manual_creator_payload", "payload": page.payload}]
|
||
})
|
||
|
||
if cookie.strip():
|
||
for url in urls:
|
||
try:
|
||
final_url, html = await _fetch_html(url, cookie=cookie)
|
||
pages.append({
|
||
"url": final_url,
|
||
"title": "",
|
||
"blobs": _extract_json_blobs_from_html(html)
|
||
})
|
||
except Exception as exc:
|
||
errors.append(f"creator_center_fetch_failed[{url}]: {exc}")
|
||
|
||
return {"pages": pages, "errors": errors}
|
||
|
||
def _upsert_account(
|
||
owner: dict[str, Any],
|
||
profile: dict[str, Any],
|
||
sync_request: DouyinAccountSyncRequest,
|
||
public_data: dict[str, Any],
|
||
creator_data: dict[str, Any]
|
||
) -> dict[str, Any]:
|
||
lookup_candidates = [
|
||
("sec_uid", profile.get("sec_uid", "")),
|
||
("douyin_id", profile.get("douyin_id", "")),
|
||
("canonical_profile_url", profile.get("canonical_profile_url", ""))
|
||
]
|
||
existing: dict[str, Any] | None = None
|
||
for field_name, field_value in lookup_candidates:
|
||
if not field_value:
|
||
continue
|
||
existing = legacy.db.fetch_one(
|
||
f"SELECT * FROM douyin_accounts WHERE user_id = ? AND {field_name} = ? LIMIT 1",
|
||
(owner["id"], field_value)
|
||
)
|
||
if existing:
|
||
break
|
||
|
||
account_id = existing["id"] if existing else make_id("dyacct")
|
||
created_at = existing["created_at"] if existing else now()
|
||
updated_at = now()
|
||
|
||
tags = _dedupe_strings(profile.get("tags", []) + _extract_keywords(profile.get("nickname", ""), profile.get("signature", "")))
|
||
profile_stats = profile.get("stats", {})
|
||
source_mode = "creator_center" if creator_data["pages"] else "public"
|
||
sync_status = "partial" if public_data["errors"] or creator_data["errors"] else "ready"
|
||
|
||
if existing:
|
||
legacy.db.execute(
|
||
"""
|
||
UPDATE douyin_accounts
|
||
SET profile_url = ?, canonical_profile_url = ?, sec_uid = ?, douyin_uid = ?, douyin_id = ?,
|
||
nickname = ?, signature = ?, avatar_url = ?, tags_json = ?, profile_stats_json = ?,
|
||
raw_profile_json = ?, source_mode = ?, sync_status = ?, last_public_sync_at = ?,
|
||
last_creator_sync_at = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""",
|
||
(
|
||
profile.get("profile_url", ""),
|
||
profile.get("canonical_profile_url", ""),
|
||
profile.get("sec_uid", ""),
|
||
profile.get("douyin_uid", ""),
|
||
profile.get("douyin_id", ""),
|
||
profile.get("nickname", ""),
|
||
profile.get("signature", ""),
|
||
profile.get("avatar_url", ""),
|
||
_safe_json_dumps(tags),
|
||
_safe_json_dumps(profile_stats),
|
||
_safe_json_dumps({
|
||
"profile": profile.get("raw", {}),
|
||
"discovery_note": sync_request.discovery_note
|
||
}),
|
||
source_mode,
|
||
sync_status,
|
||
now() if public_data["raw_pages"] else existing.get("last_public_sync_at"),
|
||
now() if creator_data["pages"] else existing.get("last_creator_sync_at"),
|
||
updated_at,
|
||
account_id
|
||
)
|
||
)
|
||
else:
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_accounts (
|
||
id, user_id, profile_url, canonical_profile_url, sec_uid, douyin_uid, douyin_id,
|
||
nickname, signature, avatar_url, tags_json, profile_stats_json, raw_profile_json,
|
||
source_mode, sync_status, last_public_sync_at, last_creator_sync_at, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
account_id,
|
||
owner["id"],
|
||
profile.get("profile_url", ""),
|
||
profile.get("canonical_profile_url", ""),
|
||
profile.get("sec_uid", ""),
|
||
profile.get("douyin_uid", ""),
|
||
profile.get("douyin_id", ""),
|
||
profile.get("nickname", ""),
|
||
profile.get("signature", ""),
|
||
profile.get("avatar_url", ""),
|
||
_safe_json_dumps(tags),
|
||
_safe_json_dumps(profile_stats),
|
||
_safe_json_dumps({
|
||
"profile": profile.get("raw", {}),
|
||
"discovery_note": sync_request.discovery_note
|
||
}),
|
||
source_mode,
|
||
sync_status,
|
||
now() if public_data["raw_pages"] else None,
|
||
now() if creator_data["pages"] else None,
|
||
created_at,
|
||
updated_at
|
||
)
|
||
)
|
||
|
||
account_row = _require_owned_account(account_id, owner["id"])
|
||
_persist_snapshots_and_videos(account_row, public_data, creator_data, sync_request)
|
||
return _require_owned_account(account_id, owner["id"])
|
||
|
||
def _persist_snapshot(
|
||
account_row: dict[str, Any],
|
||
snapshot_type: str,
|
||
source_url: str,
|
||
payload: Any,
|
||
summary: dict[str, Any],
|
||
index_fields: bool = True
|
||
) -> str:
|
||
snapshot_id = make_id("dysnap")
|
||
collected_at = now()
|
||
fields = _flatten_json(payload) if index_fields else []
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_account_snapshots (
|
||
id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
|
||
field_count, collected_at, created_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
snapshot_id,
|
||
account_row["id"],
|
||
snapshot_type,
|
||
source_url,
|
||
_safe_json_dumps(payload),
|
||
_safe_json_dumps(summary),
|
||
len(fields),
|
||
collected_at,
|
||
collected_at
|
||
)
|
||
)
|
||
for field_path, field_type, field_value in fields:
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT OR REPLACE INTO douyin_snapshot_fields (
|
||
snapshot_id, field_path, field_type, field_value_text
|
||
) VALUES (?, ?, ?, ?)
|
||
""",
|
||
(snapshot_id, field_path, field_type, field_value)
|
||
)
|
||
return snapshot_id
|
||
|
||
def _persist_snapshots_and_videos(
|
||
account_row: dict[str, Any],
|
||
public_data: dict[str, Any],
|
||
creator_data: dict[str, Any],
|
||
sync_request: DouyinAccountSyncRequest
|
||
) -> None:
|
||
if public_data["raw_pages"]:
|
||
public_payload = {
|
||
"pages": public_data["raw_pages"],
|
||
"errors": public_data["errors"],
|
||
"source_url": public_data["source_url"]
|
||
}
|
||
_persist_snapshot(
|
||
account_row,
|
||
"public_profile",
|
||
public_data["source_url"],
|
||
public_payload,
|
||
{
|
||
"video_count": len(public_data["videos"]),
|
||
"nickname": public_data["profile"].get("nickname", ""),
|
||
"tags": public_data["profile"].get("tags", [])
|
||
},
|
||
index_fields=not sync_request.compact_response
|
||
)
|
||
|
||
for page in creator_data["pages"]:
|
||
payload = {
|
||
"title": page["title"],
|
||
"blobs": page["blobs"]
|
||
}
|
||
_persist_snapshot(
|
||
account_row,
|
||
"creator_center",
|
||
page["url"],
|
||
payload,
|
||
{
|
||
"blob_count": len(page["blobs"]),
|
||
"field_count": 0 if sync_request.compact_response else len(_flatten_json(payload))
|
||
},
|
||
index_fields=not sync_request.compact_response
|
||
)
|
||
|
||
for manual_video in sync_request.manual_work_payloads:
|
||
normalized = _normalize_video_candidate(manual_video)
|
||
public_data["videos"].append(normalized)
|
||
|
||
deduped: dict[str, dict[str, Any]] = {}
|
||
for video in public_data["videos"]:
|
||
key = video["aweme_id"] or video["share_url"] or video["title"]
|
||
if key and key not in deduped:
|
||
deduped[key] = video
|
||
|
||
for video in deduped.values():
|
||
existing = None
|
||
if video["aweme_id"]:
|
||
existing = legacy.db.fetch_one(
|
||
"SELECT id FROM douyin_videos WHERE account_id = ? AND aweme_id = ? LIMIT 1",
|
||
(account_row["id"], video["aweme_id"])
|
||
)
|
||
video_id = existing["id"] if existing else make_id("dyvideo")
|
||
created_at = now()
|
||
if existing:
|
||
legacy.db.execute(
|
||
"""
|
||
UPDATE douyin_videos
|
||
SET title = ?, description = ?, share_url = ?, cover_url = ?, duration_sec = ?,
|
||
published_at = ?, tags_json = ?, stats_json = ?, raw_json = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""",
|
||
(
|
||
video["title"],
|
||
video["description"],
|
||
video["share_url"],
|
||
video["cover_url"],
|
||
video["duration_sec"],
|
||
video["published_at"],
|
||
_safe_json_dumps(video["tags"]),
|
||
_safe_json_dumps(video["stats"]),
|
||
_safe_json_dumps(video["raw"]),
|
||
now(),
|
||
video_id
|
||
)
|
||
)
|
||
else:
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_videos (
|
||
id, account_id, aweme_id, title, description, share_url, cover_url,
|
||
duration_sec, published_at, tags_json, stats_json, raw_json, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
video_id,
|
||
account_row["id"],
|
||
video["aweme_id"],
|
||
video["title"],
|
||
video["description"],
|
||
video["share_url"],
|
||
video["cover_url"],
|
||
video["duration_sec"],
|
||
video["published_at"],
|
||
_safe_json_dumps(video["tags"]),
|
||
_safe_json_dumps(video["stats"]),
|
||
_safe_json_dumps(video["raw"]),
|
||
created_at,
|
||
created_at
|
||
)
|
||
)
|
||
|
||
def _list_videos(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
|
||
rows = legacy.db.fetch_all(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_videos
|
||
WHERE account_id = ?
|
||
ORDER BY COALESCE(published_at, updated_at) DESC, updated_at DESC
|
||
LIMIT ?
|
||
""",
|
||
(account_id, limit)
|
||
)
|
||
payloads: list[dict[str, Any]] = []
|
||
for row in rows:
|
||
raw_payload = _safe_json_loads(row["raw_json"], {})
|
||
normalized = _normalize_video_candidate(raw_payload) if isinstance(raw_payload, dict) and raw_payload else {}
|
||
payloads.append({
|
||
"id": row["id"],
|
||
"aweme_id": row["aweme_id"],
|
||
"title": row["title"],
|
||
"description": row["description"],
|
||
"share_url": row["share_url"],
|
||
"cover_url": row["cover_url"],
|
||
"duration_sec": row["duration_sec"],
|
||
"published_at": row["published_at"],
|
||
"tags": _safe_json_loads(row["tags_json"], []),
|
||
"stats": _safe_json_loads(row["stats_json"], {}),
|
||
"content_type": normalized.get("content_type", "video"),
|
||
"content_type_label": normalized.get("content_type_label", "视频"),
|
||
"image_count": int(normalized.get("image_count") or 0),
|
||
"raw": raw_payload
|
||
})
|
||
return payloads
|
||
|
||
def _latest_video_analysis_map(account_id: str) -> dict[str, dict[str, Any]]:
|
||
rows = legacy.db.fetch_all(
|
||
"""
|
||
SELECT analysis.*
|
||
FROM douyin_video_analyses analysis
|
||
INNER JOIN (
|
||
SELECT video_id, MAX(created_at) AS latest_created_at
|
||
FROM douyin_video_analyses
|
||
WHERE account_id = ?
|
||
GROUP BY video_id
|
||
) latest
|
||
ON latest.video_id = analysis.video_id
|
||
AND latest.latest_created_at = analysis.created_at
|
||
WHERE analysis.account_id = ?
|
||
""",
|
||
(account_id, account_id)
|
||
)
|
||
payloads: dict[str, dict[str, Any]] = {}
|
||
for row in rows:
|
||
parsed = _safe_json_loads(row["parsed_json"], {})
|
||
payloads[row["video_id"]] = {
|
||
"id": row["id"],
|
||
"video_id": row["video_id"],
|
||
"report_id": row["report_id"],
|
||
"model_profile_id": row["model_profile_id"],
|
||
"model_label": row["model_label"],
|
||
"source_type": row["source_type"],
|
||
"status": row["status"],
|
||
"performance_score": float(row["performance_score"] or 0),
|
||
"commercial_score": float(row["commercial_score"] or 0),
|
||
"hook_score": float(row["hook_score"] or 0),
|
||
"retention_score": float(row["retention_score"] or 0),
|
||
"conversion_score": float(row["conversion_score"] or 0),
|
||
"summary_text": row["summary_text"],
|
||
"suggestion_text": row["suggestion_text"],
|
||
"parsed_json": parsed,
|
||
"created_at": row["created_at"],
|
||
"updated_at": row["updated_at"]
|
||
}
|
||
return payloads
|
||
|
||
def _build_video_payload(video: dict[str, Any], latest_analysis: dict[str, Any] | None = None) -> dict[str, Any]:
|
||
score = _video_score_breakdown(video)
|
||
payload = {
|
||
"id": video["id"],
|
||
"aweme_id": video["aweme_id"],
|
||
"title": video["title"],
|
||
"description": video["description"],
|
||
"share_url": video["share_url"],
|
||
"cover_url": video["cover_url"],
|
||
"duration_sec": video["duration_sec"],
|
||
"published_at": video["published_at"],
|
||
"tags": video["tags"],
|
||
"content_type": video.get("content_type", "video"),
|
||
"content_type_label": video.get("content_type_label", "视频"),
|
||
"image_count": int(video.get("image_count") or 0),
|
||
"stats": video["stats"],
|
||
"score": score
|
||
}
|
||
if latest_analysis:
|
||
payload["latest_analysis"] = latest_analysis
|
||
return payload
|
||
|
||
def _video_sort_key(video: dict[str, Any], sort_by: str) -> tuple[Any, ...]:
|
||
if sort_by in {"popular", "popularity"}:
|
||
return (
|
||
float(video.get("score", {}).get("popularity_score") or 0),
|
||
float(video.get("score", {}).get("performance_score") or 0),
|
||
float(video.get("score", {}).get("commercial_score") or 0)
|
||
)
|
||
if sort_by == "latest":
|
||
return (
|
||
_parse_iso_datetime(video.get("published_at")) or datetime.fromtimestamp(0, tz=timezone.utc),
|
||
video.get("score", {}).get("performance_score", 0)
|
||
)
|
||
if sort_by == "commercial":
|
||
return (
|
||
float(video.get("score", {}).get("commercial_score") or 0),
|
||
float(video.get("score", {}).get("performance_score") or 0)
|
||
)
|
||
if sort_by == "play":
|
||
return (
|
||
float((video.get("stats") or {}).get("play") or 0),
|
||
float(video.get("score", {}).get("performance_score") or 0)
|
||
)
|
||
if sort_by == "like":
|
||
return (
|
||
float((video.get("stats") or {}).get("like") or 0),
|
||
float(video.get("score", {}).get("performance_score") or 0)
|
||
)
|
||
if sort_by == "share":
|
||
return (
|
||
float((video.get("stats") or {}).get("share") or 0),
|
||
float(video.get("score", {}).get("performance_score") or 0)
|
||
)
|
||
if sort_by == "comment":
|
||
return (
|
||
float((video.get("stats") or {}).get("comment") or 0),
|
||
float(video.get("score", {}).get("performance_score") or 0)
|
||
)
|
||
return (
|
||
float(video.get("score", {}).get("performance_score") or 0),
|
||
float(video.get("score", {}).get("commercial_score") or 0)
|
||
)
|
||
|
||
def _build_video_workspace_payload(
|
||
account_row: dict[str, Any],
|
||
limit: int = 60
|
||
) -> dict[str, Any]:
|
||
raw_videos = _list_videos(account_row["id"], limit=max(limit, 24))
|
||
latest_analysis_map = _latest_video_analysis_map(account_row["id"])
|
||
videos = [
|
||
_build_video_payload(video, latest_analysis_map.get(video["id"]))
|
||
for video in raw_videos
|
||
]
|
||
videos_by_score = sorted(videos, key=lambda item: _video_sort_key(item, "score"), reverse=True)
|
||
videos_by_latest = sorted(videos, key=lambda item: _video_sort_key(item, "latest"), reverse=True)
|
||
high_score_threshold = 60.0
|
||
high_score_videos = [video for video in videos_by_score if float(video["score"]["performance_score"]) >= high_score_threshold]
|
||
analyzed_count = sum(1 for video in videos if video.get("latest_analysis"))
|
||
video_only_count = sum(1 for video in videos if video.get("content_type") == "video")
|
||
image_text_count = sum(1 for video in videos if video.get("content_type") == "image_text")
|
||
return {
|
||
"items": videos,
|
||
"top_scored_video_ids": [video["id"] for video in videos_by_score[: min(12, len(videos_by_score))]],
|
||
"latest_video_ids": [video["id"] for video in videos_by_latest[: min(12, len(videos_by_latest))]],
|
||
"high_score_threshold": high_score_threshold,
|
||
"meta": {
|
||
"total_count": len(videos),
|
||
"analyzed_count": analyzed_count,
|
||
"high_score_count": len(high_score_videos),
|
||
"video_count": video_only_count,
|
||
"image_text_count": image_text_count
|
||
}
|
||
}
|
||
|
||
def _finalize_sync_workspace(
|
||
owner: dict[str, Any],
|
||
request: DouyinAccountSyncRequest,
|
||
public_data: dict[str, Any],
|
||
creator_data: dict[str, Any]
|
||
) -> dict[str, Any]:
|
||
creator_payloads = _extract_creator_payloads(creator_data)
|
||
if creator_payloads:
|
||
creator_profile = _pick_best_profile(
|
||
[candidate for payload in creator_payloads for candidate in _extract_profile_candidates(payload)]
|
||
)
|
||
creator_videos = _extract_videos(creator_payloads)
|
||
creator_identity_match = _profiles_appear_same(public_data["profile"], creator_profile)
|
||
should_merge_creator = creator_identity_match or request.allow_creator_center_profile_fallback
|
||
if should_merge_creator:
|
||
if creator_profile.get("nickname"):
|
||
public_data["profile"] = _merge_profile_payload(public_data["profile"], creator_profile)
|
||
if not public_data["source_url"]:
|
||
public_data["source_url"] = creator_profile.get("canonical_profile_url") or request.profile_url
|
||
if request.allow_creator_center_profile_fallback and not creator_identity_match:
|
||
public_data["errors"].append("creator_center_profile_fallback_used")
|
||
elif public_data["profile"].get("nickname") != creator_profile.get("nickname"):
|
||
public_data["errors"].append("creator_center_profile_merge_partial")
|
||
public_data["videos"].extend(creator_videos)
|
||
elif creator_profile.get("nickname") or creator_videos:
|
||
public_data["errors"].append("creator_center_identity_mismatch_skipped")
|
||
if not public_data["profile"].get("nickname") and not public_data["videos"]:
|
||
message = "No Douyin profile or creator-center data could be extracted"
|
||
if "creator_center_identity_mismatch_skipped" in public_data["errors"]:
|
||
message = "Creator-center capture belongs to a different logged-in Douyin account; automatic merge was skipped"
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail={
|
||
"message": message,
|
||
"profile_url": request.profile_url,
|
||
"resolved_profile_url": public_data["source_url"],
|
||
"public_blob_count": len(public_data["raw_pages"]),
|
||
"public_video_count": len(public_data["videos"]),
|
||
"public_errors": public_data["errors"],
|
||
"creator_page_count": len(creator_data["pages"]),
|
||
"creator_errors": creator_data["errors"]
|
||
}
|
||
)
|
||
account_row = _upsert_account(owner, public_data["profile"], request, public_data, creator_data)
|
||
sync_errors = public_data["errors"] + creator_data["errors"]
|
||
if request.compact_response:
|
||
return {
|
||
"account": {
|
||
"id": account_row["id"],
|
||
"nickname": account_row["nickname"],
|
||
"profile_url": account_row["profile_url"],
|
||
"douyin_id": account_row["douyin_id"],
|
||
"sec_uid": account_row["sec_uid"],
|
||
"sync_status": account_row["sync_status"]
|
||
},
|
||
"sync_errors": sync_errors,
|
||
"public_video_count": len(public_data["videos"]),
|
||
"creator_page_count": len(creator_data["pages"])
|
||
}
|
||
workspace = _build_workspace_payload(account_row)
|
||
workspace["sync_errors"] = sync_errors
|
||
return workspace
|
||
|
||
def _build_account_payload(account_row: dict[str, Any], include_recent_videos: int = 8) -> dict[str, Any]:
|
||
videos = _list_videos(account_row["id"], limit=max(include_recent_videos, 12))
|
||
tags = _safe_json_loads(account_row["tags_json"], [])
|
||
profile_stats = _safe_json_loads(account_row["profile_stats_json"], {})
|
||
video_summary = _summarize_videos(videos, limit=include_recent_videos)
|
||
keywords = _dedupe_strings(
|
||
tags
|
||
+ _extract_keywords(account_row["nickname"], account_row["signature"])
|
||
+ video_summary["top_tags"]
|
||
+ [video["title"] for video in video_summary["videos"]]
|
||
)
|
||
return {
|
||
"id": account_row["id"],
|
||
"nickname": account_row["nickname"],
|
||
"signature": account_row["signature"],
|
||
"profile_url": account_row["canonical_profile_url"] or account_row["profile_url"],
|
||
"avatar_url": account_row["avatar_url"],
|
||
"sec_uid": account_row["sec_uid"],
|
||
"douyin_id": account_row["douyin_id"],
|
||
"profile_stats": profile_stats,
|
||
"tags": tags,
|
||
"keywords": keywords[:18],
|
||
"sync_status": account_row["sync_status"],
|
||
"video_summary": video_summary
|
||
}
|
||
|
||
def _list_linked_accounts(account_row: dict[str, Any]) -> list[dict[str, Any]]:
|
||
relation_rows = legacy.db.fetch_all(
|
||
"""
|
||
SELECT rel.*, target.nickname AS target_nickname, target.signature AS target_signature,
|
||
target.canonical_profile_url AS target_canonical_profile_url, target.profile_stats_json AS target_profile_stats_json,
|
||
target.tags_json AS target_tags_json
|
||
FROM douyin_account_relations rel
|
||
LEFT JOIN douyin_accounts target ON target.id = rel.target_account_id
|
||
WHERE rel.source_account_id = ?
|
||
ORDER BY rel.created_at DESC
|
||
""",
|
||
(account_row["id"],)
|
||
)
|
||
payloads: list[dict[str, Any]] = []
|
||
for row in relation_rows:
|
||
payloads.append({
|
||
"relation_id": row["id"],
|
||
"relation_type": row["relation_type"],
|
||
"note": row["note"],
|
||
"search_id": row["search_id"],
|
||
"created_at": row["created_at"],
|
||
"target_account_id": row["target_account_id"],
|
||
"target_profile_url": row["target_profile_url"] or row.get("target_canonical_profile_url", ""),
|
||
"target_nickname": row.get("target_nickname", ""),
|
||
"target_signature": row.get("target_signature", ""),
|
||
"target_profile_stats": _safe_json_loads(row.get("target_profile_stats_json"), {}),
|
||
"target_tags": _safe_json_loads(row.get("target_tags_json"), [])
|
||
})
|
||
return payloads
|
||
|
||
def _normalize_report_text(value: Any) -> str:
|
||
text = str(value or "").strip()
|
||
if not text:
|
||
return ""
|
||
return re.sub(r"\s+", " ", text)
|
||
|
||
def _build_report_payload(report: dict[str, Any]) -> dict[str, Any]:
|
||
suggestions = legacy.db.fetch_all(
|
||
"SELECT * FROM douyin_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
|
||
(report["id"],)
|
||
)
|
||
return {
|
||
"id": report["id"],
|
||
"focus_text": report["focus_text"],
|
||
"model_profile_ids": _safe_json_loads(report["model_profile_ids_json"], []),
|
||
"linked_account_ids": _safe_json_loads(report["linked_account_ids_json"], []),
|
||
"created_at": report["created_at"],
|
||
"duplicate_count": 1,
|
||
"duplicate_report_ids": [],
|
||
"suggestions": [
|
||
{
|
||
"id": suggestion["id"],
|
||
"model_profile_id": suggestion["model_profile_id"],
|
||
"model_label": suggestion["model_label"],
|
||
"status": suggestion["status"],
|
||
"suggestion_text": suggestion["suggestion_text"],
|
||
"parsed_json": _safe_json_loads(suggestion["parsed_json"], {})
|
||
}
|
||
for suggestion in suggestions
|
||
]
|
||
}
|
||
|
||
def _report_signature(report_payload: dict[str, Any]) -> str:
|
||
parts = [_normalize_report_text(report_payload.get("focus_text"))]
|
||
for suggestion in report_payload.get("suggestions", []):
|
||
parsed = suggestion.get("parsed_json") or {}
|
||
if isinstance(parsed, dict) and parsed:
|
||
normalized_content = json.dumps(parsed, ensure_ascii=False, sort_keys=True)
|
||
else:
|
||
normalized_content = _normalize_report_text(suggestion.get("suggestion_text"))
|
||
parts.append(
|
||
"|".join(
|
||
[
|
||
suggestion.get("model_profile_id", ""),
|
||
suggestion.get("status", ""),
|
||
normalized_content
|
||
]
|
||
)
|
||
)
|
||
return "\n".join(parts)
|
||
|
||
def _list_report_payloads(account_id: str, limit: int = 5, dedupe: bool = True) -> list[dict[str, Any]]:
|
||
rows = legacy.db.fetch_all(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_analysis_reports
|
||
WHERE account_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT ?
|
||
""",
|
||
(account_id, max(limit * 4, 20))
|
||
)
|
||
payloads = [_build_report_payload(report) for report in rows]
|
||
if not dedupe:
|
||
return payloads[:limit]
|
||
|
||
unique_payloads: list[dict[str, Any]] = []
|
||
seen: dict[str, dict[str, Any]] = {}
|
||
for payload in payloads:
|
||
signature = _report_signature(payload)
|
||
if signature in seen:
|
||
seen_payload = seen[signature]
|
||
seen_payload["duplicate_count"] = int(seen_payload.get("duplicate_count") or 1) + 1
|
||
seen_payload.setdefault("duplicate_report_ids", []).append(payload["id"])
|
||
continue
|
||
seen[signature] = payload
|
||
unique_payloads.append(payload)
|
||
focus_filtered: list[dict[str, Any]] = []
|
||
focus_seen: dict[str, dict[str, Any]] = {}
|
||
for payload in unique_payloads:
|
||
focus_key = _normalize_report_text(payload.get("focus_text") or "__default__")
|
||
if focus_key in focus_seen:
|
||
seen_payload = focus_seen[focus_key]
|
||
seen_payload["duplicate_count"] = int(seen_payload.get("duplicate_count") or 1) + 1
|
||
seen_payload.setdefault("duplicate_report_ids", []).append(payload["id"])
|
||
continue
|
||
focus_seen[focus_key] = payload
|
||
focus_filtered.append(payload)
|
||
return focus_filtered[:limit]
|
||
|
||
def _delete_report(report_id: str) -> None:
|
||
legacy.db.execute("DELETE FROM douyin_analysis_suggestions WHERE report_id = ?", (report_id,))
|
||
legacy.db.execute("DELETE FROM douyin_analysis_reports WHERE id = ?", (report_id,))
|
||
|
||
def _find_duplicate_report_payload(
|
||
account_id: str,
|
||
focus_text: str,
|
||
suggestion_payloads: list[dict[str, Any]],
|
||
exclude_report_id: str = ""
|
||
) -> dict[str, Any] | None:
|
||
candidate_rows = legacy.db.fetch_all(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_analysis_reports
|
||
WHERE account_id = ? AND focus_text = ? AND id != ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 10
|
||
""",
|
||
(account_id, focus_text, exclude_report_id)
|
||
)
|
||
probe_payload = {
|
||
"focus_text": focus_text,
|
||
"suggestions": suggestion_payloads
|
||
}
|
||
probe_signature = _report_signature(probe_payload)
|
||
for row in candidate_rows:
|
||
candidate_payload = _build_report_payload(row)
|
||
if _report_signature(candidate_payload) == probe_signature:
|
||
return candidate_payload
|
||
return None
|
||
|
||
def _build_workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
|
||
account_payload = _build_account_payload(account_row)
|
||
video_workspace = _build_video_workspace_payload(account_row)
|
||
latest_public_snapshot = legacy.db.fetch_one(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_account_snapshots
|
||
WHERE account_id = ? AND snapshot_type = 'public_profile'
|
||
ORDER BY collected_at DESC
|
||
LIMIT 1
|
||
""",
|
||
(account_row["id"],)
|
||
)
|
||
latest_creator_snapshot = legacy.db.fetch_one(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_account_snapshots
|
||
WHERE account_id = ? AND snapshot_type = 'creator_center'
|
||
ORDER BY collected_at DESC
|
||
LIMIT 1
|
||
""",
|
||
(account_row["id"],)
|
||
)
|
||
report_payloads = _list_report_payloads(account_row["id"], limit=5, dedupe=True)
|
||
recent_searches = legacy.db.fetch_all(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_similarity_searches
|
||
WHERE source_account_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 5
|
||
""",
|
||
(account_row["id"],)
|
||
)
|
||
return {
|
||
"account": account_payload,
|
||
"latest_public_snapshot": {
|
||
"id": latest_public_snapshot["id"],
|
||
"source_url": latest_public_snapshot["source_url"],
|
||
"field_count": latest_public_snapshot["field_count"],
|
||
"collected_at": latest_public_snapshot["collected_at"],
|
||
"summary": _safe_json_loads(latest_public_snapshot["summary_json"], {})
|
||
} if latest_public_snapshot else None,
|
||
"latest_creator_snapshot": {
|
||
"id": latest_creator_snapshot["id"],
|
||
"source_url": latest_creator_snapshot["source_url"],
|
||
"field_count": latest_creator_snapshot["field_count"],
|
||
"collected_at": latest_creator_snapshot["collected_at"],
|
||
"summary": _safe_json_loads(latest_creator_snapshot["summary_json"], {})
|
||
} if latest_creator_snapshot else None,
|
||
"linked_accounts": _list_linked_accounts(account_row),
|
||
"recent_reports": report_payloads,
|
||
"video_workspace": {
|
||
"top_scored_video_ids": video_workspace["top_scored_video_ids"],
|
||
"latest_video_ids": video_workspace["latest_video_ids"],
|
||
"high_score_threshold": video_workspace["high_score_threshold"],
|
||
"meta": video_workspace["meta"]
|
||
},
|
||
"recent_similarity_searches": [
|
||
{
|
||
"id": row["id"],
|
||
"keywords": _safe_json_loads(row["keywords_json"], []),
|
||
"created_at": row["created_at"]
|
||
}
|
||
for row in recent_searches
|
||
],
|
||
"available_model_profiles": [
|
||
{
|
||
"id": row["id"],
|
||
"name": row["name"],
|
||
"model_name": row["model_name"],
|
||
"base_url": row["base_url"],
|
||
"is_default": bool(row["is_default"])
|
||
}
|
||
for row in _fetch_model_profiles(account_row["user_id"])
|
||
]
|
||
}
|
||
|
||
def _list_snapshots(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
|
||
rows = legacy.db.fetch_all(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_account_snapshots
|
||
WHERE account_id = ?
|
||
ORDER BY collected_at DESC
|
||
LIMIT ?
|
||
""",
|
||
(account_id, limit)
|
||
)
|
||
return [
|
||
{
|
||
"id": row["id"],
|
||
"snapshot_type": row["snapshot_type"],
|
||
"source_url": row["source_url"],
|
||
"field_count": row["field_count"],
|
||
"collected_at": row["collected_at"],
|
||
"summary": _safe_json_loads(row["summary_json"], {})
|
||
}
|
||
for row in rows
|
||
]
|
||
|
||
def _get_snapshot_detail(snapshot_id: str, account_id: str) -> dict[str, Any]:
|
||
row = legacy.db.fetch_one(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_account_snapshots
|
||
WHERE id = ? AND account_id = ?
|
||
LIMIT 1
|
||
""",
|
||
(snapshot_id, account_id)
|
||
)
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="Snapshot not found")
|
||
fields = legacy.db.fetch_all(
|
||
"""
|
||
SELECT field_path, field_type, field_value_text
|
||
FROM douyin_snapshot_fields
|
||
WHERE snapshot_id = ?
|
||
ORDER BY field_path ASC
|
||
""",
|
||
(snapshot_id,)
|
||
)
|
||
return {
|
||
"id": row["id"],
|
||
"snapshot_type": row["snapshot_type"],
|
||
"source_url": row["source_url"],
|
||
"field_count": row["field_count"],
|
||
"collected_at": row["collected_at"],
|
||
"summary": _safe_json_loads(row["summary_json"], {}),
|
||
"raw_payload": _safe_json_loads(row["raw_payload_json"], {}),
|
||
"fields": fields
|
||
}
|
||
|
||
def _build_video_context_items(
|
||
video_workspace: dict[str, Any],
|
||
max_top_items: int = 6,
|
||
max_latest_items: int = 6
|
||
) -> dict[str, list[dict[str, Any]]]:
|
||
items = video_workspace.get("items", [])
|
||
item_map = {item["id"]: item for item in items}
|
||
top_items = [
|
||
item_map[video_id]
|
||
for video_id in video_workspace.get("top_scored_video_ids", [])[:max_top_items]
|
||
if video_id in item_map
|
||
]
|
||
latest_items = [
|
||
item_map[video_id]
|
||
for video_id in video_workspace.get("latest_video_ids", [])[:max_latest_items]
|
||
if video_id in item_map
|
||
]
|
||
|
||
def _brief(video: dict[str, Any]) -> dict[str, Any]:
|
||
return {
|
||
"video_id": video["id"],
|
||
"aweme_id": video["aweme_id"],
|
||
"title": video["title"],
|
||
"description": video["description"],
|
||
"published_at": video["published_at"],
|
||
"tags": video["tags"][:6],
|
||
"stats": video["stats"],
|
||
"score": video["score"],
|
||
"latest_analysis": (video.get("latest_analysis") or {}).get("parsed_json") or {}
|
||
}
|
||
|
||
return {
|
||
"top_performing_videos": [_brief(item) for item in top_items],
|
||
"latest_videos": [_brief(item) for item in latest_items]
|
||
}
|
||
|
||
def _bounded_score(value: Any, fallback: float = 0.0) -> float:
|
||
parsed = _parse_count(value)
|
||
if parsed <= 0 and value not in (0, "0", 0.0):
|
||
parsed = fallback
|
||
return round(max(0.0, min(100.0, parsed or fallback)), 2)
|
||
|
||
def _merge_structured_payload(fallback: Any, parsed: Any) -> Any:
|
||
if not isinstance(fallback, dict) or not isinstance(parsed, dict):
|
||
return parsed or fallback
|
||
merged: dict[str, Any] = {}
|
||
for key, fallback_value in fallback.items():
|
||
if key not in parsed:
|
||
merged[key] = fallback_value
|
||
continue
|
||
parsed_value = parsed[key]
|
||
if isinstance(fallback_value, dict) and isinstance(parsed_value, dict):
|
||
merged[key] = _merge_structured_payload(fallback_value, parsed_value)
|
||
else:
|
||
merged[key] = parsed_value if parsed_value not in (None, "", [], {}) else fallback_value
|
||
for key, parsed_value in parsed.items():
|
||
if key not in merged:
|
||
merged[key] = parsed_value
|
||
return merged
|
||
|
||
def _infer_offer_directions(keywords: list[str]) -> list[str]:
|
||
normalized = {item.lower() for item in keywords}
|
||
offers: list[str] = []
|
||
if {"创业", "成交", "获客"} & normalized:
|
||
offers.append("创业获客咨询、成交训练营或老板 IP 陪跑")
|
||
if {"文案", "短视频文案", "口播", "二创"} & normalized:
|
||
offers.append("短视频文案模板包、脚本代写或内容陪跑服务")
|
||
if {"教育", "教育规划"} & normalized:
|
||
offers.append("教育规划咨询、升学产品或高客单咨询服务")
|
||
if {"后期", "剪辑", "产品"} & normalized:
|
||
offers.append("剪辑优化、内容包装或产品策划服务")
|
||
if not offers:
|
||
offers.append("内容咨询、账号诊断和主题训练营")
|
||
offers.append("以高分作品为样板的复刻栏目和线索承接页")
|
||
return _dedupe_strings(offers)[:4]
|
||
|
||
def _build_video_analysis_fallback(account_payload: dict[str, Any], video: dict[str, Any]) -> dict[str, Any]:
|
||
score = video["score"]
|
||
tags = video.get("tags") or []
|
||
keywords = _extract_keywords(video.get("title", ""), video.get("description", ""))
|
||
hook_patterns: list[str] = []
|
||
title = video.get("title", "")
|
||
if any(token in title for token in ("怎么", "如何", "为什么")):
|
||
hook_patterns.append("问题解决型开场,先给结果再给方法")
|
||
if any(token in title for token in ("坑", "误区", "别", "不要")):
|
||
hook_patterns.append("避坑警示型开场,容易拉停留和评论")
|
||
if re.search(r"\d", title):
|
||
hook_patterns.append("数字型表达能快速建立信息密度和预期")
|
||
if not hook_patterns:
|
||
hook_patterns.append("强结论或冲突判断先出,适合 3 秒内抢注意力")
|
||
|
||
structure_patterns = [
|
||
"开头给结论或反常识观点,中段拆 2-3 个要点,结尾给执行动作",
|
||
"用具体场景或常见错误承接,降低理解门槛",
|
||
"把方法论压缩成可收藏的清单,利于后续转化"
|
||
]
|
||
commercial_judgement = (
|
||
"这条内容适合做高意向线索承接,优先放在咨询、训练营或模板产品前链路。"
|
||
if score["commercial_score"] >= 70
|
||
else "这条内容更适合作为流量内容,用来放大覆盖,再通过评论区和私信承接。"
|
||
)
|
||
operator_actions = [
|
||
"把标题里的核心钩子沉淀成 3-5 个固定开场模板,持续复用",
|
||
"在评论区补一个可执行清单,测试评论区转化和私信承接",
|
||
"围绕同主题连续发 3 条变体,验证题材是否可规模化"
|
||
]
|
||
if score["collect_rate"] >= 0.008:
|
||
operator_actions.append("把这条内容延展成可下载资料或收藏型产品,提高转化效率")
|
||
if score["share_rate"] >= 0.01:
|
||
operator_actions.append("把传播点提炼成系列化选题,优先投放同类话题")
|
||
|
||
return {
|
||
"headline_summary": f"《{_compact_text(title, 30)}》属于高可复制内容,核心价值在于{score['signals'][0]}。",
|
||
"hook_breakdown": hook_patterns[:3],
|
||
"structure_breakdown": structure_patterns,
|
||
"commercial_angle": {
|
||
"score": score["commercial_score"],
|
||
"judgement": commercial_judgement,
|
||
"suitable_for": _infer_offer_directions(account_payload.get("keywords", []))[:3]
|
||
},
|
||
"replication_plan": [
|
||
f"围绕 {tags[0] if tags else '当前主题'} 再做 3 条不同人群切口",
|
||
"保持同类开头结构,但替换成更具体的场景和结果承诺",
|
||
"在结尾加入明确的下一步动作,承接评论、私信或表单"
|
||
],
|
||
"operator_actions": _dedupe_strings(operator_actions)[:5],
|
||
"risk_notes": [
|
||
"如果后续复刻只保留题材、不保留强钩子,数据会明显回落",
|
||
"如果评论区没有承接动作,商业化价值会停留在播放层"
|
||
],
|
||
"scores": {
|
||
"hook": min(100.0, round(score["performance_score"] * 0.92 + 4, 2)),
|
||
"retention": min(100.0, round(score["performance_score"] * 0.88 + 6, 2)),
|
||
"conversion": min(100.0, round(score["commercial_score"] * 0.93 + 3, 2)),
|
||
"commercial": score["commercial_score"]
|
||
},
|
||
"raw_keywords": keywords[:8]
|
||
}
|
||
|
||
def _build_account_analysis_fallback(
|
||
target_payload: dict[str, Any],
|
||
benchmark_payloads: list[dict[str, Any]],
|
||
analysis_context: dict[str, Any]
|
||
) -> dict[str, Any]:
|
||
video_workspace = analysis_context.get("video_workspace", {})
|
||
top_videos = video_workspace.get("top_performing_videos", [])
|
||
latest_videos = video_workspace.get("latest_videos", [])
|
||
keywords = _dedupe_strings(
|
||
list(target_payload.get("keywords", []))
|
||
+ list(target_payload.get("tags", []))
|
||
+ list(target_payload.get("video_summary", {}).get("top_tags", []))
|
||
)
|
||
avg_top_score = round(
|
||
sum(float(item.get("score", {}).get("performance_score") or 0) for item in top_videos) / max(len(top_videos), 1),
|
||
2
|
||
)
|
||
avg_latest_score = round(
|
||
sum(float(item.get("score", {}).get("performance_score") or 0) for item in latest_videos) / max(len(latest_videos), 1),
|
||
2
|
||
)
|
||
monetization_score = round(
|
||
min(
|
||
100.0,
|
||
avg_top_score * 0.55
|
||
+ float(target_payload.get("video_summary", {}).get("avg_share") or 0) / 120.0
|
||
+ float(target_payload.get("video_summary", {}).get("avg_comment") or 0) / 80.0
|
||
),
|
||
2
|
||
)
|
||
|
||
audience = "想提升短视频获客效率、内容转化和账号定位的创业者与内容运营者"
|
||
if {"教育", "教育规划"} & {item.lower() for item in keywords}:
|
||
audience = "关注教育规划、升学决策和信息差机会的人群"
|
||
core_promise = (
|
||
f"用 {_compact_text(target_payload.get('video_summary', {}).get('top_tags', ['内容方法'])[0], 10)} 相关主题,"
|
||
"快速给用户一个能立刻套用的内容方法或判断。"
|
||
)
|
||
hook_patterns = []
|
||
titles = [item.get("title", "") for item in top_videos[:5]]
|
||
if any(re.search(r"\d", title) for title in titles):
|
||
hook_patterns.append("数字型开头,直接降低理解成本")
|
||
if any(any(token in title for token in ("怎么", "如何", "为什么")) for title in titles):
|
||
hook_patterns.append("问题解决型开头,先抛问题再给答案")
|
||
if any(any(token in title for token in ("坑", "误区", "别", "不要")) for title in titles):
|
||
hook_patterns.append("避坑警示型开头,容易拉停留和讨论")
|
||
hook_patterns = _dedupe_strings(hook_patterns + ["强结论先行,适合 3 秒内抢注意力"])[:4]
|
||
|
||
winning_patterns = []
|
||
for video in top_videos[:4]:
|
||
winning_patterns.append({
|
||
"video_title": video.get("title", ""),
|
||
"score": video.get("score", {}).get("performance_score", 0),
|
||
"why": "高分原因主要来自 " + "、".join(video.get("score", {}).get("signals", [])[:2]),
|
||
"replication_angle": "保留原题材与开头结构,再改写为更具体的人群场景和结果承诺"
|
||
})
|
||
|
||
latest_signal = []
|
||
for video in latest_videos[:4]:
|
||
signal = "近期内容仍在有效窗口期"
|
||
if float(video.get("score", {}).get("performance_score") or 0) + 8 < avg_top_score:
|
||
signal = "最近作品热度弱于历史高分样本,需要回到已验证题材"
|
||
latest_signal.append({
|
||
"video_title": video.get("title", ""),
|
||
"signal": signal,
|
||
"action": "优先做同题材复刻、加强开头结论和结尾承接动作"
|
||
})
|
||
|
||
benchmark_insights = [
|
||
f"对标账号 {payload.get('nickname', '未命名账号')} 可借鉴其 {', '.join(payload.get('video_summary', {}).get('top_tags', [])[:3]) or '选题聚焦'},但不要直接照搬口吻。"
|
||
for payload in benchmark_payloads[:3]
|
||
] or [
|
||
"当前可用对标账号较少,建议优先围绕高分作品题材扩充对标池。"
|
||
]
|
||
|
||
operational_gaps = [
|
||
"高分内容和最近内容之间如果存在明显分差,说明选题复盘还没有形成固定机制",
|
||
"如果收藏和评论信号强,但页面承接动作弱,商业化效率会被浪费",
|
||
"账号标签较散时,用户对你卖什么、能解决什么问题的认知会不够集中"
|
||
]
|
||
if avg_latest_score >= avg_top_score - 5:
|
||
operational_gaps[0] = "最近内容与高分内容差距不大,可以开始标准化选题库和周更节奏"
|
||
|
||
return {
|
||
"executive_summary": (
|
||
f"这个账号当前最值得放大的内容方向是 {', '.join(target_payload.get('video_summary', {}).get('top_tags', [])[:3]) or '已验证高分题材'}。"
|
||
f"高分作品平均得分 {avg_top_score},最近作品平均得分 {avg_latest_score},"
|
||
"已经具备做商业化内容矩阵和固定转化链路的基础。"
|
||
),
|
||
"commercial_positioning": {
|
||
"audience": audience,
|
||
"core_promise": core_promise,
|
||
"monetization_readiness_score": monetization_score,
|
||
"offer_directions": _infer_offer_directions(keywords)
|
||
},
|
||
"content_engine": {
|
||
"pillars": target_payload.get("video_summary", {}).get("top_tags", [])[:6],
|
||
"hook_patterns": hook_patterns,
|
||
"structure_patterns": [
|
||
"开头先给结论或冲突点,中段拆 2-3 个关键动作,结尾给明确下一步",
|
||
"围绕用户熟悉的问题场景切入,降低完播门槛",
|
||
"把方法论做成可收藏的清单,提高后续转化机会"
|
||
],
|
||
"cta_patterns": [
|
||
"高收藏内容结尾引导先收藏再执行",
|
||
"高评论内容结尾抛反问,引导评论区互动",
|
||
"高分享内容结尾补一句适合谁转发给谁,放大自然传播"
|
||
]
|
||
},
|
||
"winning_patterns": winning_patterns,
|
||
"latest_signal": latest_signal,
|
||
"benchmark_insights": benchmark_insights,
|
||
"monetization_plan": [
|
||
"把高分题材拆成免费内容、低门槛产品和咨询服务三级承接",
|
||
"优先围绕高收藏内容制作模板、清单或训练营资料",
|
||
"让评论区和私信都指向同一个明确转化动作,避免流量浪费"
|
||
],
|
||
"operational_gaps": operational_gaps,
|
||
"next_30_day_actions": [
|
||
"每周固定复刻 2 条高分题材,再测试 1 条新角度",
|
||
"给高分作品统一补评论区承接话术和私信关键词",
|
||
"每周复盘高分榜和最新榜,保留题材、更新场景和切口",
|
||
"把表现最稳的 3 条内容做成系列化栏目"
|
||
],
|
||
"risk_watchlist": [
|
||
"题材过多会稀释账号定位,影响成交效率",
|
||
"如果只追求播放而不设计承接动作,商业化会停留在表层流量",
|
||
"复制高分标题但不复制结构和场景,容易出现数据回落"
|
||
]
|
||
}
|
||
|
||
async def _run_top_video_analyses(
|
||
account_row: dict[str, Any],
|
||
owner: dict[str, Any],
|
||
profile: dict[str, Any],
|
||
*,
|
||
top_video_count: int = 6,
|
||
min_score: float = 45.0,
|
||
report_id: str = "",
|
||
source_type: str = "top_score_auto",
|
||
temperature: float = 0.25
|
||
) -> list[dict[str, Any]]:
|
||
video_workspace = _build_video_workspace_payload(account_row, limit=max(top_video_count * 3, 24))
|
||
item_map = {item["id"]: item for item in video_workspace["items"]}
|
||
ranked_videos = [
|
||
item_map[video_id]
|
||
for video_id in video_workspace["top_scored_video_ids"]
|
||
if video_id in item_map and float(item_map[video_id]["score"]["performance_score"]) >= float(min_score)
|
||
][: max(1, min(top_video_count, 12))]
|
||
|
||
if not ranked_videos:
|
||
return []
|
||
|
||
account_payload = _build_account_payload(account_row, include_recent_videos=8)
|
||
system_prompt = (
|
||
"你是商业化短视频拆解顾问。你要针对单条作品给出可用于商业化运营的复盘。"
|
||
"请返回严格 JSON 对象,字段必须包含:headline_summary、hook_breakdown、"
|
||
"structure_breakdown、commercial_angle、replication_plan、operator_actions、"
|
||
"risk_notes、scores。scores 里必须包含 hook、retention、conversion、commercial,范围 0-100。"
|
||
)
|
||
|
||
async def _analyze_video(video: dict[str, Any]) -> dict[str, Any]:
|
||
prompt_context = {
|
||
"account": {
|
||
"id": account_payload["id"],
|
||
"nickname": account_payload["nickname"],
|
||
"signature": account_payload["signature"],
|
||
"tags": account_payload["tags"][:12]
|
||
},
|
||
"video": {
|
||
"id": video["id"],
|
||
"aweme_id": video["aweme_id"],
|
||
"title": video["title"],
|
||
"description": video["description"],
|
||
"published_at": video["published_at"],
|
||
"tags": video["tags"],
|
||
"stats": video["stats"],
|
||
"score": video["score"]
|
||
}
|
||
}
|
||
user_prompt = (
|
||
"请从商业化运营视角拆解这条作品。重点回答:为什么这条作品值得关注、"
|
||
"适合承接什么产品/服务、下一步怎么复刻、运营动作怎么排。"
|
||
f"\n\n输入上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
|
||
)
|
||
try:
|
||
output = await legacy.call_model(
|
||
profile,
|
||
system_prompt=system_prompt,
|
||
user_prompt=user_prompt,
|
||
temperature=temperature
|
||
)
|
||
parsed = _try_parse_agent_json(output)
|
||
status = "ok"
|
||
except Exception as exc:
|
||
output = str(exc)
|
||
parsed = {}
|
||
status = "error"
|
||
|
||
score = video["score"]
|
||
if not isinstance(parsed, dict):
|
||
parsed = {}
|
||
parsed = _merge_structured_payload(_build_video_analysis_fallback(account_payload, video), parsed)
|
||
parsed_scores = parsed.get("scores", {}) if isinstance(parsed, dict) else {}
|
||
analysis_id = make_id("dyva")
|
||
summary_text = _first_non_empty(
|
||
(parsed.get("headline_summary") if isinstance(parsed, dict) else ""),
|
||
(parsed.get("summary") if isinstance(parsed, dict) else ""),
|
||
(parsed.get("commercial_angle", {}) or {}).get("judgement") if isinstance(parsed, dict) else "",
|
||
output
|
||
)
|
||
hook_score = _bounded_score(parsed_scores.get("hook"), fallback=score["performance_score"])
|
||
retention_score = _bounded_score(parsed_scores.get("retention"), fallback=score["performance_score"])
|
||
conversion_score = _bounded_score(parsed_scores.get("conversion"), fallback=score["commercial_score"])
|
||
commercial_score = _bounded_score(parsed_scores.get("commercial"), fallback=score["commercial_score"])
|
||
created_at = now()
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_video_analyses (
|
||
id, account_id, user_id, video_id, report_id, model_profile_id, model_label,
|
||
source_type, status, performance_score, commercial_score, hook_score,
|
||
retention_score, conversion_score, summary_text, suggestion_text,
|
||
parsed_json, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
analysis_id,
|
||
account_row["id"],
|
||
owner["id"],
|
||
video["id"],
|
||
report_id,
|
||
profile["id"],
|
||
_build_model_label(profile),
|
||
source_type,
|
||
status,
|
||
score["performance_score"],
|
||
commercial_score,
|
||
hook_score,
|
||
retention_score,
|
||
conversion_score,
|
||
_compact_text(summary_text, 240),
|
||
output if output.strip() else _safe_json_dumps(parsed),
|
||
_safe_json_dumps(parsed),
|
||
created_at,
|
||
created_at
|
||
)
|
||
)
|
||
return {
|
||
"id": analysis_id,
|
||
"video_id": video["id"],
|
||
"video_title": video["title"],
|
||
"status": status,
|
||
"summary_text": _compact_text(summary_text, 240),
|
||
"parsed_json": parsed,
|
||
"performance_score": score["performance_score"],
|
||
"commercial_score": commercial_score,
|
||
"hook_score": hook_score,
|
||
"retention_score": retention_score,
|
||
"conversion_score": conversion_score,
|
||
"created_at": created_at
|
||
}
|
||
|
||
return await asyncio.gather(*[_analyze_video(video) for video in ranked_videos])
|
||
|
||
async def _run_account_analysis(
|
||
account_row: dict[str, Any],
|
||
owner: dict[str, Any],
|
||
request: DouyinAccountAnalysisRequest
|
||
) -> dict[str, Any]:
|
||
target_payload = _build_account_payload(account_row, include_recent_videos=max(4, min(request.max_videos, 12)))
|
||
video_workspace = _build_video_workspace_payload(account_row, limit=max(request.max_videos * 3, 24))
|
||
video_context = _build_video_context_items(
|
||
video_workspace,
|
||
max_top_items=min(max(request.max_videos, 4), 8),
|
||
max_latest_items=min(max(request.max_videos, 4), 8)
|
||
)
|
||
linked_rows = _list_linked_accounts(account_row)
|
||
linked_account_ids = list(request.linked_account_ids)
|
||
if request.include_linked_accounts:
|
||
linked_account_ids.extend(
|
||
row["target_account_id"] for row in linked_rows if row.get("target_account_id")
|
||
)
|
||
linked_account_ids = _dedupe_strings(linked_account_ids)
|
||
benchmark_payloads: list[dict[str, Any]] = []
|
||
for linked_account_id in linked_account_ids:
|
||
linked_row = _require_owned_account(linked_account_id, owner["id"])
|
||
benchmark_payloads.append(_build_account_payload(linked_row, include_recent_videos=6))
|
||
|
||
if request.include_recent_similar_candidates and not benchmark_payloads:
|
||
latest_search = legacy.db.fetch_one(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_similarity_searches
|
||
WHERE source_account_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
""",
|
||
(account_row["id"],)
|
||
)
|
||
if latest_search:
|
||
candidate_rows = legacy.db.fetch_all(
|
||
"""
|
||
SELECT cand.*, acct.user_id AS account_user_id
|
||
FROM douyin_similarity_candidates cand
|
||
LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
|
||
WHERE cand.search_id = ?
|
||
ORDER BY cand.rank_index ASC
|
||
LIMIT 3
|
||
""",
|
||
(latest_search["id"],)
|
||
)
|
||
for candidate_row in candidate_rows:
|
||
candidate_account_id = candidate_row.get("candidate_account_id")
|
||
if not candidate_account_id:
|
||
continue
|
||
linked_candidate = _require_owned_account(candidate_account_id, owner["id"])
|
||
benchmark_payloads.append(_build_account_payload(linked_candidate, include_recent_videos=6))
|
||
|
||
profiles = _resolve_model_profiles(owner, request.model_profile_ids)
|
||
system_prompt = (
|
||
"你是资深抖音商业化增长顾问。你会基于账号画像、创作者中心字段、作品表现、"
|
||
"高分作品样本、最近更新信号和对标账号内容,给出可直接指导运营和商业化的结论。"
|
||
"请始终返回严格 JSON 对象,包含这些字段:executive_summary、commercial_positioning、"
|
||
"content_engine、winning_patterns、latest_signal、benchmark_insights、"
|
||
"monetization_plan、operational_gaps、next_30_day_actions、risk_watchlist。"
|
||
"commercial_positioning 必须是对象,至少包含 audience、core_promise、monetization_readiness_score、"
|
||
"offer_directions。content_engine 必须包含 pillars、hook_patterns、structure_patterns、cta_patterns。"
|
||
"winning_patterns、latest_signal、benchmark_insights、offer_directions、next_30_day_actions、"
|
||
"risk_watchlist 每个字段请给 3-6 条中文建议。"
|
||
)
|
||
analysis_context = {
|
||
"target_account": target_payload,
|
||
"benchmark_accounts": benchmark_payloads[:6],
|
||
"focus": request.extra_focus,
|
||
"video_workspace": {
|
||
"high_score_threshold": video_workspace["high_score_threshold"],
|
||
"meta": video_workspace["meta"],
|
||
**video_context
|
||
},
|
||
"creator_center_snapshot_summary": _safe_json_loads(
|
||
(legacy.db.fetch_one(
|
||
"""
|
||
SELECT summary_json
|
||
FROM douyin_account_snapshots
|
||
WHERE account_id = ? AND snapshot_type = 'creator_center'
|
||
ORDER BY collected_at DESC
|
||
LIMIT 1
|
||
""",
|
||
(account_row["id"],)
|
||
) or {}).get("summary_json"),
|
||
{}
|
||
)
|
||
}
|
||
user_prompt = (
|
||
"请从商业化运营视角分析以下抖音账号。除了账号定位和内容打法,"
|
||
"还要明确给出:什么内容最值得继续放大、什么内容已经过时、"
|
||
"适合承接什么类型的产品/服务、未来 30 天运营动作如何排优先级。"
|
||
"如果提供了对标账号,要重点指出可借鉴但不应直接照搬的部分。"
|
||
f"\n\n输入上下文:\n{json.dumps(analysis_context, ensure_ascii=False, indent=2)}"
|
||
)
|
||
|
||
report_id = make_id("dyreport")
|
||
created_at = now()
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_analysis_reports (
|
||
id, account_id, user_id, focus_text, model_profile_ids_json,
|
||
linked_account_ids_json, prompt_text, context_json, created_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
report_id,
|
||
account_row["id"],
|
||
owner["id"],
|
||
request.extra_focus,
|
||
_safe_json_dumps([profile["id"] for profile in profiles]),
|
||
_safe_json_dumps(linked_account_ids),
|
||
user_prompt,
|
||
_safe_json_dumps(analysis_context),
|
||
created_at
|
||
)
|
||
)
|
||
|
||
async def _analyze_with_model(profile: dict[str, Any]) -> dict[str, Any]:
|
||
try:
|
||
output = await legacy.call_model(
|
||
profile,
|
||
system_prompt=system_prompt,
|
||
user_prompt=user_prompt,
|
||
temperature=request.temperature
|
||
)
|
||
parsed = _try_parse_agent_json(output)
|
||
status = "ok"
|
||
except Exception as exc:
|
||
output = str(exc)
|
||
parsed = {}
|
||
status = "error"
|
||
if not isinstance(parsed, dict):
|
||
parsed = {}
|
||
parsed = _merge_structured_payload(
|
||
_build_account_analysis_fallback(target_payload, benchmark_payloads, analysis_context),
|
||
parsed
|
||
)
|
||
suggestion_id = make_id("dysady")
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_analysis_suggestions (
|
||
id, report_id, model_profile_id, model_label, status,
|
||
suggestion_text, parsed_json, created_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
suggestion_id,
|
||
report_id,
|
||
profile["id"],
|
||
_build_model_label(profile),
|
||
status,
|
||
output if output.strip() else _safe_json_dumps(parsed),
|
||
_safe_json_dumps(parsed),
|
||
now()
|
||
)
|
||
)
|
||
return {
|
||
"id": suggestion_id,
|
||
"model_profile_id": profile["id"],
|
||
"model_label": _build_model_label(profile),
|
||
"status": status,
|
||
"suggestion_text": output,
|
||
"parsed_json": parsed
|
||
}
|
||
|
||
suggestions = await asyncio.gather(*[_analyze_with_model(profile) for profile in profiles])
|
||
duplicate_report = _find_duplicate_report_payload(
|
||
account_row["id"],
|
||
request.extra_focus,
|
||
suggestions,
|
||
exclude_report_id=report_id
|
||
)
|
||
if duplicate_report:
|
||
_delete_report(report_id)
|
||
return {
|
||
"report_id": duplicate_report["id"],
|
||
"created_at": duplicate_report["created_at"],
|
||
"context": analysis_context,
|
||
"suggestions": duplicate_report["suggestions"],
|
||
"auto_video_analyses": [],
|
||
"duplicate_of_report_id": duplicate_report["id"],
|
||
"duplicate_count": duplicate_report.get("duplicate_count", 1)
|
||
}
|
||
auto_video_analyses: list[dict[str, Any]] = []
|
||
if request.auto_analyze_top_videos and profiles:
|
||
auto_video_analyses = await _run_top_video_analyses(
|
||
account_row,
|
||
owner,
|
||
profiles[0],
|
||
top_video_count=request.top_video_analysis_count,
|
||
min_score=45.0,
|
||
report_id=report_id,
|
||
source_type="account_analysis_auto",
|
||
temperature=min(request.temperature, 0.3)
|
||
)
|
||
legacy.db.execute(
|
||
"UPDATE douyin_accounts SET last_analysis_at = ?, updated_at = ? WHERE id = ?",
|
||
(now(), now(), account_row["id"])
|
||
)
|
||
return {
|
||
"report_id": report_id,
|
||
"created_at": created_at,
|
||
"context": analysis_context,
|
||
"suggestions": suggestions,
|
||
"auto_video_analyses": auto_video_analyses
|
||
}
|
||
|
||
async def _prepare_similarity_source(
|
||
owner: dict[str, Any],
|
||
request: DouyinSimilarSearchRequest
|
||
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
|
||
if request.source_account_id:
|
||
account_row = _require_owned_account(request.source_account_id, owner["id"])
|
||
return account_row, _build_account_payload(account_row)
|
||
|
||
if not (request.profile_url or "").strip():
|
||
raise HTTPException(status_code=400, detail="source_account_id or profile_url is required")
|
||
|
||
public_data = await _collect_public_profile(request.profile_url or "", None)
|
||
if not public_data["profile"].get("nickname") and not public_data["videos"]:
|
||
raise HTTPException(status_code=400, detail="Unable to parse the shared Douyin profile page")
|
||
payload = {
|
||
"id": "",
|
||
"nickname": public_data["profile"].get("nickname", ""),
|
||
"signature": public_data["profile"].get("signature", ""),
|
||
"profile_url": public_data["profile"].get("canonical_profile_url", "") or request.profile_url,
|
||
"avatar_url": public_data["profile"].get("avatar_url", ""),
|
||
"sec_uid": public_data["profile"].get("sec_uid", ""),
|
||
"douyin_id": public_data["profile"].get("douyin_id", ""),
|
||
"profile_stats": public_data["profile"].get("stats", {}),
|
||
"tags": public_data["profile"].get("tags", []),
|
||
"video_summary": _summarize_videos(public_data["videos"], limit=6)
|
||
}
|
||
payload["keywords"] = _dedupe_strings(
|
||
payload["tags"] + _extract_keywords(payload["nickname"], payload["signature"])
|
||
+ payload["video_summary"]["top_tags"]
|
||
+ [video["title"] for video in payload["video_summary"]["videos"]]
|
||
)
|
||
return None, payload
|
||
|
||
async def _fetch_or_create_candidate(owner: dict[str, Any], profile_url: str) -> dict[str, Any] | None:
|
||
existing = legacy.db.fetch_one(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_accounts
|
||
WHERE user_id = ? AND (canonical_profile_url = ? OR profile_url = ?)
|
||
LIMIT 1
|
||
""",
|
||
(owner["id"], profile_url, profile_url)
|
||
)
|
||
if existing:
|
||
return existing
|
||
|
||
public_data = await _collect_public_profile(profile_url, None)
|
||
profile = public_data["profile"]
|
||
if not (profile.get("nickname") or public_data["videos"]):
|
||
return None
|
||
sync_request = DouyinAccountSyncRequest(
|
||
profile_url=profile_url,
|
||
manual_work_payloads=[video["raw"] for video in public_data["videos"]]
|
||
)
|
||
account_row = _upsert_account(owner, profile, sync_request, public_data, {"pages": [], "errors": []})
|
||
return account_row
|
||
|
||
async def _run_similarity_search(owner: dict[str, Any], request: DouyinSimilarSearchRequest) -> dict[str, Any]:
|
||
source_account_row, source_payload = await _prepare_similarity_source(owner, request)
|
||
profile = legacy.model_profile_for_account(owner["id"], request.model_profile_id)
|
||
existing_accounts = legacy.db.fetch_all(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_accounts
|
||
WHERE user_id = ?
|
||
ORDER BY updated_at DESC
|
||
""",
|
||
(owner["id"],)
|
||
)
|
||
|
||
candidate_rows: list[dict[str, Any]] = []
|
||
seen_urls: set[str] = set()
|
||
source_id = source_account_row["id"] if source_account_row else ""
|
||
for row in existing_accounts:
|
||
if row["id"] == source_id:
|
||
continue
|
||
candidate_rows.append(row)
|
||
seen_urls.add(row["canonical_profile_url"] or row["profile_url"])
|
||
|
||
if request.seed_linked_accounts and source_account_row:
|
||
for linked in _list_linked_accounts(source_account_row):
|
||
candidate_url = linked.get("target_profile_url", "")
|
||
if not candidate_url or candidate_url in seen_urls:
|
||
continue
|
||
seen_urls.add(candidate_url)
|
||
if linked.get("target_account_id"):
|
||
candidate_rows.append(_require_owned_account(linked["target_account_id"], owner["id"]))
|
||
|
||
candidate_urls = _dedupe_strings(request.candidate_urls)
|
||
if request.search_public_pages:
|
||
discovered = await _discover_profile_urls_from_search(source_payload.get("keywords", []), limit=6)
|
||
candidate_urls.extend(discovered)
|
||
candidate_urls = _dedupe_strings(candidate_urls)
|
||
|
||
for candidate_url in candidate_urls:
|
||
if candidate_url in seen_urls or candidate_url == source_payload.get("profile_url"):
|
||
continue
|
||
candidate_row = await _fetch_or_create_candidate(owner, candidate_url)
|
||
if candidate_row:
|
||
candidate_rows.append(candidate_row)
|
||
seen_urls.add(candidate_url)
|
||
|
||
candidate_payloads: list[dict[str, Any]] = []
|
||
seen_account_ids: set[str] = set()
|
||
for row in candidate_rows:
|
||
if row["id"] in seen_account_ids:
|
||
continue
|
||
seen_account_ids.add(row["id"])
|
||
payload = _build_account_payload(row, include_recent_videos=6)
|
||
payload["heuristics"] = _heuristic_similarity(source_payload, payload)
|
||
candidate_payloads.append(payload)
|
||
|
||
candidate_payloads.sort(key=lambda item: item["heuristics"]["heuristic_score"], reverse=True)
|
||
candidate_payloads = candidate_payloads[: max(3, request.max_candidates)]
|
||
|
||
search_id = make_id("dysearch")
|
||
prompt_context = {
|
||
"source_account": source_payload,
|
||
"candidate_accounts": candidate_payloads,
|
||
"extra_requirements": request.extra_requirements
|
||
}
|
||
prompt = (
|
||
"请从候选账号中筛选与目标账号内容风格、题材、受众和互动逻辑最相似,且整体质量更高的账号。"
|
||
"请返回 JSON 数组,每项包含 candidate_account_id、candidate_profile_url、score、"
|
||
"rationale、similar_dimensions、optimization_value。score 范围 0-100。"
|
||
f"\n\n上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
|
||
)
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_similarity_searches (
|
||
id, user_id, source_account_id, source_profile_url, keywords_json,
|
||
prompt_text, context_json, created_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
search_id,
|
||
owner["id"],
|
||
source_account_row["id"] if source_account_row else None,
|
||
source_payload.get("profile_url", ""),
|
||
_safe_json_dumps(source_payload.get("keywords", [])),
|
||
prompt,
|
||
_safe_json_dumps(prompt_context),
|
||
now()
|
||
)
|
||
)
|
||
|
||
if not candidate_payloads:
|
||
return {
|
||
"search_id": search_id,
|
||
"source_account": source_payload,
|
||
"model_profile": {
|
||
"id": profile["id"],
|
||
"label": _build_model_label(profile)
|
||
},
|
||
"raw_model_output": "No candidate accounts available. Sync more Douyin accounts or provide candidate_urls.",
|
||
"candidates": []
|
||
}
|
||
|
||
system_prompt = (
|
||
"你是抖音相似账号发现专家。你要根据内容主题、标签、风格、更新频率、互动表现和商业化潜力,"
|
||
"挑选最值得对标的账号。返回严格 JSON 数组。"
|
||
)
|
||
try:
|
||
output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=0.2)
|
||
parsed = _try_parse_agent_json(output)
|
||
except Exception as exc:
|
||
output = str(exc)
|
||
parsed = []
|
||
|
||
candidate_map = {
|
||
payload["id"]: payload for payload in candidate_payloads if payload["id"]
|
||
}
|
||
if isinstance(parsed, dict):
|
||
parsed = parsed.get("items") or parsed.get("candidates") or []
|
||
|
||
saved_candidates: list[dict[str, Any]] = []
|
||
if not isinstance(parsed, list) or not parsed:
|
||
parsed = [
|
||
{
|
||
"candidate_account_id": payload["id"],
|
||
"candidate_profile_url": payload["profile_url"],
|
||
"score": payload["heuristics"]["heuristic_score"],
|
||
"rationale": "Fallback to heuristic similarity because model output was unavailable or unparsable.",
|
||
"similar_dimensions": [
|
||
{
|
||
"topic_overlap": payload["heuristics"]["topic_overlap"],
|
||
"tag_overlap": payload["heuristics"]["tag_overlap"],
|
||
"quality_score": payload["heuristics"]["quality_score"]
|
||
}
|
||
],
|
||
"optimization_value": "可作为候选对标账号进一步人工确认。"
|
||
}
|
||
for payload in candidate_payloads
|
||
]
|
||
|
||
for index, item in enumerate(parsed, start=1):
|
||
candidate_account_id = _first_non_empty(item.get("candidate_account_id"))
|
||
candidate_profile_url = _first_non_empty(item.get("candidate_profile_url"))
|
||
payload = candidate_map.get(candidate_account_id)
|
||
if not payload:
|
||
payload = next(
|
||
(candidate for candidate in candidate_payloads if candidate["profile_url"] == candidate_profile_url),
|
||
None
|
||
)
|
||
candidate_id = make_id("dycand")
|
||
heuristic_score = payload["heuristics"]["heuristic_score"] if payload else 0
|
||
score = _parse_count(item.get("score"))
|
||
rationale = _first_non_empty(item.get("rationale"), item.get("reason"), item.get("summary"))
|
||
dimensions = item.get("similar_dimensions") or item.get("dimensions") or {}
|
||
raw_output = {
|
||
"model_output": item,
|
||
"candidate_payload": payload or {}
|
||
}
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_similarity_candidates (
|
||
id, search_id, candidate_account_id, candidate_profile_url, heuristic_score,
|
||
agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
candidate_id,
|
||
search_id,
|
||
payload["id"] if payload else candidate_account_id or None,
|
||
payload["profile_url"] if payload else candidate_profile_url,
|
||
heuristic_score,
|
||
score,
|
||
rationale,
|
||
_safe_json_dumps(dimensions),
|
||
_safe_json_dumps(raw_output),
|
||
index,
|
||
now()
|
||
)
|
||
)
|
||
saved_candidates.append({
|
||
"id": candidate_id,
|
||
"candidate_account_id": payload["id"] if payload else candidate_account_id,
|
||
"candidate_profile_url": payload["profile_url"] if payload else candidate_profile_url,
|
||
"candidate_nickname": payload["nickname"] if payload else "",
|
||
"heuristic_score": heuristic_score,
|
||
"agent_score": score,
|
||
"rationale_text": rationale,
|
||
"dimensions": dimensions,
|
||
"rank_index": index
|
||
})
|
||
|
||
return {
|
||
"search_id": search_id,
|
||
"source_account": source_payload,
|
||
"model_profile": {
|
||
"id": profile["id"],
|
||
"label": _build_model_label(profile)
|
||
},
|
||
"raw_model_output": output,
|
||
"candidates": saved_candidates
|
||
}
|
||
|
||
@app.get("/v2/douyin/accounts")
|
||
def list_douyin_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
|
||
rows = legacy.db.fetch_all(
|
||
"""
|
||
SELECT *
|
||
FROM douyin_accounts
|
||
WHERE user_id = ?
|
||
ORDER BY updated_at DESC
|
||
""",
|
||
(account["id"],)
|
||
)
|
||
return [_build_account_payload(row) for row in rows]
|
||
|
||
@app.post("/v2/douyin/accounts/sync")
|
||
async def sync_douyin_account(
|
||
request: DouyinAccountSyncRequest,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
if (
|
||
not request.profile_url.strip()
|
||
and not request.manual_profile_payload
|
||
and not request.manual_creator_pages
|
||
):
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="profile_url、manual_profile_payload 或 manual_creator_pages 至少需要传一个"
|
||
)
|
||
public_data = await _collect_public_profile(request.profile_url, request.manual_profile_payload)
|
||
creator_data = await _collect_creator_center_pages(
|
||
request.creator_center_urls,
|
||
request.session_cookie,
|
||
request.manual_creator_pages
|
||
)
|
||
return await run_in_threadpool(
|
||
_finalize_sync_workspace,
|
||
account,
|
||
request,
|
||
public_data,
|
||
creator_data
|
||
)
|
||
|
||
@app.get("/v2/douyin/accounts/{account_id}")
|
||
def get_douyin_account(
|
||
account_id: str,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
return _build_workspace_payload(account_row)
|
||
|
||
@app.get("/v2/douyin/accounts/{account_id}/snapshots")
|
||
def list_douyin_account_snapshots(
|
||
account_id: str,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> list[dict[str, Any]]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
return _list_snapshots(account_row["id"])
|
||
|
||
@app.get("/v2/douyin/accounts/{account_id}/snapshots/{snapshot_id}")
|
||
def get_douyin_account_snapshot(
|
||
account_id: str,
|
||
snapshot_id: str,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
return _get_snapshot_detail(snapshot_id, account_row["id"])
|
||
|
||
@app.get("/v2/douyin/accounts/{account_id}/creator-fields")
|
||
def get_douyin_creator_fields(
|
||
account_id: str,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
latest_creator_snapshot = legacy.db.fetch_one(
|
||
"""
|
||
SELECT id
|
||
FROM douyin_account_snapshots
|
||
WHERE account_id = ? AND snapshot_type = 'creator_center'
|
||
ORDER BY collected_at DESC
|
||
LIMIT 1
|
||
""",
|
||
(account_row["id"],)
|
||
)
|
||
if not latest_creator_snapshot:
|
||
raise HTTPException(status_code=404, detail="No creator-center snapshot found")
|
||
return _get_snapshot_detail(latest_creator_snapshot["id"], account_row["id"])
|
||
|
||
@app.get("/v2/douyin/accounts/{account_id}/workspace")
|
||
def get_douyin_account_workspace(
|
||
account_id: str,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
return _build_workspace_payload(account_row)
|
||
|
||
@app.get("/v2/douyin/accounts/{account_id}/videos")
|
||
def list_douyin_account_videos(
|
||
account_id: str,
|
||
limit: int = 200,
|
||
sort_by: str = "score",
|
||
scope: str = "all",
|
||
content_type: str = "all",
|
||
q: str = "",
|
||
tag: str = "",
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
workspace = _build_video_workspace_payload(account_row, limit=max(limit, 24))
|
||
items = list(workspace["items"])
|
||
item_map = {item["id"]: item for item in items}
|
||
|
||
normalized_scope = (scope or "all").strip().lower()
|
||
if normalized_scope == "top":
|
||
items = [item_map[video_id] for video_id in workspace["top_scored_video_ids"] if video_id in item_map]
|
||
elif normalized_scope == "latest":
|
||
items = [item_map[video_id] for video_id in workspace["latest_video_ids"] if video_id in item_map]
|
||
|
||
normalized_content_type = (content_type or "all").strip().lower()
|
||
if normalized_content_type in {"video", "image_text"}:
|
||
items = [
|
||
item for item in items
|
||
if str(item.get("content_type") or "video").strip().lower() == normalized_content_type
|
||
]
|
||
|
||
query_text = (q or "").strip().lower()
|
||
if query_text:
|
||
items = [
|
||
item for item in items
|
||
if query_text in " ".join(
|
||
[
|
||
item.get("title", ""),
|
||
item.get("description", ""),
|
||
item.get("aweme_id", ""),
|
||
*[str(tag_item) for tag_item in item.get("tags", [])]
|
||
]
|
||
).lower()
|
||
]
|
||
|
||
tag_text = (tag or "").strip().lower()
|
||
if tag_text:
|
||
items = [
|
||
item for item in items
|
||
if any(tag_text in str(tag_item).lower() for tag_item in item.get("tags", []))
|
||
]
|
||
|
||
normalized_sort = (sort_by or "score").strip().lower()
|
||
items.sort(key=lambda item: _video_sort_key(item, normalized_sort), reverse=True)
|
||
return {
|
||
"account_id": account_row["id"],
|
||
"sort_by": normalized_sort,
|
||
"scope": normalized_scope,
|
||
"content_type": normalized_content_type,
|
||
"query": q,
|
||
"tag": tag,
|
||
"high_score_threshold": workspace["high_score_threshold"],
|
||
"meta": workspace["meta"],
|
||
"top_scored_video_ids": workspace["top_scored_video_ids"],
|
||
"latest_video_ids": workspace["latest_video_ids"],
|
||
"items": items[: max(1, min(limit, 1000))]
|
||
}
|
||
|
||
@app.get("/v2/douyin/accounts/{account_id}/analysis-reports")
|
||
def list_douyin_analysis_reports(
|
||
account_id: str,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> list[dict[str, Any]]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
return _build_workspace_payload(account_row)["recent_reports"]
|
||
|
||
@app.post("/v2/douyin/accounts/{account_id}/analysis")
|
||
async def analyze_douyin_account(
|
||
account_id: str,
|
||
request: DouyinAccountAnalysisRequest,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
return await _run_account_analysis(account_row, account, request)
|
||
|
||
@app.post("/v2/douyin/accounts/{account_id}/videos/analyze-top")
|
||
async def analyze_douyin_top_videos(
|
||
account_id: str,
|
||
request: DouyinTopVideoAnalysisRequest,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
profile = legacy.model_profile_for_account(account["id"], request.model_profile_id)
|
||
results = await _run_top_video_analyses(
|
||
account_row,
|
||
account,
|
||
profile,
|
||
top_video_count=request.top_video_count,
|
||
min_score=request.min_score,
|
||
source_type="manual_top_video_refresh",
|
||
temperature=request.temperature
|
||
)
|
||
return {
|
||
"account_id": account_row["id"],
|
||
"model_profile_id": profile["id"],
|
||
"analyzed_count": len(results),
|
||
"items": results
|
||
}
|
||
|
||
@app.post("/v2/douyin/similar-searches")
|
||
async def create_douyin_similarity_search(
|
||
request: DouyinSimilarSearchRequest,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
return await _run_similarity_search(account, request)
|
||
|
||
@app.get("/v2/douyin/similar-searches/{search_id}")
|
||
def get_douyin_similarity_search(
|
||
search_id: str,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
search_row = legacy.db.fetch_one(
|
||
"SELECT * FROM douyin_similarity_searches WHERE id = ? AND user_id = ?",
|
||
(search_id, account["id"])
|
||
)
|
||
if not search_row:
|
||
raise HTTPException(status_code=404, detail="Similarity search not found")
|
||
candidates = legacy.db.fetch_all(
|
||
"""
|
||
SELECT cand.*, acct.nickname AS candidate_nickname
|
||
FROM douyin_similarity_candidates cand
|
||
LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
|
||
WHERE cand.search_id = ?
|
||
ORDER BY cand.rank_index ASC
|
||
""",
|
||
(search_id,)
|
||
)
|
||
return {
|
||
"id": search_row["id"],
|
||
"source_account_id": search_row["source_account_id"],
|
||
"source_profile_url": search_row["source_profile_url"],
|
||
"keywords": _safe_json_loads(search_row["keywords_json"], []),
|
||
"context": _safe_json_loads(search_row["context_json"], {}),
|
||
"created_at": search_row["created_at"],
|
||
"candidates": [
|
||
{
|
||
"id": row["id"],
|
||
"candidate_account_id": row["candidate_account_id"],
|
||
"candidate_profile_url": row["candidate_profile_url"],
|
||
"candidate_nickname": row.get("candidate_nickname", ""),
|
||
"heuristic_score": row["heuristic_score"],
|
||
"agent_score": row["agent_score"],
|
||
"rationale_text": row["rationale_text"],
|
||
"dimensions": _safe_json_loads(row["dimensions_json"], {}),
|
||
"rank_index": row["rank_index"]
|
||
}
|
||
for row in candidates
|
||
]
|
||
}
|
||
|
||
@app.get("/v2/douyin/accounts/{account_id}/benchmark-links")
|
||
def list_douyin_benchmark_links(
|
||
account_id: str,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> list[dict[str, Any]]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
return _list_linked_accounts(account_row)
|
||
|
||
@app.post("/v2/douyin/accounts/{account_id}/benchmark-links")
|
||
def create_douyin_benchmark_links(
|
||
account_id: str,
|
||
request: DouyinBenchmarkLinkRequest,
|
||
account: dict[str, Any] = Depends(legacy.require_approved)
|
||
) -> dict[str, Any]:
|
||
account_row = _require_owned_account(account_id, account["id"])
|
||
linked_ids: list[str] = []
|
||
for target_account_id in request.target_account_ids:
|
||
target_row = _require_owned_account(target_account_id, account["id"])
|
||
relation_id = make_id("dyrel")
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_account_relations (
|
||
id, user_id, source_account_id, target_account_id, target_profile_url,
|
||
relation_type, note, search_id, created_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
relation_id,
|
||
account["id"],
|
||
account_row["id"],
|
||
target_row["id"],
|
||
target_row["canonical_profile_url"] or target_row["profile_url"],
|
||
request.relation_type,
|
||
request.note,
|
||
request.search_id,
|
||
now()
|
||
)
|
||
)
|
||
linked_ids.append(relation_id)
|
||
|
||
for target_profile_url in _dedupe_strings(request.target_profile_urls):
|
||
relation_id = make_id("dyrel")
|
||
legacy.db.execute(
|
||
"""
|
||
INSERT INTO douyin_account_relations (
|
||
id, user_id, source_account_id, target_account_id, target_profile_url,
|
||
relation_type, note, search_id, created_at
|
||
) VALUES (?, ?, ?, NULL, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
relation_id,
|
||
account["id"],
|
||
account_row["id"],
|
||
target_profile_url,
|
||
request.relation_type,
|
||
request.note,
|
||
request.search_id,
|
||
now()
|
||
)
|
||
)
|
||
linked_ids.append(relation_id)
|
||
|
||
return {
|
||
"saved": len(linked_ids),
|
||
"relation_ids": linked_ids,
|
||
"links": _list_linked_accounts(account_row)
|
||
}
|