1981 lines
79 KiB
Python
1981 lines
79 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import re
|
|
from collections import Counter
|
|
from datetime import datetime, timezone
|
|
from html import unescape
|
|
from typing import Any, Iterable
|
|
from urllib.parse import quote, unquote
|
|
|
|
import httpx
|
|
from fastapi import Depends, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
|
|
DEFAULT_CREATOR_CENTER_URLS = [
|
|
"https://creator.douyin.com/creator-micro/home",
|
|
"https://creator.douyin.com/creator-micro/data",
|
|
"https://creator.douyin.com/creator-micro/content/manage"
|
|
]
|
|
DEFAULT_TIMEOUT = 20.0
|
|
MAX_HTML_SEARCH_BYTES = 2_000_000
|
|
DEFAULT_USER_AGENT = (
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
|
|
)
|
|
|
|
|
|
class ManualPageCapture(BaseModel):
|
|
url: str = ""
|
|
title: str = ""
|
|
payload: dict[str, Any] = Field(default_factory=dict)
|
|
|
|
|
|
class DouyinAccountSyncRequest(BaseModel):
|
|
profile_url: str = ""
|
|
session_cookie: str = ""
|
|
creator_center_urls: list[str] = Field(default_factory=lambda: list(DEFAULT_CREATOR_CENTER_URLS))
|
|
manual_profile_payload: dict[str, Any] | None = None
|
|
manual_creator_pages: list[ManualPageCapture] = Field(default_factory=list)
|
|
manual_work_payloads: list[dict[str, Any]] = Field(default_factory=list)
|
|
discovery_note: str = ""
|
|
|
|
|
|
class DouyinAccountAnalysisRequest(BaseModel):
|
|
model_profile_ids: list[str] = Field(default_factory=list)
|
|
linked_account_ids: list[str] = Field(default_factory=list)
|
|
include_linked_accounts: bool = True
|
|
include_recent_similar_candidates: bool = True
|
|
max_videos: int = 12
|
|
extra_focus: str = ""
|
|
temperature: float = 0.35
|
|
|
|
|
|
class DouyinSimilarSearchRequest(BaseModel):
|
|
source_account_id: str | None = None
|
|
profile_url: str | None = None
|
|
candidate_urls: list[str] = Field(default_factory=list)
|
|
seed_linked_accounts: bool = True
|
|
search_public_pages: bool = True
|
|
model_profile_id: str | None = None
|
|
max_candidates: int = 10
|
|
extra_requirements: str = ""
|
|
|
|
|
|
class DouyinBenchmarkLinkRequest(BaseModel):
|
|
target_account_ids: list[str] = Field(default_factory=list)
|
|
target_profile_urls: list[str] = Field(default_factory=list)
|
|
relation_type: str = "benchmark"
|
|
note: str = ""
|
|
search_id: str = ""
|
|
|
|
|
|
def _safe_json_dumps(value: Any) -> str:
|
|
return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
|
|
def _safe_json_loads(value: str | None, fallback: Any) -> Any:
|
|
if not value:
|
|
return fallback
|
|
try:
|
|
return json.loads(value)
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
def _first_non_empty(*values: Any) -> str:
|
|
for value in values:
|
|
if value is None:
|
|
continue
|
|
if isinstance(value, str):
|
|
stripped = value.strip()
|
|
if stripped:
|
|
return stripped
|
|
elif value not in ("", [], {}, ()):
|
|
return str(value)
|
|
return ""
|
|
|
|
|
|
def _dedupe_strings(values: Iterable[str]) -> list[str]:
|
|
result: list[str] = []
|
|
seen: set[str] = set()
|
|
for value in values:
|
|
item = value.strip()
|
|
if not item:
|
|
continue
|
|
key = item.lower()
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def _compact_text(value: Any, limit: int = 500) -> str:
|
|
text = str(value or "").strip()
|
|
if len(text) <= limit:
|
|
return text
|
|
return f"{text[: limit - 1]}…"
|
|
|
|
|
|
def _parse_count(value: Any) -> float:
|
|
if value is None:
|
|
return 0.0
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
text = str(value).strip().lower().replace(",", "")
|
|
if not text:
|
|
return 0.0
|
|
|
|
multiplier = 1.0
|
|
if text.endswith("w") or text.endswith("万"):
|
|
multiplier = 10_000.0
|
|
text = text[:-1]
|
|
elif text.endswith("亿"):
|
|
multiplier = 100_000_000.0
|
|
text = text[:-1]
|
|
|
|
text = text.replace("+", "")
|
|
match = re.search(r"-?\d+(?:\.\d+)?", text)
|
|
if not match:
|
|
return 0.0
|
|
try:
|
|
return float(match.group()) * multiplier
|
|
except ValueError:
|
|
return 0.0
|
|
|
|
|
|
def _normalize_timestamp(value: Any) -> str | None:
|
|
if value in (None, "", 0, "0"):
|
|
return None
|
|
if isinstance(value, str):
|
|
stripped = value.strip()
|
|
if not stripped:
|
|
return None
|
|
if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped):
|
|
return stripped
|
|
if stripped.isdigit():
|
|
value = int(stripped)
|
|
else:
|
|
return stripped
|
|
if isinstance(value, (int, float)):
|
|
ts = float(value)
|
|
if ts > 10_000_000_000:
|
|
ts /= 1000.0
|
|
try:
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat()
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _extract_hashtags(*texts: str) -> list[str]:
|
|
tags: list[str] = []
|
|
for text in texts:
|
|
if not text:
|
|
continue
|
|
tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text))
|
|
return _dedupe_strings(tags)
|
|
|
|
|
|
def _extract_keywords(*texts: str) -> list[str]:
|
|
candidates: list[str] = []
|
|
for text in texts:
|
|
if not text:
|
|
continue
|
|
candidates.extend(_extract_hashtags(text))
|
|
candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text))
|
|
candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text))
|
|
stop_words = {
|
|
"视频",
|
|
"作品",
|
|
"抖音",
|
|
"账号",
|
|
"内容",
|
|
"发布",
|
|
"更多",
|
|
"关注",
|
|
"用户",
|
|
"douyin",
|
|
"profile"
|
|
}
|
|
filtered = [item for item in candidates if item.lower() not in stop_words]
|
|
return _dedupe_strings(filtered)
|
|
|
|
|
|
def _flatten_json(value: Any, prefix: str = "") -> list[tuple[str, str, str]]:
|
|
rows: list[tuple[str, str, str]] = []
|
|
if isinstance(value, dict):
|
|
for key, child in value.items():
|
|
next_prefix = f"{prefix}.{key}" if prefix else str(key)
|
|
rows.extend(_flatten_json(child, next_prefix))
|
|
elif isinstance(value, list):
|
|
for index, child in enumerate(value):
|
|
next_prefix = f"{prefix}[{index}]"
|
|
rows.extend(_flatten_json(child, next_prefix))
|
|
else:
|
|
field_type = type(value).__name__
|
|
rows.append((prefix or "$", field_type, _compact_text(value, 2000)))
|
|
return rows
|
|
|
|
|
|
def _walk_json(value: Any) -> Iterable[dict[str, Any]]:
|
|
if isinstance(value, dict):
|
|
yield value
|
|
for child in value.values():
|
|
yield from _walk_json(child)
|
|
elif isinstance(value, list):
|
|
for child in value:
|
|
yield from _walk_json(child)
|
|
|
|
|
|
def _extract_json_objects_from_text(text: str) -> list[Any]:
|
|
decoder = json.JSONDecoder()
|
|
objects: list[Any] = []
|
|
seen: set[str] = set()
|
|
if not text:
|
|
return objects
|
|
|
|
candidates = [text, unquote(text), unescape(text), unescape(unquote(text))]
|
|
for candidate in candidates:
|
|
snippet = candidate[:MAX_HTML_SEARCH_BYTES]
|
|
for match in re.finditer(r"[\{\[]", snippet):
|
|
try:
|
|
obj, _ = decoder.raw_decode(snippet[match.start() :])
|
|
except Exception:
|
|
continue
|
|
marker = _safe_json_dumps(obj)
|
|
if marker in seen:
|
|
continue
|
|
seen.add(marker)
|
|
objects.append(obj)
|
|
if len(objects) >= 50:
|
|
return objects
|
|
return objects
|
|
|
|
|
|
def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]:
|
|
blobs: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
for attrs, content in re.findall(r"<script([^>]*)>(.*?)</script>", html, re.IGNORECASE | re.DOTALL):
|
|
script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE)
|
|
script_id = script_id_match.group(1) if script_id_match else ""
|
|
for obj in _extract_json_objects_from_text(content.strip()):
|
|
marker = _safe_json_dumps(obj)
|
|
if marker in seen:
|
|
continue
|
|
seen.add(marker)
|
|
blobs.append({
|
|
"script_id": script_id,
|
|
"payload": obj
|
|
})
|
|
return blobs
|
|
|
|
|
|
def _profile_candidate_score(value: dict[str, Any]) -> int:
|
|
score = 0
|
|
interesting_keys = {
|
|
"nickname",
|
|
"signature",
|
|
"sec_uid",
|
|
"secUid",
|
|
"uid",
|
|
"unique_id",
|
|
"short_id",
|
|
"aweme_count",
|
|
"following_count",
|
|
"follower_count",
|
|
"total_favorited"
|
|
}
|
|
score += sum(1 for key in interesting_keys if key in value)
|
|
if "author" in value and isinstance(value["author"], dict):
|
|
score += 2
|
|
return score
|
|
|
|
|
|
def _video_candidate_score(value: dict[str, Any]) -> int:
|
|
score = 0
|
|
if "statistics" in value and isinstance(value["statistics"], dict):
|
|
score += 3
|
|
if "aweme_id" in value or "item_id" in value:
|
|
score += 2
|
|
if "desc" in value or "title" in value:
|
|
score += 1
|
|
return score
|
|
|
|
|
|
def _extract_profile_candidates(payload: Any) -> list[dict[str, Any]]:
|
|
candidates: list[dict[str, Any]] = []
|
|
for item in _walk_json(payload):
|
|
if _profile_candidate_score(item) >= 3:
|
|
candidates.append(item)
|
|
if "author" in item and isinstance(item["author"], dict) and _profile_candidate_score(item["author"]) >= 3:
|
|
candidates.append(item["author"])
|
|
return candidates
|
|
|
|
|
|
def _extract_video_candidates(payload: Any) -> list[dict[str, Any]]:
|
|
candidates: list[dict[str, Any]] = []
|
|
for item in _walk_json(payload):
|
|
if _video_candidate_score(item) >= 3:
|
|
candidates.append(item)
|
|
return candidates
|
|
|
|
|
|
def _normalize_profile_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]:
|
|
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
|
|
avatar = candidate.get("avatar_medium") or candidate.get("avatar_thumb") or candidate.get("avatar_url")
|
|
if isinstance(avatar, dict):
|
|
avatar = _first_non_empty(
|
|
avatar.get("url_list", [""])[0] if isinstance(avatar.get("url_list"), list) else "",
|
|
avatar.get("url")
|
|
)
|
|
|
|
signature = _first_non_empty(
|
|
candidate.get("signature"),
|
|
candidate.get("desc"),
|
|
candidate.get("bio"),
|
|
candidate.get("description")
|
|
)
|
|
nickname = _first_non_empty(candidate.get("nickname"), candidate.get("name"), candidate.get("author_name"))
|
|
canonical_url = _first_non_empty(
|
|
candidate.get("share_url"),
|
|
candidate.get("profile_url"),
|
|
fallback_url
|
|
)
|
|
return {
|
|
"nickname": nickname,
|
|
"signature": signature,
|
|
"profile_url": canonical_url,
|
|
"canonical_profile_url": canonical_url,
|
|
"sec_uid": _first_non_empty(candidate.get("sec_uid"), candidate.get("secUid")),
|
|
"douyin_uid": _first_non_empty(candidate.get("uid")),
|
|
"douyin_id": _first_non_empty(candidate.get("unique_id"), candidate.get("short_id"), candidate.get("douyin_id")),
|
|
"avatar_url": _first_non_empty(avatar),
|
|
"stats": {
|
|
"followers": _parse_count(candidate.get("follower_count") or stats_source.get("follower_count")),
|
|
"following": _parse_count(candidate.get("following_count") or stats_source.get("following_count")),
|
|
"likes": _parse_count(candidate.get("total_favorited") or stats_source.get("total_favorited")),
|
|
"videos": _parse_count(candidate.get("aweme_count") or stats_source.get("aweme_count"))
|
|
},
|
|
"tags": _dedupe_strings(
|
|
_extract_hashtags(signature, nickname)
|
|
+ [str(tag) for tag in candidate.get("tags", []) if isinstance(tag, (str, int, float))]
|
|
),
|
|
"raw": candidate
|
|
}
|
|
|
|
|
|
def _pick_best_profile(candidates: list[dict[str, Any]], fallback_url: str = "") -> dict[str, Any]:
|
|
best: dict[str, Any] | None = None
|
|
best_score = -1
|
|
for candidate in candidates:
|
|
normalized = _normalize_profile_candidate(candidate, fallback_url=fallback_url)
|
|
score = 0
|
|
score += 4 if normalized["nickname"] else 0
|
|
score += 3 if normalized["sec_uid"] else 0
|
|
score += 2 if normalized["signature"] else 0
|
|
score += 1 if normalized["stats"]["followers"] else 0
|
|
if score > best_score:
|
|
best = normalized
|
|
best_score = score
|
|
return best or _normalize_profile_candidate({}, fallback_url=fallback_url)
|
|
|
|
|
|
def _normalize_video_candidate(candidate: dict[str, Any]) -> dict[str, Any]:
|
|
stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
|
|
video_source = candidate.get("video") if isinstance(candidate.get("video"), dict) else {}
|
|
title = _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("share_title"))
|
|
description = _first_non_empty(candidate.get("desc"), candidate.get("title"), candidate.get("text"))
|
|
cover = candidate.get("cover") or video_source.get("cover")
|
|
if isinstance(cover, dict):
|
|
cover = _first_non_empty(
|
|
cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
|
|
cover.get("url")
|
|
)
|
|
return {
|
|
"aweme_id": _first_non_empty(candidate.get("aweme_id"), candidate.get("item_id"), candidate.get("group_id")),
|
|
"title": title,
|
|
"description": description,
|
|
"share_url": _first_non_empty(candidate.get("share_url")),
|
|
"cover_url": _first_non_empty(cover),
|
|
"duration_sec": float(candidate.get("duration") or video_source.get("duration") or 0) / 1000.0
|
|
if float(candidate.get("duration") or video_source.get("duration") or 0) > 1000
|
|
else float(candidate.get("duration") or video_source.get("duration") or 0),
|
|
"published_at": _normalize_timestamp(candidate.get("create_time") or candidate.get("publish_time")),
|
|
"tags": _extract_hashtags(title, description),
|
|
"stats": {
|
|
"play": _parse_count(stats_source.get("play_count") or candidate.get("play_count")),
|
|
"like": _parse_count(stats_source.get("digg_count") or candidate.get("digg_count")),
|
|
"comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")),
|
|
"share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")),
|
|
"collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count"))
|
|
},
|
|
"raw": candidate
|
|
}
|
|
|
|
|
|
def _extract_videos(payloads: Iterable[Any]) -> list[dict[str, Any]]:
|
|
videos: list[dict[str, Any]] = []
|
|
seen: set[str] = set()
|
|
for payload in payloads:
|
|
for candidate in _extract_video_candidates(payload):
|
|
normalized = _normalize_video_candidate(candidate)
|
|
dedupe_key = normalized["aweme_id"] or normalized["share_url"] or normalized["title"]
|
|
if not dedupe_key or dedupe_key in seen:
|
|
continue
|
|
seen.add(dedupe_key)
|
|
videos.append(normalized)
|
|
videos.sort(
|
|
key=lambda item: (
|
|
item["stats"]["play"] + item["stats"]["like"] + item["stats"]["comment"] * 4 + item["stats"]["share"] * 6
|
|
),
|
|
reverse=True
|
|
)
|
|
return videos
|
|
|
|
|
|
async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]:
|
|
headers = {
|
|
"User-Agent": DEFAULT_USER_AGENT,
|
|
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
|
|
}
|
|
if cookie.strip():
|
|
headers["Cookie"] = cookie.strip()
|
|
async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client:
|
|
response = await client.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return str(response.url), response.text
|
|
|
|
|
|
async def _discover_profile_urls_from_search(keywords: list[str], limit: int = 8) -> list[str]:
|
|
urls: list[str] = []
|
|
seen: set[str] = set()
|
|
for keyword in keywords[:3]:
|
|
search_url = f"https://www.douyin.com/search/{quote(keyword)}?type=user"
|
|
try:
|
|
_, html = await _fetch_html(search_url)
|
|
except Exception:
|
|
continue
|
|
for match in re.findall(r'href=["\']([^"\']+/user/[^"\']+)["\']', html):
|
|
if match.startswith("/"):
|
|
match = f"https://www.douyin.com{match}"
|
|
cleaned = match.split("?")[0]
|
|
if cleaned in seen:
|
|
continue
|
|
seen.add(cleaned)
|
|
urls.append(cleaned)
|
|
if len(urls) >= limit:
|
|
return urls
|
|
return urls
|
|
|
|
|
|
def _summarize_videos(videos: list[dict[str, Any]], limit: int = 8) -> dict[str, Any]:
|
|
selected = videos[:limit]
|
|
if not selected:
|
|
return {
|
|
"count": 0,
|
|
"top_tags": [],
|
|
"avg_play": 0.0,
|
|
"avg_like": 0.0,
|
|
"avg_comment": 0.0,
|
|
"avg_share": 0.0,
|
|
"videos": []
|
|
}
|
|
count = len(selected)
|
|
avg_play = sum(item["stats"]["play"] for item in selected) / count
|
|
avg_like = sum(item["stats"]["like"] for item in selected) / count
|
|
avg_comment = sum(item["stats"]["comment"] for item in selected) / count
|
|
avg_share = sum(item["stats"]["share"] for item in selected) / count
|
|
tag_counter = Counter(tag for item in selected for tag in item.get("tags", []))
|
|
return {
|
|
"count": len(videos),
|
|
"top_tags": [tag for tag, _ in tag_counter.most_common(8)],
|
|
"avg_play": round(avg_play, 2),
|
|
"avg_like": round(avg_like, 2),
|
|
"avg_comment": round(avg_comment, 2),
|
|
"avg_share": round(avg_share, 2),
|
|
"videos": [
|
|
{
|
|
"aweme_id": item["aweme_id"],
|
|
"title": _compact_text(item["title"], 120),
|
|
"description": _compact_text(item["description"], 180),
|
|
"tags": item["tags"][:6],
|
|
"published_at": item["published_at"],
|
|
"stats": item["stats"]
|
|
}
|
|
for item in selected
|
|
]
|
|
}
|
|
|
|
|
|
def _jaccard(left: Iterable[str], right: Iterable[str]) -> float:
|
|
left_set = {item.strip().lower() for item in left if item.strip()}
|
|
right_set = {item.strip().lower() for item in right if item.strip()}
|
|
if not left_set and not right_set:
|
|
return 0.0
|
|
intersection = len(left_set & right_set)
|
|
union = len(left_set | right_set)
|
|
return intersection / union if union else 0.0
|
|
|
|
|
|
def _quality_score(account_payload: dict[str, Any]) -> float:
|
|
stats = account_payload.get("profile_stats", {})
|
|
followers = float(stats.get("followers") or 0)
|
|
video_summary = account_payload.get("video_summary", {})
|
|
avg_play = float(video_summary.get("avg_play") or 0)
|
|
avg_like = float(video_summary.get("avg_like") or 0)
|
|
avg_comment = float(video_summary.get("avg_comment") or 0)
|
|
avg_share = float(video_summary.get("avg_share") or 0)
|
|
base = followers / 10_000.0
|
|
engagement = avg_like / 1000.0 + avg_comment / 300.0 + avg_share / 200.0 + avg_play / 5000.0
|
|
return round(base + engagement, 3)
|
|
|
|
|
|
def _heuristic_similarity(source_payload: dict[str, Any], candidate_payload: dict[str, Any]) -> dict[str, Any]:
|
|
source_keywords = source_payload.get("keywords", [])
|
|
candidate_keywords = candidate_payload.get("keywords", [])
|
|
topic_overlap = _jaccard(source_keywords, candidate_keywords)
|
|
tag_overlap = _jaccard(
|
|
source_payload.get("video_summary", {}).get("top_tags", []),
|
|
candidate_payload.get("video_summary", {}).get("top_tags", [])
|
|
)
|
|
source_signature = source_payload.get("signature", "")
|
|
candidate_signature = candidate_payload.get("signature", "")
|
|
signature_overlap = _jaccard(_extract_keywords(source_signature), _extract_keywords(candidate_signature))
|
|
quality = _quality_score(candidate_payload)
|
|
score = round(topic_overlap * 55 + tag_overlap * 20 + signature_overlap * 10 + min(quality, 15), 2)
|
|
return {
|
|
"topic_overlap": round(topic_overlap, 3),
|
|
"tag_overlap": round(tag_overlap, 3),
|
|
"signature_overlap": round(signature_overlap, 3),
|
|
"quality_score": quality,
|
|
"heuristic_score": score
|
|
}
|
|
|
|
|
|
def _build_model_label(profile: dict[str, Any]) -> str:
|
|
return _first_non_empty(profile.get("name"), profile.get("model_name"), profile.get("base_url"))
|
|
|
|
|
|
def _try_parse_agent_json(text: str) -> Any:
|
|
stripped = text.strip()
|
|
if not stripped:
|
|
return {}
|
|
try:
|
|
return json.loads(stripped)
|
|
except Exception:
|
|
pass
|
|
objects = _extract_json_objects_from_text(stripped)
|
|
return objects[0] if objects else {}
|
|
|
|
|
|
def register_douyin_routes(app: Any, legacy: Any) -> None:
|
|
def now() -> str:
|
|
return legacy.utc_now()
|
|
|
|
def make_id(prefix: str) -> str:
|
|
return legacy.make_id(prefix)
|
|
|
|
def ensure_schema() -> None:
|
|
schema = """
|
|
CREATE TABLE IF NOT EXISTS douyin_accounts (
|
|
id TEXT PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
profile_url TEXT NOT NULL DEFAULT '',
|
|
canonical_profile_url TEXT NOT NULL DEFAULT '',
|
|
sec_uid TEXT NOT NULL DEFAULT '',
|
|
douyin_uid TEXT NOT NULL DEFAULT '',
|
|
douyin_id TEXT NOT NULL DEFAULT '',
|
|
nickname TEXT NOT NULL DEFAULT '',
|
|
signature TEXT NOT NULL DEFAULT '',
|
|
avatar_url TEXT NOT NULL DEFAULT '',
|
|
tags_json TEXT NOT NULL DEFAULT '[]',
|
|
profile_stats_json TEXT NOT NULL DEFAULT '{}',
|
|
raw_profile_json TEXT NOT NULL DEFAULT '{}',
|
|
source_mode TEXT NOT NULL DEFAULT 'public',
|
|
sync_status TEXT NOT NULL DEFAULT 'pending',
|
|
last_public_sync_at TEXT,
|
|
last_creator_sync_at TEXT,
|
|
last_analysis_at TEXT,
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_updated
|
|
ON douyin_accounts(user_id, updated_at DESC);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_sec_uid
|
|
ON douyin_accounts(user_id, sec_uid);
|
|
|
|
CREATE TABLE IF NOT EXISTS douyin_account_snapshots (
|
|
id TEXT PRIMARY KEY,
|
|
account_id TEXT NOT NULL,
|
|
snapshot_type TEXT NOT NULL,
|
|
source_url TEXT NOT NULL DEFAULT '',
|
|
raw_payload_json TEXT NOT NULL DEFAULT '{}',
|
|
summary_json TEXT NOT NULL DEFAULT '{}',
|
|
field_count INTEGER NOT NULL DEFAULT 0,
|
|
collected_at TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_snapshots_account_collected
|
|
ON douyin_account_snapshots(account_id, collected_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS douyin_snapshot_fields (
|
|
snapshot_id TEXT NOT NULL,
|
|
field_path TEXT NOT NULL,
|
|
field_type TEXT NOT NULL DEFAULT 'string',
|
|
field_value_text TEXT NOT NULL DEFAULT '',
|
|
PRIMARY KEY(snapshot_id, field_path),
|
|
FOREIGN KEY(snapshot_id) REFERENCES douyin_account_snapshots(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS douyin_videos (
|
|
id TEXT PRIMARY KEY,
|
|
account_id TEXT NOT NULL,
|
|
aweme_id TEXT NOT NULL DEFAULT '',
|
|
title TEXT NOT NULL DEFAULT '',
|
|
description TEXT NOT NULL DEFAULT '',
|
|
share_url TEXT NOT NULL DEFAULT '',
|
|
cover_url TEXT NOT NULL DEFAULT '',
|
|
duration_sec REAL NOT NULL DEFAULT 0,
|
|
published_at TEXT,
|
|
tags_json TEXT NOT NULL DEFAULT '[]',
|
|
stats_json TEXT NOT NULL DEFAULT '{}',
|
|
raw_json TEXT NOT NULL DEFAULT '{}',
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_updated
|
|
ON douyin_videos(account_id, updated_at DESC);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_aweme
|
|
ON douyin_videos(account_id, aweme_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS douyin_analysis_reports (
|
|
id TEXT PRIMARY KEY,
|
|
account_id TEXT NOT NULL,
|
|
user_id TEXT NOT NULL,
|
|
focus_text TEXT NOT NULL DEFAULT '',
|
|
model_profile_ids_json TEXT NOT NULL DEFAULT '[]',
|
|
linked_account_ids_json TEXT NOT NULL DEFAULT '[]',
|
|
prompt_text TEXT NOT NULL DEFAULT '',
|
|
context_json TEXT NOT NULL DEFAULT '{}',
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_analysis_reports_account_created
|
|
ON douyin_analysis_reports(account_id, created_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS douyin_analysis_suggestions (
|
|
id TEXT PRIMARY KEY,
|
|
report_id TEXT NOT NULL,
|
|
model_profile_id TEXT NOT NULL DEFAULT '',
|
|
model_label TEXT NOT NULL DEFAULT '',
|
|
status TEXT NOT NULL DEFAULT 'ok',
|
|
suggestion_text TEXT NOT NULL DEFAULT '',
|
|
parsed_json TEXT NOT NULL DEFAULT '{}',
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY(report_id) REFERENCES douyin_analysis_reports(id) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_analysis_suggestions_report
|
|
ON douyin_analysis_suggestions(report_id, created_at ASC);
|
|
|
|
CREATE TABLE IF NOT EXISTS douyin_similarity_searches (
|
|
id TEXT PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
source_account_id TEXT,
|
|
source_profile_url TEXT NOT NULL DEFAULT '',
|
|
keywords_json TEXT NOT NULL DEFAULT '[]',
|
|
prompt_text TEXT NOT NULL DEFAULT '',
|
|
context_json TEXT NOT NULL DEFAULT '{}',
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_similarity_searches_user_created
|
|
ON douyin_similarity_searches(user_id, created_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS douyin_similarity_candidates (
|
|
id TEXT PRIMARY KEY,
|
|
search_id TEXT NOT NULL,
|
|
candidate_account_id TEXT,
|
|
candidate_profile_url TEXT NOT NULL DEFAULT '',
|
|
heuristic_score REAL NOT NULL DEFAULT 0,
|
|
agent_score REAL NOT NULL DEFAULT 0,
|
|
rationale_text TEXT NOT NULL DEFAULT '',
|
|
dimensions_json TEXT NOT NULL DEFAULT '{}',
|
|
raw_output_json TEXT NOT NULL DEFAULT '{}',
|
|
rank_index INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY(search_id) REFERENCES douyin_similarity_searches(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(candidate_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_similarity_candidates_search_rank
|
|
ON douyin_similarity_candidates(search_id, rank_index ASC);
|
|
|
|
CREATE TABLE IF NOT EXISTS douyin_account_relations (
|
|
id TEXT PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
source_account_id TEXT NOT NULL,
|
|
target_account_id TEXT,
|
|
target_profile_url TEXT NOT NULL DEFAULT '',
|
|
relation_type TEXT NOT NULL DEFAULT 'benchmark',
|
|
note TEXT NOT NULL DEFAULT '',
|
|
search_id TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL,
|
|
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
|
|
FOREIGN KEY(target_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_douyin_account_relations_source
|
|
ON douyin_account_relations(source_account_id, created_at DESC);
|
|
"""
|
|
with legacy.db.session() as conn:
|
|
conn.executescript(schema)
|
|
|
|
ensure_schema()
|
|
|
|
@app.on_event("startup")
|
|
def _startup_douyin_schema() -> None:
|
|
ensure_schema()
|
|
|
|
def _require_owned_account(account_id: str, user_id: str) -> dict[str, Any]:
|
|
row = legacy.db.fetch_one(
|
|
"SELECT * FROM douyin_accounts WHERE id = ? AND user_id = ?",
|
|
(account_id, user_id)
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Douyin account not found")
|
|
return row
|
|
|
|
def _fetch_model_profiles(account_id: str) -> list[dict[str, Any]]:
|
|
return legacy.db.fetch_all(
|
|
"""
|
|
SELECT *
|
|
FROM model_profiles
|
|
WHERE owner_account_id IS NULL OR owner_account_id = ?
|
|
ORDER BY is_default DESC, created_at ASC
|
|
""",
|
|
(account_id,)
|
|
)
|
|
|
|
def _resolve_model_profiles(account: dict[str, Any], requested_ids: list[str]) -> list[dict[str, Any]]:
|
|
profiles = _fetch_model_profiles(account["id"])
|
|
if not profiles:
|
|
raise HTTPException(status_code=400, detail="No available model profiles")
|
|
if not requested_ids:
|
|
return profiles
|
|
profile_map = {row["id"]: row for row in profiles}
|
|
missing = [profile_id for profile_id in requested_ids if profile_id not in profile_map]
|
|
if missing:
|
|
raise HTTPException(status_code=404, detail=f"Unknown model profiles: {', '.join(missing)}")
|
|
return [profile_map[profile_id] for profile_id in requested_ids]
|
|
|
|
async def _collect_public_profile(profile_url: str, manual_payload: dict[str, Any] | None) -> dict[str, Any]:
|
|
source_url = profile_url.strip()
|
|
blobs: list[dict[str, Any]] = []
|
|
errors: list[str] = []
|
|
|
|
if manual_payload:
|
|
blobs.append({"script_id": "manual_profile_payload", "payload": manual_payload})
|
|
|
|
if source_url:
|
|
try:
|
|
final_url, html = await _fetch_html(source_url)
|
|
source_url = final_url
|
|
blobs.extend(_extract_json_blobs_from_html(html))
|
|
except Exception as exc:
|
|
errors.append(f"public_profile_fetch_failed: {exc}")
|
|
|
|
payloads = [item["payload"] for item in blobs]
|
|
profile = _pick_best_profile(
|
|
[candidate for payload in payloads for candidate in _extract_profile_candidates(payload)],
|
|
fallback_url=source_url
|
|
)
|
|
videos = _extract_videos(payloads)
|
|
return {
|
|
"profile": profile,
|
|
"videos": videos,
|
|
"raw_pages": blobs,
|
|
"errors": errors,
|
|
"source_url": source_url
|
|
}
|
|
|
|
async def _collect_creator_center_pages(
|
|
urls: list[str],
|
|
cookie: str,
|
|
manual_pages: list[ManualPageCapture]
|
|
) -> dict[str, Any]:
|
|
pages: list[dict[str, Any]] = []
|
|
errors: list[str] = []
|
|
|
|
for page in manual_pages:
|
|
pages.append({
|
|
"url": page.url,
|
|
"title": page.title,
|
|
"blobs": [{"script_id": "manual_creator_payload", "payload": page.payload}]
|
|
})
|
|
|
|
if cookie.strip():
|
|
for url in urls:
|
|
try:
|
|
final_url, html = await _fetch_html(url, cookie=cookie)
|
|
pages.append({
|
|
"url": final_url,
|
|
"title": "",
|
|
"blobs": _extract_json_blobs_from_html(html)
|
|
})
|
|
except Exception as exc:
|
|
errors.append(f"creator_center_fetch_failed[{url}]: {exc}")
|
|
|
|
return {"pages": pages, "errors": errors}
|
|
|
|
def _upsert_account(
|
|
owner: dict[str, Any],
|
|
profile: dict[str, Any],
|
|
sync_request: DouyinAccountSyncRequest,
|
|
public_data: dict[str, Any],
|
|
creator_data: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
lookup_candidates = [
|
|
("sec_uid", profile.get("sec_uid", "")),
|
|
("douyin_id", profile.get("douyin_id", "")),
|
|
("canonical_profile_url", profile.get("canonical_profile_url", ""))
|
|
]
|
|
existing: dict[str, Any] | None = None
|
|
for field_name, field_value in lookup_candidates:
|
|
if not field_value:
|
|
continue
|
|
existing = legacy.db.fetch_one(
|
|
f"SELECT * FROM douyin_accounts WHERE user_id = ? AND {field_name} = ? LIMIT 1",
|
|
(owner["id"], field_value)
|
|
)
|
|
if existing:
|
|
break
|
|
|
|
account_id = existing["id"] if existing else make_id("dyacct")
|
|
created_at = existing["created_at"] if existing else now()
|
|
updated_at = now()
|
|
|
|
tags = _dedupe_strings(profile.get("tags", []) + _extract_keywords(profile.get("nickname", ""), profile.get("signature", "")))
|
|
profile_stats = profile.get("stats", {})
|
|
source_mode = "creator_center" if creator_data["pages"] else "public"
|
|
sync_status = "partial" if public_data["errors"] or creator_data["errors"] else "ready"
|
|
|
|
if existing:
|
|
legacy.db.execute(
|
|
"""
|
|
UPDATE douyin_accounts
|
|
SET profile_url = ?, canonical_profile_url = ?, sec_uid = ?, douyin_uid = ?, douyin_id = ?,
|
|
nickname = ?, signature = ?, avatar_url = ?, tags_json = ?, profile_stats_json = ?,
|
|
raw_profile_json = ?, source_mode = ?, sync_status = ?, last_public_sync_at = ?,
|
|
last_creator_sync_at = ?, updated_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
profile.get("profile_url", ""),
|
|
profile.get("canonical_profile_url", ""),
|
|
profile.get("sec_uid", ""),
|
|
profile.get("douyin_uid", ""),
|
|
profile.get("douyin_id", ""),
|
|
profile.get("nickname", ""),
|
|
profile.get("signature", ""),
|
|
profile.get("avatar_url", ""),
|
|
_safe_json_dumps(tags),
|
|
_safe_json_dumps(profile_stats),
|
|
_safe_json_dumps({
|
|
"profile": profile.get("raw", {}),
|
|
"discovery_note": sync_request.discovery_note
|
|
}),
|
|
source_mode,
|
|
sync_status,
|
|
now() if public_data["raw_pages"] else existing.get("last_public_sync_at"),
|
|
now() if creator_data["pages"] else existing.get("last_creator_sync_at"),
|
|
updated_at,
|
|
account_id
|
|
)
|
|
)
|
|
else:
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_accounts (
|
|
id, user_id, profile_url, canonical_profile_url, sec_uid, douyin_uid, douyin_id,
|
|
nickname, signature, avatar_url, tags_json, profile_stats_json, raw_profile_json,
|
|
source_mode, sync_status, last_public_sync_at, last_creator_sync_at, created_at, updated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
account_id,
|
|
owner["id"],
|
|
profile.get("profile_url", ""),
|
|
profile.get("canonical_profile_url", ""),
|
|
profile.get("sec_uid", ""),
|
|
profile.get("douyin_uid", ""),
|
|
profile.get("douyin_id", ""),
|
|
profile.get("nickname", ""),
|
|
profile.get("signature", ""),
|
|
profile.get("avatar_url", ""),
|
|
_safe_json_dumps(tags),
|
|
_safe_json_dumps(profile_stats),
|
|
_safe_json_dumps({
|
|
"profile": profile.get("raw", {}),
|
|
"discovery_note": sync_request.discovery_note
|
|
}),
|
|
source_mode,
|
|
sync_status,
|
|
now() if public_data["raw_pages"] else None,
|
|
now() if creator_data["pages"] else None,
|
|
created_at,
|
|
updated_at
|
|
)
|
|
)
|
|
|
|
account_row = _require_owned_account(account_id, owner["id"])
|
|
_persist_snapshots_and_videos(account_row, public_data, creator_data, sync_request)
|
|
return _require_owned_account(account_id, owner["id"])
|
|
|
|
def _persist_snapshot(
|
|
account_row: dict[str, Any],
|
|
snapshot_type: str,
|
|
source_url: str,
|
|
payload: Any,
|
|
summary: dict[str, Any]
|
|
) -> str:
|
|
snapshot_id = make_id("dysnap")
|
|
collected_at = now()
|
|
fields = _flatten_json(payload)
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_account_snapshots (
|
|
id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
|
|
field_count, collected_at, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
snapshot_id,
|
|
account_row["id"],
|
|
snapshot_type,
|
|
source_url,
|
|
_safe_json_dumps(payload),
|
|
_safe_json_dumps(summary),
|
|
len(fields),
|
|
collected_at,
|
|
collected_at
|
|
)
|
|
)
|
|
for field_path, field_type, field_value in fields:
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO douyin_snapshot_fields (
|
|
snapshot_id, field_path, field_type, field_value_text
|
|
) VALUES (?, ?, ?, ?)
|
|
""",
|
|
(snapshot_id, field_path, field_type, field_value)
|
|
)
|
|
return snapshot_id
|
|
|
|
def _persist_snapshots_and_videos(
|
|
account_row: dict[str, Any],
|
|
public_data: dict[str, Any],
|
|
creator_data: dict[str, Any],
|
|
sync_request: DouyinAccountSyncRequest
|
|
) -> None:
|
|
if public_data["raw_pages"]:
|
|
public_payload = {
|
|
"pages": public_data["raw_pages"],
|
|
"errors": public_data["errors"],
|
|
"source_url": public_data["source_url"]
|
|
}
|
|
_persist_snapshot(
|
|
account_row,
|
|
"public_profile",
|
|
public_data["source_url"],
|
|
public_payload,
|
|
{
|
|
"video_count": len(public_data["videos"]),
|
|
"nickname": public_data["profile"].get("nickname", ""),
|
|
"tags": public_data["profile"].get("tags", [])
|
|
}
|
|
)
|
|
|
|
for page in creator_data["pages"]:
|
|
payload = {
|
|
"title": page["title"],
|
|
"blobs": page["blobs"]
|
|
}
|
|
_persist_snapshot(
|
|
account_row,
|
|
"creator_center",
|
|
page["url"],
|
|
payload,
|
|
{
|
|
"blob_count": len(page["blobs"]),
|
|
"field_count": len(_flatten_json(payload))
|
|
}
|
|
)
|
|
|
|
for manual_video in sync_request.manual_work_payloads:
|
|
normalized = _normalize_video_candidate(manual_video)
|
|
public_data["videos"].append(normalized)
|
|
|
|
deduped: dict[str, dict[str, Any]] = {}
|
|
for video in public_data["videos"]:
|
|
key = video["aweme_id"] or video["share_url"] or video["title"]
|
|
if key and key not in deduped:
|
|
deduped[key] = video
|
|
|
|
for video in deduped.values():
|
|
existing = None
|
|
if video["aweme_id"]:
|
|
existing = legacy.db.fetch_one(
|
|
"SELECT id FROM douyin_videos WHERE account_id = ? AND aweme_id = ? LIMIT 1",
|
|
(account_row["id"], video["aweme_id"])
|
|
)
|
|
video_id = existing["id"] if existing else make_id("dyvideo")
|
|
created_at = now()
|
|
if existing:
|
|
legacy.db.execute(
|
|
"""
|
|
UPDATE douyin_videos
|
|
SET title = ?, description = ?, share_url = ?, cover_url = ?, duration_sec = ?,
|
|
published_at = ?, tags_json = ?, stats_json = ?, raw_json = ?, updated_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
video["title"],
|
|
video["description"],
|
|
video["share_url"],
|
|
video["cover_url"],
|
|
video["duration_sec"],
|
|
video["published_at"],
|
|
_safe_json_dumps(video["tags"]),
|
|
_safe_json_dumps(video["stats"]),
|
|
_safe_json_dumps(video["raw"]),
|
|
now(),
|
|
video_id
|
|
)
|
|
)
|
|
else:
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_videos (
|
|
id, account_id, aweme_id, title, description, share_url, cover_url,
|
|
duration_sec, published_at, tags_json, stats_json, raw_json, created_at, updated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
video_id,
|
|
account_row["id"],
|
|
video["aweme_id"],
|
|
video["title"],
|
|
video["description"],
|
|
video["share_url"],
|
|
video["cover_url"],
|
|
video["duration_sec"],
|
|
video["published_at"],
|
|
_safe_json_dumps(video["tags"]),
|
|
_safe_json_dumps(video["stats"]),
|
|
_safe_json_dumps(video["raw"]),
|
|
created_at,
|
|
created_at
|
|
)
|
|
)
|
|
|
|
def _list_videos(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
|
|
rows = legacy.db.fetch_all(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_videos
|
|
WHERE account_id = ?
|
|
ORDER BY COALESCE(published_at, updated_at) DESC, updated_at DESC
|
|
LIMIT ?
|
|
""",
|
|
(account_id, limit)
|
|
)
|
|
payloads: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
payloads.append({
|
|
"id": row["id"],
|
|
"aweme_id": row["aweme_id"],
|
|
"title": row["title"],
|
|
"description": row["description"],
|
|
"share_url": row["share_url"],
|
|
"cover_url": row["cover_url"],
|
|
"duration_sec": row["duration_sec"],
|
|
"published_at": row["published_at"],
|
|
"tags": _safe_json_loads(row["tags_json"], []),
|
|
"stats": _safe_json_loads(row["stats_json"], {}),
|
|
"raw": _safe_json_loads(row["raw_json"], {})
|
|
})
|
|
return payloads
|
|
|
|
def _build_account_payload(account_row: dict[str, Any], include_recent_videos: int = 8) -> dict[str, Any]:
|
|
videos = _list_videos(account_row["id"], limit=max(include_recent_videos, 12))
|
|
tags = _safe_json_loads(account_row["tags_json"], [])
|
|
profile_stats = _safe_json_loads(account_row["profile_stats_json"], {})
|
|
video_summary = _summarize_videos(videos, limit=include_recent_videos)
|
|
keywords = _dedupe_strings(
|
|
tags
|
|
+ _extract_keywords(account_row["nickname"], account_row["signature"])
|
|
+ video_summary["top_tags"]
|
|
+ [video["title"] for video in video_summary["videos"]]
|
|
)
|
|
return {
|
|
"id": account_row["id"],
|
|
"nickname": account_row["nickname"],
|
|
"signature": account_row["signature"],
|
|
"profile_url": account_row["canonical_profile_url"] or account_row["profile_url"],
|
|
"avatar_url": account_row["avatar_url"],
|
|
"sec_uid": account_row["sec_uid"],
|
|
"douyin_id": account_row["douyin_id"],
|
|
"profile_stats": profile_stats,
|
|
"tags": tags,
|
|
"keywords": keywords[:18],
|
|
"sync_status": account_row["sync_status"],
|
|
"video_summary": video_summary
|
|
}
|
|
|
|
def _list_linked_accounts(account_row: dict[str, Any]) -> list[dict[str, Any]]:
|
|
relation_rows = legacy.db.fetch_all(
|
|
"""
|
|
SELECT rel.*, target.nickname AS target_nickname, target.signature AS target_signature,
|
|
target.canonical_profile_url AS target_canonical_profile_url, target.profile_stats_json AS target_profile_stats_json,
|
|
target.tags_json AS target_tags_json
|
|
FROM douyin_account_relations rel
|
|
LEFT JOIN douyin_accounts target ON target.id = rel.target_account_id
|
|
WHERE rel.source_account_id = ?
|
|
ORDER BY rel.created_at DESC
|
|
""",
|
|
(account_row["id"],)
|
|
)
|
|
payloads: list[dict[str, Any]] = []
|
|
for row in relation_rows:
|
|
payloads.append({
|
|
"relation_id": row["id"],
|
|
"relation_type": row["relation_type"],
|
|
"note": row["note"],
|
|
"search_id": row["search_id"],
|
|
"created_at": row["created_at"],
|
|
"target_account_id": row["target_account_id"],
|
|
"target_profile_url": row["target_profile_url"] or row.get("target_canonical_profile_url", ""),
|
|
"target_nickname": row.get("target_nickname", ""),
|
|
"target_signature": row.get("target_signature", ""),
|
|
"target_profile_stats": _safe_json_loads(row.get("target_profile_stats_json"), {}),
|
|
"target_tags": _safe_json_loads(row.get("target_tags_json"), [])
|
|
})
|
|
return payloads
|
|
|
|
def _build_workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
|
|
account_payload = _build_account_payload(account_row)
|
|
latest_public_snapshot = legacy.db.fetch_one(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_account_snapshots
|
|
WHERE account_id = ? AND snapshot_type = 'public_profile'
|
|
ORDER BY collected_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(account_row["id"],)
|
|
)
|
|
latest_creator_snapshot = legacy.db.fetch_one(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_account_snapshots
|
|
WHERE account_id = ? AND snapshot_type = 'creator_center'
|
|
ORDER BY collected_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(account_row["id"],)
|
|
)
|
|
reports = legacy.db.fetch_all(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_analysis_reports
|
|
WHERE account_id = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT 5
|
|
""",
|
|
(account_row["id"],)
|
|
)
|
|
report_payloads = []
|
|
for report in reports:
|
|
suggestions = legacy.db.fetch_all(
|
|
"SELECT * FROM douyin_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
|
|
(report["id"],)
|
|
)
|
|
report_payloads.append({
|
|
"id": report["id"],
|
|
"focus_text": report["focus_text"],
|
|
"model_profile_ids": _safe_json_loads(report["model_profile_ids_json"], []),
|
|
"linked_account_ids": _safe_json_loads(report["linked_account_ids_json"], []),
|
|
"created_at": report["created_at"],
|
|
"suggestions": [
|
|
{
|
|
"id": suggestion["id"],
|
|
"model_profile_id": suggestion["model_profile_id"],
|
|
"model_label": suggestion["model_label"],
|
|
"status": suggestion["status"],
|
|
"suggestion_text": suggestion["suggestion_text"],
|
|
"parsed_json": _safe_json_loads(suggestion["parsed_json"], {})
|
|
}
|
|
for suggestion in suggestions
|
|
]
|
|
})
|
|
recent_searches = legacy.db.fetch_all(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_similarity_searches
|
|
WHERE source_account_id = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT 5
|
|
""",
|
|
(account_row["id"],)
|
|
)
|
|
return {
|
|
"account": account_payload,
|
|
"latest_public_snapshot": {
|
|
"id": latest_public_snapshot["id"],
|
|
"source_url": latest_public_snapshot["source_url"],
|
|
"field_count": latest_public_snapshot["field_count"],
|
|
"collected_at": latest_public_snapshot["collected_at"],
|
|
"summary": _safe_json_loads(latest_public_snapshot["summary_json"], {})
|
|
} if latest_public_snapshot else None,
|
|
"latest_creator_snapshot": {
|
|
"id": latest_creator_snapshot["id"],
|
|
"source_url": latest_creator_snapshot["source_url"],
|
|
"field_count": latest_creator_snapshot["field_count"],
|
|
"collected_at": latest_creator_snapshot["collected_at"],
|
|
"summary": _safe_json_loads(latest_creator_snapshot["summary_json"], {})
|
|
} if latest_creator_snapshot else None,
|
|
"linked_accounts": _list_linked_accounts(account_row),
|
|
"recent_reports": report_payloads,
|
|
"recent_similarity_searches": [
|
|
{
|
|
"id": row["id"],
|
|
"keywords": _safe_json_loads(row["keywords_json"], []),
|
|
"created_at": row["created_at"]
|
|
}
|
|
for row in recent_searches
|
|
],
|
|
"available_model_profiles": [
|
|
{
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"model_name": row["model_name"],
|
|
"base_url": row["base_url"],
|
|
"is_default": bool(row["is_default"])
|
|
}
|
|
for row in _fetch_model_profiles(account_row["user_id"])
|
|
]
|
|
}
|
|
|
|
def _list_snapshots(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
|
|
rows = legacy.db.fetch_all(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_account_snapshots
|
|
WHERE account_id = ?
|
|
ORDER BY collected_at DESC
|
|
LIMIT ?
|
|
""",
|
|
(account_id, limit)
|
|
)
|
|
return [
|
|
{
|
|
"id": row["id"],
|
|
"snapshot_type": row["snapshot_type"],
|
|
"source_url": row["source_url"],
|
|
"field_count": row["field_count"],
|
|
"collected_at": row["collected_at"],
|
|
"summary": _safe_json_loads(row["summary_json"], {})
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
def _get_snapshot_detail(snapshot_id: str, account_id: str) -> dict[str, Any]:
|
|
row = legacy.db.fetch_one(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_account_snapshots
|
|
WHERE id = ? AND account_id = ?
|
|
LIMIT 1
|
|
""",
|
|
(snapshot_id, account_id)
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Snapshot not found")
|
|
fields = legacy.db.fetch_all(
|
|
"""
|
|
SELECT field_path, field_type, field_value_text
|
|
FROM douyin_snapshot_fields
|
|
WHERE snapshot_id = ?
|
|
ORDER BY field_path ASC
|
|
""",
|
|
(snapshot_id,)
|
|
)
|
|
return {
|
|
"id": row["id"],
|
|
"snapshot_type": row["snapshot_type"],
|
|
"source_url": row["source_url"],
|
|
"field_count": row["field_count"],
|
|
"collected_at": row["collected_at"],
|
|
"summary": _safe_json_loads(row["summary_json"], {}),
|
|
"raw_payload": _safe_json_loads(row["raw_payload_json"], {}),
|
|
"fields": fields
|
|
}
|
|
|
|
async def _run_account_analysis(
|
|
account_row: dict[str, Any],
|
|
owner: dict[str, Any],
|
|
request: DouyinAccountAnalysisRequest
|
|
) -> dict[str, Any]:
|
|
target_payload = _build_account_payload(account_row, include_recent_videos=max(4, min(request.max_videos, 12)))
|
|
linked_rows = _list_linked_accounts(account_row)
|
|
linked_account_ids = list(request.linked_account_ids)
|
|
if request.include_linked_accounts:
|
|
linked_account_ids.extend(
|
|
row["target_account_id"] for row in linked_rows if row.get("target_account_id")
|
|
)
|
|
linked_account_ids = _dedupe_strings(linked_account_ids)
|
|
benchmark_payloads: list[dict[str, Any]] = []
|
|
for linked_account_id in linked_account_ids:
|
|
linked_row = _require_owned_account(linked_account_id, owner["id"])
|
|
benchmark_payloads.append(_build_account_payload(linked_row, include_recent_videos=6))
|
|
|
|
if request.include_recent_similar_candidates and not benchmark_payloads:
|
|
latest_search = legacy.db.fetch_one(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_similarity_searches
|
|
WHERE source_account_id = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(account_row["id"],)
|
|
)
|
|
if latest_search:
|
|
candidate_rows = legacy.db.fetch_all(
|
|
"""
|
|
SELECT cand.*, acct.user_id AS account_user_id
|
|
FROM douyin_similarity_candidates cand
|
|
LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
|
|
WHERE cand.search_id = ?
|
|
ORDER BY cand.rank_index ASC
|
|
LIMIT 3
|
|
""",
|
|
(latest_search["id"],)
|
|
)
|
|
for candidate_row in candidate_rows:
|
|
candidate_account_id = candidate_row.get("candidate_account_id")
|
|
if not candidate_account_id:
|
|
continue
|
|
linked_candidate = _require_owned_account(candidate_account_id, owner["id"])
|
|
benchmark_payloads.append(_build_account_payload(linked_candidate, include_recent_videos=6))
|
|
|
|
profiles = _resolve_model_profiles(owner, request.model_profile_ids)
|
|
system_prompt = (
|
|
"你是资深抖音增长顾问。你会基于账号画像、创作者中心字段、作品表现和对标账号内容,"
|
|
"给出可执行的优化建议。请始终返回 JSON 对象,包含这些字段:"
|
|
"summary、strengths、weaknesses、benchmark_insights、content_plan、"
|
|
"growth_actions、deep_search_hypotheses。每个数组字段请给出 3-6 条中文建议。"
|
|
)
|
|
analysis_context = {
|
|
"target_account": target_payload,
|
|
"benchmark_accounts": benchmark_payloads[:6],
|
|
"focus": request.extra_focus,
|
|
"creator_center_snapshot_summary": _safe_json_loads(
|
|
(legacy.db.fetch_one(
|
|
"""
|
|
SELECT summary_json
|
|
FROM douyin_account_snapshots
|
|
WHERE account_id = ? AND snapshot_type = 'creator_center'
|
|
ORDER BY collected_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(account_row["id"],)
|
|
) or {}).get("summary_json"),
|
|
{}
|
|
)
|
|
}
|
|
user_prompt = (
|
|
"请分析以下抖音账号,并分别给出内容方向、选题结构、互动增长、账号定位和对标拆解建议。"
|
|
"如果提供了对标账号,要重点指出可借鉴但不应直接照搬的部分。"
|
|
f"\n\n输入上下文:\n{json.dumps(analysis_context, ensure_ascii=False, indent=2)}"
|
|
)
|
|
|
|
report_id = make_id("dyreport")
|
|
created_at = now()
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_analysis_reports (
|
|
id, account_id, user_id, focus_text, model_profile_ids_json,
|
|
linked_account_ids_json, prompt_text, context_json, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
report_id,
|
|
account_row["id"],
|
|
owner["id"],
|
|
request.extra_focus,
|
|
_safe_json_dumps([profile["id"] for profile in profiles]),
|
|
_safe_json_dumps(linked_account_ids),
|
|
user_prompt,
|
|
_safe_json_dumps(analysis_context),
|
|
created_at
|
|
)
|
|
)
|
|
|
|
async def _analyze_with_model(profile: dict[str, Any]) -> dict[str, Any]:
|
|
try:
|
|
output = await legacy.call_model(
|
|
profile,
|
|
system_prompt=system_prompt,
|
|
user_prompt=user_prompt,
|
|
temperature=request.temperature
|
|
)
|
|
parsed = _try_parse_agent_json(output)
|
|
status = "ok"
|
|
except Exception as exc:
|
|
output = str(exc)
|
|
parsed = {}
|
|
status = "error"
|
|
suggestion_id = make_id("dysady")
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_analysis_suggestions (
|
|
id, report_id, model_profile_id, model_label, status,
|
|
suggestion_text, parsed_json, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
suggestion_id,
|
|
report_id,
|
|
profile["id"],
|
|
_build_model_label(profile),
|
|
status,
|
|
output,
|
|
_safe_json_dumps(parsed),
|
|
now()
|
|
)
|
|
)
|
|
return {
|
|
"id": suggestion_id,
|
|
"model_profile_id": profile["id"],
|
|
"model_label": _build_model_label(profile),
|
|
"status": status,
|
|
"suggestion_text": output,
|
|
"parsed_json": parsed
|
|
}
|
|
|
|
suggestions = await asyncio.gather(*[_analyze_with_model(profile) for profile in profiles])
|
|
legacy.db.execute(
|
|
"UPDATE douyin_accounts SET last_analysis_at = ?, updated_at = ? WHERE id = ?",
|
|
(now(), now(), account_row["id"])
|
|
)
|
|
return {
|
|
"report_id": report_id,
|
|
"created_at": created_at,
|
|
"context": analysis_context,
|
|
"suggestions": suggestions
|
|
}
|
|
|
|
async def _prepare_similarity_source(
|
|
owner: dict[str, Any],
|
|
request: DouyinSimilarSearchRequest
|
|
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
|
|
if request.source_account_id:
|
|
account_row = _require_owned_account(request.source_account_id, owner["id"])
|
|
return account_row, _build_account_payload(account_row)
|
|
|
|
if not (request.profile_url or "").strip():
|
|
raise HTTPException(status_code=400, detail="source_account_id or profile_url is required")
|
|
|
|
public_data = await _collect_public_profile(request.profile_url or "", None)
|
|
if not public_data["profile"].get("nickname") and not public_data["videos"]:
|
|
raise HTTPException(status_code=400, detail="Unable to parse the shared Douyin profile page")
|
|
payload = {
|
|
"id": "",
|
|
"nickname": public_data["profile"].get("nickname", ""),
|
|
"signature": public_data["profile"].get("signature", ""),
|
|
"profile_url": public_data["profile"].get("canonical_profile_url", "") or request.profile_url,
|
|
"avatar_url": public_data["profile"].get("avatar_url", ""),
|
|
"sec_uid": public_data["profile"].get("sec_uid", ""),
|
|
"douyin_id": public_data["profile"].get("douyin_id", ""),
|
|
"profile_stats": public_data["profile"].get("stats", {}),
|
|
"tags": public_data["profile"].get("tags", []),
|
|
"video_summary": _summarize_videos(public_data["videos"], limit=6)
|
|
}
|
|
payload["keywords"] = _dedupe_strings(
|
|
payload["tags"] + _extract_keywords(payload["nickname"], payload["signature"])
|
|
+ payload["video_summary"]["top_tags"]
|
|
+ [video["title"] for video in payload["video_summary"]["videos"]]
|
|
)
|
|
return None, payload
|
|
|
|
async def _fetch_or_create_candidate(owner: dict[str, Any], profile_url: str) -> dict[str, Any] | None:
|
|
existing = legacy.db.fetch_one(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_accounts
|
|
WHERE user_id = ? AND (canonical_profile_url = ? OR profile_url = ?)
|
|
LIMIT 1
|
|
""",
|
|
(owner["id"], profile_url, profile_url)
|
|
)
|
|
if existing:
|
|
return existing
|
|
|
|
public_data = await _collect_public_profile(profile_url, None)
|
|
profile = public_data["profile"]
|
|
if not (profile.get("nickname") or public_data["videos"]):
|
|
return None
|
|
sync_request = DouyinAccountSyncRequest(
|
|
profile_url=profile_url,
|
|
manual_work_payloads=[video["raw"] for video in public_data["videos"]]
|
|
)
|
|
account_row = _upsert_account(owner, profile, sync_request, public_data, {"pages": [], "errors": []})
|
|
return account_row
|
|
|
|
async def _run_similarity_search(owner: dict[str, Any], request: DouyinSimilarSearchRequest) -> dict[str, Any]:
|
|
source_account_row, source_payload = await _prepare_similarity_source(owner, request)
|
|
profile = legacy.model_profile_for_account(owner["id"], request.model_profile_id)
|
|
existing_accounts = legacy.db.fetch_all(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_accounts
|
|
WHERE user_id = ?
|
|
ORDER BY updated_at DESC
|
|
""",
|
|
(owner["id"],)
|
|
)
|
|
|
|
candidate_rows: list[dict[str, Any]] = []
|
|
seen_urls: set[str] = set()
|
|
source_id = source_account_row["id"] if source_account_row else ""
|
|
for row in existing_accounts:
|
|
if row["id"] == source_id:
|
|
continue
|
|
candidate_rows.append(row)
|
|
seen_urls.add(row["canonical_profile_url"] or row["profile_url"])
|
|
|
|
if request.seed_linked_accounts and source_account_row:
|
|
for linked in _list_linked_accounts(source_account_row):
|
|
candidate_url = linked.get("target_profile_url", "")
|
|
if not candidate_url or candidate_url in seen_urls:
|
|
continue
|
|
seen_urls.add(candidate_url)
|
|
if linked.get("target_account_id"):
|
|
candidate_rows.append(_require_owned_account(linked["target_account_id"], owner["id"]))
|
|
|
|
candidate_urls = _dedupe_strings(request.candidate_urls)
|
|
if request.search_public_pages:
|
|
discovered = await _discover_profile_urls_from_search(source_payload.get("keywords", []), limit=6)
|
|
candidate_urls.extend(discovered)
|
|
candidate_urls = _dedupe_strings(candidate_urls)
|
|
|
|
for candidate_url in candidate_urls:
|
|
if candidate_url in seen_urls or candidate_url == source_payload.get("profile_url"):
|
|
continue
|
|
candidate_row = await _fetch_or_create_candidate(owner, candidate_url)
|
|
if candidate_row:
|
|
candidate_rows.append(candidate_row)
|
|
seen_urls.add(candidate_url)
|
|
|
|
candidate_payloads: list[dict[str, Any]] = []
|
|
seen_account_ids: set[str] = set()
|
|
for row in candidate_rows:
|
|
if row["id"] in seen_account_ids:
|
|
continue
|
|
seen_account_ids.add(row["id"])
|
|
payload = _build_account_payload(row, include_recent_videos=6)
|
|
payload["heuristics"] = _heuristic_similarity(source_payload, payload)
|
|
candidate_payloads.append(payload)
|
|
|
|
candidate_payloads.sort(key=lambda item: item["heuristics"]["heuristic_score"], reverse=True)
|
|
candidate_payloads = candidate_payloads[: max(3, request.max_candidates)]
|
|
|
|
search_id = make_id("dysearch")
|
|
prompt_context = {
|
|
"source_account": source_payload,
|
|
"candidate_accounts": candidate_payloads,
|
|
"extra_requirements": request.extra_requirements
|
|
}
|
|
prompt = (
|
|
"请从候选账号中筛选与目标账号内容风格、题材、受众和互动逻辑最相似,且整体质量更高的账号。"
|
|
"请返回 JSON 数组,每项包含 candidate_account_id、candidate_profile_url、score、"
|
|
"rationale、similar_dimensions、optimization_value。score 范围 0-100。"
|
|
f"\n\n上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
|
|
)
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_similarity_searches (
|
|
id, user_id, source_account_id, source_profile_url, keywords_json,
|
|
prompt_text, context_json, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
search_id,
|
|
owner["id"],
|
|
source_account_row["id"] if source_account_row else None,
|
|
source_payload.get("profile_url", ""),
|
|
_safe_json_dumps(source_payload.get("keywords", [])),
|
|
prompt,
|
|
_safe_json_dumps(prompt_context),
|
|
now()
|
|
)
|
|
)
|
|
|
|
if not candidate_payloads:
|
|
return {
|
|
"search_id": search_id,
|
|
"source_account": source_payload,
|
|
"model_profile": {
|
|
"id": profile["id"],
|
|
"label": _build_model_label(profile)
|
|
},
|
|
"raw_model_output": "No candidate accounts available. Sync more Douyin accounts or provide candidate_urls.",
|
|
"candidates": []
|
|
}
|
|
|
|
system_prompt = (
|
|
"你是抖音相似账号发现专家。你要根据内容主题、标签、风格、更新频率、互动表现和商业化潜力,"
|
|
"挑选最值得对标的账号。返回严格 JSON 数组。"
|
|
)
|
|
try:
|
|
output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=0.2)
|
|
parsed = _try_parse_agent_json(output)
|
|
except Exception as exc:
|
|
output = str(exc)
|
|
parsed = []
|
|
|
|
candidate_map = {
|
|
payload["id"]: payload for payload in candidate_payloads if payload["id"]
|
|
}
|
|
if isinstance(parsed, dict):
|
|
parsed = parsed.get("items") or parsed.get("candidates") or []
|
|
|
|
saved_candidates: list[dict[str, Any]] = []
|
|
if not isinstance(parsed, list) or not parsed:
|
|
parsed = [
|
|
{
|
|
"candidate_account_id": payload["id"],
|
|
"candidate_profile_url": payload["profile_url"],
|
|
"score": payload["heuristics"]["heuristic_score"],
|
|
"rationale": "Fallback to heuristic similarity because model output was unavailable or unparsable.",
|
|
"similar_dimensions": [
|
|
{
|
|
"topic_overlap": payload["heuristics"]["topic_overlap"],
|
|
"tag_overlap": payload["heuristics"]["tag_overlap"],
|
|
"quality_score": payload["heuristics"]["quality_score"]
|
|
}
|
|
],
|
|
"optimization_value": "可作为候选对标账号进一步人工确认。"
|
|
}
|
|
for payload in candidate_payloads
|
|
]
|
|
|
|
for index, item in enumerate(parsed, start=1):
|
|
candidate_account_id = _first_non_empty(item.get("candidate_account_id"))
|
|
candidate_profile_url = _first_non_empty(item.get("candidate_profile_url"))
|
|
payload = candidate_map.get(candidate_account_id)
|
|
if not payload:
|
|
payload = next(
|
|
(candidate for candidate in candidate_payloads if candidate["profile_url"] == candidate_profile_url),
|
|
None
|
|
)
|
|
candidate_id = make_id("dycand")
|
|
heuristic_score = payload["heuristics"]["heuristic_score"] if payload else 0
|
|
score = _parse_count(item.get("score"))
|
|
rationale = _first_non_empty(item.get("rationale"), item.get("reason"), item.get("summary"))
|
|
dimensions = item.get("similar_dimensions") or item.get("dimensions") or {}
|
|
raw_output = {
|
|
"model_output": item,
|
|
"candidate_payload": payload or {}
|
|
}
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_similarity_candidates (
|
|
id, search_id, candidate_account_id, candidate_profile_url, heuristic_score,
|
|
agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
candidate_id,
|
|
search_id,
|
|
payload["id"] if payload else candidate_account_id or None,
|
|
payload["profile_url"] if payload else candidate_profile_url,
|
|
heuristic_score,
|
|
score,
|
|
rationale,
|
|
_safe_json_dumps(dimensions),
|
|
_safe_json_dumps(raw_output),
|
|
index,
|
|
now()
|
|
)
|
|
)
|
|
saved_candidates.append({
|
|
"id": candidate_id,
|
|
"candidate_account_id": payload["id"] if payload else candidate_account_id,
|
|
"candidate_profile_url": payload["profile_url"] if payload else candidate_profile_url,
|
|
"candidate_nickname": payload["nickname"] if payload else "",
|
|
"heuristic_score": heuristic_score,
|
|
"agent_score": score,
|
|
"rationale_text": rationale,
|
|
"dimensions": dimensions,
|
|
"rank_index": index
|
|
})
|
|
|
|
return {
|
|
"search_id": search_id,
|
|
"source_account": source_payload,
|
|
"model_profile": {
|
|
"id": profile["id"],
|
|
"label": _build_model_label(profile)
|
|
},
|
|
"raw_model_output": output,
|
|
"candidates": saved_candidates
|
|
}
|
|
|
|
@app.get("/v2/douyin/accounts")
|
|
def list_douyin_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
|
|
rows = legacy.db.fetch_all(
|
|
"""
|
|
SELECT *
|
|
FROM douyin_accounts
|
|
WHERE user_id = ?
|
|
ORDER BY updated_at DESC
|
|
""",
|
|
(account["id"],)
|
|
)
|
|
return [_build_account_payload(row) for row in rows]
|
|
|
|
@app.post("/v2/douyin/accounts/sync")
|
|
async def sync_douyin_account(
|
|
request: DouyinAccountSyncRequest,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
if (
|
|
not request.profile_url.strip()
|
|
and not request.manual_profile_payload
|
|
and not request.manual_creator_pages
|
|
):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="profile_url、manual_profile_payload 或 manual_creator_pages 至少需要传一个"
|
|
)
|
|
public_data = await _collect_public_profile(request.profile_url, request.manual_profile_payload)
|
|
creator_data = await _collect_creator_center_pages(
|
|
request.creator_center_urls,
|
|
request.session_cookie,
|
|
request.manual_creator_pages
|
|
)
|
|
if not public_data["profile"].get("nickname") and not public_data["videos"] and not creator_data["pages"]:
|
|
raise HTTPException(status_code=400, detail="No Douyin profile or creator-center data could be extracted")
|
|
account_row = _upsert_account(account, public_data["profile"], request, public_data, creator_data)
|
|
workspace = _build_workspace_payload(account_row)
|
|
workspace["sync_errors"] = public_data["errors"] + creator_data["errors"]
|
|
return workspace
|
|
|
|
@app.get("/v2/douyin/accounts/{account_id}")
|
|
def get_douyin_account(
|
|
account_id: str,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
return _build_workspace_payload(account_row)
|
|
|
|
@app.get("/v2/douyin/accounts/{account_id}/snapshots")
|
|
def list_douyin_account_snapshots(
|
|
account_id: str,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> list[dict[str, Any]]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
return _list_snapshots(account_row["id"])
|
|
|
|
@app.get("/v2/douyin/accounts/{account_id}/snapshots/{snapshot_id}")
|
|
def get_douyin_account_snapshot(
|
|
account_id: str,
|
|
snapshot_id: str,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
return _get_snapshot_detail(snapshot_id, account_row["id"])
|
|
|
|
@app.get("/v2/douyin/accounts/{account_id}/creator-fields")
|
|
def get_douyin_creator_fields(
|
|
account_id: str,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
latest_creator_snapshot = legacy.db.fetch_one(
|
|
"""
|
|
SELECT id
|
|
FROM douyin_account_snapshots
|
|
WHERE account_id = ? AND snapshot_type = 'creator_center'
|
|
ORDER BY collected_at DESC
|
|
LIMIT 1
|
|
""",
|
|
(account_row["id"],)
|
|
)
|
|
if not latest_creator_snapshot:
|
|
raise HTTPException(status_code=404, detail="No creator-center snapshot found")
|
|
return _get_snapshot_detail(latest_creator_snapshot["id"], account_row["id"])
|
|
|
|
@app.get("/v2/douyin/accounts/{account_id}/workspace")
|
|
def get_douyin_account_workspace(
|
|
account_id: str,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
return _build_workspace_payload(account_row)
|
|
|
|
@app.get("/v2/douyin/accounts/{account_id}/analysis-reports")
|
|
def list_douyin_analysis_reports(
|
|
account_id: str,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> list[dict[str, Any]]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
return _build_workspace_payload(account_row)["recent_reports"]
|
|
|
|
@app.post("/v2/douyin/accounts/{account_id}/analysis")
|
|
async def analyze_douyin_account(
|
|
account_id: str,
|
|
request: DouyinAccountAnalysisRequest,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
return await _run_account_analysis(account_row, account, request)
|
|
|
|
@app.post("/v2/douyin/similar-searches")
|
|
async def create_douyin_similarity_search(
|
|
request: DouyinSimilarSearchRequest,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
return await _run_similarity_search(account, request)
|
|
|
|
@app.get("/v2/douyin/similar-searches/{search_id}")
|
|
def get_douyin_similarity_search(
|
|
search_id: str,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
search_row = legacy.db.fetch_one(
|
|
"SELECT * FROM douyin_similarity_searches WHERE id = ? AND user_id = ?",
|
|
(search_id, account["id"])
|
|
)
|
|
if not search_row:
|
|
raise HTTPException(status_code=404, detail="Similarity search not found")
|
|
candidates = legacy.db.fetch_all(
|
|
"""
|
|
SELECT cand.*, acct.nickname AS candidate_nickname
|
|
FROM douyin_similarity_candidates cand
|
|
LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
|
|
WHERE cand.search_id = ?
|
|
ORDER BY cand.rank_index ASC
|
|
""",
|
|
(search_id,)
|
|
)
|
|
return {
|
|
"id": search_row["id"],
|
|
"source_account_id": search_row["source_account_id"],
|
|
"source_profile_url": search_row["source_profile_url"],
|
|
"keywords": _safe_json_loads(search_row["keywords_json"], []),
|
|
"context": _safe_json_loads(search_row["context_json"], {}),
|
|
"created_at": search_row["created_at"],
|
|
"candidates": [
|
|
{
|
|
"id": row["id"],
|
|
"candidate_account_id": row["candidate_account_id"],
|
|
"candidate_profile_url": row["candidate_profile_url"],
|
|
"candidate_nickname": row.get("candidate_nickname", ""),
|
|
"heuristic_score": row["heuristic_score"],
|
|
"agent_score": row["agent_score"],
|
|
"rationale_text": row["rationale_text"],
|
|
"dimensions": _safe_json_loads(row["dimensions_json"], {}),
|
|
"rank_index": row["rank_index"]
|
|
}
|
|
for row in candidates
|
|
]
|
|
}
|
|
|
|
@app.get("/v2/douyin/accounts/{account_id}/benchmark-links")
|
|
def list_douyin_benchmark_links(
|
|
account_id: str,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> list[dict[str, Any]]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
return _list_linked_accounts(account_row)
|
|
|
|
@app.post("/v2/douyin/accounts/{account_id}/benchmark-links")
|
|
def create_douyin_benchmark_links(
|
|
account_id: str,
|
|
request: DouyinBenchmarkLinkRequest,
|
|
account: dict[str, Any] = Depends(legacy.require_approved)
|
|
) -> dict[str, Any]:
|
|
account_row = _require_owned_account(account_id, account["id"])
|
|
linked_ids: list[str] = []
|
|
for target_account_id in request.target_account_ids:
|
|
target_row = _require_owned_account(target_account_id, account["id"])
|
|
relation_id = make_id("dyrel")
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_account_relations (
|
|
id, user_id, source_account_id, target_account_id, target_profile_url,
|
|
relation_type, note, search_id, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
relation_id,
|
|
account["id"],
|
|
account_row["id"],
|
|
target_row["id"],
|
|
target_row["canonical_profile_url"] or target_row["profile_url"],
|
|
request.relation_type,
|
|
request.note,
|
|
request.search_id,
|
|
now()
|
|
)
|
|
)
|
|
linked_ids.append(relation_id)
|
|
|
|
for target_profile_url in _dedupe_strings(request.target_profile_urls):
|
|
relation_id = make_id("dyrel")
|
|
legacy.db.execute(
|
|
"""
|
|
INSERT INTO douyin_account_relations (
|
|
id, user_id, source_account_id, target_account_id, target_profile_url,
|
|
relation_type, note, search_id, created_at
|
|
) VALUES (?, ?, ?, NULL, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
relation_id,
|
|
account["id"],
|
|
account_row["id"],
|
|
target_profile_url,
|
|
request.relation_type,
|
|
request.note,
|
|
request.search_id,
|
|
now()
|
|
)
|
|
)
|
|
linked_ids.append(relation_id)
|
|
|
|
return {
|
|
"saved": len(linked_ids),
|
|
"relation_ids": linked_ids,
|
|
"links": _list_linked_accounts(account_row)
|
|
}
|