from __future__ import annotations import os import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Any, Iterator SQLITE_BUSY_TIMEOUT_MS = int(os.getenv("SQLITE_BUSY_TIMEOUT_MS", "5000")) SQLITE_CONNECT_TIMEOUT_SEC = float(os.getenv("SQLITE_CONNECT_TIMEOUT_SEC", "30")) def utc_now() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).replace(microsecond=0).isoformat() def dict_factory(cursor: sqlite3.Cursor, row: sqlite3.Row) -> dict[str, Any]: return {col[0]: row[idx] for idx, col in enumerate(cursor.description)} class Database: def __init__(self, path: str) -> None: self.path = Path(path) self.path.parent.mkdir(parents=True, exist_ok=True) def connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.path, timeout=SQLITE_CONNECT_TIMEOUT_SEC) conn.row_factory = dict_factory try: conn.execute("PRAGMA journal_mode = WAL") except sqlite3.OperationalError: # Some temporary or restricted filesystems used by tests cannot # enable WAL mode reliably. Fall back to the default journal mode # so the database remains usable instead of failing to open. conn.execute("PRAGMA journal_mode = DELETE") conn.execute("PRAGMA synchronous = NORMAL") conn.execute(f"PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}") conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA temp_store = MEMORY") conn.execute("PRAGMA wal_autocheckpoint = 1000") return conn @contextmanager def session(self) -> Iterator[sqlite3.Connection]: conn = self.connect() try: yield conn conn.commit() finally: conn.close() def fetch_one(self, sql: str, params: tuple[Any, ...] = ()) -> dict[str, Any] | None: with self.session() as conn: return conn.execute(sql, params).fetchone() def fetch_all(self, sql: str, params: tuple[Any, ...] = ()) -> list[dict[str, Any]]: with self.session() as conn: return list(conn.execute(sql, params).fetchall()) def execute(self, sql: str, params: tuple[Any, ...] = ()) -> None: with self.session() as conn: conn.execute(sql, params) def table_exists(self, name: str) -> bool: row = self.fetch_one( "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (name,), ) return bool(row) def column_exists(self, table: str, column: str) -> bool: with self.session() as conn: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() return any(row["name"] == column for row in rows) def init_schema(self) -> None: schema = """ CREATE TABLE IF NOT EXISTS accounts ( id TEXT PRIMARY KEY, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, password_salt TEXT NOT NULL, display_name TEXT NOT NULL, role TEXT NOT NULL, approval_status TEXT NOT NULL, approved_by TEXT, approved_at TEXT, preferred_analysis_model_id TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS auth_tokens ( token TEXT PRIMARY KEY, account_id TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY(account_id) REFERENCES accounts(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS model_profiles ( id TEXT PRIMARY KEY, owner_account_id TEXT, name TEXT NOT NULL, provider TEXT NOT NULL, base_url TEXT NOT NULL, api_key TEXT NOT NULL DEFAULT '', model_name TEXT NOT NULL, is_system INTEGER NOT NULL DEFAULT 0, is_default INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(owner_account_id) REFERENCES accounts(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS knowledge_bases ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT, name TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', sync_status TEXT NOT NULL DEFAULT 'ready', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS knowledge_documents ( id TEXT PRIMARY KEY, knowledge_base_id TEXT NOT NULL, title TEXT NOT NULL, source_type TEXT NOT NULL, source_url TEXT NOT NULL DEFAULT '', transcript_text TEXT NOT NULL DEFAULT '', style_summary TEXT NOT NULL DEFAULT '', combined_text TEXT NOT NULL DEFAULT '', analysis_json TEXT NOT NULL DEFAULT '{}', storyboard_json TEXT NOT NULL DEFAULT '[]', source_artifact_json TEXT NOT NULL DEFAULT '{}', analysis_model_profile_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS assistants ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT, name TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', system_prompt TEXT NOT NULL DEFAULT '', generation_goal TEXT NOT NULL DEFAULT '', config_json TEXT NOT NULL DEFAULT '{}', model_profile_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS assistant_knowledge_bases ( assistant_id TEXT NOT NULL, knowledge_base_id TEXT NOT NULL, PRIMARY KEY (assistant_id, knowledge_base_id), FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE CASCADE, FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT, parent_job_id TEXT, assistant_id TEXT, knowledge_base_id TEXT NOT NULL, content_source_id TEXT, source_type TEXT NOT NULL, line_type TEXT NOT NULL DEFAULT 'analysis', workflow_key TEXT NOT NULL DEFAULT '', orchestrator TEXT NOT NULL DEFAULT 'n8n', provider_name TEXT NOT NULL DEFAULT '', provider_task_id TEXT NOT NULL DEFAULT '', source_url TEXT, title TEXT NOT NULL, language TEXT NOT NULL DEFAULT 'auto', status TEXT NOT NULL, transcript_text TEXT NOT NULL DEFAULT '', style_summary TEXT NOT NULL DEFAULT '', upload_status TEXT NOT NULL DEFAULT 'pending', error TEXT NOT NULL DEFAULT '', artifacts_json TEXT NOT NULL DEFAULT '{}', result_json TEXT NOT NULL DEFAULT '{}', analysis_model_profile_id TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL, FOREIGN KEY(knowledge_base_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS content_sources ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT, source_kind TEXT NOT NULL, platform TEXT NOT NULL DEFAULT '', handle TEXT NOT NULL DEFAULT '', source_url TEXT NOT NULL DEFAULT '', title TEXT NOT NULL DEFAULT '', local_path TEXT NOT NULL DEFAULT '', metadata_json TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL ); CREATE TABLE IF NOT EXISTS publish_reviews ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT, source_job_id TEXT, assistant_id TEXT, title TEXT NOT NULL, platform TEXT NOT NULL DEFAULT 'douyin', content_type TEXT NOT NULL DEFAULT 'video', publish_url TEXT NOT NULL DEFAULT '', published_at TEXT NOT NULL DEFAULT '', metrics_json TEXT NOT NULL DEFAULT '{}', verdict TEXT NOT NULL DEFAULT '', highlights TEXT NOT NULL DEFAULT '', next_actions TEXT NOT NULL DEFAULT '', notes TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, FOREIGN KEY(source_job_id) REFERENCES jobs(id) ON DELETE SET NULL, FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL ); CREATE TABLE IF NOT EXISTS live_recorder_sources ( id TEXT PRIMARY KEY, platform TEXT NOT NULL DEFAULT '', source_url TEXT NOT NULL, remote_name TEXT NOT NULL UNIQUE, title TEXT NOT NULL DEFAULT '', metadata_json TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, UNIQUE(platform, source_url) ); CREATE TABLE IF NOT EXISTS live_recorder_bindings ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT, assistant_id TEXT, source_id TEXT NOT NULL, title TEXT NOT NULL DEFAULT '', quality TEXT NOT NULL DEFAULT '原画', enabled INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, UNIQUE(user_id, source_id), FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL, FOREIGN KEY(source_id) REFERENCES live_recorder_sources(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS tenant_quota_profiles ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT NOT NULL DEFAULT '', monthly_budget_cents INTEGER NOT NULL DEFAULT 0, storage_limit_bytes INTEGER NOT NULL DEFAULT 0, analysis_quota INTEGER NOT NULL DEFAULT 0, copy_quota INTEGER NOT NULL DEFAULT 0, ai_video_quota INTEGER NOT NULL DEFAULT 0, real_cut_quota INTEGER NOT NULL DEFAULT 0, recorder_quota INTEGER NOT NULL DEFAULT 0, enabled INTEGER NOT NULL DEFAULT 1, config_json TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL, UNIQUE(user_id, project_id), FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL ); CREATE TABLE IF NOT EXISTS tenant_usage_ledger ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, project_id TEXT NOT NULL DEFAULT '', category TEXT NOT NULL, quantity INTEGER NOT NULL DEFAULT 1, cost_cents INTEGER NOT NULL DEFAULT 0, reference_type TEXT NOT NULL DEFAULT '', reference_id TEXT NOT NULL DEFAULT '', details_json TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL ); CREATE TABLE IF NOT EXISTS job_events ( id TEXT PRIMARY KEY, job_id TEXT NOT NULL, event_type TEXT NOT NULL, payload_json TEXT NOT NULL DEFAULT '{}', created_at TEXT NOT NULL, FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS app_updates ( id INTEGER PRIMARY KEY AUTOINCREMENT, platform TEXT NOT NULL, channel TEXT NOT NULL, version_code INTEGER NOT NULL, version_name TEXT NOT NULL, min_supported_code INTEGER NOT NULL, apk_url TEXT NOT NULL, apk_sha256 TEXT NOT NULL DEFAULT '', notes TEXT NOT NULL DEFAULT '', force_update INTEGER NOT NULL DEFAULT 0, is_active INTEGER NOT NULL DEFAULT 1, published_at INTEGER NOT NULL, created_by TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS system_runtime_settings ( key TEXT PRIMARY KEY, value_json TEXT NOT NULL DEFAULT '{}', updated_by TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); """ with self.session() as conn: conn.executescript(schema) self.migrate_schema() def migrate_schema(self) -> None: table_columns: dict[str, dict[str, str]] = { "knowledge_bases": { "project_id": "TEXT", }, "knowledge_documents": { "analysis_json": "TEXT NOT NULL DEFAULT '{}'", "storyboard_json": "TEXT NOT NULL DEFAULT '[]'", "source_artifact_json": "TEXT NOT NULL DEFAULT '{}'", }, "assistants": { "project_id": "TEXT", "config_json": "TEXT NOT NULL DEFAULT '{}'", }, "jobs": { "project_id": "TEXT", "parent_job_id": "TEXT", "content_source_id": "TEXT", "line_type": "TEXT NOT NULL DEFAULT 'analysis'", "workflow_key": "TEXT NOT NULL DEFAULT ''", "orchestrator": "TEXT NOT NULL DEFAULT 'n8n'", "provider_name": "TEXT NOT NULL DEFAULT ''", "provider_task_id": "TEXT NOT NULL DEFAULT ''", "result_json": "TEXT NOT NULL DEFAULT '{}'", }, "platform_agent_profiles": { "last_run_id": "TEXT NOT NULL DEFAULT ''", "last_run_status": "TEXT NOT NULL DEFAULT ''", "last_used_at": "TEXT NOT NULL DEFAULT ''", "last_intent_key": "TEXT NOT NULL DEFAULT ''", "last_oneliner_profile_version_no": "INTEGER NOT NULL DEFAULT 0", "last_platform_profile_version_no": "INTEGER NOT NULL DEFAULT 0", "last_execution_summary": "TEXT NOT NULL DEFAULT ''", "last_source_screen": "TEXT NOT NULL DEFAULT ''", }, } for table, columns in table_columns.items(): if not self.table_exists(table): continue for column, definition in columns.items(): if self.column_exists(table, column): continue self.execute(f"ALTER TABLE {table} ADD COLUMN {column} {definition}") self.ensure_default_projects() def ensure_default_projects(self) -> None: if not self.table_exists("projects"): return accounts = self.fetch_all("SELECT id, username FROM accounts ORDER BY created_at ASC") for account in accounts: project = self.fetch_one( "SELECT * FROM projects WHERE user_id = ? ORDER BY created_at ASC LIMIT 1", (account["id"],), ) if not project: project_id = f"proj_{account['id']}" now = utc_now() self.execute( """ INSERT INTO projects (id, user_id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, ( project_id, account["id"], f"{account['username']} 默认项目", "系统自动创建的默认项目", now, now, ), ) project = self.fetch_one("SELECT * FROM projects WHERE id = ?", (project_id,)) if not project: continue if self.column_exists("knowledge_bases", "project_id"): self.execute( """ UPDATE knowledge_bases SET project_id = ? WHERE user_id = ? AND (project_id IS NULL OR project_id = '') """, (project["id"], account["id"]), ) if self.column_exists("assistants", "project_id"): self.execute( """ UPDATE assistants SET project_id = ? WHERE user_id = ? AND (project_id IS NULL OR project_id = '') """, (project["id"], account["id"]), ) if self.column_exists("jobs", "project_id"): self.execute( """ UPDATE jobs SET project_id = ? WHERE user_id = ? AND (project_id IS NULL OR project_id = '') """, (project["id"], account["id"]), )