from __future__ import annotations import importlib import json import os import sqlite3 import subprocess import sys import tempfile import unittest from pathlib import Path from typing import Any from fastapi.testclient import TestClient ROOT = Path(__file__).resolve().parents[1] APP_ROOT = ROOT / "collector-service" if str(APP_ROOT) not in sys.path: sys.path.insert(0, str(APP_ROOT)) class ProductionBaselineTests(unittest.TestCase): @classmethod def setUpClass(cls) -> None: cls.tempdir = tempfile.TemporaryDirectory() temp_root = Path(cls.tempdir.name) os.environ["DATA_DIR"] = str(temp_root / "data") os.environ["DATABASE_PATH"] = str(temp_root / "data" / "storyforge.db") os.environ["DOWNLOADS_DIR"] = str(temp_root / "downloads") os.environ["JOBS_DIR"] = str(temp_root / "jobs") os.environ["MODELS_DIR"] = str(temp_root / "models") os.environ["ORCHESTRATOR_SHARED_SECRET"] = "test-secret" os.environ["WEB_AUTOLOGIN_ENABLED"] = "1" os.environ["WEB_AUTOLOGIN_ACCOUNT_USERNAME"] = "" os.environ["WEB_AUTOLOGIN_USERNAME"] = "" os.environ["WEB_AUTOLOGIN_PASSWORD"] = "" os.environ.setdefault("BOOTSTRAP_SUPERADMIN_USERNAME", "") os.environ.setdefault("BOOTSTRAP_SUPERADMIN_PASSWORD", "") cls.db_module = importlib.reload(importlib.import_module("app.database")) cls.core = importlib.reload(importlib.import_module("app.core_main")) cls.app_main = importlib.reload(importlib.import_module("app.main")) async def fake_trigger(job_row: dict[str, Any]) -> dict[str, Any]: cls.core.append_job_event(job_row["id"], "workflow.trigger.requested", {"workflow_key": job_row.get("workflow_key", "")}) cls.core.update_job_state( job_row["id"], status="queued", provider_name="n8n", provider_task_id="", result={"n8n_trigger": {"requested": True, "mocked": True}}, ) return cls.core.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],)) async def fake_call_model(*_args: object, **_kwargs: object) -> str: return "mock content" cls.core.trigger_orchestrated_job = fake_trigger cls.core.call_model = fake_call_model cls.core.sync_live_recorder_remote_config = lambda: {"ok": True} cls.core.db.init_schema() cls.client = TestClient(cls.app_main.app) @classmethod def tearDownClass(cls) -> None: cls.client.close() cls.tempdir.cleanup() def setUp(self) -> None: self._clear_tables() def _clear_tables(self) -> None: tables = [ "job_events", "tenant_usage_ledger", "tenant_quota_profiles", "auth_tokens", "publish_reviews", "live_recorder_bindings", "live_recorder_sources", "jobs", "content_sources", "assistant_knowledge_bases", "assistants", "knowledge_documents", "knowledge_bases", "projects", "accounts", "model_profiles", ] for table in tables: self.core.db.execute(f"DELETE FROM {table}") def _seed_context(self, tag: str, *, exhausted: bool = False) -> dict[str, Any]: now = self.db_module.utc_now() account_id = f"acct_{tag}" project_id = f"proj_{tag}" model_id = f"model_{tag}" kb_id = f"kb_{tag}" assistant_id = f"assistant_{tag}" token = f"token_{tag}" username = f"user_{tag}" login_password = f"pass_{tag}" password_hash, password_salt = self.core.create_password_hash(login_password) self.core.db.execute( """ INSERT INTO accounts ( id, username, password_hash, password_salt, display_name, role, approval_status, approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( account_id, username, password_hash, password_salt, f"User {tag}", "super_admin", "approved", account_id, now, model_id, now, now, ), ) self.core.db.execute( """ INSERT INTO projects (id, user_id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, (project_id, account_id, f"Project {tag}", "", now, now), ) self.core.db.execute( """ INSERT INTO model_profiles ( id, owner_account_id, name, provider, base_url, api_key, model_name, is_system, is_default, created_at, updated_at ) VALUES (?, NULL, ?, ?, ?, ?, ?, 1, 1, ?, ?) """, (model_id, "Default Model", "openai_compat", "http://127.0.0.1:8317/v1", "", "GLM-5", now, now), ) self.core.db.execute( """ INSERT INTO knowledge_bases (id, user_id, project_id, name, description, sync_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, 'ready', ?, ?) """, (kb_id, account_id, project_id, f"KB {tag}", "", now, now), ) self.core.db.execute( """ INSERT INTO assistants (id, user_id, project_id, name, description, system_prompt, generation_goal, config_json, model_profile_id, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, '{}', ?, ?, ?) """, ( assistant_id, account_id, project_id, f"Assistant {tag}", "", "你是文案助手。", "生成短视频文案。", model_id, now, now, ), ) self.core.db.execute("INSERT INTO assistant_knowledge_bases (assistant_id, knowledge_base_id) VALUES (?, ?)", (assistant_id, kb_id)) self.core.db.execute( "INSERT INTO auth_tokens (token, account_id, created_at) VALUES (?, ?, ?)", (token, account_id, now), ) if exhausted: quota_id = f"quota_{tag}" self.core.db.execute( """ INSERT INTO tenant_quota_profiles ( id, user_id, project_id, monthly_budget_cents, storage_limit_bytes, analysis_quota, copy_quota, ai_video_quota, real_cut_quota, recorder_quota, enabled, config_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, '{}', ?, ?) """, (quota_id, account_id, project_id, 9999, 0, 1, 1, 1, 1, 1, now, now), ) for category in ["analysis", "content_source_sync", "review", "copy", "ai_video", "real_cut", "live_recorder"]: usage_id = f"usage_{tag}_{category}" cost_map = { "analysis": 6, "content_source_sync": 8, "review": 1, "copy": 3, "ai_video": 30, "real_cut": 20, "live_recorder": 2, } self.core.db.execute( """ INSERT INTO tenant_usage_ledger ( id, user_id, project_id, category, quantity, cost_cents, reference_type, reference_id, details_json, created_at ) VALUES (?, ?, ?, ?, 1, ?, 'seed', ?, '{}', ?) """, (usage_id, account_id, project_id, category, cost_map[category], usage_id, now), ) return { "account_id": account_id, "project_id": project_id, "model_id": model_id, "kb_id": kb_id, "assistant_id": assistant_id, "token": token, "username": username, "password": login_password, } def test_auto_session_issues_token_without_manual_credentials(self) -> None: ctx = self._seed_context("auto", exhausted=False) self.core.WEB_AUTOLOGIN_ENABLED = "1" self.core.WEB_AUTOLOGIN_ACCOUNT_USERNAME = ctx["username"] self.core.WEB_AUTOLOGIN_USERNAME = "" self.core.WEB_AUTOLOGIN_PASSWORD = "" response = self.client.post("/v2/auth/auto-session") self.assertEqual(response.status_code, 200, response.text) payload = response.json() self.assertEqual(payload["account"]["username"], ctx["username"]) self.assertEqual(payload["mode"], "auto") def test_repo_contains_fnos_lan_stack_deploy_entrypoint(self) -> None: script_path = ROOT / "scripts" / "deploy_fnos_storyforge_lan_stack.sh" self.assertTrue(script_path.exists(), str(script_path)) content = script_path.read_text(encoding="utf-8") self.assertIn("deploy_fnos_cutvideo_tunnel.sh", content) self.assertIn("deploy_fnos_storyforge_collector.sh", content) self.assertIn("deploy_fnos_storyforge_web.sh", content) def test_repo_contains_fnos_lan_smoke_script(self) -> None: script_path = ROOT / "scripts" / "smoke_fnos_storyforge_lan.sh" self.assertTrue(script_path.exists(), str(script_path)) content = script_path.read_text(encoding="utf-8") for expected in [ "/healthz", "/v2/auth/auto-session", "/v2/integrations/health", "/transcribe", "/api/bootstrap", "19181", "deployment_label", "not_configured", ]: self.assertIn(expected, content) def test_repo_contains_public_smoke_script(self) -> None: script_path = ROOT / "scripts" / "smoke_public_storyforge.sh" self.assertTrue(script_path.exists(), str(script_path)) content = script_path.read_text(encoding="utf-8") for expected in [ "/healthz", "/v2/auth/auto-session", "/v2/integrations/health", "deployment_label", "not_configured", "active_device", ]: self.assertIn(expected, content) def test_web_deploy_script_defaults_to_lan_collector(self) -> None: script_path = ROOT / "scripts" / "deploy_fnos_storyforge_web.sh" content = script_path.read_text(encoding="utf-8") self.assertIn('COLLECTOR_PORT="${STORYFORGE_COLLECTOR_DEV_PORT:-19193}"', content) self.assertIn('BACKEND_URL="${STORYFORGE_FNOS_BACKEND_URL:-${STORYFORGE_FNOS_COLLECTOR_URL:-http://$FNOS_HOST:$COLLECTOR_PORT}}"', content) self.assertNotIn('https://storyforge.hyzq.net', content) def test_baseline_script_covers_homepage_dashboard_node_test(self) -> None: script = (ROOT / "scripts" / "check_repo_baseline.sh").read_text(encoding="utf-8") self.assertIn("dashboard-home.test.mjs", script) def test_healthz_exposes_lan_routing_summary(self) -> None: response = self.client.get("/healthz") self.assertEqual(response.status_code, 200, response.text) payload = response.json() self.assertIn("lanRouting", payload) self.assertIn("cutvideoRouteMode", payload["lanRouting"]) self.assertIn("cutvideoBaseUrl", payload["lanRouting"]) def test_integrations_health_exposes_asr_runtime_summary(self) -> None: ctx = self._seed_context("asr_runtime", exhausted=False) headers = {"Authorization": f"Bearer {ctx['token']}"} original_base_url = self.core.ASR_HTTP_BASE_URL original_probe_http_json = getattr(self.core, "probe_http_json", None) try: self.core.ASR_HTTP_BASE_URL = "http://asr.example:8088" def fake_probe_http_json(url: str, path: str = "", timeout: float = 3.0) -> dict[str, Any]: if url == "http://asr.example:8088" and path == "/health": return { "configured": True, "reachable": True, "status_code": 200, "error": "", "url": "http://asr.example:8088/health", "json": { "service": "storyforge-windows-asr", "model_name": "base", "language": "auto", "device": "auto", "compute_type": "auto", "active_device": "cuda", "active_compute_type": "int8_float16", }, } return { "configured": False, "reachable": False, "status_code": 0, "error": "not_configured", "url": "", "json": {}, } self.core.probe_http_json = fake_probe_http_json response = self.client.get("/v2/integrations/health", headers=headers) finally: self.core.ASR_HTTP_BASE_URL = original_base_url if original_probe_http_json is None: try: delattr(self.core, "probe_http_json") except AttributeError: pass else: self.core.probe_http_json = original_probe_http_json self.assertEqual(response.status_code, 200, response.text) payload = response.json() self.assertIn("asr", payload) self.assertEqual(payload["asr"]["active_device"], "cuda") self.assertEqual(payload["asr"]["active_compute_type"], "int8_float16") self.assertEqual(payload["asr"]["runtime_device_mode"], "auto") self.assertEqual(payload["asr"]["runtime_compute_type_mode"], "auto") self.assertEqual(payload["asr"]["language_mode"], "auto") self.assertEqual(payload["asr"]["model_name"], "base") def test_integrations_health_exposes_deployment_labels(self) -> None: ctx = self._seed_context("deployment_labels", exhausted=False) headers = {"Authorization": f"Bearer {ctx['token']}"} original_n8n = self.core.N8N_BASE_URL original_huobao = self.core.HUOBAO_BASE_URL original_asr = self.core.ASR_HTTP_BASE_URL original_live_recorder = self.core.LIVE_RECORDER_BASE_URL original_cutvideo = self.core.CUTVIDEO_BASE_URL original_probe_http = self.core.probe_http original_probe_http_json = getattr(self.core, "probe_http_json", None) original_probe_huobao_video_config = getattr(self.core, "probe_huobao_video_config", None) try: self.core.N8N_BASE_URL = "http://127.0.0.1:25670" self.core.HUOBAO_BASE_URL = "http://127.0.0.1:25678" self.core.ASR_HTTP_BASE_URL = "http://192.168.31.18:8088" self.core.LIVE_RECORDER_BASE_URL = "http://192.168.31.188:19106" self.core.CUTVIDEO_BASE_URL = "http://192.168.31.188:19186" def fake_probe_http(url: str, path: str = "", timeout: float = 3.0) -> dict[str, Any]: return { "configured": True, "reachable": True, "status_code": 200, "error": "", "url": f"{url.rstrip('/')}/{path.lstrip('/')}" if path else url, } def fake_probe_http_json(url: str, path: str = "", timeout: float = 3.0) -> dict[str, Any]: detail = fake_probe_http(url, path=path, timeout=timeout) detail["json"] = { "service": "storyforge-windows-asr", "model_name": "base", "language": "auto", "device": "auto", "compute_type": "auto", "active_device": "cuda", "active_compute_type": "int8_float16", } return detail def fake_probe_huobao_video_config(url: str, timeout: float = 5.0) -> dict[str, Any]: return { "video_config_route": "/settings/ai-config -> 视频 -> 火山引擎", "video_config_count": 1, "video_config_ready": True, } self.core.probe_http = fake_probe_http self.core.probe_http_json = fake_probe_http_json self.core.probe_huobao_video_config = fake_probe_huobao_video_config response = self.client.get("/v2/integrations/health", headers=headers) finally: self.core.N8N_BASE_URL = original_n8n self.core.HUOBAO_BASE_URL = original_huobao self.core.ASR_HTTP_BASE_URL = original_asr self.core.LIVE_RECORDER_BASE_URL = original_live_recorder self.core.CUTVIDEO_BASE_URL = original_cutvideo self.core.probe_http = original_probe_http if original_probe_http_json is None: try: delattr(self.core, "probe_http_json") except AttributeError: pass else: self.core.probe_http_json = original_probe_http_json if original_probe_huobao_video_config is None: try: delattr(self.core, "probe_huobao_video_config") except AttributeError: pass else: self.core.probe_huobao_video_config = original_probe_huobao_video_config self.assertEqual(response.status_code, 200, response.text) payload = response.json() self.assertEqual(payload["n8n"]["deployment_label"], "服务器") self.assertEqual(payload["huobao"]["deployment_label"], "服务器") self.assertEqual(payload["huobao"]["video_config_route"], "/settings/ai-config -> 视频 -> 火山引擎") self.assertTrue(payload["huobao"]["video_config_ready"]) self.assertEqual(payload["asr"]["deployment_label"], "Windows") self.assertEqual(payload["live_recorder"]["deployment_label"], "NAS") self.assertEqual(payload["cutvideo"]["deployment_label"], "NAS 隧道") def test_collector_deploy_script_exposes_health_retry_controls(self) -> None: script_path = ROOT / "scripts" / "deploy_fnos_storyforge_collector.sh" content = script_path.read_text(encoding="utf-8") self.assertIn("COLLECTOR_HEALTH_RETRY_ATTEMPTS", content) self.assertIn("COLLECTOR_HEALTH_RETRY_SLEEP_SEC", content) self.assertNotIn("$(_attempt)", content) def test_database_uses_wal_and_busy_timeout(self) -> None: conn = self.core.db.connect() try: journal_mode_row = conn.execute("PRAGMA journal_mode").fetchone() busy_timeout_row = conn.execute("PRAGMA busy_timeout").fetchone() journal_mode = journal_mode_row["journal_mode"] if isinstance(journal_mode_row, dict) else journal_mode_row[0] if isinstance(busy_timeout_row, dict): busy_timeout = int(busy_timeout_row.get("timeout") or busy_timeout_row.get("busy_timeout") or next(iter(busy_timeout_row.values()))) else: busy_timeout = int(busy_timeout_row[0]) finally: conn.close() self.assertEqual(str(journal_mode).lower(), "wal") self.assertGreaterEqual(busy_timeout, 1000) def test_quota_blocks_production_endpoints(self) -> None: ctx = self._seed_context("quota", exhausted=True) headers = {"Authorization": f"Bearer {ctx['token']}"} blocked_requests = [ ("POST", "/v2/explore/text", {"title": "T", "content": "C", "project_id": ctx["project_id"], "knowledge_base_id": ctx["kb_id"], "assistant_id": ctx["assistant_id"], "analysis_model_profile_id": ctx["model_id"]}, None), ("POST", "/v2/explore/video-link", {"video_url": "https://example.com/video", "title": "V", "project_id": ctx["project_id"], "knowledge_base_id": ctx["kb_id"], "assistant_id": ctx["assistant_id"], "analysis_model_profile_id": ctx["model_id"]}, None), ("POST", "/v2/pipelines/content-source-sync", {"project_id": ctx["project_id"]}, None), ("POST", "/v2/reviews", {"project_id": ctx["project_id"], "assistant_id": ctx["assistant_id"], "title": "Review"}, None), ("POST", "/v2/pipelines/real-cut", {"project_id": ctx["project_id"], "title": "Cut"}, None), ("POST", "/v2/pipelines/ai-video", {"project_id": ctx["project_id"], "title": "Video", "brief": "Brief"}, None), ("POST", f"/v2/assistants/{ctx['assistant_id']}/generate", {"brief": "Copy", "project_id": ctx["project_id"], "knowledge_base_ids": [ctx["kb_id"]]}, None), ("POST", "/v2/live-recorder/sources", {"project_id": ctx["project_id"], "source_url": "https://example.com/live", "title": "Live"}, None), ] for method, path, json_body, files in blocked_requests: with self.subTest(path=path): response = self.client.request(method, path, headers=headers, json=json_body, files=files) self.assertEqual(response.status_code, 403, response.text) upload_response = self.client.post( "/v2/explore/upload-video", headers=headers, data={ "title": "Upload", "project_id": ctx["project_id"], "knowledge_base_id": ctx["kb_id"], "assistant_id": ctx["assistant_id"], "analysis_model_profile_id": ctx["model_id"], }, files={"file": ("clip.mp4", b"clip-bytes", "video/mp4")}, ) self.assertEqual(upload_response.status_code, 403, upload_response.text) def test_successful_analysis_records_usage_and_retry_endpoints_work(self) -> None: ctx = self._seed_context("happy", exhausted=False) headers = {"Authorization": f"Bearer {ctx['token']}"} text_response = self.client.post( "/v2/explore/text", headers=headers, json={ "title": "Hello", "content": "Hello StoryForge", "project_id": ctx["project_id"], "knowledge_base_id": ctx["kb_id"], "assistant_id": ctx["assistant_id"], "analysis_model_profile_id": ctx["model_id"], }, ) self.assertEqual(text_response.status_code, 200, text_response.text) text_job = text_response.json() usage_row = self.core.db.fetch_one( "SELECT * FROM tenant_usage_ledger WHERE user_id = ? AND project_id = ? AND category = ? ORDER BY created_at DESC LIMIT 1", (ctx["account_id"], ctx["project_id"], "analysis"), ) self.assertIsNotNone(usage_row) self.assertEqual(text_job["status"], "queued") now = self.db_module.utc_now() source_job_id = f"job_seedance_source_{ctx['project_id']}" self.core.db.execute( """ INSERT INTO jobs ( id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id, source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id, source_url, title, language, status, transcript_text, style_summary, upload_status, error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at ) VALUES (?, ?, ?, '', ?, ?, NULL, 'text', 'analysis', 'analysis_pipeline', 'n8n', 'collector', '', '', ?, 'auto', 'completed', '', '', 'completed', '', '{}', '{\"summary\":\"done\"}', ?, ?, ?) """, ( source_job_id, ctx["account_id"], ctx["project_id"], ctx["assistant_id"], ctx["kb_id"], "Seedance Source", ctx["model_id"], now, now, ), ) ai_video_response = self.client.post( "/v2/pipelines/ai-video", headers=headers, json={ "project_id": ctx["project_id"], "assistant_id": ctx["assistant_id"], "knowledge_base_id": ctx["kb_id"], "source_job_id": source_job_id, "title": "Seedance 2.0 视频", "brief": "做一条镜头推进感更强的 AI 视频。", "video_provider": "seedance2", "video_model": "", }, ) self.assertEqual(ai_video_response.status_code, 200, ai_video_response.text) ai_video_payload = ai_video_response.json() self.assertEqual(ai_video_payload["artifacts"]["video_provider"], "seedance2") self.assertEqual(ai_video_payload["artifacts"]["video_model"], "seedance-2.0-pro") self.assertEqual(ai_video_payload["artifacts"]["video_dispatch_provider"], "doubao") self.assertEqual(ai_video_payload["artifacts"]["video_dispatch_model"], "seedance-2.0-pro") now = self.db_module.utc_now() failed_jobs = [] for index in range(2): job_id = f"job_{index}_{ctx['project_id']}" self.core.db.execute( """ INSERT INTO jobs ( id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id, source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id, source_url, title, language, status, transcript_text, style_summary, upload_status, error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at ) VALUES (?, ?, ?, '', ?, ?, '', ?, ?, ?, 'n8n', 'collector', '', '', ?, 'auto', 'failed', '', '', 'pending', ?, '{}', '{}', ?, ?, ?) """, ( job_id, ctx["account_id"], ctx["project_id"], ctx["assistant_id"], ctx["kb_id"], "text", "analysis", "analysis_pipeline", f"Failed {index}", "boom", ctx["model_id"], now, now, ), ) failed_jobs.append(job_id) retry_response = self.client.post(f"/v2/explore/jobs/{failed_jobs[0]}/retry", headers=headers) self.assertEqual(retry_response.status_code, 200, retry_response.text) retry_payload = retry_response.json() self.assertEqual(retry_payload["status"], "queued") bulk_response = self.client.post( "/v2/admin/jobs/retry-failed", headers=headers, json={"project_id": ctx["project_id"], "limit": 10}, ) self.assertEqual(bulk_response.status_code, 200, bulk_response.text) bulk_payload = bulk_response.json() self.assertEqual(bulk_payload["count"], 1) self.assertEqual(len(bulk_payload["retried"]), 1) event_rows = self.core.db.fetch_all("SELECT event_type FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (failed_jobs[0],)) event_types = [row["event_type"] for row in event_rows] self.assertIn("job.retry.requested", event_types) self.assertIn("job.retry.queued", event_types) bulk_job = self.core.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (failed_jobs[1],)) self.assertEqual(bulk_job["status"], "queued") def test_backup_script_creates_consistent_snapshot(self) -> None: ctx = self._seed_context("backup", exhausted=False) backup_dir = Path(self.tempdir.name) / "backups" script = ROOT / "scripts" / "backup_storyforge_sqlite.sh" result = subprocess.run( ["bash", str(script)], check=True, text=True, capture_output=True, env={ **os.environ, "DATABASE_PATH": str(self.core.db.path), "BACKUP_DIR": str(backup_dir), }, ) backup_path = Path(result.stdout.strip().splitlines()[-1]) self.assertTrue(backup_path.exists(), result.stdout) with sqlite3.connect(backup_path) as conn: account_count = conn.execute("SELECT COUNT(*) FROM accounts").fetchone()[0] self.assertGreaterEqual(int(account_count), 1)