from __future__ import annotations import json import os import sys import tempfile import unittest from pathlib import Path from types import SimpleNamespace from fastapi import FastAPI from fastapi.testclient import TestClient ROOT = Path(__file__).resolve().parents[1] APP_ROOT = ROOT / "collector-service" if str(APP_ROOT) not in sys.path: sys.path.insert(0, str(APP_ROOT)) from app.database import Database, utc_now from app.domestic_platform_features import register_domestic_platform_routes from app.douyin_features import register_douyin_routes BOOTSTRAP_USERNAME = "storyforge-admin" def _json(value: object) -> str: return json.dumps(value, ensure_ascii=False) def _make_legacy(db: Database, account_row: dict[str, object]) -> SimpleNamespace: counter = {"value": 0} def make_id(prefix: str) -> str: counter["value"] += 1 return f"{prefix}_{counter['value']}" def require_approved() -> dict[str, object]: return account_row def content_source_payload(row: dict[str, object]) -> dict[str, object]: metadata_raw = row.get("metadata_json") or "{}" try: metadata = json.loads(str(metadata_raw)) except json.JSONDecodeError: metadata = {} return { "id": row["id"], "user_id": row["user_id"], "project_id": row.get("project_id", ""), "source_kind": row["source_kind"], "platform": row.get("platform", ""), "handle": row.get("handle", ""), "source_url": row.get("source_url", ""), "title": row.get("title", ""), "local_path": row.get("local_path", ""), "metadata": metadata, "created_at": row["created_at"], "updated_at": row["updated_at"], } def assistant_payload(row: dict[str, object]) -> dict[str, object]: return { "id": row["id"], "name": row.get("name", ""), } def model_profile_for_account(account_id: str, requested_id: str | None) -> dict[str, object]: if requested_id: row = db.fetch_one("SELECT * FROM model_profiles WHERE id = ?", (requested_id,)) if row: return row row = db.fetch_one("SELECT * FROM model_profiles WHERE is_default = 1 LIMIT 1") if row: return row raise RuntimeError(f"No model profile configured for {account_id}") def parse_json_object(value: object) -> dict[str, object]: if isinstance(value, dict): return value if isinstance(value, str) and value.strip(): parsed = json.loads(value) return parsed if isinstance(parsed, dict) else {} return {} def resolve_target_kb(*_args: object, **_kwargs: object) -> dict[str, object]: return {"id": "kb_contract"} def resolve_target_assistant(*_args: object, **_kwargs: object) -> None: return None async def call_model(*_args: object, **_kwargs: object) -> str: return "{}" def job_payload(row: dict[str, object]) -> dict[str, object]: return row async def trigger_orchestrated_job(job_row: dict[str, object]) -> dict[str, object]: return job_row return SimpleNamespace( db=db, utc_now=utc_now, make_id=make_id, require_approved=require_approved, content_source_payload=content_source_payload, assistant_payload=assistant_payload, model_profile_for_account=model_profile_for_account, parse_json_object=parse_json_object, resolve_target_kb=resolve_target_kb, resolve_target_assistant=resolve_target_assistant, call_model=call_model, job_payload=job_payload, trigger_orchestrated_job=trigger_orchestrated_job, ) def _seed_base_account(db: Database) -> tuple[dict[str, object], dict[str, object], dict[str, object]]: now = utc_now() account_id = "acct_contract_owner" project_id = "proj_contract_owner" model_id = "model_contract_default" db.execute( """ INSERT INTO accounts ( id, username, password_hash, password_salt, display_name, role, approval_status, approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( account_id, BOOTSTRAP_USERNAME, "hash", "salt", "StoryForge Contract Owner", "super_admin", "approved", account_id, now, model_id, now, now, ), ) db.execute( """ INSERT INTO projects (id, user_id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, (project_id, account_id, "StoryForge Contracts", "", now, now), ) db.execute( """ INSERT INTO model_profiles ( id, owner_account_id, name, provider, base_url, api_key, model_name, is_system, is_default, created_at, updated_at ) VALUES (?, NULL, ?, ?, ?, ?, ?, 1, 1, ?, ?) """, (model_id, "Default Model", "openai_compat", "http://127.0.0.1:8317/v1", "", "GLM-5", now, now), ) account_row = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,)) project_row = db.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,)) model_row = db.fetch_one("SELECT * FROM model_profiles WHERE id = ?", (model_id,)) assert account_row is not None assert project_row is not None assert model_row is not None return account_row, project_row, model_row def _seed_douyin(db: Database, owner: dict[str, object], model_row: dict[str, object]) -> str: now = utc_now() account_id = "dyacct_contract_1" db.execute( """ INSERT INTO douyin_accounts ( id, user_id, profile_url, canonical_profile_url, sec_uid, douyin_uid, douyin_id, nickname, signature, avatar_url, tags_json, profile_stats_json, raw_profile_json, source_mode, sync_status, last_public_sync_at, last_creator_sync_at, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( account_id, owner["id"], "https://www.douyin.com/user/contract-test", "https://www.douyin.com/user/contract-test", "sec_contract_1", "douyin_uid_contract_1", "douyin_id_contract_1", "Contract Douyin", "Contract test signature", "https://example.com/avatar.png", _json(["增长", "内容"]), _json({"fans_count": 1200, "likes_count": 8800}), _json({"profile": {"nickname": "Contract Douyin"}}), "creator_center", "ready", now, now, now, now, ), ) for index in range(2): db.execute( """ INSERT INTO douyin_videos ( id, account_id, aweme_id, title, description, share_url, cover_url, duration_sec, published_at, tags_json, stats_json, raw_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( f"dyvid_contract_{index + 1}", account_id, f"aweme_contract_{index + 1}", f"Contract Video {index + 1}", "Contract summary", "https://example.com/video", "https://example.com/cover.png", 28, f"2026-03-26T10:0{index}:00+00:00", _json(["增长", "内容"]), _json({"play": 8200 + index * 300, "like": 410 + index * 10, "comment": 18, "share": 9}), _json({"title": f"Contract Video {index + 1}"}), now, now, ), ) public_snapshot_id = "dysnap_public_contract" creator_snapshot_id = "dysnap_creator_contract" db.execute( """ INSERT INTO douyin_account_snapshots ( id, account_id, snapshot_type, source_url, raw_payload_json, summary_json, field_count, collected_at, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( public_snapshot_id, account_id, "public_profile", "https://www.douyin.com/user/contract-test", _json({"nickname": "Contract Douyin"}), _json({"nickname": "Contract Douyin"}), 1, now, now, ), ) db.execute( """ INSERT INTO douyin_account_snapshots ( id, account_id, snapshot_type, source_url, raw_payload_json, summary_json, field_count, collected_at, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( creator_snapshot_id, account_id, "creator_center", "https://creator.douyin.com/creator-micro/home", _json({"field": "value"}), _json({"creator": "summary"}), 1, now, now, ), ) db.execute( """ INSERT INTO douyin_snapshot_fields (snapshot_id, field_path, field_type, field_value_text) VALUES (?, ?, ?, ?) """, (creator_snapshot_id, "profile.nickname", "string", "Contract Douyin"), ) db.execute( """ INSERT INTO douyin_analysis_reports ( id, account_id, user_id, focus_text, model_profile_ids_json, linked_account_ids_json, prompt_text, context_json, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( "dyreport_contract_1", account_id, owner["id"], "增长诊断", _json([model_row["id"]]), _json([]), "contract prompt", _json({"account": "douyin"}), now, ), ) db.execute( """ INSERT INTO douyin_analysis_suggestions ( id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( "dysuggestion_contract_1", "dyreport_contract_1", model_row["id"], "Test Model", "ok", "Contract analysis output", _json({"summary": "ok"}), now, ), ) db.execute( """ INSERT INTO douyin_tracked_accounts ( id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ("dytrack_contract_1", owner["id"], account_id, None, "note", now, now), ) db.execute( """ INSERT INTO douyin_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?) """, (owner["id"], "2026-03-26T09:00:00+00:00", now), ) return account_id def _seed_domestic(db: Database, owner: dict[str, object], project_row: dict[str, object], platform: str) -> str: now = utc_now() account_id = f"{platform}_acct_contract_1" db.execute( """ INSERT INTO content_sources ( id, user_id, project_id, source_kind, platform, handle, source_url, title, local_path, metadata_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( account_id, owner["id"], project_row["id"], "creator_account", platform, f"{platform}_handle", f"https://example.com/{platform}/profile", f"{platform.upper()} Contract Account", "", _json( { "bio": f"{platform} bio", "description": f"{platform} description", "avatar_url": "https://example.com/avatar.png", "tags": ["增长", platform], "keywords": ["增长", "内容"], "max_items": 5, } ), now, now, ), ) for index in range(2): db.execute( """ INSERT INTO content_sources ( id, user_id, project_id, source_kind, platform, handle, source_url, title, local_path, metadata_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( f"{platform}_video_contract_{index + 1}", owner["id"], project_row["id"], "video_link", platform, "", f"https://example.com/{platform}/video/{index + 1}", f"{platform.upper()} Contract Video {index + 1}", "", _json( { "summary": "contract summary", "description": "contract description", "cover_url": "https://example.com/cover.png", "published_at": f"2026-03-26T10:0{index}:00+00:00", "tags": ["增长", platform], "content_type": "video", "duration_sec": 30, "external_id": f"{platform}_ext_{index + 1}", "origin_content_source_id": account_id, "source_account_url": f"https://example.com/{platform}/profile", } ), now, now, ), ) db.execute( f""" INSERT INTO {platform}_analysis_reports ( id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( f"{platform}_report_contract_1", owner["id"], account_id, "增长诊断", "contract prompt", _json({"account": platform}), now, ), ) db.execute( f""" INSERT INTO {platform}_analysis_suggestions ( id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( f"{platform}_suggestion_contract_1", f"{platform}_report_contract_1", "model_contract_default", "Test Model", "ok", "Contract analysis output", _json({"summary": "ok"}), now, ), ) db.execute( f""" INSERT INTO {platform}_tracked_accounts ( id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?) """, (f"{platform}_track_contract_1", owner["id"], account_id, None, "note", now, now), ) db.execute( f""" INSERT INTO {platform}_tracking_cursors (user_id, last_seen_at, updated_at) VALUES (?, ?, ?) """, (owner["id"], "2026-03-26T09:00:00+00:00", now), ) db.execute( f""" INSERT INTO {platform}_similarity_searches ( id, user_id, source_account_id, prompt_text, context_json, created_at ) VALUES (?, ?, ?, ?, ?, ?) """, ( f"{platform}_search_contract_1", owner["id"], account_id, "contract prompt", _json({"source_account": platform}), now, ), ) return account_id def _build_app(platforms: list[str]) -> tuple[FastAPI, SimpleNamespace, dict[str, object]]: tmpdir = tempfile.TemporaryDirectory() db = Database(str(Path(tmpdir.name) / "storyforge.db")) db.init_schema() owner_row, project_row, model_row = _seed_base_account(db) legacy = _make_legacy(db, owner_row) app = FastAPI() register_douyin_routes(app, legacy) for platform in platforms: register_domestic_platform_routes(app, legacy, platform=platform, label=platform) app.state._tmpdir = tmpdir app.state._legacy = legacy app.state._project_row = project_row app.state._model_row = model_row app.state._owner_row = owner_row return app, legacy, { "owner": owner_row, "project": project_row, "model": model_row, } class PlatformContractTests(unittest.TestCase): def test_douyin_tracking_digest_and_workspace_shape(self) -> None: app, legacy, seed = _build_app(["xiaohongshu", "bilibili", "kuaishou", "wechat_video"]) douyin_account_id = _seed_douyin(legacy.db, seed["owner"], seed["model"]) with TestClient(app) as client: accounts = client.get("/v2/douyin/accounts", headers={"Authorization": "Bearer dummy"}) self.assertEqual(accounts.status_code, 200) self.assertTrue(accounts.json()) workspace = client.get(f"/v2/douyin/accounts/{douyin_account_id}", headers={"Authorization": "Bearer dummy"}) self.assertEqual(workspace.status_code, 200) workspace_payload = workspace.json() self.assertIn("account", workspace_payload) self.assertIn("recent_reports", workspace_payload) self.assertIn("latest_public_snapshot", workspace_payload) self.assertIn("latest_creator_snapshot", workspace_payload) self.assertIn("recent_similarity_searches", workspace_payload) self.assertIn("available_model_profiles", workspace_payload) reports = client.get(f"/v2/douyin/accounts/{douyin_account_id}/analysis-reports", headers={"Authorization": "Bearer dummy"}) self.assertEqual(reports.status_code, 200) self.assertEqual(len(reports.json()), len(workspace_payload["recent_reports"])) snapshots = client.get(f"/v2/douyin/accounts/{douyin_account_id}/snapshots", headers={"Authorization": "Bearer dummy"}) self.assertEqual(snapshots.status_code, 200) self.assertGreaterEqual(len(snapshots.json()), 2) creator_snapshot = next(item for item in snapshots.json() if item["snapshot_type"] == "creator_center") creator_fields = client.get(f"/v2/douyin/accounts/{douyin_account_id}/creator-fields", headers={"Authorization": "Bearer dummy"}) self.assertEqual(creator_fields.status_code, 200) self.assertEqual(creator_fields.json()["snapshot_type"], "creator_center") self.assertEqual(creator_fields.json()["id"], creator_snapshot["id"]) digest = client.get("/v2/douyin/tracking/digest", headers={"Authorization": "Bearer dummy"}) self.assertEqual(digest.status_code, 200) digest_payload = digest.json() self.assertIn("generated_at", digest_payload) self.assertIn("since", digest_payload) self.assertIn("tracked_accounts", digest_payload) self.assertIn("cursor_last_seen_at", digest_payload) self.assertTrue(digest_payload["items"]) digest_item = digest_payload["items"][0] self.assertEqual(digest_item["platform"], "douyin") self.assertIn("summary_text", digest_item) self.assertIn("tracked_account_name", digest_item) self.assertIn("account", digest_item) self.assertIn("video", digest_item) def test_domestic_workspace_and_tracking_shape(self) -> None: app, legacy, seed = _build_app(["xiaohongshu"]) xhs_account_id = _seed_domestic(legacy.db, seed["owner"], seed["project"], "xiaohongshu") with TestClient(app) as client: workspace = client.get( f"/v2/xiaohongshu/accounts/{xhs_account_id}/workspace", headers={"Authorization": "Bearer dummy"}, ) self.assertEqual(workspace.status_code, 200) workspace_payload = workspace.json() self.assertIn("latest_public_snapshot", workspace_payload) self.assertIn("latest_creator_snapshot", workspace_payload) self.assertIn("recent_reports", workspace_payload) self.assertIn("recent_similarity_searches", workspace_payload) self.assertIn("available_model_profiles", workspace_payload) reports = client.get( f"/v2/xiaohongshu/accounts/{xhs_account_id}/analysis-reports", headers={"Authorization": "Bearer dummy"}, ) self.assertEqual(reports.status_code, 200) self.assertEqual(reports.json(), workspace_payload["recent_reports"]) digest = client.get("/v2/xiaohongshu/tracking/digest", headers={"Authorization": "Bearer dummy"}) self.assertEqual(digest.status_code, 200) digest_payload = digest.json() self.assertIn("generated_at", digest_payload) self.assertIn("since", digest_payload) self.assertIn("tracked_accounts", digest_payload) self.assertIn("cursor_last_seen_at", digest_payload) self.assertTrue(digest_payload["items"]) digest_item = digest_payload["items"][0] self.assertEqual(digest_item["platform"], "xiaohongshu") self.assertIn("summary_text", digest_item) self.assertIn("account", digest_item) self.assertIn("video", digest_item) if __name__ == "__main__": unittest.main()