diff --git a/collector-service/app/core_main.py b/collector-service/app/core_main.py index f41cd45..db01de9 100644 --- a/collector-service/app/core_main.py +++ b/collector-service/app/core_main.py @@ -199,6 +199,15 @@ class PublishAppUpdateRequest(BaseModel): isActive: bool = True +class RetryFailedJobsRequest(BaseModel): + user_id: str = "" + project_id: str = "" + workflow_key: str = "" + line_type: str = "" + source_type: str = "" + limit: int = Field(default=20, ge=1, le=100) + + class ProjectCreateRequest(BaseModel): name: str description: str = "" @@ -1772,6 +1781,182 @@ def job_payload(row: dict[str, Any]) -> dict[str, Any]: } +USAGE_COST_DEFAULTS: dict[str, dict[str, Any]] = { + "analysis": {"cost_cents": 6, "quota_field": "analysis_quota"}, + "content_source_sync": {"cost_cents": 8, "quota_field": "analysis_quota"}, + "copy": {"cost_cents": 3, "quota_field": "copy_quota"}, + "review": {"cost_cents": 1, "quota_field": "analysis_quota"}, + "ai_video": {"cost_cents": 30, "quota_field": "ai_video_quota"}, + "real_cut": {"cost_cents": 20, "quota_field": "real_cut_quota"}, + "live_recorder": {"cost_cents": 2, "quota_field": "recorder_quota"}, +} + + +def _current_cycle_start() -> str: + current = datetime.now(timezone.utc) + return current.replace(day=1, hour=0, minute=0, second=0, microsecond=0).isoformat().replace("+00:00", "Z") + + +def _tenant_usage_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "user_id": row.get("user_id", ""), + "project_id": row.get("project_id", ""), + "category": row.get("category", ""), + "quantity": int(row.get("quantity") or 0), + "cost_cents": int(row.get("cost_cents") or 0), + "reference_type": row.get("reference_type", ""), + "reference_id": row.get("reference_id", ""), + "details": parse_json_object(row.get("details_json") or "{}"), + "created_at": row.get("created_at", ""), + } + + +def _tenant_quota_payload(row: dict[str, Any] | None, *, usage: dict[str, Any] | None = None) -> dict[str, Any]: + data = row or {} + return { + "id": data.get("id", ""), + "user_id": data.get("user_id", ""), + "project_id": data.get("project_id", ""), + "monthly_budget_cents": int(data.get("monthly_budget_cents") or 0), + "storage_limit_bytes": int(data.get("storage_limit_bytes") or 0), + "analysis_quota": int(data.get("analysis_quota") or 0), + "copy_quota": int(data.get("copy_quota") or 0), + "ai_video_quota": int(data.get("ai_video_quota") or 0), + "real_cut_quota": int(data.get("real_cut_quota") or 0), + "recorder_quota": int(data.get("recorder_quota") or 0), + "enabled": True if row is None else bool(int(data.get("enabled", 1) or 0)), + "config": parse_json_object(data.get("config_json") or "{}"), + "usage": usage or {}, + "created_at": data.get("created_at", ""), + "updated_at": data.get("updated_at", ""), + } + + +def _project_storage_bytes(account: dict[str, Any], *, project_id: str) -> int: + normalized_project_id = (project_id or "").strip() or None + jobs_root = job_project_root(account["id"], normalized_project_id) if normalized_project_id else job_account_root(account["id"]) + downloads_root = tenant_download_root(account["id"], normalized_project_id) if normalized_project_id else download_account_root(account["id"]) + jobs_usage = directory_usage_payload(jobs_root) + downloads_usage = directory_usage_payload(downloads_root) + return int(jobs_usage.get("bytes") or 0) + int(downloads_usage.get("bytes") or 0) + + +def _tenant_usage_summary(account: dict[str, Any], *, project_id: str) -> dict[str, Any]: + cycle_start = _current_cycle_start() + rows = db.fetch_all( + """ + SELECT category, SUM(quantity) AS quantity, SUM(cost_cents) AS cost_cents + FROM tenant_usage_ledger + WHERE user_id = ? AND project_id = ? AND created_at >= ? + GROUP BY category + ORDER BY category ASC + """, + (account["id"], project_id, cycle_start), + ) + by_category: dict[str, dict[str, Any]] = {} + for row in rows: + category = row.get("category", "") + by_category[category] = { + "category": category, + "quantity": int(row.get("quantity") or 0), + "cost_cents": int(row.get("cost_cents") or 0), + } + recent_rows = db.fetch_all( + """ + SELECT * FROM tenant_usage_ledger + WHERE user_id = ? AND project_id = ? + ORDER BY created_at DESC + LIMIT 20 + """, + (account["id"], project_id), + ) + total_cost = sum(item["cost_cents"] for item in by_category.values()) + storage_bytes = _project_storage_bytes(account, project_id=project_id) + return { + "cycle_start": cycle_start, + "categories": by_category, + "total_cost_cents": total_cost, + "recent_items": [_tenant_usage_payload(row) for row in recent_rows], + "storage_bytes": storage_bytes, + } + + +def _get_tenant_quota_row(account: dict[str, Any], *, project_id: str) -> dict[str, Any] | None: + return db.fetch_one( + "SELECT * FROM tenant_quota_profiles WHERE user_id = ? AND project_id = ?", + (account["id"], project_id), + ) + + +def _get_tenant_quota(account: dict[str, Any], *, project_id: str) -> dict[str, Any]: + usage = _tenant_usage_summary(account, project_id=project_id) + row = _get_tenant_quota_row(account, project_id=project_id) + payload = _tenant_quota_payload(row, usage=usage) + storage_limit = int(payload.get("storage_limit_bytes") or 0) + payload["storage_over_limit"] = bool(storage_limit and usage["storage_bytes"] >= storage_limit) + return payload + + +def _record_tenant_usage( + account: dict[str, Any], + *, + project_id: str, + category: str, + reference_type: str, + reference_id: str, + details: dict[str, Any] | None = None, + quantity: int = 1, +) -> dict[str, Any]: + usage_meta = USAGE_COST_DEFAULTS.get(category, {}) + usage_id = make_id("usage") + db.execute( + """ + INSERT INTO tenant_usage_ledger ( + id, user_id, project_id, category, quantity, cost_cents, reference_type, reference_id, details_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + usage_id, + account["id"], + project_id, + category, + int(quantity or 1), + int(usage_meta.get("cost_cents") or 0) * int(quantity or 1), + reference_type, + reference_id, + json.dumps(details or {}, ensure_ascii=False), + utc_now(), + ), + ) + row = db.fetch_one("SELECT * FROM tenant_usage_ledger WHERE id = ?", (usage_id,)) + return _tenant_usage_payload(row) + + +def _enforce_tenant_quota(account: dict[str, Any], *, project_id: str, usage_category: str) -> None: + quota = _get_tenant_quota(account, project_id=project_id) + if not quota.get("enabled", True): + return + usage = quota.get("usage", {}) + category_meta = USAGE_COST_DEFAULTS.get(usage_category, {}) + quota_field = category_meta.get("quota_field") + if quota_field: + allowed = int(quota.get(quota_field) or 0) + consumed = int(((usage.get("categories") or {}).get(usage_category) or {}).get("quantity") or 0) + if allowed and consumed >= allowed: + raise HTTPException(status_code=403, detail=f"当前租户本周期的 {usage_category} 配额已用完") + budget = int(quota.get("monthly_budget_cents") or 0) + total_cost = int(usage.get("total_cost_cents") or 0) + next_cost = int(category_meta.get("cost_cents") or 0) + if budget and total_cost + next_cost > budget: + raise HTTPException(status_code=403, detail="当前租户本周期预算不足,已阻止本次动作执行") + storage_limit = int(quota.get("storage_limit_bytes") or 0) + if storage_limit and usage_category in {"analysis", "content_source_sync", "ai_video", "real_cut"}: + storage_bytes = int(usage.get("storage_bytes") or 0) + if storage_bytes >= storage_limit: + raise HTTPException(status_code=403, detail="当前租户存储额度已满,已阻止继续产生大文件缓存") + + def require_auth(authorization: str | None = Header(default=None)) -> dict[str, Any]: if not authorization or not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Missing bearer token") @@ -3130,6 +3315,7 @@ def create_live_recorder_source( ) -> dict[str, Any]: project = resolve_target_project(account["id"], request.project_id or None, username=account["username"]) assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + _enforce_tenant_quota(account, project_id=project["id"], usage_category="live_recorder") binding = upsert_live_recorder_binding( user_id=account["id"], project_id=project["id"], @@ -3140,6 +3326,14 @@ def create_live_recorder_source( quality=request.quality, enabled=request.enabled, ) + _record_tenant_usage( + account, + project_id=project["id"], + category="live_recorder", + reference_type="live_recorder_binding", + reference_id=binding["id"], + details={"source_url": request.source_url, "platform": request.platform or infer_platform_from_url(request.source_url)}, + ) sync_result = sync_live_recorder_remote_config() return {"item": live_recorder_binding_payload(binding), "sync": sync_result} @@ -3561,6 +3755,7 @@ async def create_content_source_sync_job( kb = resolve_target_kb(account["id"], request.knowledge_base_id or None, project["id"], username=account["username"]) assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) profile = model_profile_for_account(account["id"], request.analysis_model_profile_id or None) + _enforce_tenant_quota(account, project_id=project["id"], usage_category="content_source_sync") source_url = (request.source_url or (source_row or {}).get("source_url") or "").strip() if not source_url: @@ -3619,6 +3814,14 @@ async def create_content_source_sync_job( }, analysis_model_profile_id=profile["id"], ) + _record_tenant_usage( + account, + project_id=project["id"], + category="content_source_sync", + reference_type="job", + reference_id=job_row["id"], + details={"workflow_key": "content_source_sync_pipeline", "source_url": source_url, "platform": platform}, + ) update_content_source_metadata( source_row["id"], { @@ -3732,6 +3935,7 @@ def create_review(request: ReviewCreateRequest, account: dict[str, Any] = Depend requested_project_id = request.project_id.strip() or (source_job.get("project_id", "") if source_job else "") project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + _enforce_tenant_quota(account, project_id=project["id"], usage_category="review") review_id = make_id("review") title = request.title.strip() or (source_job.get("title", "") if source_job else "") if not title: @@ -3767,6 +3971,14 @@ def create_review(request: ReviewCreateRequest, account: dict[str, Any] = Depend ), ) row = db.fetch_one("SELECT * FROM publish_reviews WHERE id = ?", (review_id,)) + _record_tenant_usage( + account, + project_id=project["id"], + category="review", + reference_type="review", + reference_id=review_id, + details={"source_job_id": source_job["id"] if source_job else "", "platform": normalized_platform}, + ) return review_payload(row) @@ -3852,6 +4064,106 @@ def get_job_events(job_id: str, account: dict[str, Any] = Depends(require_approv ] +async def _requeue_job(row: dict[str, Any], *, actor_id: str, action: str, force: bool = False) -> dict[str, Any]: + current_status = str(row.get("status") or "") + if action == "retry" and current_status != "failed": + raise HTTPException(status_code=409, detail="Only failed jobs can be retried") + if action == "requeue" and current_status == "completed" and not force: + raise HTTPException(status_code=409, detail="Completed jobs cannot be requeued") + if current_status == "processing" and not force: + raise HTTPException(status_code=409, detail="Processing jobs cannot be requeued without force") + + job_id = row["id"] + append_job_event( + job_id, + f"job.{action}.requested", + { + "actor_id": actor_id, + "status_before": current_status, + "workflow_key": row.get("workflow_key", ""), + "source_type": row.get("source_type", ""), + }, + ) + updated = update_job_state( + job_id, + status="queued", + error="", + provider_name="", + provider_task_id="", + result={ + "retry": { + "action": action, + "actor_id": actor_id, + "status_before": current_status, + "requeued_at": utc_now(), + } + }, + ) + append_job_event( + job_id, + f"job.{action}.queued", + { + "actor_id": actor_id, + "status_before": current_status, + "status_after": "queued", + }, + ) + return await trigger_orchestrated_job(updated) + + +@app.post("/v2/explore/jobs/{job_id}/retry") +async def retry_job(job_id: str, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + row = load_owned_job(job_id, account["id"]) + return job_payload(await _requeue_job(row, actor_id=account["id"], action="retry")) + + +@app.post("/v2/explore/jobs/{job_id}/requeue") +async def requeue_job(job_id: str, account: dict[str, Any] = Depends(require_approved)) -> dict[str, Any]: + row = load_owned_job(job_id, account["id"]) + return job_payload(await _requeue_job(row, actor_id=account["id"], action="requeue", force=True)) + + +@app.post("/v2/admin/jobs/retry-failed") +async def admin_retry_failed_jobs( + request: RetryFailedJobsRequest, + admin: dict[str, Any] = Depends(require_super_admin), +) -> dict[str, Any]: + clauses = ["status = 'failed'"] + params: list[Any] = [] + if request.user_id.strip(): + clauses.append("user_id = ?") + params.append(request.user_id.strip()) + if request.project_id.strip(): + clauses.append("project_id = ?") + params.append(request.project_id.strip()) + if request.workflow_key.strip(): + clauses.append("workflow_key = ?") + params.append(request.workflow_key.strip()) + if request.line_type.strip(): + clauses.append("line_type = ?") + params.append(request.line_type.strip()) + if request.source_type.strip(): + clauses.append("source_type = ?") + params.append(request.source_type.strip()) + rows = db.fetch_all( + f""" + SELECT * FROM jobs + WHERE {' AND '.join(clauses)} + ORDER BY updated_at ASC + LIMIT ? + """, + tuple(params + [request.limit]), + ) + retried: list[dict[str, Any]] = [] + skipped: list[dict[str, Any]] = [] + for row in rows: + try: + retried.append(job_payload(await _requeue_job(row, actor_id=admin["id"], action="retry", force=True))) + except HTTPException as exc: + skipped.append({"job_id": row["id"], "detail": str(exc.detail)}) + return {"retried": retried, "skipped": skipped, "count": len(retried), "matched": len(rows)} + + def resolve_target_kb(account_id: str, requested_kb_id: str | None, project_id: str = "", username: str = "默认用户") -> dict[str, Any]: if requested_kb_id: kb = db.fetch_one("SELECT * FROM knowledge_bases WHERE id = ? AND user_id = ?", (requested_kb_id, account_id)) @@ -3869,6 +4181,7 @@ async def create_text_job(request: ExploreTextRequest, account: dict[str, Any] = kb = resolve_target_kb(account["id"], request.knowledge_base_id, project["id"], username=account["username"]) assistant = resolve_target_assistant(account["id"], request.assistant_id, project["id"]) profile = model_profile_for_account(account["id"], request.analysis_model_profile_id) + _enforce_tenant_quota(account, project_id=project["id"], usage_category="analysis") source = create_content_source( account_id=account["id"], project_id=project["id"], @@ -3890,6 +4203,14 @@ async def create_text_job(request: ExploreTextRequest, account: dict[str, Any] = artifacts={"input_text": request.content}, analysis_model_profile_id=profile["id"], ) + _record_tenant_usage( + account, + project_id=project["id"], + category="analysis", + reference_type="job", + reference_id=job_row["id"], + details={"workflow_key": "analysis_pipeline", "source_type": "text"}, + ) return job_payload(await trigger_orchestrated_job(job_row)) @@ -3899,6 +4220,7 @@ async def create_video_link_job(request: ExploreVideoLinkRequest, account: dict[ kb = resolve_target_kb(account["id"], request.knowledge_base_id, project["id"], username=account["username"]) assistant = resolve_target_assistant(account["id"], request.assistant_id, project["id"]) profile = model_profile_for_account(account["id"], request.analysis_model_profile_id) + _enforce_tenant_quota(account, project_id=project["id"], usage_category="analysis") source = create_content_source( account_id=account["id"], project_id=project["id"], @@ -3922,6 +4244,14 @@ async def create_video_link_job(request: ExploreVideoLinkRequest, account: dict[ artifacts={}, analysis_model_profile_id=profile["id"], ) + _record_tenant_usage( + account, + project_id=project["id"], + category="analysis", + reference_type="job", + reference_id=job_row["id"], + details={"workflow_key": "analysis_pipeline", "source_type": "video_link"}, + ) return job_payload(await trigger_orchestrated_job(job_row)) @@ -3939,6 +4269,7 @@ async def upload_video( kb = resolve_target_kb(account["id"], knowledge_base_id or None, project["id"], username=account["username"]) assistant = resolve_target_assistant(account["id"], assistant_id or None, project["id"]) profile = model_profile_for_account(account["id"], analysis_model_profile_id or None) + _enforce_tenant_quota(account, project_id=project["id"], usage_category="analysis") job_id = make_id("job_upload") job_dir = job_storage_dir(account_id=account["id"], project_id=project["id"], job_id=job_id) job_dir.mkdir(parents=True, exist_ok=True) @@ -3969,6 +4300,14 @@ async def upload_video( artifacts={"uploaded_path": str(target_path)}, analysis_model_profile_id=profile["id"], ) + _record_tenant_usage( + account, + project_id=project["id"], + category="analysis", + reference_type="job", + reference_id=job_row["id"], + details={"workflow_key": "analysis_pipeline", "source_type": "upload_video"}, + ) return job_payload(await trigger_orchestrated_job(job_row)) @@ -3985,6 +4324,7 @@ async def create_real_cut_job(request: RealCutJobRequest, account: dict[str, Any raise HTTPException(status_code=400, detail="Source job does not belong to target project") kb = ensure_user_kb(account["id"], project["id"], username=account["username"]) + _enforce_tenant_quota(account, project_id=project["id"], usage_category="real_cut") resolved_input_dir = request.input_dir.strip() staged_payload: dict[str, Any] = {} if not resolved_input_dir: @@ -4042,6 +4382,14 @@ async def create_real_cut_job(request: RealCutJobRequest, account: dict[str, Any } }, ) + _record_tenant_usage( + account, + project_id=project["id"], + category="real_cut", + reference_type="job", + reference_id=job_row["id"], + details={"workflow_key": "real_cut_pipeline", "source_job_id": source_job["id"] if source_job else ""}, + ) return job_payload(await trigger_orchestrated_job(job_row)) @@ -4063,6 +4411,7 @@ async def create_ai_video_job(request: AiVideoJobRequest, account: dict[str, Any project = resolve_target_project(account["id"], requested_project_id or None, username=account["username"]) kb = resolve_target_kb(account["id"], request.knowledge_base_id or source_kb_id or None, project["id"], username=account["username"]) assistant = resolve_target_assistant(account["id"], request.assistant_id or None, project["id"]) + _enforce_tenant_quota(account, project_id=project["id"], usage_category="ai_video") source = create_content_source( account_id=account["id"], project_id=project["id"], @@ -4093,6 +4442,14 @@ async def create_ai_video_job(request: AiVideoJobRequest, account: dict[str, Any "source_job_id": request.source_job_id.strip(), }, ) + _record_tenant_usage( + account, + project_id=project["id"], + category="ai_video", + reference_type="job", + reference_id=job_row["id"], + details={"workflow_key": "ai_video_pipeline", "source_job_id": request.source_job_id.strip()}, + ) return job_payload(await trigger_orchestrated_job(job_row)) @@ -4196,6 +4553,9 @@ async def generate_copy(assistant_id: str, request: GenerateCopyRequest, account assistant = db.fetch_one("SELECT * FROM assistants WHERE id = ? AND user_id = ?", (assistant_id, account["id"])) if not assistant: raise HTTPException(status_code=404, detail="Assistant not found") + project_id = assistant.get("project_id", "") or "" + if project_id: + _enforce_tenant_quota(account, project_id=project_id, usage_category="copy") kb_ids = request.knowledge_base_ids or [row["knowledge_base_id"] for row in db.fetch_all("SELECT knowledge_base_id FROM assistant_knowledge_bases WHERE assistant_id = ?", (assistant_id,))] used_documents: list[dict[str, Any]] = [] excerpts: list[str] = [] @@ -4221,6 +4581,15 @@ async def generate_copy(assistant_id: str, request: GenerateCopyRequest, account ) profile = model_profile_for_account(account["id"], assistant.get("model_profile_id") or None) content = await call_model(profile, system_prompt, user_prompt, temperature=0.7) + if project_id: + _record_tenant_usage( + account, + project_id=project_id, + category="copy", + reference_type="assistant", + reference_id=assistant_id, + details={"knowledge_base_ids": kb_ids, "used_documents": len(used_documents)}, + ) return { "assistant_id": assistant_id, "knowledge_base_ids": kb_ids, diff --git a/collector-service/app/database.py b/collector-service/app/database.py index 755cdf7..51cfbfb 100644 --- a/collector-service/app/database.py +++ b/collector-service/app/database.py @@ -1,11 +1,16 @@ from __future__ import annotations +import os import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Any, Iterator +SQLITE_BUSY_TIMEOUT_MS = int(os.getenv("SQLITE_BUSY_TIMEOUT_MS", "5000")) +SQLITE_CONNECT_TIMEOUT_SEC = float(os.getenv("SQLITE_CONNECT_TIMEOUT_SEC", "30")) + + def utc_now() -> str: from datetime import datetime, timezone @@ -22,9 +27,14 @@ class Database: self.path.parent.mkdir(parents=True, exist_ok=True) def connect(self) -> sqlite3.Connection: - conn = sqlite3.connect(self.path) + conn = sqlite3.connect(self.path, timeout=SQLITE_CONNECT_TIMEOUT_SEC) conn.row_factory = dict_factory + conn.execute("PRAGMA journal_mode = WAL") + conn.execute("PRAGMA synchronous = NORMAL") + conn.execute(f"PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}") conn.execute("PRAGMA foreign_keys = ON") + conn.execute("PRAGMA temp_store = MEMORY") + conn.execute("PRAGMA wal_autocheckpoint = 1000") return conn @contextmanager @@ -265,6 +275,41 @@ class Database: FOREIGN KEY(source_id) REFERENCES live_recorder_sources(id) ON DELETE CASCADE ); + CREATE TABLE IF NOT EXISTS tenant_quota_profiles ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + monthly_budget_cents INTEGER NOT NULL DEFAULT 0, + storage_limit_bytes INTEGER NOT NULL DEFAULT 0, + analysis_quota INTEGER NOT NULL DEFAULT 0, + copy_quota INTEGER NOT NULL DEFAULT 0, + ai_video_quota INTEGER NOT NULL DEFAULT 0, + real_cut_quota INTEGER NOT NULL DEFAULT 0, + recorder_quota INTEGER NOT NULL DEFAULT 0, + enabled INTEGER NOT NULL DEFAULT 1, + config_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, project_id), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS tenant_usage_ledger ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + category TEXT NOT NULL, + quantity INTEGER NOT NULL DEFAULT 1, + cost_cents INTEGER NOT NULL DEFAULT 0, + reference_type TEXT NOT NULL DEFAULT '', + reference_id TEXT NOT NULL DEFAULT '', + details_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL + ); + CREATE TABLE IF NOT EXISTS job_events ( id TEXT PRIMARY KEY, job_id TEXT NOT NULL, diff --git a/docs/CURRENT_PROJECT_STATE_2026-03-26.md b/docs/CURRENT_PROJECT_STATE_2026-03-26.md index 26370a9..c43af86 100644 --- a/docs/CURRENT_PROJECT_STATE_2026-03-26.md +++ b/docs/CURRENT_PROJECT_STATE_2026-03-26.md @@ -40,6 +40,13 @@ - 复盘 - 额度与运维面板 +## 当前量产基线 + +- SQLite 已默认启用 `WAL`、`busy_timeout`、`synchronous=NORMAL`、`foreign_keys=ON` 等连接参数,减少并发写入时的锁冲突。 +- `tenant_quota_profiles` 与 `tenant_usage_ledger` 已接入核心生产链,`explore/*`、`content-source-sync`、`reviews`、`real-cut`、`ai-video`、`assistants/{id}/generate`、`live-recorder create` 都会先做额度硬拦截,再记账。 +- `jobs` 已补 `retry / requeue` 单任务入口,以及管理员批量重试失败任务入口,便于失败链路恢复。 +- 仓库内已新增 SQLite 备份脚本,可在发布或故障前快速生成一致性快照。 + ## 当前支持的平台 - `douyin` @@ -73,4 +80,5 @@ 1. 继续按当前仓库边界维护,不再把 `AI Glasses` 代码重新叠进来。 2. Web 功能优先围绕多平台工作台、生产中心和租户控制面继续深化。 3. 需要真实平台验证的事项,单独作为联调任务推进,不再和仓库边界治理混在一起。 -4. 公网环境出现异常时,先检查云服务器上的 `nginx / storyforge-web-v4.service / collector-service`,再检查本机桥接链。 +4. 生产基线任务优先按“任务恢复、额度硬控、数据库备份、观测补齐”继续深化。 +5. 公网环境出现异常时,先检查云服务器上的 `nginx / storyforge-web-v4.service / collector-service`,再检查本机桥接链。 diff --git a/docs/PRODUCTION_BASELINE_2026-03-26.md b/docs/PRODUCTION_BASELINE_2026-03-26.md new file mode 100644 index 0000000..114469f --- /dev/null +++ b/docs/PRODUCTION_BASELINE_2026-03-26.md @@ -0,0 +1,42 @@ +# StoryForge 生产基线 + +日期:2026-03-26 + +本文档描述当前仓库已经落地的量产底盘,便于后续继续开发和运维。 + +## 已落地能力 + +- SQLite 默认连接参数已收紧: + - `journal_mode=WAL` + - `synchronous=NORMAL` + - `busy_timeout` + - `foreign_keys=ON` + - `temp_store=MEMORY` +- 核心生产 API 已接入 tenant quota 硬控制与 usage ledger 记账: + - `POST /v2/explore/text` + - `POST /v2/explore/video-link` + - `POST /v2/explore/upload-video` + - `POST /v2/pipelines/content-source-sync` + - `POST /v2/reviews` + - `POST /v2/pipelines/real-cut` + - `POST /v2/pipelines/ai-video` + - `POST /v2/assistants/{assistant_id}/generate` + - `POST /v2/live-recorder/sources` +- 失败任务恢复入口已补齐: + - `POST /v2/explore/jobs/{job_id}/retry` + - `POST /v2/explore/jobs/{job_id}/requeue` + - `POST /v2/admin/jobs/retry-failed` +- 仓库内已新增 SQLite 备份脚本: + - `scripts/backup_storyforge_sqlite.sh` + +## 运行建议 + +- 发布前先执行一次数据库备份,再执行服务升级。 +- quota 配置建议按 project 维度维护,避免不同项目之间互相干扰。 +- 批量 retry 建议优先筛选 `workflow_key` 或 `source_type`,避免把不同流水线一起打回去。 + +## 当前外部阻塞 + +- 真正的额度策略仍取决于业务侧如何配置 `tenant_quota_profiles`。 +- `real-cut`、`ai-video`、`content-source-sync` 的完整链路仍依赖外部服务可用性。 +- 抖音等真实平台采集仍可能受到平台风控影响,需要真实联调确认。 diff --git a/scripts/backup_storyforge_sqlite.sh b/scripts/backup_storyforge_sqlite.sh new file mode 100755 index 0000000..d38cf67 --- /dev/null +++ b/scripts/backup_storyforge_sqlite.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +DATABASE_PATH="${DATABASE_PATH:-$ROOT_DIR/data/collector/storyforge.db}" +BACKUP_DIR="${BACKUP_DIR:-$ROOT_DIR/data/backups}" +TIMESTAMP="$(date -u +%Y%m%dT%H%M%SZ)" +TARGET_PATH="$BACKUP_DIR/storyforge-${TIMESTAMP}.db" + +mkdir -p "$BACKUP_DIR" + +python3 - "$DATABASE_PATH" "$TARGET_PATH" <<'PY' +from __future__ import annotations + +import pathlib +import sqlite3 +import sys + +source_path = pathlib.Path(sys.argv[1]) +target_path = pathlib.Path(sys.argv[2]) + +if not source_path.exists(): + raise SystemExit(f"source database not found: {source_path}") + +target_path.parent.mkdir(parents=True, exist_ok=True) + +with sqlite3.connect(source_path) as source_conn, sqlite3.connect(target_path) as target_conn: + source_conn.backup(target_conn) + +print(target_path) +PY diff --git a/tests/test_production_baseline.py b/tests/test_production_baseline.py new file mode 100644 index 0000000..9bef8cc --- /dev/null +++ b/tests/test_production_baseline.py @@ -0,0 +1,355 @@ +from __future__ import annotations + +import importlib +import json +import os +import sqlite3 +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path +from typing import Any + +from fastapi.testclient import TestClient + + +ROOT = Path(__file__).resolve().parents[1] +APP_ROOT = ROOT / "collector-service" +if str(APP_ROOT) not in sys.path: + sys.path.insert(0, str(APP_ROOT)) + + +class ProductionBaselineTests(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.tempdir = tempfile.TemporaryDirectory() + temp_root = Path(cls.tempdir.name) + os.environ["DATA_DIR"] = str(temp_root / "data") + os.environ["DATABASE_PATH"] = str(temp_root / "data" / "storyforge.db") + os.environ["DOWNLOADS_DIR"] = str(temp_root / "downloads") + os.environ["JOBS_DIR"] = str(temp_root / "jobs") + os.environ["MODELS_DIR"] = str(temp_root / "models") + os.environ["ORCHESTRATOR_SHARED_SECRET"] = "test-secret" + os.environ.setdefault("BOOTSTRAP_SUPERADMIN_USERNAME", "") + os.environ.setdefault("BOOTSTRAP_SUPERADMIN_PASSWORD", "") + + cls.db_module = importlib.reload(importlib.import_module("app.database")) + cls.core = importlib.reload(importlib.import_module("app.core_main")) + cls.app_main = importlib.reload(importlib.import_module("app.main")) + + async def fake_trigger(job_row: dict[str, Any]) -> dict[str, Any]: + cls.core.append_job_event(job_row["id"], "workflow.trigger.requested", {"workflow_key": job_row.get("workflow_key", "")}) + cls.core.update_job_state( + job_row["id"], + status="queued", + provider_name="n8n", + provider_task_id="", + result={"n8n_trigger": {"requested": True, "mocked": True}}, + ) + return cls.core.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (job_row["id"],)) + + async def fake_call_model(*_args: object, **_kwargs: object) -> str: + return "mock content" + + cls.core.trigger_orchestrated_job = fake_trigger + cls.core.call_model = fake_call_model + cls.core.sync_live_recorder_remote_config = lambda: {"ok": True} + cls.core.db.init_schema() + cls.client = TestClient(cls.app_main.app) + + @classmethod + def tearDownClass(cls) -> None: + cls.client.close() + cls.tempdir.cleanup() + + def setUp(self) -> None: + self._clear_tables() + + def _clear_tables(self) -> None: + tables = [ + "job_events", + "tenant_usage_ledger", + "tenant_quota_profiles", + "auth_tokens", + "publish_reviews", + "live_recorder_bindings", + "live_recorder_sources", + "jobs", + "content_sources", + "assistant_knowledge_bases", + "assistants", + "knowledge_documents", + "knowledge_bases", + "projects", + "accounts", + "model_profiles", + ] + for table in tables: + self.core.db.execute(f"DELETE FROM {table}") + + def _seed_context(self, tag: str, *, exhausted: bool = False) -> dict[str, Any]: + now = self.db_module.utc_now() + account_id = f"acct_{tag}" + project_id = f"proj_{tag}" + model_id = f"model_{tag}" + kb_id = f"kb_{tag}" + assistant_id = f"assistant_{tag}" + token = f"token_{tag}" + username = f"user_{tag}" + + self.core.db.execute( + """ + INSERT INTO accounts ( + id, username, password_hash, password_salt, display_name, role, approval_status, + approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + account_id, + username, + "hash", + "salt", + f"User {tag}", + "super_admin", + "approved", + account_id, + now, + model_id, + now, + now, + ), + ) + self.core.db.execute( + """ + INSERT INTO projects (id, user_id, name, description, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + (project_id, account_id, f"Project {tag}", "", now, now), + ) + self.core.db.execute( + """ + INSERT INTO model_profiles ( + id, owner_account_id, name, provider, base_url, api_key, model_name, + is_system, is_default, created_at, updated_at + ) VALUES (?, NULL, ?, ?, ?, ?, ?, 1, 1, ?, ?) + """, + (model_id, "Default Model", "openai_compat", "http://127.0.0.1:8317/v1", "", "GLM-5", now, now), + ) + self.core.db.execute( + """ + INSERT INTO knowledge_bases (id, user_id, project_id, name, description, sync_status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, 'ready', ?, ?) + """, + (kb_id, account_id, project_id, f"KB {tag}", "", now, now), + ) + self.core.db.execute( + """ + INSERT INTO assistants (id, user_id, project_id, name, description, system_prompt, generation_goal, config_json, model_profile_id, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, '{}', ?, ?, ?) + """, + ( + assistant_id, + account_id, + project_id, + f"Assistant {tag}", + "", + "你是文案助手。", + "生成短视频文案。", + model_id, + now, + now, + ), + ) + self.core.db.execute("INSERT INTO assistant_knowledge_bases (assistant_id, knowledge_base_id) VALUES (?, ?)", (assistant_id, kb_id)) + self.core.db.execute( + "INSERT INTO auth_tokens (token, account_id, created_at) VALUES (?, ?, ?)", + (token, account_id, now), + ) + + if exhausted: + quota_id = f"quota_{tag}" + self.core.db.execute( + """ + INSERT INTO tenant_quota_profiles ( + id, user_id, project_id, monthly_budget_cents, storage_limit_bytes, analysis_quota, + copy_quota, ai_video_quota, real_cut_quota, recorder_quota, enabled, config_json, + created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, '{}', ?, ?) + """, + (quota_id, account_id, project_id, 9999, 0, 1, 1, 1, 1, 1, now, now), + ) + for category in ["analysis", "content_source_sync", "review", "copy", "ai_video", "real_cut", "live_recorder"]: + usage_id = f"usage_{tag}_{category}" + cost_map = { + "analysis": 6, + "content_source_sync": 8, + "review": 1, + "copy": 3, + "ai_video": 30, + "real_cut": 20, + "live_recorder": 2, + } + self.core.db.execute( + """ + INSERT INTO tenant_usage_ledger ( + id, user_id, project_id, category, quantity, cost_cents, reference_type, reference_id, details_json, created_at + ) VALUES (?, ?, ?, ?, 1, ?, 'seed', ?, '{}', ?) + """, + (usage_id, account_id, project_id, category, cost_map[category], usage_id, now), + ) + + return { + "account_id": account_id, + "project_id": project_id, + "model_id": model_id, + "kb_id": kb_id, + "assistant_id": assistant_id, + "token": token, + } + + def test_database_uses_wal_and_busy_timeout(self) -> None: + conn = self.core.db.connect() + try: + journal_mode_row = conn.execute("PRAGMA journal_mode").fetchone() + busy_timeout_row = conn.execute("PRAGMA busy_timeout").fetchone() + journal_mode = journal_mode_row["journal_mode"] if isinstance(journal_mode_row, dict) else journal_mode_row[0] + if isinstance(busy_timeout_row, dict): + busy_timeout = int(busy_timeout_row.get("timeout") or busy_timeout_row.get("busy_timeout") or next(iter(busy_timeout_row.values()))) + else: + busy_timeout = int(busy_timeout_row[0]) + finally: + conn.close() + self.assertEqual(str(journal_mode).lower(), "wal") + self.assertGreaterEqual(busy_timeout, 1000) + + def test_quota_blocks_production_endpoints(self) -> None: + ctx = self._seed_context("quota", exhausted=True) + headers = {"Authorization": f"Bearer {ctx['token']}"} + blocked_requests = [ + ("POST", "/v2/explore/text", {"title": "T", "content": "C", "project_id": ctx["project_id"], "knowledge_base_id": ctx["kb_id"], "assistant_id": ctx["assistant_id"], "analysis_model_profile_id": ctx["model_id"]}, None), + ("POST", "/v2/explore/video-link", {"video_url": "https://example.com/video", "title": "V", "project_id": ctx["project_id"], "knowledge_base_id": ctx["kb_id"], "assistant_id": ctx["assistant_id"], "analysis_model_profile_id": ctx["model_id"]}, None), + ("POST", "/v2/pipelines/content-source-sync", {"project_id": ctx["project_id"]}, None), + ("POST", "/v2/reviews", {"project_id": ctx["project_id"], "assistant_id": ctx["assistant_id"], "title": "Review"}, None), + ("POST", "/v2/pipelines/real-cut", {"project_id": ctx["project_id"], "title": "Cut"}, None), + ("POST", "/v2/pipelines/ai-video", {"project_id": ctx["project_id"], "title": "Video", "brief": "Brief"}, None), + ("POST", f"/v2/assistants/{ctx['assistant_id']}/generate", {"brief": "Copy", "project_id": ctx["project_id"], "knowledge_base_ids": [ctx["kb_id"]]}, None), + ("POST", "/v2/live-recorder/sources", {"project_id": ctx["project_id"], "source_url": "https://example.com/live", "title": "Live"}, None), + ] + for method, path, json_body, files in blocked_requests: + with self.subTest(path=path): + response = self.client.request(method, path, headers=headers, json=json_body, files=files) + self.assertEqual(response.status_code, 403, response.text) + + upload_response = self.client.post( + "/v2/explore/upload-video", + headers=headers, + data={ + "title": "Upload", + "project_id": ctx["project_id"], + "knowledge_base_id": ctx["kb_id"], + "assistant_id": ctx["assistant_id"], + "analysis_model_profile_id": ctx["model_id"], + }, + files={"file": ("clip.mp4", b"clip-bytes", "video/mp4")}, + ) + self.assertEqual(upload_response.status_code, 403, upload_response.text) + + def test_successful_analysis_records_usage_and_retry_endpoints_work(self) -> None: + ctx = self._seed_context("happy", exhausted=False) + headers = {"Authorization": f"Bearer {ctx['token']}"} + text_response = self.client.post( + "/v2/explore/text", + headers=headers, + json={ + "title": "Hello", + "content": "Hello StoryForge", + "project_id": ctx["project_id"], + "knowledge_base_id": ctx["kb_id"], + "assistant_id": ctx["assistant_id"], + "analysis_model_profile_id": ctx["model_id"], + }, + ) + self.assertEqual(text_response.status_code, 200, text_response.text) + text_job = text_response.json() + usage_row = self.core.db.fetch_one( + "SELECT * FROM tenant_usage_ledger WHERE user_id = ? AND project_id = ? AND category = ? ORDER BY created_at DESC LIMIT 1", + (ctx["account_id"], ctx["project_id"], "analysis"), + ) + self.assertIsNotNone(usage_row) + self.assertEqual(text_job["status"], "queued") + + now = self.db_module.utc_now() + failed_jobs = [] + for index in range(2): + job_id = f"job_{index}_{ctx['project_id']}" + self.core.db.execute( + """ + INSERT INTO jobs ( + id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id, + source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id, + source_url, title, language, status, transcript_text, style_summary, upload_status, + error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at + ) VALUES (?, ?, ?, '', ?, ?, '', ?, ?, ?, 'n8n', 'collector', '', '', ?, 'auto', 'failed', '', '', 'pending', ?, '{}', '{}', ?, ?, ?) + """, + ( + job_id, + ctx["account_id"], + ctx["project_id"], + ctx["assistant_id"], + ctx["kb_id"], + "text", + "analysis", + "analysis_pipeline", + f"Failed {index}", + "boom", + ctx["model_id"], + now, + now, + ), + ) + failed_jobs.append(job_id) + + retry_response = self.client.post(f"/v2/explore/jobs/{failed_jobs[0]}/retry", headers=headers) + self.assertEqual(retry_response.status_code, 200, retry_response.text) + retry_payload = retry_response.json() + self.assertEqual(retry_payload["status"], "queued") + + bulk_response = self.client.post( + "/v2/admin/jobs/retry-failed", + headers=headers, + json={"project_id": ctx["project_id"], "limit": 10}, + ) + self.assertEqual(bulk_response.status_code, 200, bulk_response.text) + bulk_payload = bulk_response.json() + self.assertEqual(bulk_payload["count"], 1) + self.assertEqual(len(bulk_payload["retried"]), 1) + + event_rows = self.core.db.fetch_all("SELECT event_type FROM job_events WHERE job_id = ? ORDER BY created_at ASC", (failed_jobs[0],)) + event_types = [row["event_type"] for row in event_rows] + self.assertIn("job.retry.requested", event_types) + self.assertIn("job.retry.queued", event_types) + + bulk_job = self.core.db.fetch_one("SELECT * FROM jobs WHERE id = ?", (failed_jobs[1],)) + self.assertEqual(bulk_job["status"], "queued") + + def test_backup_script_creates_consistent_snapshot(self) -> None: + ctx = self._seed_context("backup", exhausted=False) + backup_dir = Path(self.tempdir.name) / "backups" + script = ROOT / "scripts" / "backup_storyforge_sqlite.sh" + result = subprocess.run( + ["bash", str(script)], + check=True, + text=True, + capture_output=True, + env={ + **os.environ, + "DATABASE_PATH": str(self.core.db.path), + "BACKUP_DIR": str(backup_dir), + }, + ) + backup_path = Path(result.stdout.strip().splitlines()[-1]) + self.assertTrue(backup_path.exists(), result.stdout) + with sqlite3.connect(backup_path) as conn: + account_count = conn.execute("SELECT COUNT(*) FROM accounts").fetchone()[0] + self.assertGreaterEqual(int(account_count), 1) diff --git a/web/storyforge-web-v4/assets/app.js b/web/storyforge-web-v4/assets/app.js index fcd09aa..d843013 100644 --- a/web/storyforge-web-v4/assets/app.js +++ b/web/storyforge-web-v4/assets/app.js @@ -1,6 +1,7 @@ const STORAGE_KEY = "storyforge-web-v4-session"; const SESSION_STORE = StoryForgeSessionStore.create(STORAGE_KEY); const DEFAULT_BACKEND_URL = StoryForgeApiClient.detectDefaultBackendUrl(); +const RECOVERY_HISTORY_KEY = STORAGE_KEY + ":recovery-history"; const navButtons = document.querySelectorAll("[data-screen-target]"); const screens = Array.from(document.querySelectorAll("[data-screen]")); @@ -50,6 +51,7 @@ const appState = { tenantUsage: null, adminOpsOverview: null, adminFixRuns: [], + recoveryRecords: [], busy: false, message: "", lastAction: null, @@ -342,6 +344,200 @@ function statusTone(status) { return "blue"; } +function readRecoveryRecords() { + try { + const raw = localStorage.getItem(RECOVERY_HISTORY_KEY); + const parsed = raw ? JSON.parse(raw) : []; + return safeArray(parsed); + } catch { + return []; + } +} + +function persistRecoveryRecords(records) { + try { + localStorage.setItem(RECOVERY_HISTORY_KEY, JSON.stringify(safeArray(records).slice(0, 40))); + } catch {} +} + +function normalizeRecoveryRecord(record) { + if (!record || typeof record !== "object") return null; + return { + id: String(record.id || `recovery_${Date.now()}`), + created_at: record.created_at || new Date().toISOString(), + account_id: String(record.account_id || ""), + project_id: String(record.project_id || ""), + job_id: String(record.job_id || ""), + job_title: String(record.job_title || ""), + job_line_type: String(record.job_line_type || ""), + job_source_type: String(record.job_source_type || ""), + job_status: String(record.job_status || "failed"), + action_key: String(record.action_key || "retry-job"), + mode: String(record.mode || "single"), + summary: String(record.summary || ""), + reason: String(record.reason || ""), + target_job_id: String(record.target_job_id || ""), + target_job_title: String(record.target_job_title || ""), + target_status: String(record.target_status || ""), + result_label: String(record.result_label || ""), + result_reason: String(record.result_reason || ""), + user_feedback: String(record.user_feedback || "") + }; +} + +function getRecoveryRecords() { + const accountId = String(appState.session?.account?.id || ""); + return readRecoveryRecords() + .map((item) => normalizeRecoveryRecord(item)) + .filter(Boolean) + .filter((item) => !accountId || item.account_id === accountId) + .sort((left, right) => compareDateDesc(left.created_at, right.created_at)); +} + +function recordRecoveryEvent(record) { + const normalized = normalizeRecoveryRecord({ + ...record, + account_id: record.account_id || appState.session?.account?.id || "", + project_id: record.project_id || appState.selectedProjectId || "", + created_at: record.created_at || new Date().toISOString() + }); + if (!normalized) return null; + const nextRecords = [normalized, ...readRecoveryRecords()] + .filter((item, index, list) => item && list.findIndex((candidate) => candidate.id === item.id) === index) + .sort((left, right) => compareDateDesc(left.created_at, right.created_at)) + .slice(0, 40); + persistRecoveryRecords(nextRecords); + appState.recoveryRecords = getRecoveryRecords(); + return normalized; +} + +function clearRecoveryRecords() { + appState.recoveryRecords = []; +} + +function isQuotaRelatedMessage(message) { + const text = String(message || ""); + return /配额|额度|预算|storage_over_limit|quota/i.test(text); +} + +function describeActionError(error, fallbackTitle = "执行失败") { + const raw = String(error?.message || error || "").trim(); + const status = Number(error?.status || 0); + const result = { + title: fallbackTitle, + summary: raw || "请稍后重试", + tone: status === 403 ? "orange" : "red", + hint: "" + }; + if (!raw) return result; + + const quotaPatterns = [ + { + pattern: /analysis 配额已用完/i, + title: "分析额度已用完", + summary: "当前租户本周期的分析额度已经用完,暂时不能继续发起分析类任务。", + hint: "可以先补额度,或者等待下个周期再恢复。" + }, + { + pattern: /copy 配额已用完/i, + title: "文案额度已用完", + summary: "当前租户本周期的文案额度已经用完,暂时不能继续生成文案。", + hint: "可以先补额度,或者改用已有产物继续复盘。" + }, + { + pattern: /ai_video 配额已用完/i, + title: "AI 视频额度已用完", + summary: "当前租户本周期的 AI 视频额度已经用完,暂时不能继续发起 AI 视频任务。", + hint: "可以先补额度,再恢复这条任务。" + }, + { + pattern: /real_cut 配额已用完/i, + title: "实拍剪辑额度已用完", + summary: "当前租户本周期的实拍剪辑额度已经用完,暂时不能继续发起剪辑任务。", + hint: "可以先补额度,或者改为只做分析类恢复。" + }, + { + pattern: /recorder 配额已用完/i, + title: "录制额度已用完", + summary: "当前租户本周期的录制额度已经用完,暂时不能新增直播录制源。", + hint: "可以先补额度,再继续新增录制源。" + }, + { + pattern: /预算不足|monthly budget/i, + title: "本周期预算不足", + summary: "当前租户本周期预算已经不够继续执行这项动作。", + hint: "可以先扩容预算,再重新发起恢复。" + }, + { + pattern: /存储额度已满|storage_over_limit/i, + title: "存储额度已满", + summary: "当前租户存储额度已触顶,系统已经停止继续产生大文件缓存。", + hint: "建议先清理旧产物或提升存储上限。" + } + ]; + for (const item of quotaPatterns) { + if (item.pattern.test(raw)) { + return { + ...result, + title: item.title, + summary: item.summary, + hint: item.hint, + tone: "orange" + }; + } + } + + if (status === 403 && /permission|not allowed|required|admin/i.test(raw)) { + return { + ...result, + title: "权限不足", + summary: "当前账号没有执行这项动作的权限。", + hint: "请确认账号审批状态,或者切换到超级管理员账号。", + tone: "orange" + }; + } + if (status === 409) { + return { + ...result, + title: "任务状态冲突", + summary: raw, + hint: "请刷新后重新查看当前任务状态。", + tone: "orange" + }; + } + if (status === 404) { + return { + ...result, + title: "资源不存在", + summary: raw, + hint: "请刷新页面后再试一次。", + tone: "orange" + }; + } + if (isQuotaRelatedMessage(raw)) { + return { + ...result, + title: "额度拦截", + summary: raw, + hint: "系统已经阻止继续执行,先补额度或清理存储再恢复。", + tone: "orange" + }; + } + return result; +} + +function formatActionErrorMessage(error, fallbackTitle = "执行失败") { + const detail = describeActionError(error, fallbackTitle); + return [detail.title, detail.summary, detail.hint].filter(Boolean).join(" · "); +} + +function presentActionFailure(error, fallbackTitle = "执行失败") { + const detail = describeActionError(error, fallbackTitle); + rememberAction(detail.title, [detail.summary, detail.hint].filter(Boolean).join(" · "), detail.tone); + renderAll(); + return detail; +} + function loadStoredSession() { return SESSION_STORE.loadStoredSession(); } @@ -895,7 +1091,7 @@ async function submitActionModal() { closeActionModal(); } } catch (error) { - if (message) message.textContent = error.message || "执行失败"; + if (message) message.textContent = formatActionErrorMessage(error); if (submit) submit.disabled = false; return; } @@ -1005,6 +1201,7 @@ async function logoutSession() { appState.tenantUsage = null; appState.adminOpsOverview = null; appState.adminFixRuns = []; + clearRecoveryRecords(); appState.integrationHealth = null; appState.storageStatus = null; appState.liveRecorderHealth = null; @@ -1617,6 +1814,7 @@ async function bootstrap() { persistSession(null); } } finally { + appState.recoveryRecords = getRecoveryRecords(); setBusy(false, ""); renderAll(); } @@ -2460,6 +2658,7 @@ function renderOneLinerActionRegistryPanel() { function renderTenantQuotaPanel() { const quota = appState.tenantQuota; const usage = appState.tenantUsage || quota?.usage || {}; + const quotaNotice = renderQuotaBlockingNotice(); if (!quota && !usage) { return `
刷新后会自动读取失败任务、集成健康和待审事件。
等管理员做一次扫描或审计处理后,这里会自动出现。
${escapeHtml(brief(job.error || recovery.reason || "任务失败,请查看恢复说明。", 120))}
+ +最近任务都在正常推进,暂时不需要恢复。
${escapeHtml(item.summary || item.reason || "恢复动作已完成。")}
+ +你执行过的恢复动作会出现在这里。
${escapeHtml(storageBlocked + ? "当前租户存储已经达到上限,建议先清理旧产物或提升存储配额。" + : blocked.map((item) => item.guard.reason).join(";"))}
+ +${escapeHtml(brief(detail.job.style_summary || detail.job.transcript_text || detail.job.error || "暂无摘要", 120))}
${escapeHtml(recovery.reason)}
+ +${escapeHtml(recovery.reason)}
+ +${escapeHtml(brief(job.error || recovery.reason || "任务失败,可重新发起。", 120))}
+ +${escapeHtml(recovery.quotaGuard.reason)}
+${escapeHtml(`最近失败任务共 ${formatNumber(failedJobs.length)} 条,其中 ${formatNumber(failedJobs.length - selectable.length)} 条需要人工处理。`)}
+ +