Files
storyforge/tests/test_platform_contracts.py
kris 7897ce6c3d
Some checks failed
StoryForge CI / Baseline checks (push) Has been cancelled
StoryForge CI / Backend tests (push) Has been cancelled
StoryForge CI / Web tests (push) Has been cancelled
test: cover live-first platform routes
2026-03-31 01:11:57 +08:00

675 lines
26 KiB
Python

from __future__ import annotations
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from types import SimpleNamespace
from fastapi import FastAPI
from fastapi.testclient import TestClient
ROOT = Path(__file__).resolve().parents[1]
APP_ROOT = ROOT / "collector-service"
if str(APP_ROOT) not in sys.path:
sys.path.insert(0, str(APP_ROOT))
from app.database import Database, utc_now
from app.domestic_platform_features import register_domestic_platform_routes
from app.douyin_features import register_douyin_routes
BOOTSTRAP_USERNAME = "storyforge-admin"
def _json(value: object) -> str:
return json.dumps(value, ensure_ascii=False)
def _make_legacy(db: Database, account_row: dict[str, object]) -> SimpleNamespace:
counter = {"value": 0}
def make_id(prefix: str) -> str:
counter["value"] += 1
return f"{prefix}_{counter['value']}"
def require_approved() -> dict[str, object]:
return account_row
def content_source_payload(row: dict[str, object]) -> dict[str, object]:
metadata_raw = row.get("metadata_json") or "{}"
try:
metadata = json.loads(str(metadata_raw))
except json.JSONDecodeError:
metadata = {}
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"source_kind": row["source_kind"],
"platform": row.get("platform", ""),
"handle": row.get("handle", ""),
"source_url": row.get("source_url", ""),
"title": row.get("title", ""),
"local_path": row.get("local_path", ""),
"metadata": metadata,
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def assistant_payload(row: dict[str, object]) -> dict[str, object]:
return {
"id": row["id"],
"name": row.get("name", ""),
}
def model_profile_for_account(account_id: str, requested_id: str | None) -> dict[str, object]:
if requested_id:
row = db.fetch_one("SELECT * FROM model_profiles WHERE id = ?", (requested_id,))
if row:
return row
row = db.fetch_one("SELECT * FROM model_profiles WHERE is_default = 1 LIMIT 1")
if row:
return row
raise RuntimeError(f"No model profile configured for {account_id}")
def parse_json_object(value: object) -> dict[str, object]:
if isinstance(value, dict):
return value
if isinstance(value, str) and value.strip():
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
return {}
def resolve_target_kb(*_args: object, **_kwargs: object) -> dict[str, object]:
return {"id": "kb_contract"}
def resolve_target_assistant(*_args: object, **_kwargs: object) -> None:
return None
async def call_model(*_args: object, **_kwargs: object) -> str:
return "{}"
def job_payload(row: dict[str, object]) -> dict[str, object]:
return row
def create_job_record(**kwargs: object) -> dict[str, object]:
created_at = utc_now()
return {
"id": make_id("job"),
"account_id": kwargs.get("account_id", ""),
"project_id": kwargs.get("project_id", ""),
"knowledge_base_id": kwargs.get("knowledge_base_id", ""),
"content_source_id": kwargs.get("content_source_id", ""),
"assistant_id": kwargs.get("assistant_id", ""),
"source_type": kwargs.get("source_type", ""),
"line_type": kwargs.get("line_type", ""),
"workflow_key": kwargs.get("workflow_key", ""),
"title": kwargs.get("title", ""),
"language": kwargs.get("language", "auto"),
"source_url": kwargs.get("source_url", ""),
"artifacts": kwargs.get("artifacts", {}),
"analysis_model_profile_id": kwargs.get("analysis_model_profile_id", ""),
"status": "queued",
"created_at": created_at,
"updated_at": created_at,
}
async def trigger_orchestrated_job(job_row: dict[str, object]) -> dict[str, object]:
return job_row
return SimpleNamespace(
db=db,
utc_now=utc_now,
make_id=make_id,
require_approved=require_approved,
content_source_payload=content_source_payload,
assistant_payload=assistant_payload,
model_profile_for_account=model_profile_for_account,
parse_json_object=parse_json_object,
resolve_target_kb=resolve_target_kb,
resolve_target_assistant=resolve_target_assistant,
call_model=call_model,
create_job_record=create_job_record,
job_payload=job_payload,
trigger_orchestrated_job=trigger_orchestrated_job,
)
def _seed_base_account(db: Database) -> tuple[dict[str, object], dict[str, object], dict[str, object]]:
now = utc_now()
account_id = "acct_contract_owner"
project_id = "proj_contract_owner"
model_id = "model_contract_default"
db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role, approval_status,
approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
BOOTSTRAP_USERNAME,
"hash",
"salt",
"StoryForge Contract Owner",
"super_admin",
"approved",
account_id,
now,
model_id,
now,
now,
),
)
db.execute(
"""
INSERT INTO projects (id, user_id, name, description, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(project_id, account_id, "StoryForge Contracts", "", now, now),
)
db.execute(
"""
INSERT INTO model_profiles (
id, owner_account_id, name, provider, base_url, api_key, model_name,
is_system, is_default, created_at, updated_at
) VALUES (?, NULL, ?, ?, ?, ?, ?, 1, 1, ?, ?)
""",
(model_id, "Default Model", "openai_compat", "http://127.0.0.1:8317/v1", "", "GLM-5", now, now),
)
account_row = db.fetch_one("SELECT * FROM accounts WHERE id = ?", (account_id,))
project_row = db.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,))
model_row = db.fetch_one("SELECT * FROM model_profiles WHERE id = ?", (model_id,))
assert account_row is not None
assert project_row is not None
assert model_row is not None
return account_row, project_row, model_row
def _seed_douyin(db: Database, owner: dict[str, object], model_row: dict[str, object]) -> str:
now = utc_now()
account_id = "dyacct_contract_1"
db.execute(
"""
INSERT INTO douyin_accounts (
id, user_id, profile_url, canonical_profile_url, sec_uid, douyin_uid, douyin_id,
nickname, signature, avatar_url, tags_json, profile_stats_json, raw_profile_json,
source_mode, sync_status, last_public_sync_at, last_creator_sync_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
owner["id"],
"https://www.douyin.com/user/contract-test",
"https://www.douyin.com/user/contract-test",
"sec_contract_1",
"douyin_uid_contract_1",
"douyin_id_contract_1",
"Contract Douyin",
"Contract test signature",
"https://example.com/avatar.png",
_json(["增长", "内容"]),
_json({"fans_count": 1200, "likes_count": 8800}),
_json({"profile": {"nickname": "Contract Douyin"}}),
"creator_center",
"ready",
now,
now,
now,
now,
),
)
for index in range(2):
db.execute(
"""
INSERT INTO douyin_videos (
id, account_id, aweme_id, title, description, share_url, cover_url,
duration_sec, published_at, tags_json, stats_json, raw_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"dyvid_contract_{index + 1}",
account_id,
f"aweme_contract_{index + 1}",
f"Contract Video {index + 1}",
"Contract summary",
"https://example.com/video",
"https://example.com/cover.png",
28,
f"2026-03-26T10:0{index}:00+00:00",
_json(["增长", "内容"]),
_json({"play": 8200 + index * 300, "like": 410 + index * 10, "comment": 18, "share": 9}),
_json({"title": f"Contract Video {index + 1}"}),
now,
now,
),
)
public_snapshot_id = "dysnap_public_contract"
creator_snapshot_id = "dysnap_creator_contract"
db.execute(
"""
INSERT INTO douyin_account_snapshots (
id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
field_count, collected_at, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
public_snapshot_id,
account_id,
"public_profile",
"https://www.douyin.com/user/contract-test",
_json({"nickname": "Contract Douyin"}),
_json({"nickname": "Contract Douyin"}),
1,
now,
now,
),
)
db.execute(
"""
INSERT INTO douyin_account_snapshots (
id, account_id, snapshot_type, source_url, raw_payload_json, summary_json,
field_count, collected_at, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
creator_snapshot_id,
account_id,
"creator_center",
"https://creator.douyin.com/creator-micro/home",
_json({"field": "value"}),
_json({"creator": "summary"}),
1,
now,
now,
),
)
db.execute(
"""
INSERT INTO douyin_snapshot_fields (snapshot_id, field_path, field_type, field_value_text)
VALUES (?, ?, ?, ?)
""",
(creator_snapshot_id, "profile.nickname", "string", "Contract Douyin"),
)
db.execute(
"""
INSERT INTO douyin_analysis_reports (
id, account_id, user_id, focus_text, model_profile_ids_json, linked_account_ids_json,
prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"dyreport_contract_1",
account_id,
owner["id"],
"增长诊断",
_json([model_row["id"]]),
_json([]),
"contract prompt",
_json({"account": "douyin"}),
now,
),
)
db.execute(
"""
INSERT INTO douyin_analysis_suggestions (
id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"dysuggestion_contract_1",
"dyreport_contract_1",
model_row["id"],
"Test Model",
"ok",
"Contract analysis output",
_json({"summary": "ok"}),
now,
),
)
db.execute(
"""
INSERT INTO douyin_tracked_accounts (
id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
("dytrack_contract_1", owner["id"], account_id, None, "note", now, now),
)
db.execute(
"""
INSERT INTO douyin_tracking_cursors (user_id, last_seen_at, updated_at)
VALUES (?, ?, ?)
""",
(owner["id"], "2026-03-26T09:00:00+00:00", now),
)
return account_id
def _seed_domestic(db: Database, owner: dict[str, object], project_row: dict[str, object], platform: str) -> str:
now = utc_now()
account_id = f"{platform}_acct_contract_1"
db.execute(
"""
INSERT INTO content_sources (
id, user_id, project_id, source_kind, platform, handle, source_url, title, local_path,
metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
owner["id"],
project_row["id"],
"creator_account",
platform,
f"{platform}_handle",
f"https://example.com/{platform}/profile",
f"{platform.upper()} Contract Account",
"",
_json(
{
"bio": f"{platform} bio",
"description": f"{platform} description",
"avatar_url": "https://example.com/avatar.png",
"tags": ["增长", platform],
"keywords": ["增长", "内容"],
"max_items": 5,
}
),
now,
now,
),
)
for index in range(2):
db.execute(
"""
INSERT INTO content_sources (
id, user_id, project_id, source_kind, platform, handle, source_url, title, local_path,
metadata_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{platform}_video_contract_{index + 1}",
owner["id"],
project_row["id"],
"video_link",
platform,
"",
f"https://example.com/{platform}/video/{index + 1}",
f"{platform.upper()} Contract Video {index + 1}",
"",
_json(
{
"summary": "contract summary",
"description": "contract description",
"cover_url": "https://example.com/cover.png",
"published_at": f"2026-03-26T10:0{index}:00+00:00",
"tags": ["增长", platform],
"content_type": "video",
"duration_sec": 30,
"external_id": f"{platform}_ext_{index + 1}",
"origin_content_source_id": account_id,
"source_account_url": f"https://example.com/{platform}/profile",
}
),
now,
now,
),
)
db.execute(
f"""
INSERT INTO {platform}_analysis_reports (
id, user_id, account_source_id, focus_text, prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
f"{platform}_report_contract_1",
owner["id"],
account_id,
"增长诊断",
"contract prompt",
_json({"account": platform}),
now,
),
)
db.execute(
f"""
INSERT INTO {platform}_analysis_suggestions (
id, report_id, model_profile_id, model_label, status, suggestion_text, parsed_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{platform}_suggestion_contract_1",
f"{platform}_report_contract_1",
"model_contract_default",
"Test Model",
"ok",
"Contract analysis output",
_json({"summary": "ok"}),
now,
),
)
db.execute(
f"""
INSERT INTO {platform}_tracked_accounts (
id, user_id, tracked_account_id, assistant_id, note, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(f"{platform}_track_contract_1", owner["id"], account_id, None, "note", now, now),
)
db.execute(
f"""
INSERT INTO {platform}_tracking_cursors (user_id, last_seen_at, updated_at)
VALUES (?, ?, ?)
""",
(owner["id"], "2026-03-26T09:00:00+00:00", now),
)
db.execute(
f"""
INSERT INTO {platform}_similarity_searches (
id, user_id, source_account_id, prompt_text, context_json, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""",
(
f"{platform}_search_contract_1",
owner["id"],
account_id,
"contract prompt",
_json({"source_account": platform}),
now,
),
)
return account_id
def _build_app(platforms: list[str]) -> tuple[FastAPI, SimpleNamespace, dict[str, object]]:
tmpdir = tempfile.TemporaryDirectory()
db = Database(str(Path(tmpdir.name) / "storyforge.db"))
db.init_schema()
owner_row, project_row, model_row = _seed_base_account(db)
legacy = _make_legacy(db, owner_row)
app = FastAPI()
register_douyin_routes(app, legacy)
for platform in platforms:
register_domestic_platform_routes(app, legacy, platform=platform, label=platform)
app.state._tmpdir = tmpdir
app.state._legacy = legacy
app.state._project_row = project_row
app.state._model_row = model_row
app.state._owner_row = owner_row
return app, legacy, {
"owner": owner_row,
"project": project_row,
"model": model_row,
}
class PlatformContractTests(unittest.TestCase):
def test_douyin_tracking_digest_and_workspace_shape(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu", "bilibili", "kuaishou", "wechat_video"])
douyin_account_id = _seed_douyin(legacy.db, seed["owner"], seed["model"])
with TestClient(app) as client:
accounts = client.get("/v2/douyin/accounts", headers={"Authorization": "Bearer dummy"})
self.assertEqual(accounts.status_code, 200)
self.assertTrue(accounts.json())
workspace = client.get(f"/v2/douyin/accounts/{douyin_account_id}", headers={"Authorization": "Bearer dummy"})
self.assertEqual(workspace.status_code, 200)
workspace_payload = workspace.json()
self.assertIn("account", workspace_payload)
self.assertIn("recent_reports", workspace_payload)
self.assertIn("latest_public_snapshot", workspace_payload)
self.assertIn("latest_creator_snapshot", workspace_payload)
self.assertIn("recent_similarity_searches", workspace_payload)
self.assertIn("available_model_profiles", workspace_payload)
reports = client.get(f"/v2/douyin/accounts/{douyin_account_id}/analysis-reports", headers={"Authorization": "Bearer dummy"})
self.assertEqual(reports.status_code, 200)
self.assertEqual(len(reports.json()), len(workspace_payload["recent_reports"]))
snapshots = client.get(f"/v2/douyin/accounts/{douyin_account_id}/snapshots", headers={"Authorization": "Bearer dummy"})
self.assertEqual(snapshots.status_code, 200)
self.assertGreaterEqual(len(snapshots.json()), 2)
creator_snapshot = next(item for item in snapshots.json() if item["snapshot_type"] == "creator_center")
creator_fields = client.get(f"/v2/douyin/accounts/{douyin_account_id}/creator-fields", headers={"Authorization": "Bearer dummy"})
self.assertEqual(creator_fields.status_code, 200)
self.assertEqual(creator_fields.json()["snapshot_type"], "creator_center")
self.assertEqual(creator_fields.json()["id"], creator_snapshot["id"])
digest = client.get("/v2/douyin/tracking/digest", headers={"Authorization": "Bearer dummy"})
self.assertEqual(digest.status_code, 200)
digest_payload = digest.json()
self.assertIn("generated_at", digest_payload)
self.assertIn("since", digest_payload)
self.assertIn("tracked_accounts", digest_payload)
self.assertIn("cursor_last_seen_at", digest_payload)
self.assertTrue(digest_payload["items"])
digest_item = digest_payload["items"][0]
self.assertEqual(digest_item["platform"], "douyin")
self.assertIn("summary_text", digest_item)
self.assertIn("tracked_account_name", digest_item)
self.assertIn("account", digest_item)
self.assertIn("video", digest_item)
def test_domestic_workspace_and_tracking_shape(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
xhs_account_id = _seed_domestic(legacy.db, seed["owner"], seed["project"], "xiaohongshu")
with TestClient(app) as client:
workspace = client.get(
f"/v2/xiaohongshu/accounts/{xhs_account_id}/workspace",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(workspace.status_code, 200)
workspace_payload = workspace.json()
self.assertIn("latest_public_snapshot", workspace_payload)
self.assertIn("latest_creator_snapshot", workspace_payload)
self.assertIn("recent_reports", workspace_payload)
self.assertIn("recent_similarity_searches", workspace_payload)
self.assertIn("available_model_profiles", workspace_payload)
reports = client.get(
f"/v2/xiaohongshu/accounts/{xhs_account_id}/analysis-reports",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(reports.status_code, 200)
self.assertEqual(reports.json(), workspace_payload["recent_reports"])
digest = client.get("/v2/xiaohongshu/tracking/digest", headers={"Authorization": "Bearer dummy"})
self.assertEqual(digest.status_code, 200)
digest_payload = digest.json()
self.assertIn("generated_at", digest_payload)
self.assertIn("since", digest_payload)
self.assertIn("tracked_accounts", digest_payload)
self.assertIn("cursor_last_seen_at", digest_payload)
self.assertTrue(digest_payload["items"])
digest_item = digest_payload["items"][0]
self.assertEqual(digest_item["platform"], "xiaohongshu")
self.assertIn("summary_text", digest_item)
self.assertIn("account", digest_item)
self.assertIn("video", digest_item)
def test_douyin_live_first_mutation_routes_are_available(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
douyin_account_id = _seed_douyin(legacy.db, seed["owner"], seed["model"])
with TestClient(app) as client:
analyze = client.post(
f"/v2/douyin/accounts/{douyin_account_id}/videos/analyze-top",
headers={"Authorization": "Bearer dummy"},
json={"top_video_count": 2, "min_score": 0, "temperature": 0.25, "model_profile_id": seed["model"]["id"]},
)
self.assertEqual(analyze.status_code, 200, analyze.text)
analyze_payload = analyze.json()
self.assertEqual(analyze_payload["account_id"], douyin_account_id)
self.assertIn("analyzed_count", analyze_payload)
refresh_all = client.post("/v2/douyin/tracking/refresh", headers={"Authorization": "Bearer dummy"})
self.assertEqual(refresh_all.status_code, 200, refresh_all.text)
refresh_all_payload = refresh_all.json()
self.assertIn("refreshed", refresh_all_payload)
self.assertIn("items", refresh_all_payload)
refresh_one = client.post(
f"/v2/douyin/tracking/accounts/{douyin_account_id}/refresh",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(refresh_one.status_code, 200, refresh_one.text)
refresh_one_payload = refresh_one.json()
self.assertTrue(refresh_one_payload["success"])
self.assertEqual(refresh_one_payload["tracked_account_id"], douyin_account_id)
self.assertIn("account", refresh_one_payload)
self.assertIn("sync_errors", refresh_one_payload)
cursor = client.post(
"/v2/douyin/tracking/cursor",
headers={"Authorization": "Bearer dummy"},
json={"last_seen_at": "2026-03-30T10:00:00+00:00"},
)
self.assertEqual(cursor.status_code, 200, cursor.text)
self.assertEqual(cursor.json()["last_seen_at"], "2026-03-30T10:00:00+00:00")
def test_domestic_live_first_mutation_routes_are_available(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
xhs_account_id = _seed_domestic(legacy.db, seed["owner"], seed["project"], "xiaohongshu")
with TestClient(app) as client:
analyze = client.post(
f"/v2/xiaohongshu/accounts/{xhs_account_id}/videos/analyze-top",
headers={"Authorization": "Bearer dummy"},
json={"top_video_count": 2, "min_score": 0, "temperature": 0.25, "model_profile_id": seed["model"]["id"]},
)
self.assertEqual(analyze.status_code, 200, analyze.text)
analyze_payload = analyze.json()
self.assertEqual(analyze_payload["account_id"], xhs_account_id)
self.assertIn("analyzed_count", analyze_payload)
refresh_all = client.post("/v2/xiaohongshu/tracking/refresh", headers={"Authorization": "Bearer dummy"})
self.assertEqual(refresh_all.status_code, 200, refresh_all.text)
refresh_all_payload = refresh_all.json()
self.assertIn("refreshed", refresh_all_payload)
self.assertIn("items", refresh_all_payload)
refresh_one = client.post(
f"/v2/xiaohongshu/tracking/accounts/{xhs_account_id}/refresh",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(refresh_one.status_code, 200, refresh_one.text)
refresh_one_payload = refresh_one.json()
self.assertEqual(refresh_one_payload["tracked_account_id"], xhs_account_id)
self.assertIn("sync_job_id", refresh_one_payload)
cursor = client.post(
"/v2/xiaohongshu/tracking/cursor",
headers={"Authorization": "Bearer dummy"},
json={"last_seen_at": "2026-03-30T10:00:00+00:00"},
)
self.assertEqual(cursor.status_code, 200, cursor.text)
self.assertEqual(cursor.json()["last_seen_at"], "2026-03-30T10:00:00+00:00")
if __name__ == "__main__":
unittest.main()