diff --git a/collector-service/app/oneliner_features.py b/collector-service/app/oneliner_features.py index 08fc699..a0c55fc 100644 --- a/collector-service/app/oneliner_features.py +++ b/collector-service/app/oneliner_features.py @@ -2,6 +2,7 @@ from __future__ import annotations import json import re +from datetime import datetime, timezone from typing import Any from fastapi import Depends, HTTPException, Query @@ -95,6 +96,44 @@ class OneLinerActionExecuteRequest(BaseModel): payload: dict[str, Any] = Field(default_factory=dict) +class OneLinerActionDefinitionRequest(BaseModel): + label: str = "" + description: str = "" + category: str = "custom" + status: str = "enabled" + admin_only: bool | None = None + requires_platform: bool | None = None + config: dict[str, Any] = Field(default_factory=dict) + + +class AdminFixPlanRequest(BaseModel): + incident_id: str = "" + scope: str = "plan" + notes: str = "" + + +class AdminFixRunReviewRequest(BaseModel): + review_status: str = "approved" + review_notes: str = "" + + +class PlatformSkillRollbackRequest(BaseModel): + project_id: str = "" + version_id: str = "" + + +class TenantQuotaRequest(BaseModel): + monthly_budget_cents: int = Field(default=0, ge=0) + storage_limit_bytes: int = Field(default=0, ge=0) + analysis_quota: int = Field(default=0, ge=0) + copy_quota: int = Field(default=0, ge=0) + ai_video_quota: int = Field(default=0, ge=0) + real_cut_quota: int = Field(default=0, ge=0) + recorder_quota: int = Field(default=0, ge=0) + enabled: bool = True + config: dict[str, Any] = Field(default_factory=dict) + + INTENT_ACTIONS: dict[str, list[dict[str, Any]]] = { "create_project": [{"key": "goto-intake", "label": "去我的项目", "kind": "navigate"}], "create_assistant": [{"key": "open-create-assistant", "label": "创建 Agent", "kind": "ui_action"}], @@ -128,6 +167,139 @@ INTENT_LABELS = { "custom": "自定义任务", } +ACTION_REGISTRY_DEFAULTS: dict[str, dict[str, Any]] = { + "platform-self-check": { + "label": "运行平台自检", + "description": "检查当前平台 Agent 的路由、执行 Agent、技能和记忆是否达到可运行状态。", + "category": "platform", + "handler_key": "platform-self-check", + "status": "enabled", + "admin_only": False, + "requires_platform": True, + "config": {}, + }, + "storage-status": { + "label": "查看当前存储状态", + "description": "查看当前租户项目的 jobs、downloads、NAS 目录占用和最近产物。", + "category": "storage", + "handler_key": "storage-status", + "status": "enabled", + "admin_only": False, + "requires_platform": False, + "config": {}, + }, + "live-recorder-status": { + "label": "查看录制状态", + "description": "查看当前租户名下的录制源、运行状态和最近录像文件。", + "category": "recorder", + "handler_key": "live-recorder-status", + "status": "enabled", + "admin_only": False, + "requires_platform": False, + "config": {}, + }, + "scan-admin-ops": { + "label": "重新扫描故障", + "description": "让管理员运维 Agent 重新汇总当前失败任务与集成异常。", + "category": "admin_ops", + "handler_key": "scan-admin-ops", + "status": "enabled", + "admin_only": True, + "requires_platform": False, + "config": {}, + }, + "generate-copy": { + "label": "直接生成一版文案", + "description": "基于当前项目、执行 Agent 和最近上下文直接生成一版文案。", + "category": "content", + "handler_key": "generate-copy", + "status": "enabled", + "admin_only": False, + "requires_platform": False, + "config": {}, + }, + "review-draft": { + "label": "生成复盘草稿", + "description": "基于最近完成任务自动生成或回收一版复盘草稿。", + "category": "review", + "handler_key": "review-draft", + "status": "enabled", + "admin_only": False, + "requires_platform": False, + "config": {}, + }, + "import-homepage": { + "label": "直接导入主页", + "description": "把主页导入当前项目并触发内容源同步。", + "category": "intake", + "handler_key": "import-homepage", + "status": "enabled", + "admin_only": False, + "requires_platform": True, + "config": {"auto_trigger_analysis": True}, + }, + "analyze-top-videos": { + "label": "直接分析高分作品", + "description": "拆解当前平台账号的高分作品,并把结论沉淀到平台记忆。", + "category": "analysis", + "handler_key": "analyze-top-videos", + "status": "enabled", + "admin_only": False, + "requires_platform": True, + "config": {"top_video_count": 4}, + }, + "create-ai-video": { + "label": "直接创建 AI 视频", + "description": "基于最近可用源任务直接创建 AI 视频链任务。", + "category": "production", + "handler_key": "create-ai-video", + "status": "enabled", + "admin_only": False, + "requires_platform": False, + "config": {}, + }, + "create-real-cut": { + "label": "直接创建实拍剪辑", + "description": "基于最近可用源任务直接创建实拍剪辑链任务。", + "category": "production", + "handler_key": "create-real-cut", + "status": "enabled", + "admin_only": False, + "requires_platform": False, + "config": {}, + }, + "save-live-recorder-source": { + "label": "直接保存录制源", + "description": "把直播源直接保存到当前租户的 NAS 录制配置。", + "category": "recorder", + "handler_key": "save-live-recorder-source", + "status": "enabled", + "admin_only": False, + "requires_platform": True, + "config": {"auto_start": True}, + }, +} + +USAGE_COST_DEFAULTS: dict[str, dict[str, Any]] = { + "analysis": {"cost_cents": 6, "quota_field": "analysis_quota"}, + "content_source_sync": {"cost_cents": 8, "quota_field": "analysis_quota"}, + "copy": {"cost_cents": 3, "quota_field": "copy_quota"}, + "review": {"cost_cents": 1, "quota_field": "analysis_quota"}, + "ai_video": {"cost_cents": 30, "quota_field": "ai_video_quota"}, + "real_cut": {"cost_cents": 20, "quota_field": "real_cut_quota"}, + "live_recorder": {"cost_cents": 2, "quota_field": "recorder_quota"}, +} + +ACTION_USAGE_KEYS: dict[str, str] = { + "generate-copy": "copy", + "review-draft": "review", + "import-homepage": "content_source_sync", + "analyze-top-videos": "analysis", + "create-ai-video": "ai_video", + "create-real-cut": "real_cut", + "save-live-recorder-source": "live_recorder", +} + def register_oneliner_routes(app: Any, legacy: Any) -> None: def now() -> str: @@ -148,6 +320,17 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: def _dump(value: Any) -> str: return json.dumps(value or {}, ensure_ascii=False) + def _bool_flag(value: Any) -> bool: + if isinstance(value, bool): + return value + if value in {1, "1", "true", "True", "yes", "on"}: + return True + return False + + def _current_cycle_start() -> str: + current = datetime.now(timezone.utc) + return current.replace(day=1, hour=0, minute=0, second=0, microsecond=0).isoformat().replace("+00:00", "Z") + def ensure_schema() -> None: schema = """ CREATE TABLE IF NOT EXISTS oneliner_profiles ( @@ -297,6 +480,100 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: FOREIGN KEY(actor_user_id) REFERENCES accounts(id) ON DELETE SET NULL, FOREIGN KEY(incident_id) REFERENCES admin_ops_incidents(id) ON DELETE CASCADE ); + + CREATE TABLE IF NOT EXISTS oneliner_action_definitions ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + action_key TEXT NOT NULL, + handler_key TEXT NOT NULL DEFAULT '', + label TEXT NOT NULL DEFAULT '', + description TEXT NOT NULL DEFAULT '', + category TEXT NOT NULL DEFAULT 'custom', + status TEXT NOT NULL DEFAULT 'enabled', + admin_only INTEGER NOT NULL DEFAULT 0, + requires_platform INTEGER NOT NULL DEFAULT 0, + config_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, project_id, action_key), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS agent_skill_versions ( + id TEXT PRIMARY KEY, + skill_id TEXT NOT NULL, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + agent_scope TEXT NOT NULL, + platform TEXT NOT NULL DEFAULT '', + version_no INTEGER NOT NULL DEFAULT 1, + snapshot_reason TEXT NOT NULL DEFAULT 'updated', + snapshot_json TEXT NOT NULL DEFAULT '{}', + actor_user_id TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + UNIQUE(skill_id, version_no), + FOREIGN KEY(skill_id) REFERENCES agent_skills(id) ON DELETE CASCADE, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(actor_user_id) REFERENCES accounts(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS admin_ops_fix_runs ( + id TEXT PRIMARY KEY, + incident_id TEXT NOT NULL, + actor_user_id TEXT NOT NULL DEFAULT '', + tenant_user_id TEXT NOT NULL DEFAULT '', + tenant_project_id TEXT NOT NULL DEFAULT '', + plan_scope TEXT NOT NULL DEFAULT 'plan', + status TEXT NOT NULL DEFAULT 'planned', + audit_status TEXT NOT NULL DEFAULT 'pending', + review_notes TEXT NOT NULL DEFAULT '', + plan_json TEXT NOT NULL DEFAULT '{}', + verification_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(incident_id) REFERENCES admin_ops_incidents(id) ON DELETE CASCADE, + FOREIGN KEY(actor_user_id) REFERENCES accounts(id) ON DELETE SET NULL, + FOREIGN KEY(tenant_user_id) REFERENCES accounts(id) ON DELETE SET NULL, + FOREIGN KEY(tenant_project_id) REFERENCES projects(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS tenant_quota_profiles ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + monthly_budget_cents INTEGER NOT NULL DEFAULT 0, + storage_limit_bytes INTEGER NOT NULL DEFAULT 0, + analysis_quota INTEGER NOT NULL DEFAULT 0, + copy_quota INTEGER NOT NULL DEFAULT 0, + ai_video_quota INTEGER NOT NULL DEFAULT 0, + real_cut_quota INTEGER NOT NULL DEFAULT 0, + recorder_quota INTEGER NOT NULL DEFAULT 0, + enabled INTEGER NOT NULL DEFAULT 1, + config_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, project_id), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS tenant_usage_ledger ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + category TEXT NOT NULL, + quantity INTEGER NOT NULL DEFAULT 1, + cost_cents INTEGER NOT NULL DEFAULT 0, + reference_type TEXT NOT NULL DEFAULT '', + reference_id TEXT NOT NULL DEFAULT '', + details_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL + ); """ with legacy.db.session() as conn: conn.executescript(schema) @@ -605,6 +882,271 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "created_at": row.get("created_at", ""), } + def _action_definition_payload(row: dict[str, Any] | None, *, fallback_key: str = "") -> dict[str, Any]: + fallback = ACTION_REGISTRY_DEFAULTS.get(fallback_key or "", {}) + data = row or {} + action_key = data.get("action_key") or fallback_key + return { + "id": data.get("id", ""), + "user_id": data.get("user_id", ""), + "project_id": data.get("project_id", ""), + "action_key": action_key, + "handler_key": data.get("handler_key") or fallback.get("handler_key") or action_key, + "label": data.get("label") or fallback.get("label") or action_key, + "description": data.get("description") or fallback.get("description") or "", + "category": data.get("category") or fallback.get("category") or "custom", + "status": data.get("status") or fallback.get("status") or "enabled", + "admin_only": _bool_flag(data.get("admin_only", fallback.get("admin_only", False))), + "requires_platform": _bool_flag(data.get("requires_platform", fallback.get("requires_platform", False))), + "config": _parse_json(data.get("config_json"), fallback.get("config") or {}), + "created_at": data.get("created_at", ""), + "updated_at": data.get("updated_at", ""), + "source": "override" if row else "default", + } + + def _list_action_registry(account: dict[str, Any], *, project_id: str) -> list[dict[str, Any]]: + rows = legacy.db.fetch_all( + """ + SELECT * FROM oneliner_action_definitions + WHERE user_id = ? AND project_id = ? + ORDER BY category ASC, action_key ASC + """, + (account["id"], project_id), + ) + row_map = {row["action_key"]: row for row in rows} + items = [] + for action_key in sorted(ACTION_REGISTRY_DEFAULTS.keys()): + items.append(_action_definition_payload(row_map.get(action_key), fallback_key=action_key)) + for action_key, row in row_map.items(): + if action_key in ACTION_REGISTRY_DEFAULTS: + continue + items.append(_action_definition_payload(row, fallback_key=action_key)) + return items + + def _get_action_definition(account: dict[str, Any], *, project_id: str, action_key: str) -> dict[str, Any] | None: + normalized_key = str(action_key or "").strip() + if not normalized_key: + return None + row = legacy.db.fetch_one( + """ + SELECT * FROM oneliner_action_definitions + WHERE user_id = ? AND project_id = ? AND action_key = ? + """, + (account["id"], project_id, normalized_key), + ) + if row: + return _action_definition_payload(row, fallback_key=normalized_key) + if normalized_key in ACTION_REGISTRY_DEFAULTS: + return _action_definition_payload(None, fallback_key=normalized_key) + return None + + def _decorate_oneliner_action(account: dict[str, Any], *, project_id: str, action: dict[str, Any]) -> dict[str, Any]: + cloned = dict(action or {}) + executor_key = str(cloned.get("executor_key") or "").strip() + if not executor_key: + return cloned + definition = _get_action_definition(account, project_id=project_id, action_key=executor_key) + if not definition: + cloned["disabled_reason"] = "当前租户还没有接入这条动作。" + return cloned + cloned["executor_label"] = definition.get("label") or cloned.get("label") or executor_key + if cloned.get("kind") == "api_action" and not cloned.get("label"): + cloned["label"] = definition.get("label") or executor_key + if definition.get("status") != "enabled": + cloned["disabled_reason"] = "当前租户已停用这条动作。" + elif definition.get("admin_only") and account.get("role") != "super_admin": + cloned["disabled_reason"] = "只有平台管理者才能执行这条动作。" + elif definition.get("requires_platform") and not cloned.get("platform"): + cloned["disabled_reason"] = "这条动作需要先选定平台。" + return cloned + + def _upsert_action_definition( + account: dict[str, Any], + *, + project_id: str, + action_key: str, + request: OneLinerActionDefinitionRequest, + ) -> dict[str, Any]: + normalized_key = str(action_key or "").strip() + if not normalized_key: + raise HTTPException(status_code=400, detail="Action key is required") + fallback = ACTION_REGISTRY_DEFAULTS.get(normalized_key) + existing = legacy.db.fetch_one( + """ + SELECT * FROM oneliner_action_definitions + WHERE user_id = ? AND project_id = ? AND action_key = ? + """, + (account["id"], project_id, normalized_key), + ) + if not existing and not fallback: + raise HTTPException(status_code=404, detail="Action definition not found") + timestamp = now() + handler_key = (existing or {}).get("handler_key") or (fallback or {}).get("handler_key") or normalized_key + next_admin_only = (fallback or {}).get("admin_only", False) + next_requires_platform = (fallback or {}).get("requires_platform", False) + if account.get("role") == "super_admin": + if request.admin_only is not None: + next_admin_only = bool(request.admin_only) + if request.requires_platform is not None: + next_requires_platform = bool(request.requires_platform) + if existing: + legacy.db.execute( + """ + UPDATE oneliner_action_definitions + SET label = ?, description = ?, category = ?, status = ?, admin_only = ?, requires_platform = ?, config_json = ?, updated_at = ? + WHERE id = ? + """, + ( + request.label.strip() or existing.get("label") or (fallback or {}).get("label") or normalized_key, + request.description.strip() or existing.get("description") or (fallback or {}).get("description") or "", + request.category.strip() or existing.get("category") or (fallback or {}).get("category") or "custom", + request.status.strip() or existing.get("status") or (fallback or {}).get("status") or "enabled", + 1 if next_admin_only else 0, + 1 if next_requires_platform else 0, + _dump(request.config or _parse_json(existing.get("config_json"), (fallback or {}).get("config") or {})), + timestamp, + existing["id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM oneliner_action_definitions WHERE id = ?", (existing["id"],)) + else: + definition_id = make_id("oline_action") + legacy.db.execute( + """ + INSERT INTO oneliner_action_definitions ( + id, user_id, project_id, action_key, handler_key, label, description, category, status, + admin_only, requires_platform, config_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + definition_id, + account["id"], + project_id, + normalized_key, + handler_key, + request.label.strip() or (fallback or {}).get("label") or normalized_key, + request.description.strip() or (fallback or {}).get("description") or "", + request.category.strip() or (fallback or {}).get("category") or "custom", + request.status.strip() or (fallback or {}).get("status") or "enabled", + 1 if next_admin_only else 0, + 1 if next_requires_platform else 0, + _dump(request.config or (fallback or {}).get("config") or {}), + timestamp, + timestamp, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM oneliner_action_definitions WHERE id = ?", (definition_id,)) + return _action_definition_payload(row, fallback_key=normalized_key) + + def _skill_version_payload(row: dict[str, Any]) -> dict[str, Any]: + snapshot = _parse_json(row.get("snapshot_json"), {}) + return { + "id": row["id"], + "skill_id": row.get("skill_id", ""), + "user_id": row.get("user_id", ""), + "project_id": row.get("project_id", ""), + "agent_scope": row.get("agent_scope", ""), + "platform": row.get("platform", ""), + "platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "", + "version_no": int(row.get("version_no") or 0), + "snapshot_reason": row.get("snapshot_reason", "updated"), + "snapshot": snapshot, + "actor_user_id": row.get("actor_user_id", ""), + "created_at": row.get("created_at", ""), + } + + def _snapshot_skill_version( + skill_row: dict[str, Any], + *, + actor_user_id: str, + reason: str, + metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + current = legacy.db.fetch_one( + "SELECT COALESCE(MAX(version_no), 0) AS max_version FROM agent_skill_versions WHERE skill_id = ?", + (skill_row["id"],), + ) + next_version = int((current or {}).get("max_version") or 0) + 1 + version_id = make_id("skill_ver") + snapshot = { + "skill": _skill_payload(skill_row), + "metadata": metadata or {}, + } + legacy.db.execute( + """ + INSERT INTO agent_skill_versions ( + id, skill_id, user_id, project_id, agent_scope, platform, version_no, snapshot_reason, + snapshot_json, actor_user_id, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + version_id, + skill_row["id"], + skill_row.get("user_id", ""), + skill_row.get("project_id", ""), + skill_row.get("agent_scope", ""), + skill_row.get("platform", ""), + next_version, + reason, + _dump(snapshot), + actor_user_id, + now(), + ), + ) + row = legacy.db.fetch_one("SELECT * FROM agent_skill_versions WHERE id = ?", (version_id,)) + return _skill_version_payload(row) + + def _fix_run_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "incident_id": row.get("incident_id", ""), + "actor_user_id": row.get("actor_user_id", ""), + "tenant_user_id": row.get("tenant_user_id", ""), + "tenant_project_id": row.get("tenant_project_id", ""), + "plan_scope": row.get("plan_scope", "plan"), + "status": row.get("status", "planned"), + "audit_status": row.get("audit_status", "pending"), + "review_notes": row.get("review_notes", ""), + "plan": _parse_json(row.get("plan_json"), {}), + "verification": _parse_json(row.get("verification_json"), {}), + "created_at": row.get("created_at", ""), + "updated_at": row.get("updated_at", ""), + } + + def _tenant_quota_payload(row: dict[str, Any] | None, *, usage: dict[str, Any] | None = None) -> dict[str, Any]: + data = row or {} + return { + "id": data.get("id", ""), + "user_id": data.get("user_id", ""), + "project_id": data.get("project_id", ""), + "monthly_budget_cents": int(data.get("monthly_budget_cents") or 0), + "storage_limit_bytes": int(data.get("storage_limit_bytes") or 0), + "analysis_quota": int(data.get("analysis_quota") or 0), + "copy_quota": int(data.get("copy_quota") or 0), + "ai_video_quota": int(data.get("ai_video_quota") or 0), + "real_cut_quota": int(data.get("real_cut_quota") or 0), + "recorder_quota": int(data.get("recorder_quota") or 0), + "enabled": True if row is None else _bool_flag(data.get("enabled", 1)), + "config": _parse_json(data.get("config_json"), {}), + "usage": usage or {}, + "created_at": data.get("created_at", ""), + "updated_at": data.get("updated_at", ""), + } + + def _tenant_usage_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "user_id": row.get("user_id", ""), + "project_id": row.get("project_id", ""), + "category": row.get("category", ""), + "quantity": int(row.get("quantity") or 0), + "cost_cents": int(row.get("cost_cents") or 0), + "reference_type": row.get("reference_type", ""), + "reference_id": row.get("reference_id", ""), + "details": _parse_json(row.get("details_json"), {}), + "created_at": row.get("created_at", ""), + } + def _log_admin_audit_event( *, actor_user_id: str, @@ -641,6 +1183,126 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: row = legacy.db.fetch_one("SELECT * FROM admin_ops_audit_logs WHERE id = ?", (audit_id,)) return _admin_audit_payload(row) + def _project_storage_bytes(account: dict[str, Any], *, project_id: str) -> int: + try: + payload = legacy.storage_status(project_id=project_id, account=account) + except Exception: + return 0 + tenant_usage = payload.get("tenant_usage", {}) if isinstance(payload, dict) else {} + jobs_bytes = int((((tenant_usage.get("project_jobs") or {}).get("bytes")) or 0)) + downloads_bytes = int((((tenant_usage.get("project_downloads") or {}).get("bytes")) or 0)) + return jobs_bytes + downloads_bytes + + def _tenant_usage_summary(account: dict[str, Any], *, project_id: str) -> dict[str, Any]: + cycle_start = _current_cycle_start() + rows = legacy.db.fetch_all( + """ + SELECT category, SUM(quantity) AS quantity, SUM(cost_cents) AS cost_cents + FROM tenant_usage_ledger + WHERE user_id = ? AND project_id = ? AND created_at >= ? + GROUP BY category + ORDER BY category ASC + """, + (account["id"], project_id, cycle_start), + ) + by_category: dict[str, dict[str, Any]] = {} + for row in rows: + category = row.get("category", "") + by_category[category] = { + "category": category, + "quantity": int(row.get("quantity") or 0), + "cost_cents": int(row.get("cost_cents") or 0), + } + recent_rows = legacy.db.fetch_all( + """ + SELECT * FROM tenant_usage_ledger + WHERE user_id = ? AND project_id = ? + ORDER BY created_at DESC + LIMIT 20 + """, + (account["id"], project_id), + ) + total_cost = sum(item["cost_cents"] for item in by_category.values()) + storage_bytes = _project_storage_bytes(account, project_id=project_id) + return { + "cycle_start": cycle_start, + "categories": by_category, + "total_cost_cents": total_cost, + "recent_items": [_tenant_usage_payload(row) for row in recent_rows], + "storage_bytes": storage_bytes, + } + + def _get_tenant_quota_row(account: dict[str, Any], *, project_id: str) -> dict[str, Any] | None: + return legacy.db.fetch_one( + "SELECT * FROM tenant_quota_profiles WHERE user_id = ? AND project_id = ?", + (account["id"], project_id), + ) + + def _get_tenant_quota(account: dict[str, Any], *, project_id: str) -> dict[str, Any]: + usage = _tenant_usage_summary(account, project_id=project_id) + row = _get_tenant_quota_row(account, project_id=project_id) + payload = _tenant_quota_payload(row, usage=usage) + storage_limit = int(payload.get("storage_limit_bytes") or 0) + payload["storage_over_limit"] = bool(storage_limit and usage["storage_bytes"] >= storage_limit) + return payload + + def _record_tenant_usage( + account: dict[str, Any], + *, + project_id: str, + category: str, + reference_type: str, + reference_id: str, + details: dict[str, Any] | None = None, + quantity: int = 1, + ) -> dict[str, Any]: + usage_meta = USAGE_COST_DEFAULTS.get(category, {}) + usage_id = make_id("usage") + legacy.db.execute( + """ + INSERT INTO tenant_usage_ledger ( + id, user_id, project_id, category, quantity, cost_cents, reference_type, reference_id, details_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + usage_id, + account["id"], + project_id, + category, + int(quantity or 1), + int(usage_meta.get("cost_cents") or 0) * int(quantity or 1), + reference_type, + reference_id, + _dump(details or {}), + now(), + ), + ) + row = legacy.db.fetch_one("SELECT * FROM tenant_usage_ledger WHERE id = ?", (usage_id,)) + return _tenant_usage_payload(row) + + def _enforce_tenant_quota(account: dict[str, Any], *, project_id: str, usage_category: str) -> None: + quota = _get_tenant_quota(account, project_id=project_id) + if not quota.get("enabled", True): + return + usage = quota.get("usage", {}) + category_meta = USAGE_COST_DEFAULTS.get(usage_category, {}) + quota_field = category_meta.get("quota_field") + if quota_field: + allowed = int(quota.get(quota_field) or 0) + consumed = int(((usage.get("categories") or {}).get(usage_category) or {}).get("quantity") or 0) + if allowed and consumed >= allowed: + raise HTTPException(status_code=403, detail=f"当前租户本周期的 {usage_category} 配额已用完") + budget = int(quota.get("monthly_budget_cents") or 0) + total_cost = int((usage.get("total_cost_cents") or 0)) + next_cost = int(category_meta.get("cost_cents") or 0) + if budget and total_cost + next_cost > budget: + raise HTTPException(status_code=403, detail="当前租户本周期预算不足,已阻止本次动作执行") + storage_limit = int(quota.get("storage_limit_bytes") or 0) + if storage_limit and usage_category in {"analysis", "content_source_sync", "ai_video", "real_cut"}: + storage_bytes = int(usage.get("storage_bytes") or 0) + if storage_bytes >= storage_limit: + raise HTTPException(status_code=403, detail="当前租户存储额度已满,已阻止继续产生大文件缓存") + def _platform_source_samples( account: dict[str, Any], *, @@ -1472,6 +2134,10 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "platform": "", } ) + secondary_actions = [ + _decorate_oneliner_action(account, project_id=project_id or "", action=item) + for item in secondary_actions + ] return { "summary_text": "\n".join([line for line in summary_lines if line.strip()]), "context": context, @@ -1685,6 +2351,12 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ), ) row = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (existing["id"],)) + _snapshot_skill_version( + row, + actor_user_id=account["id"], + reason="updated", + metadata={"via": "upsert", "accepted": row.get("status") == "validated"}, + ) else: new_id = make_id("skill") legacy.db.execute( @@ -1716,6 +2388,12 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ), ) row = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (new_id,)) + _snapshot_skill_version( + row, + actor_user_id=account["id"], + reason="created", + metadata={"via": "upsert"}, + ) return _skill_payload(row) def _create_or_update_incident( @@ -1734,47 +2412,55 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: (source_type, source_id, title), ) timestamp = now() + disable_fk = not str(tenant_user_id or "").strip() or not str(tenant_project_id or "").strip() if existing: - legacy.db.execute( - """ + sql = """ UPDATE admin_ops_incidents SET tenant_user_id = ?, tenant_project_id = ?, severity = ?, summary = ?, payload_json = ?, updated_at = ? WHERE id = ? - """, - ( - tenant_user_id, - tenant_project_id, - severity, - summary, - _dump(payload), - timestamp, - existing["id"], - ), + """ + params = ( + tenant_user_id, + tenant_project_id, + severity, + summary, + _dump(payload), + timestamp, + existing["id"], ) + if disable_fk: + with legacy.db.session() as conn: + conn.execute("PRAGMA foreign_keys=OFF") + conn.execute(sql, params) + conn.execute("PRAGMA foreign_keys=ON") + else: + legacy.db.execute(sql, params) row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (existing["id"],)) else: incident_id = make_id("incident") - legacy.db.execute( - """ + sql = """ INSERT INTO admin_ops_incidents ( id, tenant_user_id, tenant_project_id, source_type, source_id, severity, title, summary, payload_json, status, assigned_to, reviewed_by, review_notes, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'open', '', '', '', ?, ?) - """, - ( - incident_id, - tenant_user_id, - tenant_project_id, - source_type, - source_id, - severity, - title, - summary, - _dump(payload), - timestamp, - timestamp, - ), + """ + params = ( + incident_id, + tenant_user_id, + tenant_project_id, + source_type, + source_id, + severity, + title, + summary, + _dump(payload), + timestamp, + timestamp, ) + with legacy.db.session() as conn: + conn.execute("PRAGMA foreign_keys=OFF") + conn.execute(sql, params) + conn.execute("PRAGMA foreign_keys=ON") row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,)) return _incident_payload(row) @@ -1831,6 +2517,98 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ), } + def _build_incident_repair_plan( + incident: dict[str, Any], + *, + scope: str, + notes: str, + ) -> dict[str, Any]: + payload = incident.get("payload") or {} + source_type = incident.get("source_type") or "" + severity = incident.get("severity") or "warn" + if source_type == "integration": + target = incident.get("source_id") or "integration" + steps = [ + f"读取 {target} 当前健康状态和最近错误详情。", + f"对 {target} 执行最小可行 smoke,确认是网络、配置还是服务端版本问题。", + "若为外部依赖异常,则先生成修复建议而不是直接改核心代码。", + ] + verification = [ + "health 接口恢复可达", + "关键能力 smoke 返回 200", + "相关任务链不再新增 failed", + ] + elif source_type == "job": + target = payload.get("title") or incident.get("source_id") or "job" + steps = [ + f"读取失败任务 {target} 的 error/result/artifacts。", + "定位失败在哪个集成或哪段编排上。", + "生成补救建议或重试路径,并明确是否需要人工确认。", + ] + verification = [ + "同类任务能再次跑通", + "错误不再重复出现", + "租户数据和存储路径未被污染", + ] + else: + steps = [ + "读取当前事件上下文和最近变更。", + "先给出低风险修复建议,再决定是否进入人工处理。", + ] + verification = ["事件状态可被复核", "没有破坏多租户隔离"] + return { + "summary": f"针对 {incident.get('title') or incident.get('id')} 生成一版{scope}级修复计划。", + "severity": severity, + "scope": scope, + "source_type": source_type, + "steps": steps, + "verification": verification, + "safe_boundary": { + "core_code_locked": True, + "tenant_isolation_required": True, + "audit_required": True, + }, + "notes": notes.strip(), + } + + def _create_fix_run( + admin: dict[str, Any], + *, + incident: dict[str, Any], + scope: str, + notes: str, + ) -> dict[str, Any]: + plan = _build_incident_repair_plan(incident, scope=scope, notes=notes) + run_id = make_id("fix_run") + timestamp = now() + disable_fk = not str(incident.get("tenant_user_id") or "").strip() or not str(incident.get("tenant_project_id") or "").strip() + sql = """ + INSERT INTO admin_ops_fix_runs ( + id, incident_id, actor_user_id, tenant_user_id, tenant_project_id, plan_scope, status, + audit_status, review_notes, plan_json, verification_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, 'planned', 'pending', '', ?, ?, ?, ?) + """ + params = ( + run_id, + incident["id"], + admin["id"], + incident.get("tenant_user_id", ""), + incident.get("tenant_project_id", ""), + scope, + _dump(plan), + _dump({"checks": plan.get("verification", [])}), + timestamp, + timestamp, + ) + with legacy.db.session() as conn: + if disable_fk: + conn.execute("PRAGMA foreign_keys=OFF") + conn.execute(sql, params) + if disable_fk: + conn.execute("PRAGMA foreign_keys=ON") + row = legacy.db.fetch_one("SELECT * FROM admin_ops_fix_runs WHERE id = ?", (run_id,)) + return _fix_run_payload(row) + def _admin_ops_overview_payload(admin: dict[str, Any]) -> dict[str, Any]: incidents = [ _incident_payload(row) @@ -1860,6 +2638,12 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "SELECT * FROM admin_ops_audit_logs ORDER BY created_at DESC LIMIT 20" ) ] + recent_fix_runs = [ + _fix_run_payload(row) + for row in legacy.db.fetch_all( + "SELECT * FROM admin_ops_fix_runs ORDER BY updated_at DESC LIMIT 20" + ) + ] return { "incidents": incidents, "incident_count": len(incidents), @@ -1871,6 +2655,9 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "pending_account_count": len(pending_accounts), "recent_audits": recent_audits, "audit_count": len(recent_audits), + "recent_fix_runs": recent_fix_runs, + "fix_run_count": len(recent_fix_runs), + "pending_fix_run_count": len([item for item in recent_fix_runs if item.get("audit_status") == "pending"]), "integration_health": legacy.integrations_health(admin), } @@ -2004,6 +2791,12 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ), ) updated = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (skill_id,)) + version = _snapshot_skill_version( + updated, + actor_user_id=account["id"], + reason="validated" if accepted else "needs_revision", + metadata={"review_notes": request.review_notes.strip(), "score": request.score}, + ) feedback_summary = (request.summary or request.review_notes or "").strip() feedback_memory = None if feedback_summary: @@ -2025,10 +2818,95 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: confidence=0.9 if accepted else 0.66, ) payload = _skill_payload(updated) + payload["version"] = version if feedback_memory: payload["feedback_memory"] = feedback_memory return payload + def _list_skill_versions( + account: dict[str, Any], + *, + platform: str, + project_id: str, + skill_id: str, + ) -> list[dict[str, Any]]: + skill_row = legacy.db.fetch_one( + """ + SELECT * FROM agent_skills + WHERE id = ? AND user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ? + """, + (skill_id, account["id"], project_id, platform), + ) + if not skill_row: + raise HTTPException(status_code=404, detail="Platform skill not found") + rows = legacy.db.fetch_all( + "SELECT * FROM agent_skill_versions WHERE skill_id = ? ORDER BY version_no DESC, created_at DESC", + (skill_id,), + ) + return [_skill_version_payload(row) for row in rows] + + def _rollback_platform_skill( + account: dict[str, Any], + *, + platform: str, + skill_id: str, + request: PlatformSkillRollbackRequest, + ) -> dict[str, Any]: + project = _resolve_project(account, request.project_id or None) + current = legacy.db.fetch_one( + """ + SELECT * FROM agent_skills + WHERE id = ? AND user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ? + """, + (skill_id, account["id"], project["id"], platform), + ) + if not current: + raise HTTPException(status_code=404, detail="Platform skill not found") + target_version = legacy.db.fetch_one( + """ + SELECT * FROM agent_skill_versions + WHERE id = ? AND skill_id = ? + """, + (request.version_id.strip(), skill_id), + ) + if not target_version: + raise HTTPException(status_code=404, detail="Skill version not found") + snapshot = _parse_json(target_version.get("snapshot_json"), {}) + skill_snapshot = (snapshot.get("skill") or {}) + timestamp = now() + legacy.db.execute( + """ + UPDATE agent_skills + SET name = ?, status = ?, method_json = ?, test_spec_json = ?, last_result_json = ?, + success_count = ?, failure_count = ?, last_score = ?, last_validated_at = ?, updated_at = ? + WHERE id = ? + """, + ( + skill_snapshot.get("name") or current.get("name") or current.get("skill_key") or "平台技能", + skill_snapshot.get("status") or "draft", + _dump(skill_snapshot.get("method") or {}), + _dump(skill_snapshot.get("test_spec") or {}), + _dump(skill_snapshot.get("last_result") or {}), + int(skill_snapshot.get("success_count") or 0), + int(skill_snapshot.get("failure_count") or 0), + float(skill_snapshot.get("last_score") or 0), + skill_snapshot.get("last_validated_at") or timestamp, + timestamp, + skill_id, + ), + ) + updated = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (skill_id,)) + rollback_version = _snapshot_skill_version( + updated, + actor_user_id=account["id"], + reason="rollback", + metadata={"from_version_id": request.version_id.strip()}, + ) + payload = _skill_payload(updated) + payload["rollback_from_version"] = _skill_version_payload(target_version) + payload["version"] = rollback_version + return payload + async def _execute_oneliner_action( account: dict[str, Any], request: OneLinerActionExecuteRequest, @@ -2038,6 +2916,19 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: action_key = (request.action_key or "").strip() if not action_key: raise HTTPException(status_code=400, detail="Action key is required") + action_definition = _get_action_definition(account, project_id=project["id"], action_key=action_key) + if not action_definition: + raise HTTPException(status_code=404, detail=f"Action definition not found: {action_key}") + if action_definition.get("status") != "enabled": + raise HTTPException(status_code=403, detail=f"Action disabled for current tenant: {action_key}") + if action_definition.get("admin_only") and account.get("role") != "super_admin": + raise HTTPException(status_code=403, detail="Current action is only available to platform administrators") + if action_definition.get("requires_platform") and not normalized_platform: + raise HTTPException(status_code=400, detail="Platform is required for this action") + handler_key = action_definition.get("handler_key") or action_key + usage_category = ACTION_USAGE_KEYS.get(handler_key, "") + if usage_category: + _enforce_tenant_quota(account, project_id=project["id"], usage_category=usage_category) latest_user_message = _last_user_message_text(request.session_id, account["id"]) if request.session_id else "" requested_payload = request.payload or {} @@ -2431,10 +3322,33 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "create-real-cut": _run_create_real_cut, "save-live-recorder-source": _run_save_live_recorder_source, } - executor = executors.get(action_key) + executor = executors.get(handler_key) if not executor: - raise HTTPException(status_code=400, detail=f"Unsupported OneLiner action: {action_key}") + raise HTTPException(status_code=400, detail=f"Unsupported OneLiner action: {handler_key}") result = await executor() + usage_entry = None + if usage_category: + payload_map = result.get("payload") or {} + job_map = payload_map.get("job") or {} + saved_map = (payload_map.get("saved") or {}).get("item") or {} + usage_entry = _record_tenant_usage( + account, + project_id=project["id"], + category=usage_category, + reference_type=( + "job" + if job_map.get("id") + else "live_recorder_source" + if saved_map.get("binding_id") + else "action" + ), + reference_id=job_map.get("id") or saved_map.get("binding_id") or handler_key, + details={ + "action_key": action_key, + "handler_key": handler_key, + "platform": normalized_platform, + }, + ) if request.session_id: session = _load_owned_session(request.session_id, account) _insert_message( @@ -2455,9 +3369,12 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ) return { "action_key": action_key, + "handler_key": handler_key, "project_id": project["id"], "platform": normalized_platform, "executed_at": now(), + "action_definition": action_definition, + "usage_entry": usage_entry, **result, } @@ -2628,6 +3545,25 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ) -> dict[str, Any]: return await _execute_oneliner_action(account, request) + @app.get("/v2/oneliner/action-registry") + def list_oneliner_action_registry( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + items = _list_action_registry(account, project_id=project["id"]) + return {"items": items, "count": len(items)} + + @app.put("/v2/oneliner/action-registry/{action_key}") + def update_oneliner_action_registry( + action_key: str, + request: OneLinerActionDefinitionRequest, + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + return _upsert_action_definition(account, project_id=project["id"], action_key=action_key, request=request) + @app.get("/v2/platform-agents") def list_platform_agents( project_id: str | None = Query(default=None), @@ -2737,14 +3673,145 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: normalized_platform = _safe_platform(platform) return _review_platform_skill(account, platform=normalized_platform, skill_id=skill_id, request=request) + @app.get("/v2/platform-agents/{platform}/skills/{skill_id}/versions") + def list_platform_skill_versions( + platform: str, + skill_id: str, + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + normalized_platform = _safe_platform(platform) + items = _list_skill_versions(account, platform=normalized_platform, project_id=project["id"], skill_id=skill_id) + return {"items": items, "count": len(items)} + + @app.post("/v2/platform-agents/{platform}/skills/{skill_id}/rollback") + def rollback_platform_skill( + platform: str, + skill_id: str, + request: PlatformSkillRollbackRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + normalized_platform = _safe_platform(platform) + return _rollback_platform_skill(account, platform=normalized_platform, skill_id=skill_id, request=request) + + @app.get("/v2/tenant/quota") + def get_tenant_quota( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + return _get_tenant_quota(account, project_id=project["id"]) + + @app.put("/v2/tenant/quota") + def put_tenant_quota( + request: TenantQuotaRequest, + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + existing = _get_tenant_quota_row(account, project_id=project["id"]) + timestamp = now() + if existing: + legacy.db.execute( + """ + UPDATE tenant_quota_profiles + SET monthly_budget_cents = ?, storage_limit_bytes = ?, analysis_quota = ?, copy_quota = ?, + ai_video_quota = ?, real_cut_quota = ?, recorder_quota = ?, enabled = ?, config_json = ?, updated_at = ? + WHERE id = ? + """, + ( + request.monthly_budget_cents, + request.storage_limit_bytes, + request.analysis_quota, + request.copy_quota, + request.ai_video_quota, + request.real_cut_quota, + request.recorder_quota, + 1 if request.enabled else 0, + _dump(request.config), + timestamp, + existing["id"], + ), + ) + else: + quota_id = make_id("quota") + legacy.db.execute( + """ + INSERT INTO tenant_quota_profiles ( + id, user_id, project_id, monthly_budget_cents, storage_limit_bytes, analysis_quota, copy_quota, + ai_video_quota, real_cut_quota, recorder_quota, enabled, config_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + quota_id, + account["id"], + project["id"], + request.monthly_budget_cents, + request.storage_limit_bytes, + request.analysis_quota, + request.copy_quota, + request.ai_video_quota, + request.real_cut_quota, + request.recorder_quota, + 1 if request.enabled else 0, + _dump(request.config), + timestamp, + timestamp, + ), + ) + return _get_tenant_quota(account, project_id=project["id"]) + + @app.get("/v2/tenant/usage") + def get_tenant_usage( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + return _tenant_usage_summary(account, project_id=project["id"]) + @app.get("/v2/admin/ops/overview") def admin_ops_overview(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]: return _admin_ops_overview_payload(admin) + @app.get("/v2/admin/ops/fix-runs") + def admin_ops_fix_runs(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]: + rows = legacy.db.fetch_all( + "SELECT * FROM admin_ops_fix_runs ORDER BY updated_at DESC LIMIT 50" + ) + items = [_fix_run_payload(row) for row in rows] + return {"items": items, "count": len(items)} + @app.post("/v2/admin/ops/incidents/scan") def admin_ops_scan(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]: return _scan_admin_incidents(admin) + @app.post("/v2/admin/ops/incidents/{incident_id}/repair-plan") + def create_admin_repair_plan( + incident_id: str, + request: AdminFixPlanRequest, + admin: dict[str, Any] = Depends(legacy.require_super_admin), + ) -> dict[str, Any]: + incident_row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,)) + if not incident_row: + raise HTTPException(status_code=404, detail="Incident not found") + incident = _incident_payload(incident_row) + payload = _create_fix_run( + admin, + incident=incident, + scope=request.scope.strip() or "plan", + notes=request.notes, + ) + payload["audit"] = _log_admin_audit_event( + actor_user_id=admin["id"], + incident_id=incident_id, + action_key="repair-plan", + status="planned", + summary=f"已为事件「{incident.get('title') or incident_id}」生成修复计划。", + details={"fix_run_id": payload["id"], "scope": payload.get("plan_scope", "plan")}, + ) + return payload + @app.patch("/v2/admin/ops/incidents/{incident_id}") def review_admin_incident( incident_id: str, @@ -2784,3 +3851,58 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: }, ) return payload + + @app.post("/v2/admin/ops/fix-runs/{run_id}/audit") + def audit_admin_fix_run( + run_id: str, + request: AdminFixRunReviewRequest, + admin: dict[str, Any] = Depends(legacy.require_super_admin), + ) -> dict[str, Any]: + current = legacy.db.fetch_one("SELECT * FROM admin_ops_fix_runs WHERE id = ?", (run_id,)) + if not current: + raise HTTPException(status_code=404, detail="Fix run not found") + incident_row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (current["incident_id"],)) + review_status = request.review_status.strip() or "approved" + timestamp = now() + next_status = "audited" if review_status in {"approved", "rejected"} else "watching" + legacy.db.execute( + """ + UPDATE admin_ops_fix_runs + SET audit_status = ?, review_notes = ?, status = ?, updated_at = ? + WHERE id = ? + """, + ( + review_status, + request.review_notes.strip(), + next_status, + timestamp, + run_id, + ), + ) + if incident_row: + incident_status = "resolved" if review_status == "approved" else "watching" if review_status == "watching" else "rejected" + legacy.db.execute( + """ + UPDATE admin_ops_incidents + SET status = ?, reviewed_by = ?, review_notes = ?, updated_at = ? + WHERE id = ? + """, + ( + incident_status, + admin["id"], + request.review_notes.strip(), + timestamp, + current["incident_id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM admin_ops_fix_runs WHERE id = ?", (run_id,)) + payload = _fix_run_payload(row) + payload["audit"] = _log_admin_audit_event( + actor_user_id=admin["id"], + incident_id=current["incident_id"], + action_key="fix-run-audit", + status=review_status, + summary=f"修复计划 {run_id} 已审计为 {review_status}。", + details={"review_notes": request.review_notes.strip()}, + ) + return payload diff --git a/web/storyforge-web-v4/assets/app.js b/web/storyforge-web-v4/assets/app.js index 5555389..64828b0 100644 --- a/web/storyforge-web-v4/assets/app.js +++ b/web/storyforge-web-v4/assets/app.js @@ -35,7 +35,10 @@ const appState = { onelinerSessions: [], selectedOnelinerSessionId: "", onelinerMessages: [], + onelinerActionRegistry: [], platformAgents: [], + tenantQuota: null, + tenantUsage: null, adminOpsOverview: null, busy: false, message: "", @@ -782,6 +785,8 @@ function renderOneLinerMessagesHtml() { return `data-${escapeHtml(attrKey)}="${escapeHtml(serialized)}"`; }) ].filter(Boolean).join(" ") + , + { disabledReason: item.disabled_reason || "" } )).join("")} ` : ""} @@ -1001,7 +1006,10 @@ async function logoutSession() { appState.onelinerSessions = []; appState.selectedOnelinerSessionId = ""; appState.onelinerMessages = []; + appState.onelinerActionRegistry = []; appState.platformAgents = []; + appState.tenantQuota = null; + appState.tenantUsage = null; appState.adminOpsOverview = null; appState.integrationHealth = null; appState.storageStatus = null; @@ -1040,19 +1048,31 @@ async function loadAgentControlSurfaces(projectId = "") { const normalizedProjectId = projectId || getOneLinerProjectId(); const supportsOneLinerProfile = backendSupports("/v2/oneliner/profile"); const supportsOneLinerSessions = backendSupports("/v2/oneliner/sessions"); + const supportsActionRegistry = backendSupports("/v2/oneliner/action-registry"); const supportsPlatformAgents = backendSupports("/v2/platform-agents"); const supportsAdminOps = backendSupports("/v2/admin/ops/overview"); + const supportsTenantQuota = backendSupports("/v2/tenant/quota"); + const supportsTenantUsage = backendSupports("/v2/tenant/usage"); - const [profile, sessionsPayload, platformAgentsPayload, adminOpsOverview] = await Promise.all([ + const [profile, sessionsPayload, actionRegistryPayload, platformAgentsPayload, tenantQuota, tenantUsage, adminOpsOverview] = await Promise.all([ supportsOneLinerProfile ? storyforgeFetch(`/v2/oneliner/profile?project_id=${encodeURIComponent(normalizedProjectId)}`).catch(() => null) : Promise.resolve(null), supportsOneLinerSessions ? storyforgeFetch(`/v2/oneliner/sessions?project_id=${encodeURIComponent(normalizedProjectId)}`).catch(() => ({ items: [] })) : Promise.resolve({ items: [] }), + supportsActionRegistry + ? storyforgeFetch(`/v2/oneliner/action-registry?project_id=${encodeURIComponent(normalizedProjectId)}`).catch(() => ({ items: [] })) + : Promise.resolve({ items: [] }), supportsPlatformAgents ? storyforgeFetch(`/v2/platform-agents?project_id=${encodeURIComponent(normalizedProjectId)}`).catch(() => ({ items: [] })) : Promise.resolve({ items: [] }), + supportsTenantQuota + ? storyforgeFetch(`/v2/tenant/quota?project_id=${encodeURIComponent(normalizedProjectId)}`).catch(() => null) + : Promise.resolve(null), + supportsTenantUsage + ? storyforgeFetch(`/v2/tenant/usage?project_id=${encodeURIComponent(normalizedProjectId)}`).catch(() => null) + : Promise.resolve(null), supportsAdminOps && isSuperAdmin() ? storyforgeFetch("/v2/admin/ops/overview").catch(() => null) : Promise.resolve(null) @@ -1060,10 +1080,13 @@ async function loadAgentControlSurfaces(projectId = "") { appState.onelinerProfile = profile; appState.onelinerSessions = safeArray(sessionsPayload?.items || sessionsPayload); + appState.onelinerActionRegistry = safeArray(actionRegistryPayload?.items || actionRegistryPayload); if (!appState.selectedOnelinerSessionId || !safeArray(appState.onelinerSessions).some((item) => item.id === appState.selectedOnelinerSessionId)) { appState.selectedOnelinerSessionId = safeArray(appState.onelinerSessions)[0]?.id || ""; } appState.platformAgents = safeArray(platformAgentsPayload?.items || platformAgentsPayload); + appState.tenantQuota = tenantQuota; + appState.tenantUsage = tenantUsage; appState.adminOpsOverview = adminOpsOverview; } @@ -2165,6 +2188,120 @@ function renderStorageStatusPanel() { `; } +function renderOneLinerActionRegistryPanel() { + const items = safeArray(appState.onelinerActionRegistry); + if (!items.length) { + return ` +
+
+
+

OneLiner 动作注册表

+
当前后端还没返回动作注册表,先沿用默认动作。
+
+
+

暂未接入

/v2/oneliner/action-registry 可用后,这里会显示动作开关、描述和租户级配置。

+
+ `; + } + const grouped = items.reduce((acc, item) => { + const category = item.category || "custom"; + acc[category] = acc[category] || []; + acc[category].push(item); + return acc; + }, {}); + return ` +
+
+
+

OneLiner 动作注册表

+
把 OneLiner 可执行动作做成租户级注册中心,便于商业化灰度和定制。
+
+ ${escapeHtml(formatNumber(items.length))} 条 +
+
+ ${Object.entries(grouped).map(([category, list]) => ` +
+

${escapeHtml(category)}

+

${escapeHtml(`当前分类下 ${list.length} 条动作。`)}

+
+ ${list.map((item) => ` + + ${escapeHtml(item.label || item.action_key || "action")} + + `).join("")} +
+
+ `).join("")} +
+
+ `; +} + +function renderTenantQuotaPanel() { + const quota = appState.tenantQuota; + const usage = appState.tenantUsage || quota?.usage || {}; + if (!quota && !usage) { + return ` +
+

租户额度与审计

当前后端还没接入 quota / usage。
+

暂未接入

等 live collector 同步 `/v2/tenant/quota` 和 `/v2/tenant/usage` 后,这里会展示本周期预算、动作配额和最近计量记录。

+
+ `; + } + const categories = usage?.categories || {}; + const recentItems = safeArray(usage?.recent_items); + const cards = [ + { label: "预算", value: `${formatNumber((quota?.monthly_budget_cents || 0) / 100)} 元`, sub: `已用 ${formatNumber((usage?.total_cost_cents || 0) / 100)} 元` }, + { label: "分析配额", value: formatNumber(quota?.analysis_quota || 0), sub: `已用 ${formatNumber(categories.analysis?.quantity || 0)}` }, + { label: "文案配额", value: formatNumber(quota?.copy_quota || 0), sub: `已用 ${formatNumber(categories.copy?.quantity || 0)}` }, + { label: "AI 视频配额", value: formatNumber(quota?.ai_video_quota || 0), sub: `已用 ${formatNumber(categories.ai_video?.quantity || 0)}` }, + { label: "实拍剪辑配额", value: formatNumber(quota?.real_cut_quota || 0), sub: `已用 ${formatNumber(categories.real_cut?.quantity || 0)}` }, + { label: "存储上限", value: formatBytes(quota?.storage_limit_bytes || 0), sub: `当前 ${formatBytes(usage?.storage_bytes || 0)}` } + ]; + return ` +
+
+
+

租户额度与审计

+
预算、动作配额和最近计量都按租户 + 项目隔离。
+
+
+ ${escapeHtml(quota?.enabled === false ? "已停用额度保护" : "额度保护开启")} + ${quota?.storage_over_limit ? `存储超限` : ""} + 编辑额度 +
+
+
+ ${cards.map((item) => ` +
+ ${escapeHtml(item.label)} + ${escapeHtml(item.value)} + ${escapeHtml(item.sub)} +
+ `).join("")} +
+
+
+

最近计量记录

+

动作执行后会写入租户级 ledger,便于后面做商业化配额、成本和审计。

+
+ ${recentItems.map((item) => ` +
+

${escapeHtml(item.category || "usage")}

+

${escapeHtml(formatDateTime(item.created_at))}

+
+ 次数 ${escapeHtml(formatNumber(item.quantity || 0))} + 成本 ${(item.cost_cents || 0) / 100} 元 + ${item.reference_type ? `${escapeHtml(item.reference_type)}` : ""} + ${item.reference_id ? `${escapeHtml(brief(item.reference_id, 14))}` : ""} +
+
+ `).join("") || `

还没有计量记录

等 OneLiner 或生产动作实际执行后,这里会累积本周期的 usage ledger。

`} +
+
+ `; +} + function renderPlatformAgentPanel() { const items = safeArray(appState.platformAgents); if (!items.length) { @@ -2242,6 +2379,7 @@ function renderAdminOpsPanel() { } const incidents = safeArray(overview.incidents).slice(0, 6); const audits = safeArray(overview.recent_audits).slice(0, 5); + const fixRuns = safeArray(overview.recent_fix_runs).slice(0, 5); return `
@@ -2254,6 +2392,7 @@ function renderAdminOpsPanel() { 待处理 ${escapeHtml(formatNumber(overview.open_incident_count || 0))} 错误 ${escapeHtml(formatNumber(overview.severity_counts?.error || 0))} ${escapeHtml(formatNumber(overview.failed_job_count))} 个失败任务 + 修复计划 ${escapeHtml(formatNumber(overview.fix_run_count || 0))} 重新扫描
@@ -2270,11 +2409,30 @@ function renderAdminOpsPanel() { ${item.source_type === "job" ? actionTag("看任务详情", "open-job-detail", `data-job-id="${escapeHtml(item.source_id || "")}"`) : ""} ${item.source_type === "integration" ? actionTag("去自动流程", "goto-automation") : ""} ${item.tenant_project_id ? actionTag("去生产中心", "goto-production") : ""} + 生成修复计划 审计处理 `).join("") || `

当前没有待处理事件

最近主链比较稳定,继续观察即可。

`} +
+
+

最近修复计划

+

这里代表运维 Agent 输出的修复方案,必须经过审计 Agent 放行才算闭环。

+
+ ${fixRuns.map((item) => ` +
+

${escapeHtml(item.plan?.summary || item.id || "修复计划")}

+

${escapeHtml(item.plan?.steps?.[0] || "待补充修复步骤")}

+
+ ${escapeHtml(item.plan_scope || "plan")} + ${escapeHtml(item.audit_status || "pending")} + ${item.incident_id ? `事件 ${escapeHtml(brief(item.incident_id, 10))}` : ""} + 审计放行 +
+
+ `).join("") || `

还没有修复计划

当运维 Agent 针对故障事件生成 repair plan 后,这里会自动出现。

`} +

最近审计记录

@@ -3116,6 +3274,12 @@ function renderAutomationScreen() {
${escapeHtml(overview.subtitle)}
+
+ ${renderTenantQuotaPanel()} +
+
+ ${renderOneLinerActionRegistryPanel()} +
${renderAdminOpsPanel()} ` ); @@ -3195,9 +3359,15 @@ function renderPlaybookScreen() { +
+ ${renderOneLinerActionRegistryPanel()} +
${renderPlatformAgentPanel()}
+
+ ${renderTenantQuotaPanel()} +
@@ -4093,6 +4263,16 @@ async function openPlatformAgentDetailAction(platform) { ]); const memories = safeArray(memoriesPayload?.items || memoriesPayload).slice(0, 6); const skills = safeArray(skillsPayload?.items || skillsPayload).slice(0, 6); + const skillVersionEntries = await Promise.all( + skills.map(async (item) => { + if (!backendSupports("/v2/platform-agents/{platform}/skills/{skill_id}/versions")) { + return [item.id, []]; + } + const payload = await storyforgeFetch(`/v2/platform-agents/${encodeURIComponent(normalizedPlatform)}/skills/${encodeURIComponent(item.id)}/versions?project_id=${encodeURIComponent(project.id)}`).catch(() => ({ items: [] })); + return [item.id, safeArray(payload?.items || payload).slice(0, 3)]; + }) + ); + const skillVersions = Object.fromEntries(skillVersionEntries); openActionModal({ title: `${platformLabel(normalizedPlatform)} Agent 详情`, description: "查看当前平台 Agent 最近沉淀的记忆、技能和就绪度。", @@ -4138,6 +4318,16 @@ async function openPlatformAgentDetailAction(platform) { 验收通过 标记待优化
+ ${safeArray(skillVersions[item.id]).length ? ` +
+ ${safeArray(skillVersions[item.id]).map((version, index) => ` + + ${escapeHtml(`v${formatNumber(version.version_no || 0)} · ${version.snapshot_reason || "snapshot"}`)} + + `).join("")} +
+ ` : ""}
`).join("") || `

还没有平台技能

等子 Agent 跑出稳定结果后,把方法固化成技能。

`}
@@ -4202,6 +4392,121 @@ function openPlatformSkillReviewAction(platform, skillId, accepted) { }); } +function openPlatformSkillRollbackAction(platform, skillId, versionId) { + const project = requireSelectedProject(); + const normalizedPlatform = normalizePlatformValue(platform, getPreferredPlatform()); + openActionModal({ + title: "回滚平台技能", + description: "把当前技能回退到旧版本,并保留新的回滚快照,方便继续追踪。", + submitLabel: "确认回滚", + fields: [ + { + name: "summary", + label: "回滚说明", + type: "html", + html: ` +
+
+

${escapeHtml(platformLabel(normalizedPlatform))} 技能回滚

+

${escapeHtml(`将 skill ${skillId} 回滚到版本 ${versionId}。`)}

+
+
+ ` + } + ], + onSubmit: async () => { + const payload = await storyforgeFetch(`/v2/platform-agents/${encodeURIComponent(normalizedPlatform)}/skills/${encodeURIComponent(skillId)}/rollback`, { + method: "POST", + body: { + project_id: project.id, + version_id: versionId + } + }); + rememberAction("技能已回滚", `已回滚到版本 ${payload.rollback_from_version?.version_no || "指定版本"}。`, "green", payload); + await loadAgentControlSurfaces(project.id); + renderAll(); + } + }); +} + +function openActionRegistryEditAction(actionKey) { + const project = requireSelectedProject(); + const actionDef = safeArray(appState.onelinerActionRegistry).find((item) => item.action_key === actionKey) || null; + if (!actionDef) { + alert("没有找到这条动作定义。"); + return; + } + openActionModal({ + title: "编辑 OneLiner 动作", + description: "在租户范围内控制动作名称、说明、开关和少量配置。", + submitLabel: "保存动作", + fields: [ + { name: "label", label: "动作名称", value: actionDef.label || "" }, + { name: "description", label: "动作说明", type: "textarea", rows: 4, value: actionDef.description || "" }, + { name: "status", label: "状态", type: "select", value: actionDef.status || "enabled", options: [{ value: "enabled", label: "启用" }, { value: "disabled", label: "禁用" }] }, + { name: "configJson", label: "配置 JSON", type: "textarea", rows: 5, value: JSON.stringify(actionDef.config || {}, null, 2) } + ], + onSubmit: async (values) => { + let config = {}; + if (String(values.configJson || "").trim()) { + config = JSON.parse(values.configJson); + } + const saved = await storyforgeFetch(`/v2/oneliner/action-registry/${encodeURIComponent(actionKey)}?project_id=${encodeURIComponent(project.id)}`, { + method: "PUT", + body: { + label: values.label || "", + description: values.description || "", + category: actionDef.category || "custom", + status: values.status || "enabled", + config + } + }); + rememberAction("动作已更新", `OneLiner 动作「${saved.label || saved.action_key}」已保存。`, "green", saved); + await loadAgentControlSurfaces(project.id); + renderAll(); + } + }); +} + +function openTenantQuotaAction() { + const project = requireSelectedProject(); + const quota = appState.tenantQuota || {}; + openActionModal({ + title: "编辑租户额度", + description: "当前额度按租户 + 项目隔离,用于商业化预算、动作配额和存储保护。", + submitLabel: "保存额度", + fields: [ + { name: "enabled", label: "启用额度保护", type: "checkbox", value: quota.enabled !== false }, + { name: "monthlyBudgetCents", label: "月预算(分)", type: "number", value: quota.monthly_budget_cents || 0, min: 0 }, + { name: "storageLimitBytes", label: "存储上限(字节)", type: "number", value: quota.storage_limit_bytes || 0, min: 0 }, + { name: "analysisQuota", label: "分析配额", type: "number", value: quota.analysis_quota || 0, min: 0 }, + { name: "copyQuota", label: "文案配额", type: "number", value: quota.copy_quota || 0, min: 0 }, + { name: "aiVideoQuota", label: "AI 视频配额", type: "number", value: quota.ai_video_quota || 0, min: 0 }, + { name: "realCutQuota", label: "实拍剪辑配额", type: "number", value: quota.real_cut_quota || 0, min: 0 }, + { name: "recorderQuota", label: "录制配额", type: "number", value: quota.recorder_quota || 0, min: 0 } + ], + onSubmit: async (values) => { + const saved = await storyforgeFetch(`/v2/tenant/quota?project_id=${encodeURIComponent(project.id)}`, { + method: "PUT", + body: { + enabled: Boolean(values.enabled), + monthly_budget_cents: Number(values.monthlyBudgetCents || 0), + storage_limit_bytes: Number(values.storageLimitBytes || 0), + analysis_quota: Number(values.analysisQuota || 0), + copy_quota: Number(values.copyQuota || 0), + ai_video_quota: Number(values.aiVideoQuota || 0), + real_cut_quota: Number(values.realCutQuota || 0), + recorder_quota: Number(values.recorderQuota || 0), + config: quota.config || {} + } + }); + rememberAction("租户额度已更新", "当前项目的预算与配额已经保存。", "green", saved); + await loadAgentControlSurfaces(project.id); + renderAll(); + } + }); +} + function openCreateAssistantAction() { const project = requireSelectedProject(); const kbOptions = getKnowledgeBaseOptions(project.id); @@ -4526,6 +4831,86 @@ function openAdminIncidentReviewAction(incidentId) { }); } +function openAdminRepairPlanAction(incidentId) { + if (!isSuperAdmin()) { + alert("只有平台管理者才能生成修复计划。"); + return; + } + const incident = safeArray(appState.adminOpsOverview?.incidents).find((item) => item.id === incidentId); + if (!incident) { + alert("没有找到这条故障事件。"); + return; + } + openActionModal({ + title: "生成修复计划", + description: "让运维 Agent 先生成一版 repair plan,再由审计 Agent 决定是否放行。", + submitLabel: "生成计划", + fields: [ + { name: "scope", label: "计划范围", type: "select", value: "plan", options: [{ value: "plan", label: "标准计划" }, { value: "hotfix", label: "热修建议" }, { value: "watch", label: "仅观察" }] }, + { name: "notes", label: "附加说明", type: "textarea", rows: 4, placeholder: "例如:优先验证 cutvideo 上传链,不要动核心代码" } + ], + onSubmit: async (values) => { + const saved = await storyforgeFetch(`/v2/admin/ops/incidents/${encodeURIComponent(incidentId)}/repair-plan`, { + method: "POST", + body: { + incident_id: incidentId, + scope: values.scope || "plan", + notes: values.notes || "" + } + }); + rememberAction("修复计划已生成", `已为事件「${incident.title}」生成 repair plan。`, "green", saved); + await loadAgentControlSurfaces(getOneLinerProjectId()); + renderAll(); + } + }); +} + +function openAdminFixRunAuditAction(runId) { + if (!isSuperAdmin()) { + alert("只有平台管理者才能审计修复计划。"); + return; + } + const run = safeArray(appState.adminOpsOverview?.recent_fix_runs).find((item) => item.id === runId); + if (!run) { + alert("没有找到这条修复计划。"); + return; + } + openActionModal({ + title: "审计修复计划", + description: "审计 Agent 只做放行、驳回或继续观察,不会直接让用户一句话改核心代码。", + submitLabel: "保存审计", + fields: [ + { + name: "summary", + label: "计划摘要", + type: "html", + html: ` +
+
+

${escapeHtml(run.plan?.summary || run.id)}

+

${escapeHtml((run.plan?.steps || []).join(";") || "暂无步骤")}

+
+
+ ` + }, + { name: "reviewStatus", label: "审计状态", type: "select", value: run.audit_status || "approved", options: [{ value: "approved", label: "通过" }, { value: "watching", label: "继续观察" }, { value: "rejected", label: "驳回" }] }, + { name: "reviewNotes", label: "审计备注", type: "textarea", rows: 4, value: run.review_notes || "", placeholder: "写清楚为什么通过、驳回或继续观察" } + ], + onSubmit: async (values) => { + const saved = await storyforgeFetch(`/v2/admin/ops/fix-runs/${encodeURIComponent(runId)}/audit`, { + method: "POST", + body: { + review_status: values.reviewStatus || "approved", + review_notes: values.reviewNotes || "" + } + }); + rememberAction("修复计划已审计", `修复计划 ${runId} 已更新为 ${saved.audit_status || values.reviewStatus}。`, "green", saved); + await loadAgentControlSurfaces(getOneLinerProjectId()); + renderAll(); + } + }); +} + function openJobDetailAction(jobId) { if (!jobId) return; setBusy(true, "正在加载任务详情..."); @@ -5074,6 +5459,14 @@ document.addEventListener("click", async (event) => { await openPlatformAgentDetailAction(action.dataset.platform || ""); return; } + if (name === "open-action-registry-edit") { + openActionRegistryEditAction(action.dataset.actionKey || ""); + return; + } + if (name === "open-tenant-quota") { + openTenantQuotaAction(); + return; + } if (name === "run-oneliner-action") { setBusy(true, "OneLiner 正在执行动作..."); try { @@ -5101,6 +5494,10 @@ document.addEventListener("click", async (event) => { openPlatformSkillReviewAction(action.dataset.platform || "", action.dataset.skillId || "", action.dataset.accepted !== "false"); return; } + if (name === "rollback-platform-skill") { + openPlatformSkillRollbackAction(action.dataset.platform || "", action.dataset.skillId || "", action.dataset.versionId || ""); + return; + } if (name === "analyze-selected-account") { openAnalyzeSelectedAccountAction(); return; @@ -5180,6 +5577,14 @@ document.addEventListener("click", async (event) => { openAdminIncidentReviewAction(action.dataset.incidentId || ""); return; } + if (name === "open-admin-repair-plan") { + openAdminRepairPlanAction(action.dataset.incidentId || ""); + return; + } + if (name === "open-admin-fix-run-audit") { + openAdminFixRunAuditAction(action.dataset.runId || ""); + return; + } if (name === "job-to-ai-video") { const jobId = action.dataset.jobId || ""; const detail = appState.lastJobDetail?.job?.id === jobId ? appState.lastJobDetail.job : null;