Files
storyforge/tests/test_production_baseline.py
kris 27801107b5
Some checks failed
StoryForge CI / Baseline checks (push) Has been cancelled
StoryForge CI / Backend tests (push) Has been cancelled
StoryForge CI / Web tests (push) Has been cancelled
feat: surface integration deployment locations
2026-04-06 13:32:10 +08:00

590 lines
27 KiB
Python

from __future__ import annotations
import importlib
import json
import os
import sqlite3
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
from typing import Any
from fastapi.testclient import TestClient
ROOT = Path(__file__).resolve().parents[1]
APP_ROOT = ROOT / "collector-service"
if str(APP_ROOT) not in sys.path:
sys.path.insert(0, str(APP_ROOT))
class ProductionBaselineTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.tempdir = tempfile.TemporaryDirectory()
temp_root = Path(cls.tempdir.name)
os.environ["DATA_DIR"] = str(temp_root / "data")
os.environ["DATABASE_PATH"] = str(temp_root / "data" / "storyforge.db")
os.environ["DOWNLOADS_DIR"] = str(temp_root / "downloads")
os.environ["JOBS_DIR"] = str(temp_root / "jobs")
os.environ["MODELS_DIR"] = str(temp_root / "models")
os.environ["ORCHESTRATOR_SHARED_SECRET"] = "test-secret"
os.environ["WEB_AUTOLOGIN_ENABLED"] = "1"
os.environ["WEB_AUTOLOGIN_ACCOUNT_USERNAME"] = ""
os.environ["WEB_AUTOLOGIN_USERNAME"] = ""
os.environ["WEB_AUTOLOGIN_PASSWORD"] = ""
os.environ.setdefault("BOOTSTRAP_SUPERADMIN_USERNAME", "")
os.environ.setdefault("BOOTSTRAP_SUPERADMIN_PASSWORD", "")
cls.db_module = importlib.reload(importlib.import_module("app.database"))
cls.core = importlib.reload(importlib.import_module("app.core_main"))
cls.app_main = importlib.reload(importlib.import_module("app.main"))
async def fake_trigger(job_row: dict[str, Any]) -> dict[str, Any]:
cls.core.append_job_event(job_row["id"], "workflow.trigger.requested", {"workflow_key": job_row.get("workflow_key", "")})
cls.core.update_job_state(
job_row["id"],
status="queued",
provider_name="n8n",
provider_task_id="",
result={"n8n_trigger": {"requested": True, "mocked": True}},
)
return cls.core.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],))
async def fake_call_model(*_args: object, **_kwargs: object) -> str:
return "mock content"
cls.core.trigger_orchestrated_job = fake_trigger
cls.core.call_model = fake_call_model
cls.core.sync_live_recorder_remote_config = lambda: {"ok": True}
cls.core.db.init_schema()
cls.client = TestClient(cls.app_main.app)
@classmethod
def tearDownClass(cls) -> None:
cls.client.close()
cls.tempdir.cleanup()
def setUp(self) -> None:
self._clear_tables()
def _clear_tables(self) -> None:
tables = [
"job_events",
"tenant_usage_ledger",
"tenant_quota_profiles",
"auth_tokens",
"publish_reviews",
"live_recorder_bindings",
"live_recorder_sources",
"jobs",
"content_sources",
"assistant_knowledge_bases",
"assistants",
"knowledge_documents",
"knowledge_bases",
"projects",
"accounts",
"model_profiles",
]
for table in tables:
self.core.db.execute(f"DELETE FROM {table}")
def _seed_context(self, tag: str, *, exhausted: bool = False) -> dict[str, Any]:
now = self.db_module.utc_now()
account_id = f"acct_{tag}"
project_id = f"proj_{tag}"
model_id = f"model_{tag}"
kb_id = f"kb_{tag}"
assistant_id = f"assistant_{tag}"
token = f"token_{tag}"
username = f"user_{tag}"
login_password = f"pass_{tag}"
password_hash, password_salt = self.core.create_password_hash(login_password)
self.core.db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role, approval_status,
approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
account_id,
username,
password_hash,
password_salt,
f"User {tag}",
"super_admin",
"approved",
account_id,
now,
model_id,
now,
now,
),
)
self.core.db.execute(
"""
INSERT INTO projects (id, user_id, name, description, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(project_id, account_id, f"Project {tag}", "", now, now),
)
self.core.db.execute(
"""
INSERT INTO model_profiles (
id, owner_account_id, name, provider, base_url, api_key, model_name,
is_system, is_default, created_at, updated_at
) VALUES (?, NULL, ?, ?, ?, ?, ?, 1, 1, ?, ?)
""",
(model_id, "Default Model", "openai_compat", "http://127.0.0.1:8317/v1", "", "GLM-5", now, now),
)
self.core.db.execute(
"""
INSERT INTO knowledge_bases (id, user_id, project_id, name, description, sync_status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, 'ready', ?, ?)
""",
(kb_id, account_id, project_id, f"KB {tag}", "", now, now),
)
self.core.db.execute(
"""
INSERT INTO assistants (id, user_id, project_id, name, description, system_prompt, generation_goal, config_json, model_profile_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, '{}', ?, ?, ?)
""",
(
assistant_id,
account_id,
project_id,
f"Assistant {tag}",
"",
"你是文案助手。",
"生成短视频文案。",
model_id,
now,
now,
),
)
self.core.db.execute("INSERT INTO assistant_knowledge_bases (assistant_id, knowledge_base_id) VALUES (?, ?)", (assistant_id, kb_id))
self.core.db.execute(
"INSERT INTO auth_tokens (token, account_id, created_at) VALUES (?, ?, ?)",
(token, account_id, now),
)
if exhausted:
quota_id = f"quota_{tag}"
self.core.db.execute(
"""
INSERT INTO tenant_quota_profiles (
id, user_id, project_id, monthly_budget_cents, storage_limit_bytes, analysis_quota,
copy_quota, ai_video_quota, real_cut_quota, recorder_quota, enabled, config_json,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, '{}', ?, ?)
""",
(quota_id, account_id, project_id, 9999, 0, 1, 1, 1, 1, 1, now, now),
)
for category in ["analysis", "content_source_sync", "review", "copy", "ai_video", "real_cut", "live_recorder"]:
usage_id = f"usage_{tag}_{category}"
cost_map = {
"analysis": 6,
"content_source_sync": 8,
"review": 1,
"copy": 3,
"ai_video": 30,
"real_cut": 20,
"live_recorder": 2,
}
self.core.db.execute(
"""
INSERT INTO tenant_usage_ledger (
id, user_id, project_id, category, quantity, cost_cents, reference_type, reference_id, details_json, created_at
) VALUES (?, ?, ?, ?, 1, ?, 'seed', ?, '{}', ?)
""",
(usage_id, account_id, project_id, category, cost_map[category], usage_id, now),
)
return {
"account_id": account_id,
"project_id": project_id,
"model_id": model_id,
"kb_id": kb_id,
"assistant_id": assistant_id,
"token": token,
"username": username,
"password": login_password,
}
def test_auto_session_issues_token_without_manual_credentials(self) -> None:
ctx = self._seed_context("auto", exhausted=False)
self.core.WEB_AUTOLOGIN_ENABLED = "1"
self.core.WEB_AUTOLOGIN_ACCOUNT_USERNAME = ctx["username"]
self.core.WEB_AUTOLOGIN_USERNAME = ""
self.core.WEB_AUTOLOGIN_PASSWORD = ""
response = self.client.post("/v2/auth/auto-session")
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertEqual(payload["account"]["username"], ctx["username"])
self.assertEqual(payload["mode"], "auto")
def test_repo_contains_fnos_lan_stack_deploy_entrypoint(self) -> None:
script_path = ROOT / "scripts" / "deploy_fnos_storyforge_lan_stack.sh"
self.assertTrue(script_path.exists(), str(script_path))
content = script_path.read_text(encoding="utf-8")
self.assertIn("deploy_fnos_cutvideo_tunnel.sh", content)
self.assertIn("deploy_fnos_storyforge_collector.sh", content)
self.assertIn("deploy_fnos_storyforge_web.sh", content)
def test_repo_contains_fnos_lan_smoke_script(self) -> None:
script_path = ROOT / "scripts" / "smoke_fnos_storyforge_lan.sh"
self.assertTrue(script_path.exists(), str(script_path))
content = script_path.read_text(encoding="utf-8")
for expected in [
"/healthz",
"/v2/auth/auto-session",
"/v2/integrations/health",
"/transcribe",
"/api/bootstrap",
"19181",
]:
self.assertIn(expected, content)
def test_web_deploy_script_defaults_to_lan_collector(self) -> None:
script_path = ROOT / "scripts" / "deploy_fnos_storyforge_web.sh"
content = script_path.read_text(encoding="utf-8")
self.assertIn('COLLECTOR_PORT="${STORYFORGE_COLLECTOR_DEV_PORT:-19193}"', content)
self.assertIn('BACKEND_URL="${STORYFORGE_FNOS_BACKEND_URL:-${STORYFORGE_FNOS_COLLECTOR_URL:-http://$FNOS_HOST:$COLLECTOR_PORT}}"', content)
self.assertNotIn('https://storyforge.hyzq.net', content)
def test_baseline_script_covers_homepage_dashboard_node_test(self) -> None:
script = (ROOT / "scripts" / "check_repo_baseline.sh").read_text(encoding="utf-8")
self.assertIn("dashboard-home.test.mjs", script)
def test_healthz_exposes_lan_routing_summary(self) -> None:
response = self.client.get("/healthz")
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertIn("lanRouting", payload)
self.assertIn("cutvideoRouteMode", payload["lanRouting"])
self.assertIn("cutvideoBaseUrl", payload["lanRouting"])
def test_integrations_health_exposes_asr_runtime_summary(self) -> None:
ctx = self._seed_context("asr_runtime", exhausted=False)
headers = {"Authorization": f"Bearer {ctx['token']}"}
original_base_url = self.core.ASR_HTTP_BASE_URL
original_probe_http_json = getattr(self.core, "probe_http_json", None)
try:
self.core.ASR_HTTP_BASE_URL = "http://asr.example:8088"
def fake_probe_http_json(url: str, path: str = "", timeout: float = 3.0) -> dict[str, Any]:
if url == "http://asr.example:8088" and path == "/health":
return {
"configured": True,
"reachable": True,
"status_code": 200,
"error": "",
"url": "http://asr.example:8088/health",
"json": {
"service": "storyforge-windows-asr",
"model_name": "base",
"language": "auto",
"device": "auto",
"compute_type": "auto",
"active_device": "cuda",
"active_compute_type": "int8_float16",
},
}
return {
"configured": False,
"reachable": False,
"status_code": 0,
"error": "not_configured",
"url": "",
"json": {},
}
self.core.probe_http_json = fake_probe_http_json
response = self.client.get("/v2/integrations/health", headers=headers)
finally:
self.core.ASR_HTTP_BASE_URL = original_base_url
if original_probe_http_json is None:
try:
delattr(self.core, "probe_http_json")
except AttributeError:
pass
else:
self.core.probe_http_json = original_probe_http_json
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertIn("asr", payload)
self.assertEqual(payload["asr"]["active_device"], "cuda")
self.assertEqual(payload["asr"]["active_compute_type"], "int8_float16")
self.assertEqual(payload["asr"]["runtime_device_mode"], "auto")
self.assertEqual(payload["asr"]["runtime_compute_type_mode"], "auto")
self.assertEqual(payload["asr"]["language_mode"], "auto")
self.assertEqual(payload["asr"]["model_name"], "base")
def test_integrations_health_exposes_deployment_labels(self) -> None:
ctx = self._seed_context("deployment_labels", exhausted=False)
headers = {"Authorization": f"Bearer {ctx['token']}"}
original_n8n = self.core.N8N_BASE_URL
original_huobao = self.core.HUOBAO_BASE_URL
original_asr = self.core.ASR_HTTP_BASE_URL
original_live_recorder = self.core.LIVE_RECORDER_BASE_URL
original_cutvideo = self.core.CUTVIDEO_BASE_URL
original_probe_http = self.core.probe_http
original_probe_http_json = getattr(self.core, "probe_http_json", None)
try:
self.core.N8N_BASE_URL = "http://127.0.0.1:25670"
self.core.HUOBAO_BASE_URL = "http://127.0.0.1:25678"
self.core.ASR_HTTP_BASE_URL = "http://192.168.31.18:8088"
self.core.LIVE_RECORDER_BASE_URL = "http://192.168.31.188:19106"
self.core.CUTVIDEO_BASE_URL = "http://192.168.31.188:19186"
def fake_probe_http(url: str, path: str = "", timeout: float = 3.0) -> dict[str, Any]:
return {
"configured": True,
"reachable": True,
"status_code": 200,
"error": "",
"url": f"{url.rstrip('/')}/{path.lstrip('/')}" if path else url,
}
def fake_probe_http_json(url: str, path: str = "", timeout: float = 3.0) -> dict[str, Any]:
detail = fake_probe_http(url, path=path, timeout=timeout)
detail["json"] = {
"service": "storyforge-windows-asr",
"model_name": "base",
"language": "auto",
"device": "auto",
"compute_type": "auto",
"active_device": "cuda",
"active_compute_type": "int8_float16",
}
return detail
self.core.probe_http = fake_probe_http
self.core.probe_http_json = fake_probe_http_json
response = self.client.get("/v2/integrations/health", headers=headers)
finally:
self.core.N8N_BASE_URL = original_n8n
self.core.HUOBAO_BASE_URL = original_huobao
self.core.ASR_HTTP_BASE_URL = original_asr
self.core.LIVE_RECORDER_BASE_URL = original_live_recorder
self.core.CUTVIDEO_BASE_URL = original_cutvideo
self.core.probe_http = original_probe_http
if original_probe_http_json is None:
try:
delattr(self.core, "probe_http_json")
except AttributeError:
pass
else:
self.core.probe_http_json = original_probe_http_json
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertEqual(payload["n8n"]["deployment_label"], "服务器")
self.assertEqual(payload["huobao"]["deployment_label"], "服务器")
self.assertEqual(payload["asr"]["deployment_label"], "Windows")
self.assertEqual(payload["live_recorder"]["deployment_label"], "NAS")
self.assertEqual(payload["cutvideo"]["deployment_label"], "NAS 隧道")
def test_collector_deploy_script_exposes_health_retry_controls(self) -> None:
script_path = ROOT / "scripts" / "deploy_fnos_storyforge_collector.sh"
content = script_path.read_text(encoding="utf-8")
self.assertIn("COLLECTOR_HEALTH_RETRY_ATTEMPTS", content)
self.assertIn("COLLECTOR_HEALTH_RETRY_SLEEP_SEC", content)
self.assertNotIn("$(_attempt)", content)
def test_database_uses_wal_and_busy_timeout(self) -> None:
conn = self.core.db.connect()
try:
journal_mode_row = conn.execute("PRAGMA journal_mode").fetchone()
busy_timeout_row = conn.execute("PRAGMA busy_timeout").fetchone()
journal_mode = journal_mode_row["journal_mode"] if isinstance(journal_mode_row, dict) else journal_mode_row[0]
if isinstance(busy_timeout_row, dict):
busy_timeout = int(busy_timeout_row.get("timeout") or busy_timeout_row.get("busy_timeout") or next(iter(busy_timeout_row.values())))
else:
busy_timeout = int(busy_timeout_row[0])
finally:
conn.close()
self.assertEqual(str(journal_mode).lower(), "wal")
self.assertGreaterEqual(busy_timeout, 1000)
def test_quota_blocks_production_endpoints(self) -> None:
ctx = self._seed_context("quota", exhausted=True)
headers = {"Authorization": f"Bearer {ctx['token']}"}
blocked_requests = [
("POST", "/v2/explore/text", {"title": "T", "content": "C", "project_id": ctx["project_id"], "knowledge_base_id": ctx["kb_id"], "assistant_id": ctx["assistant_id"], "analysis_model_profile_id": ctx["model_id"]}, None),
("POST", "/v2/explore/video-link", {"video_url": "https://example.com/video", "title": "V", "project_id": ctx["project_id"], "knowledge_base_id": ctx["kb_id"], "assistant_id": ctx["assistant_id"], "analysis_model_profile_id": ctx["model_id"]}, None),
("POST", "/v2/pipelines/content-source-sync", {"project_id": ctx["project_id"]}, None),
("POST", "/v2/reviews", {"project_id": ctx["project_id"], "assistant_id": ctx["assistant_id"], "title": "Review"}, None),
("POST", "/v2/pipelines/real-cut", {"project_id": ctx["project_id"], "title": "Cut"}, None),
("POST", "/v2/pipelines/ai-video", {"project_id": ctx["project_id"], "title": "Video", "brief": "Brief"}, None),
("POST", f"/v2/assistants/{ctx['assistant_id']}/generate", {"brief": "Copy", "project_id": ctx["project_id"], "knowledge_base_ids": [ctx["kb_id"]]}, None),
("POST", "/v2/live-recorder/sources", {"project_id": ctx["project_id"], "source_url": "https://example.com/live", "title": "Live"}, None),
]
for method, path, json_body, files in blocked_requests:
with self.subTest(path=path):
response = self.client.request(method, path, headers=headers, json=json_body, files=files)
self.assertEqual(response.status_code, 403, response.text)
upload_response = self.client.post(
"/v2/explore/upload-video",
headers=headers,
data={
"title": "Upload",
"project_id": ctx["project_id"],
"knowledge_base_id": ctx["kb_id"],
"assistant_id": ctx["assistant_id"],
"analysis_model_profile_id": ctx["model_id"],
},
files={"file": ("clip.mp4", b"clip-bytes", "video/mp4")},
)
self.assertEqual(upload_response.status_code, 403, upload_response.text)
def test_successful_analysis_records_usage_and_retry_endpoints_work(self) -> None:
ctx = self._seed_context("happy", exhausted=False)
headers = {"Authorization": f"Bearer {ctx['token']}"}
text_response = self.client.post(
"/v2/explore/text",
headers=headers,
json={
"title": "Hello",
"content": "Hello StoryForge",
"project_id": ctx["project_id"],
"knowledge_base_id": ctx["kb_id"],
"assistant_id": ctx["assistant_id"],
"analysis_model_profile_id": ctx["model_id"],
},
)
self.assertEqual(text_response.status_code, 200, text_response.text)
text_job = text_response.json()
usage_row = self.core.db.fetch_one(
"SELECT * FROM tenant_usage_ledger WHERE user_id = ? AND project_id = ? AND category = ? ORDER BY created_at DESC LIMIT 1",
(ctx["account_id"], ctx["project_id"], "analysis"),
)
self.assertIsNotNone(usage_row)
self.assertEqual(text_job["status"], "queued")
now = self.db_module.utc_now()
source_job_id = f"job_seedance_source_{ctx['project_id']}"
self.core.db.execute(
"""
INSERT INTO jobs (
id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id,
source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id,
source_url, title, language, status, transcript_text, style_summary, upload_status,
error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at
) VALUES (?, ?, ?, '', ?, ?, NULL, 'text', 'analysis', 'analysis_pipeline', 'n8n', 'collector', '', '', ?, 'auto', 'completed', '', '', 'completed', '', '{}', '{\"summary\":\"done\"}', ?, ?, ?)
""",
(
source_job_id,
ctx["account_id"],
ctx["project_id"],
ctx["assistant_id"],
ctx["kb_id"],
"Seedance Source",
ctx["model_id"],
now,
now,
),
)
ai_video_response = self.client.post(
"/v2/pipelines/ai-video",
headers=headers,
json={
"project_id": ctx["project_id"],
"assistant_id": ctx["assistant_id"],
"knowledge_base_id": ctx["kb_id"],
"source_job_id": source_job_id,
"title": "Seedance 2.0 视频",
"brief": "做一条镜头推进感更强的 AI 视频。",
"video_provider": "seedance2",
"video_model": "",
},
)
self.assertEqual(ai_video_response.status_code, 200, ai_video_response.text)
ai_video_payload = ai_video_response.json()
self.assertEqual(ai_video_payload["artifacts"]["video_provider"], "seedance2")
self.assertEqual(ai_video_payload["artifacts"]["video_model"], "seedance-2.0-pro")
self.assertEqual(ai_video_payload["artifacts"]["video_dispatch_provider"], "doubao")
self.assertEqual(ai_video_payload["artifacts"]["video_dispatch_model"], "seedance-2.0-pro")
now = self.db_module.utc_now()
failed_jobs = []
for index in range(2):
job_id = f"job_{index}_{ctx['project_id']}"
self.core.db.execute(
"""
INSERT INTO jobs (
id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id,
source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id,
source_url, title, language, status, transcript_text, style_summary, upload_status,
error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at
) VALUES (?, ?, ?, '', ?, ?, '', ?, ?, ?, 'n8n', 'collector', '', '', ?, 'auto', 'failed', '', '', 'pending', ?, '{}', '{}', ?, ?, ?)
""",
(
job_id,
ctx["account_id"],
ctx["project_id"],
ctx["assistant_id"],
ctx["kb_id"],
"text",
"analysis",
"analysis_pipeline",
f"Failed {index}",
"boom",
ctx["model_id"],
now,
now,
),
)
failed_jobs.append(job_id)
retry_response = self.client.post(f"/v2/explore/jobs/{failed_jobs[0]}/retry", headers=headers)
self.assertEqual(retry_response.status_code, 200, retry_response.text)
retry_payload = retry_response.json()
self.assertEqual(retry_payload["status"], "queued")
bulk_response = self.client.post(
"/v2/admin/jobs/retry-failed",
headers=headers,
json={"project_id": ctx["project_id"], "limit": 10},
)
self.assertEqual(bulk_response.status_code, 200, bulk_response.text)
bulk_payload = bulk_response.json()
self.assertEqual(bulk_payload["count"], 1)
self.assertEqual(len(bulk_payload["retried"]), 1)
event_rows = self.core.db.fetch_all("SELECT event_type FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (failed_jobs[0],))
event_types = [row["event_type"] for row in event_rows]
self.assertIn("job.retry.requested", event_types)
self.assertIn("job.retry.queued", event_types)
bulk_job = self.core.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (failed_jobs[1],))
self.assertEqual(bulk_job["status"], "queued")
def test_backup_script_creates_consistent_snapshot(self) -> None:
ctx = self._seed_context("backup", exhausted=False)
backup_dir = Path(self.tempdir.name) / "backups"
script = ROOT / "scripts" / "backup_storyforge_sqlite.sh"
result = subprocess.run(
["bash", str(script)],
check=True,
text=True,
capture_output=True,
env={
**os.environ,
"DATABASE_PATH": str(self.core.db.path),
"BACKUP_DIR": str(backup_dir),
},
)
backup_path = Path(result.stdout.strip().splitlines()[-1])
self.assertTrue(backup_path.exists(), result.stdout)
with sqlite3.connect(backup_path) as conn:
account_count = conn.execute("SELECT COUNT(*) FROM accounts").fetchone()[0]
self.assertGreaterEqual(int(account_count), 1)