diff --git a/.env.example b/.env.example
index b5c1100..e362ca4 100644
--- a/.env.example
+++ b/.env.example
@@ -2,7 +2,10 @@ DEFAULT_EXTERNAL_BASE_URL=http://test.hyzq.net:8081
LOCAL_OPENAI_BASE_URL=http://127.0.0.1:8317/v1
LOCAL_OPENAI_MODEL=GLM-5
LOCAL_OPENAI_API_KEY=
+# Host-side collector runs can keep using N8N_BASE_URL.
N8N_BASE_URL=http://127.0.0.1:5670
+# Dockerized collector should use the internal n8n service address.
+COLLECTOR_N8N_BASE_URL=http://n8n:5678
N8N_ANALYSIS_WEBHOOK_PATH=/webhook/storyforge-analysis
N8N_REAL_CUT_WEBHOOK_PATH=/webhook/storyforge-real-cut
N8N_AI_VIDEO_WEBHOOK_PATH=/webhook/storyforge-ai-video
diff --git a/README.md b/README.md
index 5346ef0..f8976a0 100644
--- a/README.md
+++ b/README.md
@@ -37,6 +37,18 @@ cp .env.example .env
docker compose up -d --build
```
+如果 `collector` 跑在 Docker 里,建议保留:
+
+```bash
+COLLECTOR_N8N_BASE_URL=http://n8n:5678
+```
+
+如果你单独在宿主机启动 `collector-service`,它读取的仍然是:
+
+```bash
+N8N_BASE_URL=http://127.0.0.1:5670
+```
+
默认会启动:
- `collector-service`:`http://127.0.0.1:8081`
diff --git a/collector-service/app/douyin_features.py b/collector-service/app/douyin_features.py
new file mode 100644
index 0000000..ccc248e
--- /dev/null
+++ b/collector-service/app/douyin_features.py
@@ -0,0 +1,1980 @@
+from __future__ import annotations
+
+import asyncio
+import json
+import re
+from collections import Counter
+from datetime import datetime, timezone
+from html import unescape
+from typing import Any, Iterable
+from urllib.parse import quote, unquote
+
+import httpx
+from fastapi import Depends, HTTPException
+from pydantic import BaseModel, Field
+
+DEFAULT_CREATOR_CENTER_URLS = [
+ "https://creator.douyin.com/creator-micro/home",
+ "https://creator.douyin.com/creator-micro/data",
+ "https://creator.douyin.com/creator-micro/content/manage"
+]
+DEFAULT_TIMEOUT = 20.0
+MAX_HTML_SEARCH_BYTES = 2_000_000
+DEFAULT_USER_AGENT = (
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
+)
+
+
+class ManualPageCapture(BaseModel):
+ url: str = ""
+ title: str = ""
+ payload: dict[str, Any] = Field(default_factory=dict)
+
+
+class DouyinAccountSyncRequest(BaseModel):
+ profile_url: str = ""
+ session_cookie: str = ""
+ creator_center_urls: list[str] = Field(default_factory=lambda: list(DEFAULT_CREATOR_CENTER_URLS))
+ manual_profile_payload: dict[str, Any] | None = None
+ manual_creator_pages: list[ManualPageCapture] = Field(default_factory=list)
+ manual_work_payloads: list[dict[str, Any]] = Field(default_factory=list)
+ discovery_note: str = ""
+
+
+class DouyinAccountAnalysisRequest(BaseModel):
+ model_profile_ids: list[str] = Field(default_factory=list)
+ linked_account_ids: list[str] = Field(default_factory=list)
+ include_linked_accounts: bool = True
+ include_recent_similar_candidates: bool = True
+ max_videos: int = 12
+ extra_focus: str = ""
+ temperature: float = 0.35
+
+
+class DouyinSimilarSearchRequest(BaseModel):
+ source_account_id: str | None = None
+ profile_url: str | None = None
+ candidate_urls: list[str] = Field(default_factory=list)
+ seed_linked_accounts: bool = True
+ search_public_pages: bool = True
+ model_profile_id: str | None = None
+ max_candidates: int = 10
+ extra_requirements: str = ""
+
+
+class DouyinBenchmarkLinkRequest(BaseModel):
+ target_account_ids: list[str] = Field(default_factory=list)
+ target_profile_urls: list[str] = Field(default_factory=list)
+ relation_type: str = "benchmark"
+ note: str = ""
+ search_id: str = ""
+
+
+def _safe_json_dumps(value: Any) -> str:
+ return json.dumps(value, ensure_ascii=False, separators=(",", ":"))
+
+
+def _safe_json_loads(value: str | None, fallback: Any) -> Any:
+ if not value:
+ return fallback
+ try:
+ return json.loads(value)
+ except Exception:
+ return fallback
+
+
+def _first_non_empty(*values: Any) -> str:
+ for value in values:
+ if value is None:
+ continue
+ if isinstance(value, str):
+ stripped = value.strip()
+ if stripped:
+ return stripped
+ elif value not in ("", [], {}, ()):
+ return str(value)
+ return ""
+
+
+def _dedupe_strings(values: Iterable[str]) -> list[str]:
+ result: list[str] = []
+ seen: set[str] = set()
+ for value in values:
+ item = value.strip()
+ if not item:
+ continue
+ key = item.lower()
+ if key in seen:
+ continue
+ seen.add(key)
+ result.append(item)
+ return result
+
+
+def _compact_text(value: Any, limit: int = 500) -> str:
+ text = str(value or "").strip()
+ if len(text) <= limit:
+ return text
+ return f"{text[: limit - 1]}…"
+
+
+def _parse_count(value: Any) -> float:
+ if value is None:
+ return 0.0
+ if isinstance(value, (int, float)):
+ return float(value)
+ text = str(value).strip().lower().replace(",", "")
+ if not text:
+ return 0.0
+
+ multiplier = 1.0
+ if text.endswith("w") or text.endswith("万"):
+ multiplier = 10_000.0
+ text = text[:-1]
+ elif text.endswith("亿"):
+ multiplier = 100_000_000.0
+ text = text[:-1]
+
+ text = text.replace("+", "")
+ match = re.search(r"-?\d+(?:\.\d+)?", text)
+ if not match:
+ return 0.0
+ try:
+ return float(match.group()) * multiplier
+ except ValueError:
+ return 0.0
+
+
+def _normalize_timestamp(value: Any) -> str | None:
+ if value in (None, "", 0, "0"):
+ return None
+ if isinstance(value, str):
+ stripped = value.strip()
+ if not stripped:
+ return None
+ if re.match(r"^\d{4}-\d{2}-\d{2}T", stripped):
+ return stripped
+ if stripped.isdigit():
+ value = int(stripped)
+ else:
+ return stripped
+ if isinstance(value, (int, float)):
+ ts = float(value)
+ if ts > 10_000_000_000:
+ ts /= 1000.0
+ try:
+ return datetime.fromtimestamp(ts, tz=timezone.utc).replace(microsecond=0).isoformat()
+ except Exception:
+ return None
+ return None
+
+
+def _extract_hashtags(*texts: str) -> list[str]:
+ tags: list[str] = []
+ for text in texts:
+ if not text:
+ continue
+ tags.extend(match.group(1) for match in re.finditer(r"#([\w\u4e00-\u9fff]+)", text))
+ return _dedupe_strings(tags)
+
+
+def _extract_keywords(*texts: str) -> list[str]:
+ candidates: list[str] = []
+ for text in texts:
+ if not text:
+ continue
+ candidates.extend(_extract_hashtags(text))
+ candidates.extend(re.findall(r"[\u4e00-\u9fff]{2,8}", text))
+ candidates.extend(re.findall(r"[A-Za-z][A-Za-z0-9_]{2,20}", text))
+ stop_words = {
+ "视频",
+ "作品",
+ "抖音",
+ "账号",
+ "内容",
+ "发布",
+ "更多",
+ "关注",
+ "用户",
+ "douyin",
+ "profile"
+ }
+ filtered = [item for item in candidates if item.lower() not in stop_words]
+ return _dedupe_strings(filtered)
+
+
+def _flatten_json(value: Any, prefix: str = "") -> list[tuple[str, str, str]]:
+ rows: list[tuple[str, str, str]] = []
+ if isinstance(value, dict):
+ for key, child in value.items():
+ next_prefix = f"{prefix}.{key}" if prefix else str(key)
+ rows.extend(_flatten_json(child, next_prefix))
+ elif isinstance(value, list):
+ for index, child in enumerate(value):
+ next_prefix = f"{prefix}[{index}]"
+ rows.extend(_flatten_json(child, next_prefix))
+ else:
+ field_type = type(value).__name__
+ rows.append((prefix or "$", field_type, _compact_text(value, 2000)))
+ return rows
+
+
+def _walk_json(value: Any) -> Iterable[dict[str, Any]]:
+ if isinstance(value, dict):
+ yield value
+ for child in value.values():
+ yield from _walk_json(child)
+ elif isinstance(value, list):
+ for child in value:
+ yield from _walk_json(child)
+
+
+def _extract_json_objects_from_text(text: str) -> list[Any]:
+ decoder = json.JSONDecoder()
+ objects: list[Any] = []
+ seen: set[str] = set()
+ if not text:
+ return objects
+
+ candidates = [text, unquote(text), unescape(text), unescape(unquote(text))]
+ for candidate in candidates:
+ snippet = candidate[:MAX_HTML_SEARCH_BYTES]
+ for match in re.finditer(r"[\{\[]", snippet):
+ try:
+ obj, _ = decoder.raw_decode(snippet[match.start() :])
+ except Exception:
+ continue
+ marker = _safe_json_dumps(obj)
+ if marker in seen:
+ continue
+ seen.add(marker)
+ objects.append(obj)
+ if len(objects) >= 50:
+ return objects
+ return objects
+
+
+def _extract_json_blobs_from_html(html: str) -> list[dict[str, Any]]:
+ blobs: list[dict[str, Any]] = []
+ seen: set[str] = set()
+ for attrs, content in re.findall(r"", html, re.IGNORECASE | re.DOTALL):
+ script_id_match = re.search(r'id=["\']([^"\']+)["\']', attrs, re.IGNORECASE)
+ script_id = script_id_match.group(1) if script_id_match else ""
+ for obj in _extract_json_objects_from_text(content.strip()):
+ marker = _safe_json_dumps(obj)
+ if marker in seen:
+ continue
+ seen.add(marker)
+ blobs.append({
+ "script_id": script_id,
+ "payload": obj
+ })
+ return blobs
+
+
+def _profile_candidate_score(value: dict[str, Any]) -> int:
+ score = 0
+ interesting_keys = {
+ "nickname",
+ "signature",
+ "sec_uid",
+ "secUid",
+ "uid",
+ "unique_id",
+ "short_id",
+ "aweme_count",
+ "following_count",
+ "follower_count",
+ "total_favorited"
+ }
+ score += sum(1 for key in interesting_keys if key in value)
+ if "author" in value and isinstance(value["author"], dict):
+ score += 2
+ return score
+
+
+def _video_candidate_score(value: dict[str, Any]) -> int:
+ score = 0
+ if "statistics" in value and isinstance(value["statistics"], dict):
+ score += 3
+ if "aweme_id" in value or "item_id" in value:
+ score += 2
+ if "desc" in value or "title" in value:
+ score += 1
+ return score
+
+
+def _extract_profile_candidates(payload: Any) -> list[dict[str, Any]]:
+ candidates: list[dict[str, Any]] = []
+ for item in _walk_json(payload):
+ if _profile_candidate_score(item) >= 3:
+ candidates.append(item)
+ if "author" in item and isinstance(item["author"], dict) and _profile_candidate_score(item["author"]) >= 3:
+ candidates.append(item["author"])
+ return candidates
+
+
+def _extract_video_candidates(payload: Any) -> list[dict[str, Any]]:
+ candidates: list[dict[str, Any]] = []
+ for item in _walk_json(payload):
+ if _video_candidate_score(item) >= 3:
+ candidates.append(item)
+ return candidates
+
+
+def _normalize_profile_candidate(candidate: dict[str, Any], fallback_url: str = "") -> dict[str, Any]:
+ stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
+ avatar = candidate.get("avatar_medium") or candidate.get("avatar_thumb") or candidate.get("avatar_url")
+ if isinstance(avatar, dict):
+ avatar = _first_non_empty(
+ avatar.get("url_list", [""])[0] if isinstance(avatar.get("url_list"), list) else "",
+ avatar.get("url")
+ )
+
+ signature = _first_non_empty(
+ candidate.get("signature"),
+ candidate.get("desc"),
+ candidate.get("bio"),
+ candidate.get("description")
+ )
+ nickname = _first_non_empty(candidate.get("nickname"), candidate.get("name"), candidate.get("author_name"))
+ canonical_url = _first_non_empty(
+ candidate.get("share_url"),
+ candidate.get("profile_url"),
+ fallback_url
+ )
+ return {
+ "nickname": nickname,
+ "signature": signature,
+ "profile_url": canonical_url,
+ "canonical_profile_url": canonical_url,
+ "sec_uid": _first_non_empty(candidate.get("sec_uid"), candidate.get("secUid")),
+ "douyin_uid": _first_non_empty(candidate.get("uid")),
+ "douyin_id": _first_non_empty(candidate.get("unique_id"), candidate.get("short_id"), candidate.get("douyin_id")),
+ "avatar_url": _first_non_empty(avatar),
+ "stats": {
+ "followers": _parse_count(candidate.get("follower_count") or stats_source.get("follower_count")),
+ "following": _parse_count(candidate.get("following_count") or stats_source.get("following_count")),
+ "likes": _parse_count(candidate.get("total_favorited") or stats_source.get("total_favorited")),
+ "videos": _parse_count(candidate.get("aweme_count") or stats_source.get("aweme_count"))
+ },
+ "tags": _dedupe_strings(
+ _extract_hashtags(signature, nickname)
+ + [str(tag) for tag in candidate.get("tags", []) if isinstance(tag, (str, int, float))]
+ ),
+ "raw": candidate
+ }
+
+
+def _pick_best_profile(candidates: list[dict[str, Any]], fallback_url: str = "") -> dict[str, Any]:
+ best: dict[str, Any] | None = None
+ best_score = -1
+ for candidate in candidates:
+ normalized = _normalize_profile_candidate(candidate, fallback_url=fallback_url)
+ score = 0
+ score += 4 if normalized["nickname"] else 0
+ score += 3 if normalized["sec_uid"] else 0
+ score += 2 if normalized["signature"] else 0
+ score += 1 if normalized["stats"]["followers"] else 0
+ if score > best_score:
+ best = normalized
+ best_score = score
+ return best or _normalize_profile_candidate({}, fallback_url=fallback_url)
+
+
+def _normalize_video_candidate(candidate: dict[str, Any]) -> dict[str, Any]:
+ stats_source = candidate.get("statistics") if isinstance(candidate.get("statistics"), dict) else {}
+ video_source = candidate.get("video") if isinstance(candidate.get("video"), dict) else {}
+ title = _first_non_empty(candidate.get("title"), candidate.get("desc"), candidate.get("share_title"))
+ description = _first_non_empty(candidate.get("desc"), candidate.get("title"), candidate.get("text"))
+ cover = candidate.get("cover") or video_source.get("cover")
+ if isinstance(cover, dict):
+ cover = _first_non_empty(
+ cover.get("url_list", [""])[0] if isinstance(cover.get("url_list"), list) else "",
+ cover.get("url")
+ )
+ return {
+ "aweme_id": _first_non_empty(candidate.get("aweme_id"), candidate.get("item_id"), candidate.get("group_id")),
+ "title": title,
+ "description": description,
+ "share_url": _first_non_empty(candidate.get("share_url")),
+ "cover_url": _first_non_empty(cover),
+ "duration_sec": float(candidate.get("duration") or video_source.get("duration") or 0) / 1000.0
+ if float(candidate.get("duration") or video_source.get("duration") or 0) > 1000
+ else float(candidate.get("duration") or video_source.get("duration") or 0),
+ "published_at": _normalize_timestamp(candidate.get("create_time") or candidate.get("publish_time")),
+ "tags": _extract_hashtags(title, description),
+ "stats": {
+ "play": _parse_count(stats_source.get("play_count") or candidate.get("play_count")),
+ "like": _parse_count(stats_source.get("digg_count") or candidate.get("digg_count")),
+ "comment": _parse_count(stats_source.get("comment_count") or candidate.get("comment_count")),
+ "share": _parse_count(stats_source.get("share_count") or candidate.get("share_count")),
+ "collect": _parse_count(stats_source.get("collect_count") or candidate.get("collect_count"))
+ },
+ "raw": candidate
+ }
+
+
+def _extract_videos(payloads: Iterable[Any]) -> list[dict[str, Any]]:
+ videos: list[dict[str, Any]] = []
+ seen: set[str] = set()
+ for payload in payloads:
+ for candidate in _extract_video_candidates(payload):
+ normalized = _normalize_video_candidate(candidate)
+ dedupe_key = normalized["aweme_id"] or normalized["share_url"] or normalized["title"]
+ if not dedupe_key or dedupe_key in seen:
+ continue
+ seen.add(dedupe_key)
+ videos.append(normalized)
+ videos.sort(
+ key=lambda item: (
+ item["stats"]["play"] + item["stats"]["like"] + item["stats"]["comment"] * 4 + item["stats"]["share"] * 6
+ ),
+ reverse=True
+ )
+ return videos
+
+
+async def _fetch_html(url: str, cookie: str = "") -> tuple[str, str]:
+ headers = {
+ "User-Agent": DEFAULT_USER_AGENT,
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
+ }
+ if cookie.strip():
+ headers["Cookie"] = cookie.strip()
+ async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT, follow_redirects=True) as client:
+ response = await client.get(url, headers=headers)
+ response.raise_for_status()
+ return str(response.url), response.text
+
+
+async def _discover_profile_urls_from_search(keywords: list[str], limit: int = 8) -> list[str]:
+ urls: list[str] = []
+ seen: set[str] = set()
+ for keyword in keywords[:3]:
+ search_url = f"https://www.douyin.com/search/{quote(keyword)}?type=user"
+ try:
+ _, html = await _fetch_html(search_url)
+ except Exception:
+ continue
+ for match in re.findall(r'href=["\']([^"\']+/user/[^"\']+)["\']', html):
+ if match.startswith("/"):
+ match = f"https://www.douyin.com{match}"
+ cleaned = match.split("?")[0]
+ if cleaned in seen:
+ continue
+ seen.add(cleaned)
+ urls.append(cleaned)
+ if len(urls) >= limit:
+ return urls
+ return urls
+
+
+def _summarize_videos(videos: list[dict[str, Any]], limit: int = 8) -> dict[str, Any]:
+ selected = videos[:limit]
+ if not selected:
+ return {
+ "count": 0,
+ "top_tags": [],
+ "avg_play": 0.0,
+ "avg_like": 0.0,
+ "avg_comment": 0.0,
+ "avg_share": 0.0,
+ "videos": []
+ }
+ count = len(selected)
+ avg_play = sum(item["stats"]["play"] for item in selected) / count
+ avg_like = sum(item["stats"]["like"] for item in selected) / count
+ avg_comment = sum(item["stats"]["comment"] for item in selected) / count
+ avg_share = sum(item["stats"]["share"] for item in selected) / count
+ tag_counter = Counter(tag for item in selected for tag in item.get("tags", []))
+ return {
+ "count": len(videos),
+ "top_tags": [tag for tag, _ in tag_counter.most_common(8)],
+ "avg_play": round(avg_play, 2),
+ "avg_like": round(avg_like, 2),
+ "avg_comment": round(avg_comment, 2),
+ "avg_share": round(avg_share, 2),
+ "videos": [
+ {
+ "aweme_id": item["aweme_id"],
+ "title": _compact_text(item["title"], 120),
+ "description": _compact_text(item["description"], 180),
+ "tags": item["tags"][:6],
+ "published_at": item["published_at"],
+ "stats": item["stats"]
+ }
+ for item in selected
+ ]
+ }
+
+
+def _jaccard(left: Iterable[str], right: Iterable[str]) -> float:
+ left_set = {item.strip().lower() for item in left if item.strip()}
+ right_set = {item.strip().lower() for item in right if item.strip()}
+ if not left_set and not right_set:
+ return 0.0
+ intersection = len(left_set & right_set)
+ union = len(left_set | right_set)
+ return intersection / union if union else 0.0
+
+
+def _quality_score(account_payload: dict[str, Any]) -> float:
+ stats = account_payload.get("profile_stats", {})
+ followers = float(stats.get("followers") or 0)
+ video_summary = account_payload.get("video_summary", {})
+ avg_play = float(video_summary.get("avg_play") or 0)
+ avg_like = float(video_summary.get("avg_like") or 0)
+ avg_comment = float(video_summary.get("avg_comment") or 0)
+ avg_share = float(video_summary.get("avg_share") or 0)
+ base = followers / 10_000.0
+ engagement = avg_like / 1000.0 + avg_comment / 300.0 + avg_share / 200.0 + avg_play / 5000.0
+ return round(base + engagement, 3)
+
+
+def _heuristic_similarity(source_payload: dict[str, Any], candidate_payload: dict[str, Any]) -> dict[str, Any]:
+ source_keywords = source_payload.get("keywords", [])
+ candidate_keywords = candidate_payload.get("keywords", [])
+ topic_overlap = _jaccard(source_keywords, candidate_keywords)
+ tag_overlap = _jaccard(
+ source_payload.get("video_summary", {}).get("top_tags", []),
+ candidate_payload.get("video_summary", {}).get("top_tags", [])
+ )
+ source_signature = source_payload.get("signature", "")
+ candidate_signature = candidate_payload.get("signature", "")
+ signature_overlap = _jaccard(_extract_keywords(source_signature), _extract_keywords(candidate_signature))
+ quality = _quality_score(candidate_payload)
+ score = round(topic_overlap * 55 + tag_overlap * 20 + signature_overlap * 10 + min(quality, 15), 2)
+ return {
+ "topic_overlap": round(topic_overlap, 3),
+ "tag_overlap": round(tag_overlap, 3),
+ "signature_overlap": round(signature_overlap, 3),
+ "quality_score": quality,
+ "heuristic_score": score
+ }
+
+
+def _build_model_label(profile: dict[str, Any]) -> str:
+ return _first_non_empty(profile.get("name"), profile.get("model_name"), profile.get("base_url"))
+
+
+def _try_parse_agent_json(text: str) -> Any:
+ stripped = text.strip()
+ if not stripped:
+ return {}
+ try:
+ return json.loads(stripped)
+ except Exception:
+ pass
+ objects = _extract_json_objects_from_text(stripped)
+ return objects[0] if objects else {}
+
+
+def register_douyin_routes(app: Any, legacy: Any) -> None:
+ def now() -> str:
+ return legacy.utc_now()
+
+ def make_id(prefix: str) -> str:
+ return legacy.make_id(prefix)
+
+ def ensure_schema() -> None:
+ schema = """
+ CREATE TABLE IF NOT EXISTS douyin_accounts (
+ id TEXT PRIMARY KEY,
+ user_id TEXT NOT NULL,
+ profile_url TEXT NOT NULL DEFAULT '',
+ canonical_profile_url TEXT NOT NULL DEFAULT '',
+ sec_uid TEXT NOT NULL DEFAULT '',
+ douyin_uid TEXT NOT NULL DEFAULT '',
+ douyin_id TEXT NOT NULL DEFAULT '',
+ nickname TEXT NOT NULL DEFAULT '',
+ signature TEXT NOT NULL DEFAULT '',
+ avatar_url TEXT NOT NULL DEFAULT '',
+ tags_json TEXT NOT NULL DEFAULT '[]',
+ profile_stats_json TEXT NOT NULL DEFAULT '{}',
+ raw_profile_json TEXT NOT NULL DEFAULT '{}',
+ source_mode TEXT NOT NULL DEFAULT 'public',
+ sync_status TEXT NOT NULL DEFAULT 'pending',
+ last_public_sync_at TEXT,
+ last_creator_sync_at TEXT,
+ last_analysis_at TEXT,
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL,
+ FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_updated
+ ON douyin_accounts(user_id, updated_at DESC);
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_accounts_user_sec_uid
+ ON douyin_accounts(user_id, sec_uid);
+
+ CREATE TABLE IF NOT EXISTS douyin_account_snapshots (
+ id TEXT PRIMARY KEY,
+ account_id TEXT NOT NULL,
+ snapshot_type TEXT NOT NULL,
+ source_url TEXT NOT NULL DEFAULT '',
+ raw_payload_json TEXT NOT NULL DEFAULT '{}',
+ summary_json TEXT NOT NULL DEFAULT '{}',
+ field_count INTEGER NOT NULL DEFAULT 0,
+ collected_at TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_snapshots_account_collected
+ ON douyin_account_snapshots(account_id, collected_at DESC);
+
+ CREATE TABLE IF NOT EXISTS douyin_snapshot_fields (
+ snapshot_id TEXT NOT NULL,
+ field_path TEXT NOT NULL,
+ field_type TEXT NOT NULL DEFAULT 'string',
+ field_value_text TEXT NOT NULL DEFAULT '',
+ PRIMARY KEY(snapshot_id, field_path),
+ FOREIGN KEY(snapshot_id) REFERENCES douyin_account_snapshots(id) ON DELETE CASCADE
+ );
+
+ CREATE TABLE IF NOT EXISTS douyin_videos (
+ id TEXT PRIMARY KEY,
+ account_id TEXT NOT NULL,
+ aweme_id TEXT NOT NULL DEFAULT '',
+ title TEXT NOT NULL DEFAULT '',
+ description TEXT NOT NULL DEFAULT '',
+ share_url TEXT NOT NULL DEFAULT '',
+ cover_url TEXT NOT NULL DEFAULT '',
+ duration_sec REAL NOT NULL DEFAULT 0,
+ published_at TEXT,
+ tags_json TEXT NOT NULL DEFAULT '[]',
+ stats_json TEXT NOT NULL DEFAULT '{}',
+ raw_json TEXT NOT NULL DEFAULT '{}',
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL,
+ FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_updated
+ ON douyin_videos(account_id, updated_at DESC);
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_videos_account_aweme
+ ON douyin_videos(account_id, aweme_id);
+
+ CREATE TABLE IF NOT EXISTS douyin_analysis_reports (
+ id TEXT PRIMARY KEY,
+ account_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ focus_text TEXT NOT NULL DEFAULT '',
+ model_profile_ids_json TEXT NOT NULL DEFAULT '[]',
+ linked_account_ids_json TEXT NOT NULL DEFAULT '[]',
+ prompt_text TEXT NOT NULL DEFAULT '',
+ context_json TEXT NOT NULL DEFAULT '{}',
+ created_at TEXT NOT NULL,
+ FOREIGN KEY(account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
+ FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_analysis_reports_account_created
+ ON douyin_analysis_reports(account_id, created_at DESC);
+
+ CREATE TABLE IF NOT EXISTS douyin_analysis_suggestions (
+ id TEXT PRIMARY KEY,
+ report_id TEXT NOT NULL,
+ model_profile_id TEXT NOT NULL DEFAULT '',
+ model_label TEXT NOT NULL DEFAULT '',
+ status TEXT NOT NULL DEFAULT 'ok',
+ suggestion_text TEXT NOT NULL DEFAULT '',
+ parsed_json TEXT NOT NULL DEFAULT '{}',
+ created_at TEXT NOT NULL,
+ FOREIGN KEY(report_id) REFERENCES douyin_analysis_reports(id) ON DELETE CASCADE
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_analysis_suggestions_report
+ ON douyin_analysis_suggestions(report_id, created_at ASC);
+
+ CREATE TABLE IF NOT EXISTS douyin_similarity_searches (
+ id TEXT PRIMARY KEY,
+ user_id TEXT NOT NULL,
+ source_account_id TEXT,
+ source_profile_url TEXT NOT NULL DEFAULT '',
+ keywords_json TEXT NOT NULL DEFAULT '[]',
+ prompt_text TEXT NOT NULL DEFAULT '',
+ context_json TEXT NOT NULL DEFAULT '{}',
+ created_at TEXT NOT NULL,
+ FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
+ FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_similarity_searches_user_created
+ ON douyin_similarity_searches(user_id, created_at DESC);
+
+ CREATE TABLE IF NOT EXISTS douyin_similarity_candidates (
+ id TEXT PRIMARY KEY,
+ search_id TEXT NOT NULL,
+ candidate_account_id TEXT,
+ candidate_profile_url TEXT NOT NULL DEFAULT '',
+ heuristic_score REAL NOT NULL DEFAULT 0,
+ agent_score REAL NOT NULL DEFAULT 0,
+ rationale_text TEXT NOT NULL DEFAULT '',
+ dimensions_json TEXT NOT NULL DEFAULT '{}',
+ raw_output_json TEXT NOT NULL DEFAULT '{}',
+ rank_index INTEGER NOT NULL DEFAULT 0,
+ created_at TEXT NOT NULL,
+ FOREIGN KEY(search_id) REFERENCES douyin_similarity_searches(id) ON DELETE CASCADE,
+ FOREIGN KEY(candidate_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_similarity_candidates_search_rank
+ ON douyin_similarity_candidates(search_id, rank_index ASC);
+
+ CREATE TABLE IF NOT EXISTS douyin_account_relations (
+ id TEXT PRIMARY KEY,
+ user_id TEXT NOT NULL,
+ source_account_id TEXT NOT NULL,
+ target_account_id TEXT,
+ target_profile_url TEXT NOT NULL DEFAULT '',
+ relation_type TEXT NOT NULL DEFAULT 'benchmark',
+ note TEXT NOT NULL DEFAULT '',
+ search_id TEXT NOT NULL DEFAULT '',
+ created_at TEXT NOT NULL,
+ FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
+ FOREIGN KEY(source_account_id) REFERENCES douyin_accounts(id) ON DELETE CASCADE,
+ FOREIGN KEY(target_account_id) REFERENCES douyin_accounts(id) ON DELETE SET NULL
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_douyin_account_relations_source
+ ON douyin_account_relations(source_account_id, created_at DESC);
+ """
+ with legacy.db.session() as conn:
+ conn.executescript(schema)
+
+ ensure_schema()
+
+ @app.on_event("startup")
+ def _startup_douyin_schema() -> None:
+ ensure_schema()
+
+ def _require_owned_account(account_id: str, user_id: str) -> dict[str, Any]:
+ row = legacy.db.fetch_one(
+ "SELECT * FROM douyin_accounts WHERE id = ? AND user_id = ?",
+ (account_id, user_id)
+ )
+ if not row:
+ raise HTTPException(status_code=404, detail="Douyin account not found")
+ return row
+
+ def _fetch_model_profiles(account_id: str) -> list[dict[str, Any]]:
+ return legacy.db.fetch_all(
+ """
+ SELECT *
+ FROM model_profiles
+ WHERE owner_account_id IS NULL OR owner_account_id = ?
+ ORDER BY is_default DESC, created_at ASC
+ """,
+ (account_id,)
+ )
+
+ def _resolve_model_profiles(account: dict[str, Any], requested_ids: list[str]) -> list[dict[str, Any]]:
+ profiles = _fetch_model_profiles(account["id"])
+ if not profiles:
+ raise HTTPException(status_code=400, detail="No available model profiles")
+ if not requested_ids:
+ return profiles
+ profile_map = {row["id"]: row for row in profiles}
+ missing = [profile_id for profile_id in requested_ids if profile_id not in profile_map]
+ if missing:
+ raise HTTPException(status_code=404, detail=f"Unknown model profiles: {', '.join(missing)}")
+ return [profile_map[profile_id] for profile_id in requested_ids]
+
+ async def _collect_public_profile(profile_url: str, manual_payload: dict[str, Any] | None) -> dict[str, Any]:
+ source_url = profile_url.strip()
+ blobs: list[dict[str, Any]] = []
+ errors: list[str] = []
+
+ if manual_payload:
+ blobs.append({"script_id": "manual_profile_payload", "payload": manual_payload})
+
+ if source_url:
+ try:
+ final_url, html = await _fetch_html(source_url)
+ source_url = final_url
+ blobs.extend(_extract_json_blobs_from_html(html))
+ except Exception as exc:
+ errors.append(f"public_profile_fetch_failed: {exc}")
+
+ payloads = [item["payload"] for item in blobs]
+ profile = _pick_best_profile(
+ [candidate for payload in payloads for candidate in _extract_profile_candidates(payload)],
+ fallback_url=source_url
+ )
+ videos = _extract_videos(payloads)
+ return {
+ "profile": profile,
+ "videos": videos,
+ "raw_pages": blobs,
+ "errors": errors,
+ "source_url": source_url
+ }
+
+ async def _collect_creator_center_pages(
+ urls: list[str],
+ cookie: str,
+ manual_pages: list[ManualPageCapture]
+ ) -> dict[str, Any]:
+ pages: list[dict[str, Any]] = []
+ errors: list[str] = []
+
+ for page in manual_pages:
+ pages.append({
+ "url": page.url,
+ "title": page.title,
+ "blobs": [{"script_id": "manual_creator_payload", "payload": page.payload}]
+ })
+
+ if cookie.strip():
+ for url in urls:
+ try:
+ final_url, html = await _fetch_html(url, cookie=cookie)
+ pages.append({
+ "url": final_url,
+ "title": "",
+ "blobs": _extract_json_blobs_from_html(html)
+ })
+ except Exception as exc:
+ errors.append(f"creator_center_fetch_failed[{url}]: {exc}")
+
+ return {"pages": pages, "errors": errors}
+
+ def _upsert_account(
+ owner: dict[str, Any],
+ profile: dict[str, Any],
+ sync_request: DouyinAccountSyncRequest,
+ public_data: dict[str, Any],
+ creator_data: dict[str, Any]
+ ) -> dict[str, Any]:
+ lookup_candidates = [
+ ("sec_uid", profile.get("sec_uid", "")),
+ ("douyin_id", profile.get("douyin_id", "")),
+ ("canonical_profile_url", profile.get("canonical_profile_url", ""))
+ ]
+ existing: dict[str, Any] | None = None
+ for field_name, field_value in lookup_candidates:
+ if not field_value:
+ continue
+ existing = legacy.db.fetch_one(
+ f"SELECT * FROM douyin_accounts WHERE user_id = ? AND {field_name} = ? LIMIT 1",
+ (owner["id"], field_value)
+ )
+ if existing:
+ break
+
+ account_id = existing["id"] if existing else make_id("dyacct")
+ created_at = existing["created_at"] if existing else now()
+ updated_at = now()
+
+ tags = _dedupe_strings(profile.get("tags", []) + _extract_keywords(profile.get("nickname", ""), profile.get("signature", "")))
+ profile_stats = profile.get("stats", {})
+ source_mode = "creator_center" if creator_data["pages"] else "public"
+ sync_status = "partial" if public_data["errors"] or creator_data["errors"] else "ready"
+
+ if existing:
+ legacy.db.execute(
+ """
+ UPDATE douyin_accounts
+ SET profile_url = ?, canonical_profile_url = ?, sec_uid = ?, douyin_uid = ?, douyin_id = ?,
+ nickname = ?, signature = ?, avatar_url = ?, tags_json = ?, profile_stats_json = ?,
+ raw_profile_json = ?, source_mode = ?, sync_status = ?, last_public_sync_at = ?,
+ last_creator_sync_at = ?, updated_at = ?
+ WHERE id = ?
+ """,
+ (
+ profile.get("profile_url", ""),
+ profile.get("canonical_profile_url", ""),
+ profile.get("sec_uid", ""),
+ profile.get("douyin_uid", ""),
+ profile.get("douyin_id", ""),
+ profile.get("nickname", ""),
+ profile.get("signature", ""),
+ profile.get("avatar_url", ""),
+ _safe_json_dumps(tags),
+ _safe_json_dumps(profile_stats),
+ _safe_json_dumps({
+ "profile": profile.get("raw", {}),
+ "discovery_note": sync_request.discovery_note
+ }),
+ source_mode,
+ sync_status,
+ now() if public_data["raw_pages"] else existing.get("last_public_sync_at"),
+ now() if creator_data["pages"] else existing.get("last_creator_sync_at"),
+ updated_at,
+ account_id
+ )
+ )
+ else:
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_accounts (
+ id, user_id, profile_url, canonical_profile_url, sec_uid, douyin_uid, douyin_id,
+ nickname, signature, avatar_url, tags_json, profile_stats_json, raw_profile_json,
+ source_mode, sync_status, last_public_sync_at, last_creator_sync_at, created_at, updated_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ account_id,
+ owner["id"],
+ profile.get("profile_url", ""),
+ profile.get("canonical_profile_url", ""),
+ profile.get("sec_uid", ""),
+ profile.get("douyin_uid", ""),
+ profile.get("douyin_id", ""),
+ profile.get("nickname", ""),
+ profile.get("signature", ""),
+ profile.get("avatar_url", ""),
+ _safe_json_dumps(tags),
+ _safe_json_dumps(profile_stats),
+ _safe_json_dumps({
+ "profile": profile.get("raw", {}),
+ "discovery_note": sync_request.discovery_note
+ }),
+ source_mode,
+ sync_status,
+ now() if public_data["raw_pages"] else None,
+ now() if creator_data["pages"] else None,
+ created_at,
+ updated_at
+ )
+ )
+
+ account_row = _require_owned_account(account_id, owner["id"])
+ _persist_snapshots_and_videos(account_row, public_data, creator_data, sync_request)
+ return _require_owned_account(account_id, owner["id"])
+
+ def _persist_snapshot(
+ account_row: dict[str, Any],
+ snapshot_type: str,
+ source_url: str,
+ payload: Any,
+ summary: dict[str, Any]
+ ) -> str:
+ snapshot_id = make_id("dysnap")
+ collected_at = now()
+ fields = _flatten_json(payload)
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_account_snapshots (
+ id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
+ field_count, collected_at, created_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ snapshot_id,
+ account_row["id"],
+ snapshot_type,
+ source_url,
+ _safe_json_dumps(payload),
+ _safe_json_dumps(summary),
+ len(fields),
+ collected_at,
+ collected_at
+ )
+ )
+ for field_path, field_type, field_value in fields:
+ legacy.db.execute(
+ """
+ INSERT OR REPLACE INTO douyin_snapshot_fields (
+ snapshot_id, field_path, field_type, field_value_text
+ ) VALUES (?, ?, ?, ?)
+ """,
+ (snapshot_id, field_path, field_type, field_value)
+ )
+ return snapshot_id
+
+ def _persist_snapshots_and_videos(
+ account_row: dict[str, Any],
+ public_data: dict[str, Any],
+ creator_data: dict[str, Any],
+ sync_request: DouyinAccountSyncRequest
+ ) -> None:
+ if public_data["raw_pages"]:
+ public_payload = {
+ "pages": public_data["raw_pages"],
+ "errors": public_data["errors"],
+ "source_url": public_data["source_url"]
+ }
+ _persist_snapshot(
+ account_row,
+ "public_profile",
+ public_data["source_url"],
+ public_payload,
+ {
+ "video_count": len(public_data["videos"]),
+ "nickname": public_data["profile"].get("nickname", ""),
+ "tags": public_data["profile"].get("tags", [])
+ }
+ )
+
+ for page in creator_data["pages"]:
+ payload = {
+ "title": page["title"],
+ "blobs": page["blobs"]
+ }
+ _persist_snapshot(
+ account_row,
+ "creator_center",
+ page["url"],
+ payload,
+ {
+ "blob_count": len(page["blobs"]),
+ "field_count": len(_flatten_json(payload))
+ }
+ )
+
+ for manual_video in sync_request.manual_work_payloads:
+ normalized = _normalize_video_candidate(manual_video)
+ public_data["videos"].append(normalized)
+
+ deduped: dict[str, dict[str, Any]] = {}
+ for video in public_data["videos"]:
+ key = video["aweme_id"] or video["share_url"] or video["title"]
+ if key and key not in deduped:
+ deduped[key] = video
+
+ for video in deduped.values():
+ existing = None
+ if video["aweme_id"]:
+ existing = legacy.db.fetch_one(
+ "SELECT id FROM douyin_videos WHERE account_id = ? AND aweme_id = ? LIMIT 1",
+ (account_row["id"], video["aweme_id"])
+ )
+ video_id = existing["id"] if existing else make_id("dyvideo")
+ created_at = now()
+ if existing:
+ legacy.db.execute(
+ """
+ UPDATE douyin_videos
+ SET title = ?, description = ?, share_url = ?, cover_url = ?, duration_sec = ?,
+ published_at = ?, tags_json = ?, stats_json = ?, raw_json = ?, updated_at = ?
+ WHERE id = ?
+ """,
+ (
+ video["title"],
+ video["description"],
+ video["share_url"],
+ video["cover_url"],
+ video["duration_sec"],
+ video["published_at"],
+ _safe_json_dumps(video["tags"]),
+ _safe_json_dumps(video["stats"]),
+ _safe_json_dumps(video["raw"]),
+ now(),
+ video_id
+ )
+ )
+ else:
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_videos (
+ id, account_id, aweme_id, title, description, share_url, cover_url,
+ duration_sec, published_at, tags_json, stats_json, raw_json, created_at, updated_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ video_id,
+ account_row["id"],
+ video["aweme_id"],
+ video["title"],
+ video["description"],
+ video["share_url"],
+ video["cover_url"],
+ video["duration_sec"],
+ video["published_at"],
+ _safe_json_dumps(video["tags"]),
+ _safe_json_dumps(video["stats"]),
+ _safe_json_dumps(video["raw"]),
+ created_at,
+ created_at
+ )
+ )
+
+ def _list_videos(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
+ rows = legacy.db.fetch_all(
+ """
+ SELECT *
+ FROM douyin_videos
+ WHERE account_id = ?
+ ORDER BY COALESCE(published_at, updated_at) DESC, updated_at DESC
+ LIMIT ?
+ """,
+ (account_id, limit)
+ )
+ payloads: list[dict[str, Any]] = []
+ for row in rows:
+ payloads.append({
+ "id": row["id"],
+ "aweme_id": row["aweme_id"],
+ "title": row["title"],
+ "description": row["description"],
+ "share_url": row["share_url"],
+ "cover_url": row["cover_url"],
+ "duration_sec": row["duration_sec"],
+ "published_at": row["published_at"],
+ "tags": _safe_json_loads(row["tags_json"], []),
+ "stats": _safe_json_loads(row["stats_json"], {}),
+ "raw": _safe_json_loads(row["raw_json"], {})
+ })
+ return payloads
+
+ def _build_account_payload(account_row: dict[str, Any], include_recent_videos: int = 8) -> dict[str, Any]:
+ videos = _list_videos(account_row["id"], limit=max(include_recent_videos, 12))
+ tags = _safe_json_loads(account_row["tags_json"], [])
+ profile_stats = _safe_json_loads(account_row["profile_stats_json"], {})
+ video_summary = _summarize_videos(videos, limit=include_recent_videos)
+ keywords = _dedupe_strings(
+ tags
+ + _extract_keywords(account_row["nickname"], account_row["signature"])
+ + video_summary["top_tags"]
+ + [video["title"] for video in video_summary["videos"]]
+ )
+ return {
+ "id": account_row["id"],
+ "nickname": account_row["nickname"],
+ "signature": account_row["signature"],
+ "profile_url": account_row["canonical_profile_url"] or account_row["profile_url"],
+ "avatar_url": account_row["avatar_url"],
+ "sec_uid": account_row["sec_uid"],
+ "douyin_id": account_row["douyin_id"],
+ "profile_stats": profile_stats,
+ "tags": tags,
+ "keywords": keywords[:18],
+ "sync_status": account_row["sync_status"],
+ "video_summary": video_summary
+ }
+
+ def _list_linked_accounts(account_row: dict[str, Any]) -> list[dict[str, Any]]:
+ relation_rows = legacy.db.fetch_all(
+ """
+ SELECT rel.*, target.nickname AS target_nickname, target.signature AS target_signature,
+ target.canonical_profile_url AS target_canonical_profile_url, target.profile_stats_json AS target_profile_stats_json,
+ target.tags_json AS target_tags_json
+ FROM douyin_account_relations rel
+ LEFT JOIN douyin_accounts target ON target.id = rel.target_account_id
+ WHERE rel.source_account_id = ?
+ ORDER BY rel.created_at DESC
+ """,
+ (account_row["id"],)
+ )
+ payloads: list[dict[str, Any]] = []
+ for row in relation_rows:
+ payloads.append({
+ "relation_id": row["id"],
+ "relation_type": row["relation_type"],
+ "note": row["note"],
+ "search_id": row["search_id"],
+ "created_at": row["created_at"],
+ "target_account_id": row["target_account_id"],
+ "target_profile_url": row["target_profile_url"] or row.get("target_canonical_profile_url", ""),
+ "target_nickname": row.get("target_nickname", ""),
+ "target_signature": row.get("target_signature", ""),
+ "target_profile_stats": _safe_json_loads(row.get("target_profile_stats_json"), {}),
+ "target_tags": _safe_json_loads(row.get("target_tags_json"), [])
+ })
+ return payloads
+
+ def _build_workspace_payload(account_row: dict[str, Any]) -> dict[str, Any]:
+ account_payload = _build_account_payload(account_row)
+ latest_public_snapshot = legacy.db.fetch_one(
+ """
+ SELECT *
+ FROM douyin_account_snapshots
+ WHERE account_id = ? AND snapshot_type = 'public_profile'
+ ORDER BY collected_at DESC
+ LIMIT 1
+ """,
+ (account_row["id"],)
+ )
+ latest_creator_snapshot = legacy.db.fetch_one(
+ """
+ SELECT *
+ FROM douyin_account_snapshots
+ WHERE account_id = ? AND snapshot_type = 'creator_center'
+ ORDER BY collected_at DESC
+ LIMIT 1
+ """,
+ (account_row["id"],)
+ )
+ reports = legacy.db.fetch_all(
+ """
+ SELECT *
+ FROM douyin_analysis_reports
+ WHERE account_id = ?
+ ORDER BY created_at DESC
+ LIMIT 5
+ """,
+ (account_row["id"],)
+ )
+ report_payloads = []
+ for report in reports:
+ suggestions = legacy.db.fetch_all(
+ "SELECT * FROM douyin_analysis_suggestions WHERE report_id = ? ORDER BY created_at ASC",
+ (report["id"],)
+ )
+ report_payloads.append({
+ "id": report["id"],
+ "focus_text": report["focus_text"],
+ "model_profile_ids": _safe_json_loads(report["model_profile_ids_json"], []),
+ "linked_account_ids": _safe_json_loads(report["linked_account_ids_json"], []),
+ "created_at": report["created_at"],
+ "suggestions": [
+ {
+ "id": suggestion["id"],
+ "model_profile_id": suggestion["model_profile_id"],
+ "model_label": suggestion["model_label"],
+ "status": suggestion["status"],
+ "suggestion_text": suggestion["suggestion_text"],
+ "parsed_json": _safe_json_loads(suggestion["parsed_json"], {})
+ }
+ for suggestion in suggestions
+ ]
+ })
+ recent_searches = legacy.db.fetch_all(
+ """
+ SELECT *
+ FROM douyin_similarity_searches
+ WHERE source_account_id = ?
+ ORDER BY created_at DESC
+ LIMIT 5
+ """,
+ (account_row["id"],)
+ )
+ return {
+ "account": account_payload,
+ "latest_public_snapshot": {
+ "id": latest_public_snapshot["id"],
+ "source_url": latest_public_snapshot["source_url"],
+ "field_count": latest_public_snapshot["field_count"],
+ "collected_at": latest_public_snapshot["collected_at"],
+ "summary": _safe_json_loads(latest_public_snapshot["summary_json"], {})
+ } if latest_public_snapshot else None,
+ "latest_creator_snapshot": {
+ "id": latest_creator_snapshot["id"],
+ "source_url": latest_creator_snapshot["source_url"],
+ "field_count": latest_creator_snapshot["field_count"],
+ "collected_at": latest_creator_snapshot["collected_at"],
+ "summary": _safe_json_loads(latest_creator_snapshot["summary_json"], {})
+ } if latest_creator_snapshot else None,
+ "linked_accounts": _list_linked_accounts(account_row),
+ "recent_reports": report_payloads,
+ "recent_similarity_searches": [
+ {
+ "id": row["id"],
+ "keywords": _safe_json_loads(row["keywords_json"], []),
+ "created_at": row["created_at"]
+ }
+ for row in recent_searches
+ ],
+ "available_model_profiles": [
+ {
+ "id": row["id"],
+ "name": row["name"],
+ "model_name": row["model_name"],
+ "base_url": row["base_url"],
+ "is_default": bool(row["is_default"])
+ }
+ for row in _fetch_model_profiles(account_row["user_id"])
+ ]
+ }
+
+ def _list_snapshots(account_id: str, limit: int = 20) -> list[dict[str, Any]]:
+ rows = legacy.db.fetch_all(
+ """
+ SELECT *
+ FROM douyin_account_snapshots
+ WHERE account_id = ?
+ ORDER BY collected_at DESC
+ LIMIT ?
+ """,
+ (account_id, limit)
+ )
+ return [
+ {
+ "id": row["id"],
+ "snapshot_type": row["snapshot_type"],
+ "source_url": row["source_url"],
+ "field_count": row["field_count"],
+ "collected_at": row["collected_at"],
+ "summary": _safe_json_loads(row["summary_json"], {})
+ }
+ for row in rows
+ ]
+
+ def _get_snapshot_detail(snapshot_id: str, account_id: str) -> dict[str, Any]:
+ row = legacy.db.fetch_one(
+ """
+ SELECT *
+ FROM douyin_account_snapshots
+ WHERE id = ? AND account_id = ?
+ LIMIT 1
+ """,
+ (snapshot_id, account_id)
+ )
+ if not row:
+ raise HTTPException(status_code=404, detail="Snapshot not found")
+ fields = legacy.db.fetch_all(
+ """
+ SELECT field_path, field_type, field_value_text
+ FROM douyin_snapshot_fields
+ WHERE snapshot_id = ?
+ ORDER BY field_path ASC
+ """,
+ (snapshot_id,)
+ )
+ return {
+ "id": row["id"],
+ "snapshot_type": row["snapshot_type"],
+ "source_url": row["source_url"],
+ "field_count": row["field_count"],
+ "collected_at": row["collected_at"],
+ "summary": _safe_json_loads(row["summary_json"], {}),
+ "raw_payload": _safe_json_loads(row["raw_payload_json"], {}),
+ "fields": fields
+ }
+
+ async def _run_account_analysis(
+ account_row: dict[str, Any],
+ owner: dict[str, Any],
+ request: DouyinAccountAnalysisRequest
+ ) -> dict[str, Any]:
+ target_payload = _build_account_payload(account_row, include_recent_videos=max(4, min(request.max_videos, 12)))
+ linked_rows = _list_linked_accounts(account_row)
+ linked_account_ids = list(request.linked_account_ids)
+ if request.include_linked_accounts:
+ linked_account_ids.extend(
+ row["target_account_id"] for row in linked_rows if row.get("target_account_id")
+ )
+ linked_account_ids = _dedupe_strings(linked_account_ids)
+ benchmark_payloads: list[dict[str, Any]] = []
+ for linked_account_id in linked_account_ids:
+ linked_row = _require_owned_account(linked_account_id, owner["id"])
+ benchmark_payloads.append(_build_account_payload(linked_row, include_recent_videos=6))
+
+ if request.include_recent_similar_candidates and not benchmark_payloads:
+ latest_search = legacy.db.fetch_one(
+ """
+ SELECT *
+ FROM douyin_similarity_searches
+ WHERE source_account_id = ?
+ ORDER BY created_at DESC
+ LIMIT 1
+ """,
+ (account_row["id"],)
+ )
+ if latest_search:
+ candidate_rows = legacy.db.fetch_all(
+ """
+ SELECT cand.*, acct.user_id AS account_user_id
+ FROM douyin_similarity_candidates cand
+ LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
+ WHERE cand.search_id = ?
+ ORDER BY cand.rank_index ASC
+ LIMIT 3
+ """,
+ (latest_search["id"],)
+ )
+ for candidate_row in candidate_rows:
+ candidate_account_id = candidate_row.get("candidate_account_id")
+ if not candidate_account_id:
+ continue
+ linked_candidate = _require_owned_account(candidate_account_id, owner["id"])
+ benchmark_payloads.append(_build_account_payload(linked_candidate, include_recent_videos=6))
+
+ profiles = _resolve_model_profiles(owner, request.model_profile_ids)
+ system_prompt = (
+ "你是资深抖音增长顾问。你会基于账号画像、创作者中心字段、作品表现和对标账号内容,"
+ "给出可执行的优化建议。请始终返回 JSON 对象,包含这些字段:"
+ "summary、strengths、weaknesses、benchmark_insights、content_plan、"
+ "growth_actions、deep_search_hypotheses。每个数组字段请给出 3-6 条中文建议。"
+ )
+ analysis_context = {
+ "target_account": target_payload,
+ "benchmark_accounts": benchmark_payloads[:6],
+ "focus": request.extra_focus,
+ "creator_center_snapshot_summary": _safe_json_loads(
+ (legacy.db.fetch_one(
+ """
+ SELECT summary_json
+ FROM douyin_account_snapshots
+ WHERE account_id = ? AND snapshot_type = 'creator_center'
+ ORDER BY collected_at DESC
+ LIMIT 1
+ """,
+ (account_row["id"],)
+ ) or {}).get("summary_json"),
+ {}
+ )
+ }
+ user_prompt = (
+ "请分析以下抖音账号,并分别给出内容方向、选题结构、互动增长、账号定位和对标拆解建议。"
+ "如果提供了对标账号,要重点指出可借鉴但不应直接照搬的部分。"
+ f"\n\n输入上下文:\n{json.dumps(analysis_context, ensure_ascii=False, indent=2)}"
+ )
+
+ report_id = make_id("dyreport")
+ created_at = now()
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_analysis_reports (
+ id, account_id, user_id, focus_text, model_profile_ids_json,
+ linked_account_ids_json, prompt_text, context_json, created_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ report_id,
+ account_row["id"],
+ owner["id"],
+ request.extra_focus,
+ _safe_json_dumps([profile["id"] for profile in profiles]),
+ _safe_json_dumps(linked_account_ids),
+ user_prompt,
+ _safe_json_dumps(analysis_context),
+ created_at
+ )
+ )
+
+ async def _analyze_with_model(profile: dict[str, Any]) -> dict[str, Any]:
+ try:
+ output = await legacy.call_model(
+ profile,
+ system_prompt=system_prompt,
+ user_prompt=user_prompt,
+ temperature=request.temperature
+ )
+ parsed = _try_parse_agent_json(output)
+ status = "ok"
+ except Exception as exc:
+ output = str(exc)
+ parsed = {}
+ status = "error"
+ suggestion_id = make_id("dysady")
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_analysis_suggestions (
+ id, report_id, model_profile_id, model_label, status,
+ suggestion_text, parsed_json, created_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ suggestion_id,
+ report_id,
+ profile["id"],
+ _build_model_label(profile),
+ status,
+ output,
+ _safe_json_dumps(parsed),
+ now()
+ )
+ )
+ return {
+ "id": suggestion_id,
+ "model_profile_id": profile["id"],
+ "model_label": _build_model_label(profile),
+ "status": status,
+ "suggestion_text": output,
+ "parsed_json": parsed
+ }
+
+ suggestions = await asyncio.gather(*[_analyze_with_model(profile) for profile in profiles])
+ legacy.db.execute(
+ "UPDATE douyin_accounts SET last_analysis_at = ?, updated_at = ? WHERE id = ?",
+ (now(), now(), account_row["id"])
+ )
+ return {
+ "report_id": report_id,
+ "created_at": created_at,
+ "context": analysis_context,
+ "suggestions": suggestions
+ }
+
+ async def _prepare_similarity_source(
+ owner: dict[str, Any],
+ request: DouyinSimilarSearchRequest
+ ) -> tuple[dict[str, Any] | None, dict[str, Any]]:
+ if request.source_account_id:
+ account_row = _require_owned_account(request.source_account_id, owner["id"])
+ return account_row, _build_account_payload(account_row)
+
+ if not (request.profile_url or "").strip():
+ raise HTTPException(status_code=400, detail="source_account_id or profile_url is required")
+
+ public_data = await _collect_public_profile(request.profile_url or "", None)
+ if not public_data["profile"].get("nickname") and not public_data["videos"]:
+ raise HTTPException(status_code=400, detail="Unable to parse the shared Douyin profile page")
+ payload = {
+ "id": "",
+ "nickname": public_data["profile"].get("nickname", ""),
+ "signature": public_data["profile"].get("signature", ""),
+ "profile_url": public_data["profile"].get("canonical_profile_url", "") or request.profile_url,
+ "avatar_url": public_data["profile"].get("avatar_url", ""),
+ "sec_uid": public_data["profile"].get("sec_uid", ""),
+ "douyin_id": public_data["profile"].get("douyin_id", ""),
+ "profile_stats": public_data["profile"].get("stats", {}),
+ "tags": public_data["profile"].get("tags", []),
+ "video_summary": _summarize_videos(public_data["videos"], limit=6)
+ }
+ payload["keywords"] = _dedupe_strings(
+ payload["tags"] + _extract_keywords(payload["nickname"], payload["signature"])
+ + payload["video_summary"]["top_tags"]
+ + [video["title"] for video in payload["video_summary"]["videos"]]
+ )
+ return None, payload
+
+ async def _fetch_or_create_candidate(owner: dict[str, Any], profile_url: str) -> dict[str, Any] | None:
+ existing = legacy.db.fetch_one(
+ """
+ SELECT *
+ FROM douyin_accounts
+ WHERE user_id = ? AND (canonical_profile_url = ? OR profile_url = ?)
+ LIMIT 1
+ """,
+ (owner["id"], profile_url, profile_url)
+ )
+ if existing:
+ return existing
+
+ public_data = await _collect_public_profile(profile_url, None)
+ profile = public_data["profile"]
+ if not (profile.get("nickname") or public_data["videos"]):
+ return None
+ sync_request = DouyinAccountSyncRequest(
+ profile_url=profile_url,
+ manual_work_payloads=[video["raw"] for video in public_data["videos"]]
+ )
+ account_row = _upsert_account(owner, profile, sync_request, public_data, {"pages": [], "errors": []})
+ return account_row
+
+ async def _run_similarity_search(owner: dict[str, Any], request: DouyinSimilarSearchRequest) -> dict[str, Any]:
+ source_account_row, source_payload = await _prepare_similarity_source(owner, request)
+ profile = legacy.model_profile_for_account(owner["id"], request.model_profile_id)
+ existing_accounts = legacy.db.fetch_all(
+ """
+ SELECT *
+ FROM douyin_accounts
+ WHERE user_id = ?
+ ORDER BY updated_at DESC
+ """,
+ (owner["id"],)
+ )
+
+ candidate_rows: list[dict[str, Any]] = []
+ seen_urls: set[str] = set()
+ source_id = source_account_row["id"] if source_account_row else ""
+ for row in existing_accounts:
+ if row["id"] == source_id:
+ continue
+ candidate_rows.append(row)
+ seen_urls.add(row["canonical_profile_url"] or row["profile_url"])
+
+ if request.seed_linked_accounts and source_account_row:
+ for linked in _list_linked_accounts(source_account_row):
+ candidate_url = linked.get("target_profile_url", "")
+ if not candidate_url or candidate_url in seen_urls:
+ continue
+ seen_urls.add(candidate_url)
+ if linked.get("target_account_id"):
+ candidate_rows.append(_require_owned_account(linked["target_account_id"], owner["id"]))
+
+ candidate_urls = _dedupe_strings(request.candidate_urls)
+ if request.search_public_pages:
+ discovered = await _discover_profile_urls_from_search(source_payload.get("keywords", []), limit=6)
+ candidate_urls.extend(discovered)
+ candidate_urls = _dedupe_strings(candidate_urls)
+
+ for candidate_url in candidate_urls:
+ if candidate_url in seen_urls or candidate_url == source_payload.get("profile_url"):
+ continue
+ candidate_row = await _fetch_or_create_candidate(owner, candidate_url)
+ if candidate_row:
+ candidate_rows.append(candidate_row)
+ seen_urls.add(candidate_url)
+
+ candidate_payloads: list[dict[str, Any]] = []
+ seen_account_ids: set[str] = set()
+ for row in candidate_rows:
+ if row["id"] in seen_account_ids:
+ continue
+ seen_account_ids.add(row["id"])
+ payload = _build_account_payload(row, include_recent_videos=6)
+ payload["heuristics"] = _heuristic_similarity(source_payload, payload)
+ candidate_payloads.append(payload)
+
+ candidate_payloads.sort(key=lambda item: item["heuristics"]["heuristic_score"], reverse=True)
+ candidate_payloads = candidate_payloads[: max(3, request.max_candidates)]
+
+ search_id = make_id("dysearch")
+ prompt_context = {
+ "source_account": source_payload,
+ "candidate_accounts": candidate_payloads,
+ "extra_requirements": request.extra_requirements
+ }
+ prompt = (
+ "请从候选账号中筛选与目标账号内容风格、题材、受众和互动逻辑最相似,且整体质量更高的账号。"
+ "请返回 JSON 数组,每项包含 candidate_account_id、candidate_profile_url、score、"
+ "rationale、similar_dimensions、optimization_value。score 范围 0-100。"
+ f"\n\n上下文:\n{json.dumps(prompt_context, ensure_ascii=False, indent=2)}"
+ )
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_similarity_searches (
+ id, user_id, source_account_id, source_profile_url, keywords_json,
+ prompt_text, context_json, created_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ search_id,
+ owner["id"],
+ source_account_row["id"] if source_account_row else None,
+ source_payload.get("profile_url", ""),
+ _safe_json_dumps(source_payload.get("keywords", [])),
+ prompt,
+ _safe_json_dumps(prompt_context),
+ now()
+ )
+ )
+
+ if not candidate_payloads:
+ return {
+ "search_id": search_id,
+ "source_account": source_payload,
+ "model_profile": {
+ "id": profile["id"],
+ "label": _build_model_label(profile)
+ },
+ "raw_model_output": "No candidate accounts available. Sync more Douyin accounts or provide candidate_urls.",
+ "candidates": []
+ }
+
+ system_prompt = (
+ "你是抖音相似账号发现专家。你要根据内容主题、标签、风格、更新频率、互动表现和商业化潜力,"
+ "挑选最值得对标的账号。返回严格 JSON 数组。"
+ )
+ try:
+ output = await legacy.call_model(profile, system_prompt=system_prompt, user_prompt=prompt, temperature=0.2)
+ parsed = _try_parse_agent_json(output)
+ except Exception as exc:
+ output = str(exc)
+ parsed = []
+
+ candidate_map = {
+ payload["id"]: payload for payload in candidate_payloads if payload["id"]
+ }
+ if isinstance(parsed, dict):
+ parsed = parsed.get("items") or parsed.get("candidates") or []
+
+ saved_candidates: list[dict[str, Any]] = []
+ if not isinstance(parsed, list) or not parsed:
+ parsed = [
+ {
+ "candidate_account_id": payload["id"],
+ "candidate_profile_url": payload["profile_url"],
+ "score": payload["heuristics"]["heuristic_score"],
+ "rationale": "Fallback to heuristic similarity because model output was unavailable or unparsable.",
+ "similar_dimensions": [
+ {
+ "topic_overlap": payload["heuristics"]["topic_overlap"],
+ "tag_overlap": payload["heuristics"]["tag_overlap"],
+ "quality_score": payload["heuristics"]["quality_score"]
+ }
+ ],
+ "optimization_value": "可作为候选对标账号进一步人工确认。"
+ }
+ for payload in candidate_payloads
+ ]
+
+ for index, item in enumerate(parsed, start=1):
+ candidate_account_id = _first_non_empty(item.get("candidate_account_id"))
+ candidate_profile_url = _first_non_empty(item.get("candidate_profile_url"))
+ payload = candidate_map.get(candidate_account_id)
+ if not payload:
+ payload = next(
+ (candidate for candidate in candidate_payloads if candidate["profile_url"] == candidate_profile_url),
+ None
+ )
+ candidate_id = make_id("dycand")
+ heuristic_score = payload["heuristics"]["heuristic_score"] if payload else 0
+ score = _parse_count(item.get("score"))
+ rationale = _first_non_empty(item.get("rationale"), item.get("reason"), item.get("summary"))
+ dimensions = item.get("similar_dimensions") or item.get("dimensions") or {}
+ raw_output = {
+ "model_output": item,
+ "candidate_payload": payload or {}
+ }
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_similarity_candidates (
+ id, search_id, candidate_account_id, candidate_profile_url, heuristic_score,
+ agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ candidate_id,
+ search_id,
+ payload["id"] if payload else candidate_account_id or None,
+ payload["profile_url"] if payload else candidate_profile_url,
+ heuristic_score,
+ score,
+ rationale,
+ _safe_json_dumps(dimensions),
+ _safe_json_dumps(raw_output),
+ index,
+ now()
+ )
+ )
+ saved_candidates.append({
+ "id": candidate_id,
+ "candidate_account_id": payload["id"] if payload else candidate_account_id,
+ "candidate_profile_url": payload["profile_url"] if payload else candidate_profile_url,
+ "candidate_nickname": payload["nickname"] if payload else "",
+ "heuristic_score": heuristic_score,
+ "agent_score": score,
+ "rationale_text": rationale,
+ "dimensions": dimensions,
+ "rank_index": index
+ })
+
+ return {
+ "search_id": search_id,
+ "source_account": source_payload,
+ "model_profile": {
+ "id": profile["id"],
+ "label": _build_model_label(profile)
+ },
+ "raw_model_output": output,
+ "candidates": saved_candidates
+ }
+
+ @app.get("/v2/douyin/accounts")
+ def list_douyin_accounts(account: dict[str, Any] = Depends(legacy.require_approved)) -> list[dict[str, Any]]:
+ rows = legacy.db.fetch_all(
+ """
+ SELECT *
+ FROM douyin_accounts
+ WHERE user_id = ?
+ ORDER BY updated_at DESC
+ """,
+ (account["id"],)
+ )
+ return [_build_account_payload(row) for row in rows]
+
+ @app.post("/v2/douyin/accounts/sync")
+ async def sync_douyin_account(
+ request: DouyinAccountSyncRequest,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ if (
+ not request.profile_url.strip()
+ and not request.manual_profile_payload
+ and not request.manual_creator_pages
+ ):
+ raise HTTPException(
+ status_code=400,
+ detail="profile_url、manual_profile_payload 或 manual_creator_pages 至少需要传一个"
+ )
+ public_data = await _collect_public_profile(request.profile_url, request.manual_profile_payload)
+ creator_data = await _collect_creator_center_pages(
+ request.creator_center_urls,
+ request.session_cookie,
+ request.manual_creator_pages
+ )
+ if not public_data["profile"].get("nickname") and not public_data["videos"] and not creator_data["pages"]:
+ raise HTTPException(status_code=400, detail="No Douyin profile or creator-center data could be extracted")
+ account_row = _upsert_account(account, public_data["profile"], request, public_data, creator_data)
+ workspace = _build_workspace_payload(account_row)
+ workspace["sync_errors"] = public_data["errors"] + creator_data["errors"]
+ return workspace
+
+ @app.get("/v2/douyin/accounts/{account_id}")
+ def get_douyin_account(
+ account_id: str,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ account_row = _require_owned_account(account_id, account["id"])
+ return _build_workspace_payload(account_row)
+
+ @app.get("/v2/douyin/accounts/{account_id}/snapshots")
+ def list_douyin_account_snapshots(
+ account_id: str,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> list[dict[str, Any]]:
+ account_row = _require_owned_account(account_id, account["id"])
+ return _list_snapshots(account_row["id"])
+
+ @app.get("/v2/douyin/accounts/{account_id}/snapshots/{snapshot_id}")
+ def get_douyin_account_snapshot(
+ account_id: str,
+ snapshot_id: str,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ account_row = _require_owned_account(account_id, account["id"])
+ return _get_snapshot_detail(snapshot_id, account_row["id"])
+
+ @app.get("/v2/douyin/accounts/{account_id}/creator-fields")
+ def get_douyin_creator_fields(
+ account_id: str,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ account_row = _require_owned_account(account_id, account["id"])
+ latest_creator_snapshot = legacy.db.fetch_one(
+ """
+ SELECT id
+ FROM douyin_account_snapshots
+ WHERE account_id = ? AND snapshot_type = 'creator_center'
+ ORDER BY collected_at DESC
+ LIMIT 1
+ """,
+ (account_row["id"],)
+ )
+ if not latest_creator_snapshot:
+ raise HTTPException(status_code=404, detail="No creator-center snapshot found")
+ return _get_snapshot_detail(latest_creator_snapshot["id"], account_row["id"])
+
+ @app.get("/v2/douyin/accounts/{account_id}/workspace")
+ def get_douyin_account_workspace(
+ account_id: str,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ account_row = _require_owned_account(account_id, account["id"])
+ return _build_workspace_payload(account_row)
+
+ @app.get("/v2/douyin/accounts/{account_id}/analysis-reports")
+ def list_douyin_analysis_reports(
+ account_id: str,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> list[dict[str, Any]]:
+ account_row = _require_owned_account(account_id, account["id"])
+ return _build_workspace_payload(account_row)["recent_reports"]
+
+ @app.post("/v2/douyin/accounts/{account_id}/analysis")
+ async def analyze_douyin_account(
+ account_id: str,
+ request: DouyinAccountAnalysisRequest,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ account_row = _require_owned_account(account_id, account["id"])
+ return await _run_account_analysis(account_row, account, request)
+
+ @app.post("/v2/douyin/similar-searches")
+ async def create_douyin_similarity_search(
+ request: DouyinSimilarSearchRequest,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ return await _run_similarity_search(account, request)
+
+ @app.get("/v2/douyin/similar-searches/{search_id}")
+ def get_douyin_similarity_search(
+ search_id: str,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ search_row = legacy.db.fetch_one(
+ "SELECT * FROM douyin_similarity_searches WHERE id = ? AND user_id = ?",
+ (search_id, account["id"])
+ )
+ if not search_row:
+ raise HTTPException(status_code=404, detail="Similarity search not found")
+ candidates = legacy.db.fetch_all(
+ """
+ SELECT cand.*, acct.nickname AS candidate_nickname
+ FROM douyin_similarity_candidates cand
+ LEFT JOIN douyin_accounts acct ON acct.id = cand.candidate_account_id
+ WHERE cand.search_id = ?
+ ORDER BY cand.rank_index ASC
+ """,
+ (search_id,)
+ )
+ return {
+ "id": search_row["id"],
+ "source_account_id": search_row["source_account_id"],
+ "source_profile_url": search_row["source_profile_url"],
+ "keywords": _safe_json_loads(search_row["keywords_json"], []),
+ "context": _safe_json_loads(search_row["context_json"], {}),
+ "created_at": search_row["created_at"],
+ "candidates": [
+ {
+ "id": row["id"],
+ "candidate_account_id": row["candidate_account_id"],
+ "candidate_profile_url": row["candidate_profile_url"],
+ "candidate_nickname": row.get("candidate_nickname", ""),
+ "heuristic_score": row["heuristic_score"],
+ "agent_score": row["agent_score"],
+ "rationale_text": row["rationale_text"],
+ "dimensions": _safe_json_loads(row["dimensions_json"], {}),
+ "rank_index": row["rank_index"]
+ }
+ for row in candidates
+ ]
+ }
+
+ @app.get("/v2/douyin/accounts/{account_id}/benchmark-links")
+ def list_douyin_benchmark_links(
+ account_id: str,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> list[dict[str, Any]]:
+ account_row = _require_owned_account(account_id, account["id"])
+ return _list_linked_accounts(account_row)
+
+ @app.post("/v2/douyin/accounts/{account_id}/benchmark-links")
+ def create_douyin_benchmark_links(
+ account_id: str,
+ request: DouyinBenchmarkLinkRequest,
+ account: dict[str, Any] = Depends(legacy.require_approved)
+ ) -> dict[str, Any]:
+ account_row = _require_owned_account(account_id, account["id"])
+ linked_ids: list[str] = []
+ for target_account_id in request.target_account_ids:
+ target_row = _require_owned_account(target_account_id, account["id"])
+ relation_id = make_id("dyrel")
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_account_relations (
+ id, user_id, source_account_id, target_account_id, target_profile_url,
+ relation_type, note, search_id, created_at
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ relation_id,
+ account["id"],
+ account_row["id"],
+ target_row["id"],
+ target_row["canonical_profile_url"] or target_row["profile_url"],
+ request.relation_type,
+ request.note,
+ request.search_id,
+ now()
+ )
+ )
+ linked_ids.append(relation_id)
+
+ for target_profile_url in _dedupe_strings(request.target_profile_urls):
+ relation_id = make_id("dyrel")
+ legacy.db.execute(
+ """
+ INSERT INTO douyin_account_relations (
+ id, user_id, source_account_id, target_account_id, target_profile_url,
+ relation_type, note, search_id, created_at
+ ) VALUES (?, ?, ?, NULL, ?, ?, ?, ?, ?)
+ """,
+ (
+ relation_id,
+ account["id"],
+ account_row["id"],
+ target_profile_url,
+ request.relation_type,
+ request.note,
+ request.search_id,
+ now()
+ )
+ )
+ linked_ids.append(relation_id)
+
+ return {
+ "saved": len(linked_ids),
+ "relation_ids": linked_ids,
+ "links": _list_linked_accounts(account_row)
+ }
diff --git a/collector-service/app/main.py b/collector-service/app/main.py
index 12a779a..5557060 100644
--- a/collector-service/app/main.py
+++ b/collector-service/app/main.py
@@ -7,6 +7,7 @@ import re
import secrets
import shutil
import subprocess
+import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
@@ -18,6 +19,7 @@ from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from .database import Database, utc_now
+from .douyin_features import register_douyin_routes
from .integrations import AsrHttpClient, CutVideoClient, HuobaoDramaClient, N8NClient
from .openai_compat import OpenAICompatClient
@@ -2821,3 +2823,6 @@ def publish_app_update(request: PublishAppUpdateRequest, admin: dict[str, Any] =
(request.platform, request.channel, request.versionCode),
)
return {"saved": True, "action": "published", "updateId": row["id"] if row else 0}
+
+
+register_douyin_routes(app, sys.modules[__name__])
diff --git a/docker-compose.yml b/docker-compose.yml
index 6218333..72cf8b3 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -32,7 +32,7 @@ services:
LOCAL_OPENAI_BASE_URL: ${LOCAL_OPENAI_BASE_URL:-http://host.docker.internal:8317/v1}
LOCAL_OPENAI_MODEL: ${LOCAL_OPENAI_MODEL:-GLM-5}
LOCAL_OPENAI_API_KEY: ${LOCAL_OPENAI_API_KEY:-}
- N8N_BASE_URL: ${N8N_BASE_URL:-http://n8n:5678}
+ N8N_BASE_URL: ${COLLECTOR_N8N_BASE_URL:-http://n8n:5678}
N8N_ANALYSIS_WEBHOOK_PATH: ${N8N_ANALYSIS_WEBHOOK_PATH:-/webhook/storyforge-analysis}
N8N_REAL_CUT_WEBHOOK_PATH: ${N8N_REAL_CUT_WEBHOOK_PATH:-/webhook/storyforge-real-cut}
N8N_AI_VIDEO_WEBHOOK_PATH: ${N8N_AI_VIDEO_WEBHOOK_PATH:-/webhook/storyforge-ai-video}
diff --git a/docs/AUDIT_2026-03-18.md b/docs/AUDIT_2026-03-18.md
index e4c0376..5258a83 100644
--- a/docs/AUDIT_2026-03-18.md
+++ b/docs/AUDIT_2026-03-18.md
@@ -1,6 +1,7 @@
# StoryForge 现状审计
日期:2026-03-18
+更新:2026-03-20
## 结论
@@ -145,10 +146,11 @@
- 内部编排接口
- `docker-compose.yml` 已改为 `collector + n8n + cli-proxy-api`
- `n8n` 工作流导出文件已纳入仓库
+- `collector-service` 的 live 运行态已回归到 `StoryForge-gitea` 自身源码构建,不再依赖 `/Users/kris/code/Fastgpt/collector-service/app` 的临时 bind mount
+- `collector-service` 现已在 live `8081` 提供 `/v2/douyin/*` 接口,并保留原有 `real-cut / ai-video / content-source-sync` 路由
## 当前主要风险
-1. `cutvideo` 的素材传输还未完整闭环
-2. Windows 机器还未部署支持 `POST /api/uploads` 的 `cutvideo` 新版本
-3. 抖音 / 小红书账号级内容源还未做真实平台验证
-4. `huobao-drama` 已在本机旧改版实例上跑通,但兼容补丁尚未迁到 upstream 仓库并形成正式提交
+1. 抖音 / 小红书账号级内容源还未做真实平台验证
+2. `huobao-drama` 已在本机旧改版实例上跑通,但兼容补丁尚未迁到 upstream 仓库并形成正式提交
+3. `douyin` 新接口已上线 live,但还需要补一轮真实账号级回归,确认页面抓取、手工 payload 和相似账号分析都稳定
diff --git a/docs/LAN_E2E_GUIDE_2026-03-18.md b/docs/LAN_E2E_GUIDE_2026-03-18.md
index 4bd962a..7e0567b 100644
--- a/docs/LAN_E2E_GUIDE_2026-03-18.md
+++ b/docs/LAN_E2E_GUIDE_2026-03-18.md
@@ -13,7 +13,8 @@ cp .env.example .env
至少确认这些变量:
-- `N8N_BASE_URL=http://127.0.0.1:5670`
+- `N8N_BASE_URL=http://127.0.0.1:5670`,用于你在宿主机单独运行 `collector-service`
+- `COLLECTOR_N8N_BASE_URL=http://n8n:5678`,用于 Docker 里的 `collector`
- `ORCHESTRATOR_SHARED_SECRET=storyforge-local-secret`
- `CUTVIDEO_BASE_URL=http://:7860`
- `CUTVIDEO_API_KEY=` 如果 Windows 服务启用了鉴权
@@ -27,6 +28,7 @@ cp .env.example .env
说明:
- 如果你单独重建 `collector`,要确保运行时仍带上 `CUTVIDEO_BASE_URL`,否则容器会退回空值
+- `collector` 容器不要直接复用宿主机的 `N8N_BASE_URL=http://127.0.0.1:5670`,否则容器内会连回自己并导致 webhook 调度失败
- 当前已验证可用的 Windows `cutvideo` 地址是 `http://192.168.31.18:7860`
- 当前已验证可用的本机 HTTP ASR 入口是 `http://host.docker.internal:8088/transcribe`
- 如果你用的是本机 `mac-whisper-service`,建议同时以 `WHISPER_TIMEOUT_MS=120000` 启动,否则长视频会直接 504
diff --git a/docs/MVP_STATUS_2026-03-18.md b/docs/MVP_STATUS_2026-03-18.md
index 77bb7ac..fca2d68 100644
--- a/docs/MVP_STATUS_2026-03-18.md
+++ b/docs/MVP_STATUS_2026-03-18.md
@@ -1,6 +1,7 @@
# StoryForge MVP 状态
日期:2026-03-18
+更新:2026-03-20
## 已跑通或已完成代码接通
@@ -16,6 +17,8 @@
- 本地大模型内容分析、二创文案、分镜生成
- Windows `cutvideo` API 调度与结果回写接口
- `upload_video -> source_job_id -> cutvideo` 自动 staging 闭环
+- `collector` live 运行态已从临时源码挂载切回 `StoryForge-gitea` 正式镜像
+- live `collector` 已挂出 `/v2/douyin/*` 能力并通过认证接口验证
- 本机 `huobao-drama` API 调度、首尾帧生成、视频生成与结果回写接口
- FastGPT 运行时依赖删除
@@ -29,14 +32,15 @@
- 实拍剪辑链路:`job_5ebd829c3f2144bca5c941183e75bdcd`
- 实拍剪辑自动 staging 联调:`job_01a6f283cbda42e4ae692b268b811a50`
- AI 视频链路:`job_01828c40377747cf914b51be360cc333`
+- Windows `cutvideo` 部署后联调:`job_5838515ed5c34679acd55a52cfcd424b`
## 尚未完全跑通
- 抖音 / 小红书账号级内容源还未做真实平台验证;`bilibili` 账号级 URL 已跑通
-- Windows 机器上的 `cutvideo` 需要同步部署带 `POST /api/uploads` 的版本,当前自动 staging 已在本机联调通过
+- `douyin` 账号分析接口已上线到 live `collector`,但还没有跑过真实生产账号样例
## 下一步优先级
-1. 把 `cutvideo` 新上传接口部署到 Windows 机器
-2. 补抖音 / 小红书账号级真实验证与必要的 URL 归一化
+1. 补抖音 / 小红书账号级真实验证与必要的 URL 归一化
+2. 把 `collector` live 切换结果和部署回滚说明固化到仓库
3. 把改动整理成提交并推送