Files
storyforge/tests/test_platform_contracts.py
kris 65db3cd336
Some checks failed
StoryForge CI / Baseline checks (push) Has been cancelled
StoryForge CI / Backend tests (push) Has been cancelled
StoryForge CI / Web tests (push) Has been cancelled
chore: sync storyforge handoff state
2026-05-02 17:50:21 +08:00

1044 lines
43 KiB
Python

from __future__ import annotations
import json
import os
import shutil
import sys
import tempfile
import unittest
import weakref
from pathlib import Path
from types import SimpleNamespace
from fastapi import FastAPI
from fastapi.testclient import TestClient
ROOT = Path(__file__).resolve().parents[1]
APP_ROOT = ROOT / "collector-service"
if str(APP_ROOT) not in sys.path:
sys.path.insert(0, str(APP_ROOT))
from app.database import Database, utc_now
from app.domestic_platform_features import register_domestic_platform_routes
from app.douyin_features import register_douyin_routes
BOOTSTRAP_USERNAME = "storyforge-admin"
def _json(value: object) -> str:
return json.dumps(value, ensure_ascii=False)
def _make_legacy(db: Database, account_row: dict[str, object]) -> SimpleNamespace:
counter = {"value": 0}
def make_id(prefix: str) -> str:
counter["value"] += 1
return f"{prefix}_{counter['value']}"
def require_approved() -> dict[str, object]:
return account_row
def content_source_payload(row: dict[str, object]) -> dict[str, object]:
metadata_raw = row.get("metadata_json") or "{}"
try:
metadata = json.loads(str(metadata_raw))
except json.JSONDecodeError:
metadata = {}
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"source_kind": row["source_kind"],
"platform": row.get("platform", ""),
"handle": row.get("handle", ""),
"source_url": row.get("source_url", ""),
"title": row.get("title", ""),
"local_path": row.get("local_path", ""),
"metadata": metadata,
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def assistant_payload(row: dict[str, object]) -> dict[str, object]:
return {
"id": row["id"],
"name": row.get("name", ""),
}
def model_profile_for_account(account_id: str, requested_id: str | None) -> dict[str, object]:
if requested_id:
row = db.fetch_one("SELECT * FROM model_profiles WHERE id = ?", (requested_id,))
if row:
return row
row = db.fetch_one("SELECT * FROM model_profiles WHERE is_default = 1 LIMIT 1")
if row:
return row
raise RuntimeError(f"No model profile configured for {account_id}")
def parse_json_object(value: object) -> dict[str, object]:
if isinstance(value, dict):
return value
if isinstance(value, str) and value.strip():
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
return {}
def resolve_target_kb(*_args: object, **_kwargs: object) -> dict[str, object]:
return {"id": "kb_contract"}
def resolve_target_assistant(*_args: object, **_kwargs: object) -> None:
return None
async def call_model(*_args: object, **_kwargs: object) -> str:
return "{}"
def job_payload(row: dict[str, object]) -> dict[str, object]:
return row
def create_job_record(**kwargs: object) -> dict[str, object]:
created_at = utc_now()
return {
"id": make_id("job"),
"account_id": kwargs.get("account_id", ""),
"project_id": kwargs.get("project_id", ""),
"knowledge_base_id": kwargs.get("knowledge_base_id", ""),
"content_source_id": kwargs.get("content_source_id", ""),
"assistant_id": kwargs.get("assistant_id", ""),
"source_type": kwargs.get("source_type", ""),
"line_type": kwargs.get("line_type", ""),
"workflow_key": kwargs.get("workflow_key", ""),
"title": kwargs.get("title", ""),
"language": kwargs.get("language", "auto"),
"source_url": kwargs.get("source_url", ""),
"artifacts": kwargs.get("artifacts", {}),
"analysis_model_profile_id": kwargs.get("analysis_model_profile_id", ""),
"status": "queued",
"created_at": created_at,
"updated_at": created_at,
}
async def trigger_orchestrated_job(job_row: dict[str, object]) -> dict[str, object]:
return job_row
return SimpleNamespace(
db=db,
utc_now=utc_now,
make_id=make_id,
require_approved=require_approved,
content_source_payload=content_source_payload,
assistant_payload=assistant_payload,
model_profile_for_account=model_profile_for_account,
parse_json_object=parse_json_object,
resolve_target_kb=resolve_target_kb,
resolve_target_assistant=resolve_target_assistant,
call_model=call_model,
create_job_record=create_job_record,
job_payload=job_payload,
trigger_orchestrated_job=trigger_orchestrated_job,
)
def _seed_base_account(db: Database) -> tuple[dict[str, object], dict[str, object], dict[str, object]]:
now = utc_now()
account_id = "acct_contract_owner"
project_id = "proj_contract_owner"
model_id = "model_contract_default"
db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role, approval_status,
approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
BOOTSTRAP_USERNAME,
"hash",
"salt",
"StoryForge Contract Owner",
"super_admin",
"approved",
account_id,
now,
model_id,
now,
now,
),
)
db.execute(
"""
INSERT INTO projects (id, user_id, name, description, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(project_id, account_id, "StoryForge Contracts", "", now, now),
)
db.execute(
"""
INSERT INTO model_profiles (
id, owner_account_id, name, provider, base_url, api_key, model_name,
is_system, is_default, created_at, updated_at
) VALUES (?, NULL, ?, ?, ?, ?, ?, 1, 1, ?, ?)
""",
(model_id, "Default Model", "openai_compat", "http://127.0.0.1:8317/v1", "", "GLM-5", now, now),
)
account_row = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,))
project_row = db.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,))
model_row = db.fetch_one("SELECT * FROM model_profiles WHERE id = ?", (model_id,))
assert account_row is not None
assert project_row is not None
assert model_row is not None
return account_row, project_row, model_row
def _seed_douyin(db: Database, owner: dict[str, object], model_row: dict[str, object]) -> str:
now = utc_now()
account_id = "dyacct_contract_1"
db.execute(
"""
INSERT INTO douyin_accounts (
id, user_id, profile_url, canonical_profile_url, sec_uid, douyin_uid, douyin_id,
nickname, signature, avatar_url, tags_json, profile_stats_json, raw_profile_json,
source_mode, sync_status, last_public_sync_at, last_creator_sync_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
owner["id"],
"https://www.douyin.com/user/contract-test",
"https://www.douyin.com/user/contract-test",
"sec_contract_1",
"douyin_uid_contract_1",
"douyin_id_contract_1",
"Contract Douyin",
"Contract test signature",
"https://example.com/avatar.png",
_json(["增长", "内容"]),
_json({"fans_count": 1200, "likes_count": 8800}),
_json({"profile": {"nickname": "Contract Douyin"}}),
"creator_center",
"ready",
now,
now,
now,
now,
),
)
for index in range(2):
db.execute(
"""
INSERT INTO douyin_videos (
id, account_id, aweme_id, title, description, share_url, cover_url,
duration_sec, published_at, tags_json, stats_json, raw_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"dyvid_contract_{index + 1}",
account_id,
f"aweme_contract_{index + 1}",
f"Contract Video {index + 1}",
"Contract summary",
"https://example.com/video",
"https://example.com/cover.png",
28,
f"2026-03-26T10:0{index}:00+00:00",
_json(["增长", "内容"]),
_json({"play": 8200 + index * 300, "like": 410 + index * 10, "comment": 18, "share": 9}),
_json({"title": f"Contract Video {index + 1}"}),
now,
now,
),
)
public_snapshot_id = "dysnap_public_contract"
creator_snapshot_id = "dysnap_creator_contract"
db.execute(
"""
INSERT INTO douyin_account_snapshots (
id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
field_count, collected_at, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
public_snapshot_id,
account_id,
"public_profile",
"https://www.douyin.com/user/contract-test",
_json({"nickname": "Contract Douyin"}),
_json({"nickname": "Contract Douyin"}),
1,
now,
now,
),
)
db.execute(
"""
INSERT INTO douyin_account_snapshots (
id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
field_count, collected_at, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
creator_snapshot_id,
account_id,
"creator_center",
"https://creator.douyin.com/creator-micro/home",
_json({"field": "value"}),
_json({"creator": "summary"}),
1,
now,
now,
),
)
db.execute(
"""
INSERT INTO douyin_snapshot_fields (snapshot_id, field_path, field_type, field_value_text)
VALUES (?, ?, ?, ?)
""",
(creator_snapshot_id, "profile.nickname", "string", "Contract Douyin"),
)
db.execute(
"""
INSERT INTO douyin_analysis_reports (
id, account_id, user_id, focus_text, model_profile_ids_json, linked_account_ids_json,
prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"dyreport_contract_1",
account_id,
owner["id"],
"增长诊断",
_json([model_row["id"]]),
_json([]),
"contract prompt",
_json({"account": "douyin"}),
now,
),
)
db.execute(
"""
INSERT INTO douyin_analysis_suggestions (
id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"dysuggestion_contract_1",
"dyreport_contract_1",
model_row["id"],
"Test Model",
"ok",
"Contract analysis output",
_json({"summary": "ok"}),
now,
),
)
db.execute(
"""
INSERT INTO douyin_tracked_accounts (
id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
("dytrack_contract_1", owner["id"], account_id, None, "note", now, now),
)
db.execute(
"""
INSERT INTO douyin_tracking_cursors (user_id, last_seen_at, updated_at)
VALUES (?, ?, ?)
""",
(owner["id"], "2026-03-26T09:00:00+00:00", now),
)
return account_id
def _seed_domestic(db: Database, owner: dict[str, object], project_row: dict[str, object], platform: str) -> str:
now = utc_now()
account_id = f"{platform}_acct_contract_1"
db.execute(
"""
INSERT INTO content_sources (
id, user_id, project_id, source_kind, platform, handle, source_url, title, local_path,
metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
owner["id"],
project_row["id"],
"creator_account",
platform,
f"{platform}_handle",
f"https://example.com/{platform}/profile",
f"{platform.upper()} Contract Account",
"",
_json(
{
"bio": f"{platform} bio",
"description": f"{platform} description",
"avatar_url": "https://example.com/avatar.png",
"tags": ["增长", platform],
"keywords": ["增长", "内容"],
"max_items": 5,
}
),
now,
now,
),
)
for index in range(2):
db.execute(
"""
INSERT INTO content_sources (
id, user_id, project_id, source_kind, platform, handle, source_url, title, local_path,
metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{platform}_video_contract_{index + 1}",
owner["id"],
project_row["id"],
"video_link",
platform,
"",
f"https://example.com/{platform}/video/{index + 1}",
f"{platform.upper()} Contract Video {index + 1}",
"",
_json(
{
"summary": "contract summary",
"description": "contract description",
"cover_url": "https://example.com/cover.png",
"published_at": f"2026-03-26T10:0{index}:00+00:00",
"tags": ["增长", platform],
"content_type": "video",
"duration_sec": 30,
"external_id": f"{platform}_ext_{index + 1}",
"origin_content_source_id": account_id,
"source_account_url": f"https://example.com/{platform}/profile",
}
),
now,
now,
),
)
db.execute(
f"""
INSERT INTO {platform}_analysis_reports (
id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
f"{platform}_report_contract_1",
owner["id"],
account_id,
"增长诊断",
"contract prompt",
_json({"account": platform}),
now,
),
)
db.execute(
f"""
INSERT INTO {platform}_analysis_suggestions (
id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{platform}_suggestion_contract_1",
f"{platform}_report_contract_1",
"model_contract_default",
"Test Model",
"ok",
"Contract analysis output",
_json({"summary": "ok"}),
now,
),
)
db.execute(
f"""
INSERT INTO {platform}_tracked_accounts (
id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(f"{platform}_track_contract_1", owner["id"], account_id, None, "note", now, now),
)
db.execute(
f"""
INSERT INTO {platform}_tracking_cursors (user_id, last_seen_at, updated_at)
VALUES (?, ?, ?)
""",
(owner["id"], "2026-03-26T09:00:00+00:00", now),
)
db.execute(
f"""
INSERT INTO {platform}_similarity_searches (
id, user_id, source_account_id, prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
f"{platform}_search_contract_1",
owner["id"],
account_id,
"contract prompt",
_json({"source_account": platform}),
now,
),
)
return account_id
def _insert_domestic_creator_account(
db: Database,
owner: dict[str, object],
project_row: dict[str, object],
platform: str,
*,
suffix: str,
title: str,
handle: str,
bio: str,
tags: list[str],
keywords: list[str],
) -> str:
now = utc_now()
account_id = f"{platform}_acct_contract_{suffix}"
db.execute(
"""
INSERT INTO content_sources (
id, user_id, project_id, source_kind, platform, handle, source_url, title, local_path,
metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
owner["id"],
project_row["id"],
"creator_account",
platform,
handle,
f"https://example.com/{platform}/profile-{suffix}",
title,
"",
_json(
{
"bio": bio,
"description": bio,
"avatar_url": "https://example.com/avatar.png",
"tags": tags,
"keywords": keywords,
"max_items": 5,
}
),
now,
now,
),
)
return account_id
def _build_app(platforms: list[str]) -> tuple[FastAPI, SimpleNamespace, dict[str, object]]:
tmpdir = Path(tempfile.mkdtemp(prefix="storyforge-platform-contracts-"))
db = Database(str(tmpdir / "storyforge.db"))
db.init_schema()
owner_row, project_row, model_row = _seed_base_account(db)
legacy = _make_legacy(db, owner_row)
app = FastAPI()
register_douyin_routes(app, legacy)
for platform in platforms:
register_domestic_platform_routes(app, legacy, platform=platform, label=platform)
weakref.finalize(app, shutil.rmtree, tmpdir, True)
app.state._tmpdir = tmpdir
app.state._legacy = legacy
app.state._project_row = project_row
app.state._model_row = model_row
app.state._owner_row = owner_row
return app, legacy, {
"owner": owner_row,
"project": project_row,
"model": model_row,
}
class PlatformContractTests(unittest.TestCase):
def test_douyin_tracking_digest_and_workspace_shape(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu", "bilibili", "kuaishou", "wechat_video"])
douyin_account_id = _seed_douyin(legacy.db, seed["owner"], seed["model"])
with TestClient(app) as client:
accounts = client.get("/v2/douyin/accounts", headers={"Authorization": "Bearer dummy"})
self.assertEqual(accounts.status_code, 200)
self.assertTrue(accounts.json())
workspace = client.get(f"/v2/douyin/accounts/{douyin_account_id}", headers={"Authorization": "Bearer dummy"})
self.assertEqual(workspace.status_code, 200)
workspace_payload = workspace.json()
self.assertIn("account", workspace_payload)
self.assertIn("recent_reports", workspace_payload)
self.assertIn("latest_public_snapshot", workspace_payload)
self.assertIn("latest_creator_snapshot", workspace_payload)
self.assertIn("recent_similarity_searches", workspace_payload)
self.assertIn("available_model_profiles", workspace_payload)
reports = client.get(f"/v2/douyin/accounts/{douyin_account_id}/analysis-reports", headers={"Authorization": "Bearer dummy"})
self.assertEqual(reports.status_code, 200)
self.assertEqual(len(reports.json()), len(workspace_payload["recent_reports"]))
snapshots = client.get(f"/v2/douyin/accounts/{douyin_account_id}/snapshots", headers={"Authorization": "Bearer dummy"})
self.assertEqual(snapshots.status_code, 200)
self.assertGreaterEqual(len(snapshots.json()), 2)
creator_snapshot = next(item for item in snapshots.json() if item["snapshot_type"] == "creator_center")
creator_fields = client.get(f"/v2/douyin/accounts/{douyin_account_id}/creator-fields", headers={"Authorization": "Bearer dummy"})
self.assertEqual(creator_fields.status_code, 200)
self.assertEqual(creator_fields.json()["snapshot_type"], "creator_center")
self.assertEqual(creator_fields.json()["id"], creator_snapshot["id"])
digest = client.get("/v2/douyin/tracking/digest", headers={"Authorization": "Bearer dummy"})
self.assertEqual(digest.status_code, 200)
digest_payload = digest.json()
self.assertIn("generated_at", digest_payload)
self.assertIn("since", digest_payload)
self.assertIn("tracked_accounts", digest_payload)
self.assertIn("cursor_last_seen_at", digest_payload)
self.assertTrue(digest_payload["items"])
digest_item = digest_payload["items"][0]
self.assertEqual(digest_item["platform"], "douyin")
self.assertIn("summary_text", digest_item)
self.assertIn("tracked_account_name", digest_item)
self.assertIn("account", digest_item)
self.assertIn("video", digest_item)
def test_domestic_workspace_and_tracking_shape(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
xhs_account_id = _seed_domestic(legacy.db, seed["owner"], seed["project"], "xiaohongshu")
with TestClient(app) as client:
workspace = client.get(
f"/v2/xiaohongshu/accounts/{xhs_account_id}/workspace",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(workspace.status_code, 200)
workspace_payload = workspace.json()
self.assertIn("latest_public_snapshot", workspace_payload)
self.assertIn("latest_creator_snapshot", workspace_payload)
self.assertIn("recent_reports", workspace_payload)
self.assertIn("recent_similarity_searches", workspace_payload)
self.assertIn("available_model_profiles", workspace_payload)
reports = client.get(
f"/v2/xiaohongshu/accounts/{xhs_account_id}/analysis-reports",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(reports.status_code, 200)
self.assertEqual(reports.json(), workspace_payload["recent_reports"])
digest = client.get("/v2/xiaohongshu/tracking/digest", headers={"Authorization": "Bearer dummy"})
self.assertEqual(digest.status_code, 200)
digest_payload = digest.json()
self.assertIn("generated_at", digest_payload)
self.assertIn("since", digest_payload)
self.assertIn("tracked_accounts", digest_payload)
self.assertIn("cursor_last_seen_at", digest_payload)
self.assertTrue(digest_payload["items"])
digest_item = digest_payload["items"][0]
self.assertEqual(digest_item["platform"], "xiaohongshu")
self.assertIn("summary_text", digest_item)
self.assertIn("account", digest_item)
self.assertIn("video", digest_item)
def test_kuaishou_creator_center_sync_persists_snapshots_and_analysis_context(self) -> None:
app, legacy, seed = _build_app(["kuaishou"])
with TestClient(app) as client:
sync = client.post(
"/v2/kuaishou/accounts/sync",
headers={"Authorization": "Bearer dummy"},
json={
"project_id": seed["project"]["id"],
"profile_url": "https://www.kuaishou.com/profile/contract-creator",
"title": "快手合同账号",
"handle": "contract_creator",
"manual_profile_payload": {
"nickname": "快手合同账号",
"bio": "擅长创业内容和成交转化",
"avatar_url": "https://example.com/kuaishou/avatar.png",
"follower_count": 32100,
},
"manual_creator_pages": [
{
"url": "https://creator.kuaishou.com/creator/home",
"title": "快手创作者中心",
"payload": {
"creator": {
"nickname": "快手合同账号",
"fans_count": 32100,
"play_count": 987654,
"content_tags": ["创业", "转化"],
},
"works": {
"published_count": 48,
"avg_finish_rate": 0.43,
"items": [
{
"video_id": "ks_work_001",
"title": "创业成交拆解 1",
"description": "拆 1 个高转化案例",
"share_url": "https://www.kuaishou.com/short-video/ks_work_001",
"cover_url": "https://example.com/kuaishou/work-1.png",
"duration_sec": 38,
"published_at": "2026-03-20T10:00:00+00:00",
"play_count": 82000,
"like_count": 4300,
"comment_count": 280,
"share_count": 140,
"tags": ["创业", "成交"]
},
{
"video_id": "ks_work_002",
"title": "口播脚本结构拆解",
"description": "复盘 3 段爆款口播结构",
"share_url": "https://www.kuaishou.com/short-video/ks_work_002",
"cover_url": "https://example.com/kuaishou/work-2.png",
"duration_sec": 46,
"published_at": "2026-03-18T10:00:00+00:00",
"play_count": 65000,
"like_count": 3100,
"comment_count": 190,
"share_count": 96,
"tags": ["口播", "结构"]
},
],
},
},
}
],
},
)
self.assertEqual(sync.status_code, 200, sync.text)
workspace_payload = sync.json()
self.assertEqual(workspace_payload["account"]["platform"], "kuaishou")
self.assertIsNotNone(workspace_payload["latest_public_snapshot"])
self.assertIsNotNone(workspace_payload["latest_creator_snapshot"])
self.assertEqual(workspace_payload["latest_creator_snapshot"]["snapshot_type"], "creator_center")
self.assertEqual(workspace_payload["account"]["nickname"], "快手合同账号")
self.assertGreaterEqual(workspace_payload["account"]["video_summary"]["count"], 2)
account_id = workspace_payload["account"]["id"]
videos = client.get(
f"/v2/kuaishou/accounts/{account_id}/videos",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(videos.status_code, 200, videos.text)
videos_payload = videos.json()
self.assertGreaterEqual(videos_payload["count"], 2)
self.assertTrue(videos_payload["items"])
self.assertEqual(videos_payload["items"][0]["title"], "创业成交拆解 1")
self.assertEqual(videos_payload["items"][0]["stats"]["play"], 82000)
self.assertTrue(videos_payload["top_scored_video_ids"])
self.assertEqual(videos_payload["top_scored_video_ids"][0], videos_payload["items"][0]["id"])
snapshots = client.get(
f"/v2/kuaishou/accounts/{account_id}/snapshots",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(snapshots.status_code, 200, snapshots.text)
snapshots_payload = snapshots.json()
self.assertGreaterEqual(len(snapshots_payload), 2)
creator_snapshot = next(item for item in snapshots_payload if item["snapshot_type"] == "creator_center")
creator_fields = client.get(
f"/v2/kuaishou/accounts/{account_id}/creator-fields",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(creator_fields.status_code, 200, creator_fields.text)
creator_fields_payload = creator_fields.json()
self.assertEqual(creator_fields_payload["id"], creator_snapshot["id"])
self.assertEqual(creator_fields_payload["snapshot_type"], "creator_center")
self.assertTrue(creator_fields_payload["fields"])
analyze = client.post(
f"/v2/kuaishou/accounts/{account_id}/analysis",
headers={"Authorization": "Bearer dummy"},
json={
"model_profile_ids": [],
"linked_account_ids": [],
"include_linked_accounts": True,
"include_recent_similar_candidates": True,
"max_videos": 6,
"extra_focus": "更关注创作者中心里的成交与转化指标",
"temperature": 0.35,
"auto_analyze_top_videos": False,
"top_video_analysis_count": 4,
},
)
self.assertEqual(analyze.status_code, 200, analyze.text)
analyze_payload = analyze.json()
self.assertIn("creator_center", analyze_payload["context"])
self.assertEqual(
analyze_payload["context"]["creator_center"]["latest_creator_snapshot"]["snapshot_type"],
"creator_center",
)
self.assertEqual(analyze_payload["context"]["requested_model_profile_ids"], [])
self.assertEqual(analyze_payload["context"]["selected_model_profile_ids"], [seed["model"]["id"]])
self.assertTrue(analyze_payload["context"]["request_options"]["include_linked_accounts"])
self.assertEqual(analyze_payload["context"]["linked_accounts"], [])
self.assertEqual(analyze_payload["top_video_analyses"], [])
def test_domestic_analysis_uses_requested_context_and_auto_top_video_followup(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
source_account_id = _seed_domestic(legacy.db, seed["owner"], seed["project"], "xiaohongshu")
linked_account_id = _insert_domestic_creator_account(
legacy.db,
seed["owner"],
seed["project"],
"xiaohongshu",
suffix="linked",
title="Linked Benchmark",
handle="xhs_linked",
bio="主打创业转化与爆款拆解",
tags=["创业", "转化"],
keywords=["创业", "转化"],
)
candidate_account_id = _insert_domestic_creator_account(
legacy.db,
seed["owner"],
seed["project"],
"xiaohongshu",
suffix="candidate",
title="Candidate Creator",
handle="xhs_candidate",
bio="专注创业口播、成交文案与转化漏斗",
tags=["创业", "口播", "转化"],
keywords=["口播", "成交"],
)
now = utc_now()
legacy.db.execute(
"""
INSERT INTO xiaohongshu_account_relations (
id, user_id, source_account_id, target_account_id, target_profile_url,
relation_type, note, search_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"xhs_relation_contract_linked",
seed["owner"]["id"],
source_account_id,
linked_account_id,
"",
"benchmark",
"linked note",
"",
now,
),
)
legacy.db.execute(
"""
INSERT INTO xiaohongshu_similarity_searches (
id, user_id, source_account_id, prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
"xhs_search_contract_recent",
seed["owner"]["id"],
source_account_id,
"recent search",
_json({"source_account": "xiaohongshu"}),
now,
),
)
legacy.db.execute(
"""
INSERT INTO xiaohongshu_similarity_candidates (
id, search_id, candidate_account_id, candidate_profile_url, heuristic_score,
agent_score, rationale_text, dimensions_json, raw_output_json, rank_index, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"xhs_candidate_contract_recent",
"xhs_search_contract_recent",
candidate_account_id,
"https://example.com/xiaohongshu/profile-candidate",
72,
72,
"近期相似候选",
_json({"tag_overlap": 2}),
_json({"candidate_account_id": candidate_account_id, "candidate_profile_url": "https://example.com/xiaohongshu/profile-candidate"}),
0,
now,
),
)
with TestClient(app) as client:
analyze = client.post(
f"/v2/xiaohongshu/accounts/{source_account_id}/analysis",
headers={"Authorization": "Bearer dummy"},
json={
"model_profile_ids": [seed["model"]["id"]],
"linked_account_ids": [linked_account_id],
"include_linked_accounts": True,
"include_recent_similar_candidates": True,
"max_videos": 5,
"extra_focus": "关注转化路径和选题结构",
"temperature": 0.32,
"auto_analyze_top_videos": True,
"top_video_analysis_count": 2,
},
)
self.assertEqual(analyze.status_code, 200, analyze.text)
payload = analyze.json()
self.assertEqual(payload["context"]["requested_model_profile_ids"], [seed["model"]["id"]])
self.assertEqual(payload["context"]["selected_model_profile_ids"], [seed["model"]["id"]])
self.assertEqual(payload["context"]["request_options"]["top_video_analysis_count"], 2)
self.assertEqual(len(payload["context"]["linked_accounts"]), 1)
self.assertEqual(payload["context"]["linked_accounts"][0]["target_account_id"], linked_account_id)
self.assertEqual(len(payload["context"]["recent_similar_candidates"]), 1)
self.assertEqual(payload["context"]["recent_similar_candidates"][0]["candidate_account_id"], candidate_account_id)
self.assertEqual(payload["top_video_analyses"][0]["video_id"], "xiaohongshu_video_contract_2")
self.assertEqual(len(payload["top_video_analyses"]), 2)
def test_domestic_similarity_search_merges_manual_urls_and_linked_candidates(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
source_account_id = _seed_domestic(legacy.db, seed["owner"], seed["project"], "xiaohongshu")
linked_account_id = _insert_domestic_creator_account(
legacy.db,
seed["owner"],
seed["project"],
"xiaohongshu",
suffix="similarlinked",
title="Linked Similar",
handle="xhs_similar_linked",
bio="创业成交、私域转化、直播承接",
tags=["创业", "成交", "转化"],
keywords=["直播", "承接"],
)
now = utc_now()
legacy.db.execute(
"""
INSERT INTO xiaohongshu_account_relations (
id, user_id, source_account_id, target_account_id, target_profile_url,
relation_type, note, search_id, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"xhs_relation_contract_similarity",
seed["owner"]["id"],
source_account_id,
linked_account_id,
"https://example.com/xiaohongshu/profile-similarlinked",
"benchmark",
"linked similar note",
"",
now,
),
)
with TestClient(app) as client:
created = client.post(
"/v2/xiaohongshu/similar-searches",
headers={"Authorization": "Bearer dummy"},
json={
"source_account_id": source_account_id,
"candidate_urls": [
"https://example.com/xiaohongshu/external-similar"
],
"seed_linked_accounts": True,
"search_public_pages": False,
"model_profile_id": seed["model"]["id"],
"max_candidates": 6,
"extra_requirements": "优先找创业成交和口播拆解账号",
},
)
self.assertEqual(created.status_code, 200, created.text)
detail = client.get(
f"/v2/xiaohongshu/similar-searches/{created.json()['search_id']}",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(detail.status_code, 200, detail.text)
payload = detail.json()
self.assertGreaterEqual(len(payload["candidates"]), 2)
self.assertEqual(payload["candidates"][0]["candidate_account_id"], linked_account_id)
manual_candidate = next(
item for item in payload["candidates"]
if item.get("candidate_profile_url") == "https://example.com/xiaohongshu/external-similar"
)
self.assertEqual(manual_candidate["candidate_account_id"], "")
self.assertIn("external", manual_candidate["candidate_nickname"].lower())
def test_douyin_live_first_mutation_routes_are_available(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
douyin_account_id = _seed_douyin(legacy.db, seed["owner"], seed["model"])
with TestClient(app) as client:
analyze = client.post(
f"/v2/douyin/accounts/{douyin_account_id}/videos/analyze-top",
headers={"Authorization": "Bearer dummy"},
json={"top_video_count": 2, "min_score": 0, "temperature": 0.25, "model_profile_id": seed["model"]["id"]},
)
self.assertEqual(analyze.status_code, 200, analyze.text)
analyze_payload = analyze.json()
self.assertEqual(analyze_payload["account_id"], douyin_account_id)
self.assertIn("analyzed_count", analyze_payload)
refresh_all = client.post("/v2/douyin/tracking/refresh", headers={"Authorization": "Bearer dummy"})
self.assertEqual(refresh_all.status_code, 200, refresh_all.text)
refresh_all_payload = refresh_all.json()
self.assertIn("refreshed", refresh_all_payload)
self.assertIn("items", refresh_all_payload)
refresh_one = client.post(
f"/v2/douyin/tracking/accounts/{douyin_account_id}/refresh",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(refresh_one.status_code, 200, refresh_one.text)
refresh_one_payload = refresh_one.json()
self.assertTrue(refresh_one_payload["success"])
self.assertEqual(refresh_one_payload["tracked_account_id"], douyin_account_id)
self.assertIn("account", refresh_one_payload)
self.assertIn("sync_errors", refresh_one_payload)
cursor = client.post(
"/v2/douyin/tracking/cursor",
headers={"Authorization": "Bearer dummy"},
json={"last_seen_at": "2026-03-30T10:00:00+00:00"},
)
self.assertEqual(cursor.status_code, 200, cursor.text)
self.assertEqual(cursor.json()["last_seen_at"], "2026-03-30T10:00:00+00:00")
def test_domestic_live_first_mutation_routes_are_available(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
xhs_account_id = _seed_domestic(legacy.db, seed["owner"], seed["project"], "xiaohongshu")
with TestClient(app) as client:
analyze = client.post(
f"/v2/xiaohongshu/accounts/{xhs_account_id}/videos/analyze-top",
headers={"Authorization": "Bearer dummy"},
json={"top_video_count": 2, "min_score": 0, "temperature": 0.25, "model_profile_id": seed["model"]["id"]},
)
self.assertEqual(analyze.status_code, 200, analyze.text)
analyze_payload = analyze.json()
self.assertEqual(analyze_payload["account_id"], xhs_account_id)
self.assertIn("analyzed_count", analyze_payload)
refresh_all = client.post("/v2/xiaohongshu/tracking/refresh", headers={"Authorization": "Bearer dummy"})
self.assertEqual(refresh_all.status_code, 200, refresh_all.text)
refresh_all_payload = refresh_all.json()
self.assertIn("refreshed", refresh_all_payload)
self.assertIn("items", refresh_all_payload)
refresh_one = client.post(
f"/v2/xiaohongshu/tracking/accounts/{xhs_account_id}/refresh",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(refresh_one.status_code, 200, refresh_one.text)
refresh_one_payload = refresh_one.json()
self.assertEqual(refresh_one_payload["tracked_account_id"], xhs_account_id)
self.assertIn("sync_job_id", refresh_one_payload)
cursor = client.post(
"/v2/xiaohongshu/tracking/cursor",
headers={"Authorization": "Bearer dummy"},
json={"last_seen_at": "2026-03-30T10:00:00+00:00"},
)
self.assertEqual(cursor.status_code, 200, cursor.text)
self.assertEqual(cursor.json()["last_seen_at"], "2026-03-30T10:00:00+00:00")
if __name__ == "__main__":
unittest.main()