diff --git a/collector-service/app/main.py b/collector-service/app/main.py index f1272cf..537f6ec 100644 --- a/collector-service/app/main.py +++ b/collector-service/app/main.py @@ -24,6 +24,7 @@ from pydantic import BaseModel, Field from .database import Database, utc_now from .douyin_features import register_douyin_routes from .integrations import AsrHttpClient, CutVideoClient, HuobaoDramaClient, N8NClient +from .oneliner_features import register_oneliner_routes from .openai_compat import OpenAICompatClient BASE_DIR = Path(__file__).resolve().parents[2] @@ -60,6 +61,8 @@ CUTVIDEO_UPLOAD_TIMEOUT_SEC = int(os.getenv("CUTVIDEO_UPLOAD_TIMEOUT_SEC", "1800 HUOBAO_POLL_INTERVAL_SEC = int(os.getenv("HUOBAO_POLL_INTERVAL_SEC", "10")) HUOBAO_MAX_WAIT_SEC = int(os.getenv("HUOBAO_MAX_WAIT_SEC", "900")) +DOMESTIC_PLATFORMS = {"douyin", "xiaohongshu", "bilibili", "kuaishou", "wechat_video"} + for path in (DATA_DIR, DOWNLOADS_DIR, JOBS_DIR, MODELS_DIR): path.mkdir(parents=True, exist_ok=True) @@ -3319,3 +3322,4 @@ def publish_app_update(request: PublishAppUpdateRequest, admin: dict[str, Any] = register_douyin_routes(app, sys.modules[__name__]) +register_oneliner_routes(app, sys.modules[__name__]) diff --git a/collector-service/app/oneliner_features.py b/collector-service/app/oneliner_features.py new file mode 100644 index 0000000..759440c --- /dev/null +++ b/collector-service/app/oneliner_features.py @@ -0,0 +1,1406 @@ +from __future__ import annotations + +import json +from typing import Any + +from fastapi import Depends, HTTPException, Query +from pydantic import BaseModel, Field + + +class OneLinerProfileRequest(BaseModel): + project_id: str = "" + assistant_id: str = "" + display_name: str = "OneLiner" + long_term_goal: str = "" + notes: str = "" + default_platform: str = "" + config: dict[str, Any] = Field(default_factory=dict) + + +class OneLinerSessionCreateRequest(BaseModel): + project_id: str = "" + title: str = "" + preferred_platform: str = "" + initial_message: str = "" + + +class OneLinerMessageRequest(BaseModel): + content: str + project_id: str = "" + platform: str = "" + target_account_id: str = "" + remember_preference: bool = False + + +class PlatformAgentProfileRequest(BaseModel): + project_id: str = "" + assistant_id: str = "" + name: str = "" + mission: str = "" + notes: str = "" + status: str = "active" + config: dict[str, Any] = Field(default_factory=dict) + + +class AgentMemoryUpsertRequest(BaseModel): + project_id: str = "" + subject_type: str = "project" + subject_id: str = "" + memory_key: str + title: str = "" + summary: str + details: dict[str, Any] = Field(default_factory=dict) + confidence: float = Field(default=0.7, ge=0.0, le=1.0) + + +class AgentSkillUpsertRequest(BaseModel): + project_id: str = "" + skill_key: str + name: str + status: str = "draft" + method: dict[str, Any] = Field(default_factory=dict) + test_spec: dict[str, Any] = Field(default_factory=dict) + last_result: dict[str, Any] = Field(default_factory=dict) + success_count: int = Field(default=0, ge=0) + failure_count: int = Field(default=0, ge=0) + last_score: float = 0.0 + + +class AdminIncidentReviewRequest(BaseModel): + status: str = "reviewed" + review_notes: str = "" + + +INTENT_ACTIONS: dict[str, list[dict[str, Any]]] = { + "create_project": [{"key": "goto-intake", "label": "去我的项目", "kind": "navigate"}], + "create_assistant": [{"key": "open-create-assistant", "label": "创建 Agent", "kind": "ui_action"}], + "import_homepage": [{"key": "open-import-homepage", "label": "导入主页", "kind": "ui_action"}], + "track_account": [{"key": "open-track-selected-account", "label": "跟踪当前账号", "kind": "ui_action"}], + "analyze_account": [{"key": "analyze-selected-account", "label": "分析当前账号", "kind": "ui_action"}], + "analyze_top_videos": [{"key": "analyze-top-videos", "label": "分析高分作品", "kind": "ui_action"}], + "generate_copy": [{"key": "open-generate-copy", "label": "生成文案", "kind": "ui_action"}], + "ai_video": [{"key": "open-ai-video", "label": "做 AI 视频", "kind": "ui_action"}], + "real_cut": [{"key": "open-real-cut", "label": "做实拍剪辑", "kind": "ui_action"}], + "review": [{"key": "goto-review", "label": "去发布与复盘", "kind": "navigate"}], + "live_recorder": [{"key": "open-live-recorder", "label": "打开录制控制", "kind": "ui_action"}], + "storage_status": [{"key": "goto-production", "label": "查看生产与存储", "kind": "navigate"}], + "ops_admin": [{"key": "goto-automation", "label": "去自动流程", "kind": "navigate"}], +} + +INTENT_LABELS = { + "create_project": "创建项目", + "create_assistant": "创建 Agent", + "import_homepage": "导入主页", + "track_account": "跟踪账号", + "analyze_account": "分析账号", + "analyze_top_videos": "分析高分作品", + "generate_copy": "生成文案", + "ai_video": "生成 AI 视频", + "real_cut": "实拍剪辑", + "review": "发布复盘", + "live_recorder": "直播录制", + "storage_status": "查看存储", + "ops_admin": "运维巡检", + "custom": "自定义任务", +} + + +def register_oneliner_routes(app: Any, legacy: Any) -> None: + def now() -> str: + return legacy.utc_now() + + def make_id(prefix: str) -> str: + return legacy.make_id(prefix) + + def _parse_json(raw: str | None, fallback: Any) -> Any: + cleaned = str(raw or "").strip() + if not cleaned: + return fallback + try: + return json.loads(cleaned) + except json.JSONDecodeError: + return fallback + + def _dump(value: Any) -> str: + return json.dumps(value or {}, ensure_ascii=False) + + def ensure_schema() -> None: + schema = """ + CREATE TABLE IF NOT EXISTS oneliner_profiles ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + assistant_id TEXT NOT NULL DEFAULT '', + display_name TEXT NOT NULL DEFAULT 'OneLiner', + long_term_goal TEXT NOT NULL DEFAULT '', + notes TEXT NOT NULL DEFAULT '', + default_platform TEXT NOT NULL DEFAULT '', + config_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, project_id), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS oneliner_sessions ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + profile_id TEXT, + title TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'active', + preferred_platform TEXT NOT NULL DEFAULT '', + last_platform TEXT NOT NULL DEFAULT '', + last_intent_key TEXT NOT NULL DEFAULT '', + last_message_at TEXT NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(profile_id) REFERENCES oneliner_profiles(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS oneliner_messages ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + user_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL DEFAULT '', + plan_json TEXT NOT NULL DEFAULT '{}', + result_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY(session_id) REFERENCES oneliner_sessions(id) ON DELETE CASCADE, + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS platform_agent_profiles ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + platform TEXT NOT NULL, + assistant_id TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + mission TEXT NOT NULL DEFAULT '', + notes TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'active', + config_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, project_id, platform), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS agent_memories ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + agent_scope TEXT NOT NULL, + platform TEXT NOT NULL DEFAULT '', + subject_type TEXT NOT NULL DEFAULT 'project', + subject_id TEXT NOT NULL DEFAULT '', + memory_key TEXT NOT NULL, + title TEXT NOT NULL DEFAULT '', + summary TEXT NOT NULL DEFAULT '', + details_json TEXT NOT NULL DEFAULT '{}', + confidence REAL NOT NULL DEFAULT 0.7, + last_validated_at TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, project_id, agent_scope, platform, subject_type, subject_id, memory_key), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS agent_skills ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + agent_scope TEXT NOT NULL, + platform TEXT NOT NULL DEFAULT '', + parent_skill_id TEXT NOT NULL DEFAULT '', + skill_key TEXT NOT NULL, + name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'draft', + method_json TEXT NOT NULL DEFAULT '{}', + test_spec_json TEXT NOT NULL DEFAULT '{}', + last_result_json TEXT NOT NULL DEFAULT '{}', + success_count INTEGER NOT NULL DEFAULT 0, + failure_count INTEGER NOT NULL DEFAULT 0, + last_score REAL NOT NULL DEFAULT 0, + last_validated_at TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(user_id, project_id, agent_scope, platform, skill_key), + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS admin_ops_incidents ( + id TEXT PRIMARY KEY, + tenant_user_id TEXT NOT NULL DEFAULT '', + tenant_project_id TEXT NOT NULL DEFAULT '', + source_type TEXT NOT NULL, + source_id TEXT NOT NULL DEFAULT '', + severity TEXT NOT NULL DEFAULT 'warn', + title TEXT NOT NULL, + summary TEXT NOT NULL DEFAULT '', + payload_json TEXT NOT NULL DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'open', + assigned_to TEXT NOT NULL DEFAULT '', + reviewed_by TEXT NOT NULL DEFAULT '', + review_notes TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + UNIQUE(source_type, source_id, title), + FOREIGN KEY(tenant_user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(tenant_project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(assigned_to) REFERENCES accounts(id) ON DELETE SET NULL, + FOREIGN KEY(reviewed_by) REFERENCES accounts(id) ON DELETE SET NULL + ); + """ + with legacy.db.session() as conn: + conn.executescript(schema) + + ensure_schema() + + @app.on_event("startup") + def _startup_oneliner_schema() -> None: + ensure_schema() + + def _resolve_project(account: dict[str, Any], project_id: str | None) -> dict[str, Any]: + return legacy.resolve_target_project(account["id"], project_id or None, username=account["username"]) + + def _resolve_assistant(account: dict[str, Any], assistant_id: str | None, project_id: str = "") -> dict[str, Any] | None: + return legacy.resolve_target_assistant(account["id"], assistant_id or None, project_id) + + def _safe_platform(platform_value: str | None, fallback: str = "douyin") -> str: + return legacy.ensure_domestic_platform(platform_value or fallback, allow_blank=not fallback) or fallback + + def _fetch_profile_row(account: dict[str, Any], project_id: str = "") -> dict[str, Any] | None: + return legacy.db.fetch_one( + "SELECT * FROM oneliner_profiles WHERE user_id = ? AND project_id = ?", + (account["id"], project_id), + ) + + def _profile_payload(row: dict[str, Any], *, account: dict[str, Any] | None = None) -> dict[str, Any]: + assistant = None + if row.get("assistant_id"): + assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],)) + if assistant_row and (not account or assistant_row.get("user_id") == account["id"]): + assistant = legacy.assistant_payload(assistant_row) + return { + "id": row["id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", ""), + "assistant_id": row.get("assistant_id", ""), + "display_name": row.get("display_name", "OneLiner"), + "long_term_goal": row.get("long_term_goal", ""), + "notes": row.get("notes", ""), + "default_platform": row.get("default_platform", ""), + "config": _parse_json(row.get("config_json"), {}), + "assistant": assistant, + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + def _ensure_oneliner_profile(account: dict[str, Any], project_id: str = "") -> dict[str, Any]: + row = _fetch_profile_row(account, project_id) + if row: + return row + assistant_id = "" + if project_id: + assistant_row = legacy.db.fetch_one( + "SELECT * FROM assistants WHERE user_id = ? AND (project_id = ? OR project_id = '') ORDER BY created_at ASC LIMIT 1", + (account["id"], project_id), + ) + else: + assistant_row = legacy.db.fetch_one( + "SELECT * FROM assistants WHERE user_id = ? ORDER BY created_at ASC LIMIT 1", + (account["id"],), + ) + if assistant_row: + assistant_id = assistant_row["id"] + profile_id = make_id("oneliner") + created_at = now() + default_platform = "douyin" + legacy.db.execute( + """ + INSERT INTO oneliner_profiles ( + id, user_id, project_id, assistant_id, display_name, long_term_goal, notes, + default_platform, config_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + profile_id, + account["id"], + project_id, + assistant_id, + "OneLiner", + "", + "", + default_platform, + _dump({"chat_only_for_unreleased_ui": True}), + created_at, + created_at, + ), + ) + return legacy.db.fetch_one("SELECT * FROM oneliner_profiles WHERE id = ?", (profile_id,)) + + def _list_platform_profiles(account: dict[str, Any], project_id: str = "") -> list[dict[str, Any]]: + rows = legacy.db.fetch_all( + "SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? ORDER BY platform ASC", + (account["id"], project_id), + ) + mapping = {row["platform"]: row for row in rows} + results: list[dict[str, Any]] = [] + for platform in sorted(legacy.DOMESTIC_PLATFORMS): + row = mapping.get(platform) + payload = _platform_agent_payload(account, row, platform=platform, project_id=project_id) + results.append(payload) + return results + + def _platform_agent_payload( + account: dict[str, Any], + row: dict[str, Any] | None, + *, + platform: str, + project_id: str = "", + ) -> dict[str, Any]: + assistant = None + if row and row.get("assistant_id"): + assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],)) + if assistant_row and assistant_row.get("user_id") == account["id"]: + assistant = legacy.assistant_payload(assistant_row) + memory_count = legacy.db.fetch_one( + """ + SELECT COUNT(*) AS count FROM agent_memories + WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ? + """, + (account["id"], project_id, platform), + )["count"] + skill_count = legacy.db.fetch_one( + """ + SELECT COUNT(*) AS count FROM agent_skills + WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ? + """, + (account["id"], project_id, platform), + )["count"] + return { + "id": row["id"] if row else "", + "user_id": account["id"], + "project_id": project_id, + "platform": platform, + "platform_label": legacy.platform_label(platform), + "assistant_id": row.get("assistant_id", "") if row else "", + "name": row.get("name", f"{legacy.platform_label(platform)} Agent") if row else f"{legacy.platform_label(platform)} Agent", + "mission": row.get("mission", "") if row else "", + "notes": row.get("notes", "") if row else "", + "status": row.get("status", "draft") if row else "draft", + "config": _parse_json((row or {}).get("config_json"), {}), + "memory_count": memory_count, + "skill_count": skill_count, + "assistant": assistant, + "created_at": (row or {}).get("created_at", ""), + "updated_at": (row or {}).get("updated_at", ""), + } + + def _memory_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", ""), + "agent_scope": row.get("agent_scope", ""), + "platform": row.get("platform", ""), + "platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "", + "subject_type": row.get("subject_type", ""), + "subject_id": row.get("subject_id", ""), + "memory_key": row.get("memory_key", ""), + "title": row.get("title", ""), + "summary": row.get("summary", ""), + "details": _parse_json(row.get("details_json"), {}), + "confidence": float(row.get("confidence") or 0), + "last_validated_at": row.get("last_validated_at", ""), + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + def _skill_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", ""), + "agent_scope": row.get("agent_scope", ""), + "platform": row.get("platform", ""), + "platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "", + "parent_skill_id": row.get("parent_skill_id", ""), + "skill_key": row.get("skill_key", ""), + "name": row.get("name", ""), + "status": row.get("status", "draft"), + "method": _parse_json(row.get("method_json"), {}), + "test_spec": _parse_json(row.get("test_spec_json"), {}), + "last_result": _parse_json(row.get("last_result_json"), {}), + "success_count": int(row.get("success_count") or 0), + "failure_count": int(row.get("failure_count") or 0), + "last_score": float(row.get("last_score") or 0), + "last_validated_at": row.get("last_validated_at", ""), + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + def _session_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "user_id": row["user_id"], + "project_id": row.get("project_id", ""), + "profile_id": row.get("profile_id", ""), + "title": row.get("title", ""), + "status": row.get("status", "active"), + "preferred_platform": row.get("preferred_platform", ""), + "last_platform": row.get("last_platform", ""), + "last_intent_key": row.get("last_intent_key", ""), + "last_message_at": row.get("last_message_at", ""), + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + def _message_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "session_id": row["session_id"], + "user_id": row["user_id"], + "role": row.get("role", "user"), + "content": row.get("content", ""), + "plan": _parse_json(row.get("plan_json"), {}), + "result": _parse_json(row.get("result_json"), {}), + "created_at": row["created_at"], + } + + def _incident_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "tenant_user_id": row.get("tenant_user_id", ""), + "tenant_project_id": row.get("tenant_project_id", ""), + "source_type": row.get("source_type", ""), + "source_id": row.get("source_id", ""), + "severity": row.get("severity", "warn"), + "title": row.get("title", ""), + "summary": row.get("summary", ""), + "payload": _parse_json(row.get("payload_json"), {}), + "status": row.get("status", "open"), + "assigned_to": row.get("assigned_to", ""), + "reviewed_by": row.get("reviewed_by", ""), + "review_notes": row.get("review_notes", ""), + "created_at": row["created_at"], + "updated_at": row["updated_at"], + } + + def _load_owned_session(session_id: str, account: dict[str, Any]) -> dict[str, Any]: + row = legacy.db.fetch_one( + "SELECT * FROM oneliner_sessions WHERE id = ? AND user_id = ?", + (session_id, account["id"]), + ) + if not row: + raise HTTPException(status_code=404, detail="OneLiner session not found") + return row + + def _deterministic_intent(message: str, platform_hint: str, account: dict[str, Any]) -> dict[str, Any]: + text = message.strip() + lowered = text.lower() + platform = normalize_platform_from_text(text) or _safe_platform(platform_hint or "", fallback="") + intent_key = "custom" + confidence = 0.45 + summary = "先理解目标,再把任务路由到合适的平台 Agent 或固定能力。" + if any(keyword in text for keyword in ("新建项目", "创建项目", "建项目")): + intent_key = "create_project" + confidence = 0.96 + summary = "这是一个新项目启动诉求,优先进入项目创建流。" + elif any(keyword in lowered for keyword in ("create agent",)) or any(keyword in text for keyword in ("创建agent", "创建 Agent", "新建agent", "新建 Agent")): + intent_key = "create_assistant" + confidence = 0.96 + summary = "这是定义新 Agent 的需求,适合直接进入 Agent 创建流。" + elif any(keyword in text for keyword in ("导入主页", "主页链接", "账号主页", "主页账号")): + intent_key = "import_homepage" + confidence = 0.9 + summary = "这是账号主页导入诉求,适合进入内容源接入。" + elif any(keyword in text for keyword in ("跟踪", "日报", "更新提醒", "持续跟")): + intent_key = "track_account" + confidence = 0.9 + summary = "这是持续跟踪类任务,适合交给平台 Agent 和跟踪摘要链。" + elif any(keyword in text for keyword in ("高分", "爆款", "优质作品", "高表现")): + intent_key = "analyze_top_videos" + confidence = 0.88 + summary = "这是高表现内容拆解诉求,优先分析高分作品。" + elif any(keyword in text for keyword in ("对标", "分析账号", "调研", "拆账号")): + intent_key = "analyze_account" + confidence = 0.86 + summary = "这是账号层面的调研任务,优先交给对应平台 Agent。" + elif any(keyword in text for keyword in ("文案", "脚本", "口播", "改写")): + intent_key = "generate_copy" + confidence = 0.88 + summary = "这是文案/脚本生成任务,适合走 Agent 生成链。" + elif any(keyword in text for keyword in ("AI视频", "AI 视频", "生成视频")): + intent_key = "ai_video" + confidence = 0.9 + summary = "这是 AI 视频生产任务,适合走 AI 视频链。" + elif any(keyword in text for keyword in ("实拍", "剪辑", "混剪")): + intent_key = "real_cut" + confidence = 0.9 + summary = "这是实拍剪辑任务,适合走 cutvideo 链。" + elif any(keyword in text for keyword in ("直播", "录制", "开录")): + intent_key = "live_recorder" + confidence = 0.9 + summary = "这是直播录制任务,适合走 NAS 录制能力。" + elif any(keyword in text for keyword in ("复盘", "发布总结", "回看数据")): + intent_key = "review" + confidence = 0.84 + summary = "这是发布复盘任务,适合进入复盘工作台。" + elif any(keyword in text for keyword in ("空间", "缓存", "存储", "NAS")): + intent_key = "storage_status" + confidence = 0.82 + summary = "这是存储状态问题,适合查看租户存储面板。" + elif account.get("role") == "super_admin" and any(keyword in text for keyword in ("报错", "日志", "故障", "运维", "修复")): + intent_key = "ops_admin" + confidence = 0.84 + summary = "这是平台级运维诉求,只能交给管理员运维/审计能力。" + return { + "intent_key": intent_key, + "platform": platform or "", + "confidence": confidence, + "summary": summary, + "reasoning_mode": "deterministic-first", + } + + def normalize_platform_from_text(text: str) -> str: + for key, value in legacy.PLATFORM_ALIASES.items(): + if key and key in text.lower(): + if value in legacy.DOMESTIC_PLATFORMS: + return value + for key, value in legacy.PLATFORM_ALIASES.items(): + if key and key in text: + if value in legacy.DOMESTIC_PLATFORMS: + return value + return "" + + async def _model_refine_intent( + account: dict[str, Any], + *, + project_id: str, + message: str, + platform_hint: str, + ) -> dict[str, Any]: + profile = legacy.model_profile_for_account(account["id"], None) + if not profile: + raise HTTPException(status_code=503, detail="No model profile available") + system_prompt = ( + "你是 StoryForge 的 OneLiner 总控主Agent,只负责把用户目标分类成安全的系统动作。" + "必须输出 JSON,不要输出 Markdown。" + ) + user_prompt = ( + f"用户角色:{account.get('role','user')}\n" + f"项目:{project_id or '默认项目'}\n" + f"平台提示:{platform_hint or '未指定'}\n" + f"用户原话:{message}\n\n" + "请输出 JSON:" + "{" + '"intent_key":"","platform":"","confidence":0.0,' + '"summary":"","needs_oneliner_only":false,' + '"remember_preference":false' + "}\n" + "intent_key 只能取:create_project, create_assistant, import_homepage, track_account, analyze_account, analyze_top_videos, generate_copy, ai_video, real_cut, review, live_recorder, storage_status, ops_admin, custom。" + "如果前端 UI 还没有明确产品化,needs_oneliner_only 返回 true。" + ) + raw = await legacy.call_model(profile, system_prompt, user_prompt, temperature=0.1) + parsed = legacy.parse_json_object(raw) + if not parsed: + raise HTTPException(status_code=502, detail="OneLiner planner returned empty result") + return { + "intent_key": str(parsed.get("intent_key") or "custom").strip() or "custom", + "platform": normalize_platform_from_text(str(parsed.get("platform") or "")) or _safe_platform(platform_hint or "", fallback=""), + "confidence": float(parsed.get("confidence") or 0), + "summary": str(parsed.get("summary") or "").strip() or "已按模型判断用户目标。", + "needs_oneliner_only": bool(parsed.get("needs_oneliner_only")), + "remember_preference": bool(parsed.get("remember_preference")), + "reasoning_mode": "model-refine", + } + + async def _plan_oneliner_request( + account: dict[str, Any], + *, + project_id: str, + message: str, + platform_hint: str, + ) -> dict[str, Any]: + plan = _deterministic_intent(message, platform_hint, account) + if plan["confidence"] < 0.82: + try: + refined = await _model_refine_intent(account, project_id=project_id, message=message, platform_hint=platform_hint) + if refined.get("confidence", 0) >= plan.get("confidence", 0): + plan = {**plan, **refined} + except Exception: + pass + intent_key = plan.get("intent_key") or "custom" + actions = INTENT_ACTIONS.get(intent_key, []) + if intent_key == "ops_admin" and account.get("role") != "super_admin": + actions = [] + plan["summary"] = "这是平台级运维诉求,但当前账号没有管理员权限。" + plan["needs_oneliner_only"] = True + ui_supported = bool(actions) + if intent_key == "custom": + plan["needs_oneliner_only"] = True + plan["ui_supported"] = ui_supported + plan["delivery_mode"] = "ui" if ui_supported and not plan.get("needs_oneliner_only") else "oneliner" + plan["suggested_actions"] = actions + plan["intent_label"] = INTENT_LABELS.get(intent_key, intent_key) + plan["platform_label"] = legacy.platform_label(plan.get("platform")) if plan.get("platform") else "待判断" + plan["economicity"] = { + "policy": "deterministic-first", + "explanation": "先走固定流程,再走平台 Agent,最后才升级到 OneLiner 深度调度。", + } + return plan + + def _remember_message_preference( + account: dict[str, Any], + *, + project_id: str, + plan: dict[str, Any], + message: str, + ) -> dict[str, Any] | None: + cues = ("记住", "以后", "长期", "默认", "一直", "优先") + if not plan.get("remember_preference") and not any(cue in message for cue in cues): + return None + agent_scope = "oneliner" + platform = plan.get("platform") or "" + subject_type = "project" if project_id else "account" + subject_id = project_id or account["id"] + memory_key = f"preference::{plan.get('intent_key') or 'custom'}" + existing = legacy.db.fetch_one( + """ + SELECT * FROM agent_memories + WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ? + AND subject_type = ? AND subject_id = ? AND memory_key = ? + """, + (account["id"], project_id, agent_scope, platform, subject_type, subject_id, memory_key), + ) + timestamp = now() + details = { + "captured_from": "oneliner_chat", + "intent_key": plan.get("intent_key", "custom"), + "source_message": message, + "platform": platform, + } + if existing: + legacy.db.execute( + """ + UPDATE agent_memories + SET title = ?, summary = ?, details_json = ?, confidence = ?, last_validated_at = ?, updated_at = ? + WHERE id = ? + """, + ( + f"{INTENT_LABELS.get(plan.get('intent_key') or 'custom', '偏好')}偏好", + message.strip()[:280], + _dump(details), + 0.82, + timestamp, + timestamp, + existing["id"], + ), + ) + stored = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (existing["id"],)) + else: + memory_id = make_id("mem") + legacy.db.execute( + """ + INSERT INTO agent_memories ( + id, user_id, project_id, agent_scope, platform, subject_type, subject_id, + memory_key, title, summary, details_json, confidence, last_validated_at, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + memory_id, + account["id"], + project_id, + agent_scope, + platform, + subject_type, + subject_id, + memory_key, + f"{INTENT_LABELS.get(plan.get('intent_key') or 'custom', '偏好')}偏好", + message.strip()[:280], + _dump(details), + 0.82, + timestamp, + timestamp, + timestamp, + ), + ) + stored = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (memory_id,)) + return _memory_payload(stored) if stored else None + + def _session_context_summary(account: dict[str, Any], project_id: str, platform: str) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + assistant = None + profile_row = _fetch_profile_row(account, project["id"]) + if profile_row and profile_row.get("assistant_id"): + assistant = _resolve_assistant(account, profile_row.get("assistant_id"), project["id"]) + platform_profile = legacy.db.fetch_one( + "SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?", + (account["id"], project["id"], platform), + ) if platform else None + return { + "project": legacy.project_payload(project), + "oneliner_profile": _profile_payload(profile_row, account=account) if profile_row else None, + "platform_agent": _platform_agent_payload(account, platform_profile, platform=platform, project_id=project["id"]) if platform else None, + "assistant": legacy.assistant_payload(assistant) if assistant else None, + } + + async def _generate_oneliner_reply( + account: dict[str, Any], + *, + project_id: str, + message: str, + plan: dict[str, Any], + ) -> dict[str, Any]: + context = _session_context_summary(account, project_id or "", plan.get("platform") or "") + summary_lines = [ + f"我理解你的目标是:{plan.get('intent_label', '自定义任务')}。", + f"建议优先处理的平台:{plan.get('platform_label', '待判断')}。", + plan.get("summary", ""), + ] + if plan.get("delivery_mode") == "oneliner": + summary_lines.append("这项能力当前更适合先由 OneLiner 对话承接,而不是要求你先理解前端功能树。") + if plan.get("intent_key") == "ops_admin" and account.get("role") != "super_admin": + summary_lines.append("当前账号不是平台最高权限用户,所以我不会放出运维 Agent 入口。") + if context.get("platform_agent"): + summary_lines.append(f"当前 {context['platform_agent']['platform_label']} Agent 已绑定:{context['platform_agent'].get('assistant', {}).get('name') or '未绑定执行 Agent'}。") + return { + "summary_text": "\n".join([line for line in summary_lines if line.strip()]), + "context": context, + "safe_boundary": { + "core_code_locked": True, + "tenant_isolation": True, + "ops_admin_only": True, + }, + } + + def _insert_message(session_id: str, account_id: str, role: str, content: str, plan: dict[str, Any], result: dict[str, Any]) -> dict[str, Any]: + message_id = make_id("oline_msg") + created_at = now() + legacy.db.execute( + """ + INSERT INTO oneliner_messages (id, session_id, user_id, role, content, plan_json, result_json, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + (message_id, session_id, account_id, role, content, _dump(plan), _dump(result), created_at), + ) + return legacy.db.fetch_one("SELECT * FROM oneliner_messages WHERE id = ?", (message_id,)) + + def _upsert_platform_profile(account: dict[str, Any], platform: str, request: PlatformAgentProfileRequest) -> dict[str, Any]: + project = _resolve_project(account, request.project_id or None) + assistant = _resolve_assistant(account, request.assistant_id or None, project["id"]) + existing = legacy.db.fetch_one( + "SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?", + (account["id"], project["id"], platform), + ) + timestamp = now() + if existing: + legacy.db.execute( + """ + UPDATE platform_agent_profiles + SET assistant_id = ?, name = ?, mission = ?, notes = ?, status = ?, config_json = ?, updated_at = ? + WHERE id = ? + """, + ( + (assistant or {}).get("id", ""), + request.name.strip() or existing.get("name") or f"{legacy.platform_label(platform)} Agent", + request.mission.strip(), + request.notes.strip(), + request.status.strip() or "active", + _dump(request.config), + timestamp, + existing["id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (existing["id"],)) + else: + profile_id = make_id("plat_agent") + legacy.db.execute( + """ + INSERT INTO platform_agent_profiles ( + id, user_id, project_id, platform, assistant_id, name, mission, notes, status, config_json, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + profile_id, + account["id"], + project["id"], + platform, + (assistant or {}).get("id", ""), + request.name.strip() or f"{legacy.platform_label(platform)} Agent", + request.mission.strip(), + request.notes.strip(), + request.status.strip() or "active", + _dump(request.config), + timestamp, + timestamp, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (profile_id,)) + return _platform_agent_payload(account, row, platform=platform, project_id=project["id"]) + + def _upsert_memory( + account: dict[str, Any], + *, + agent_scope: str, + platform: str, + request: AgentMemoryUpsertRequest, + ) -> dict[str, Any]: + project = _resolve_project(account, request.project_id or None) + subject_type = request.subject_type.strip() or "project" + subject_id = request.subject_id.strip() or (project["id"] if subject_type == "project" else account["id"]) + existing = legacy.db.fetch_one( + """ + SELECT * FROM agent_memories + WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ? + AND subject_type = ? AND subject_id = ? AND memory_key = ? + """, + (account["id"], project["id"], agent_scope, platform, subject_type, subject_id, request.memory_key.strip()), + ) + timestamp = now() + if existing: + legacy.db.execute( + """ + UPDATE agent_memories + SET title = ?, summary = ?, details_json = ?, confidence = ?, last_validated_at = ?, updated_at = ? + WHERE id = ? + """, + ( + request.title.strip(), + request.summary.strip(), + _dump(request.details), + request.confidence, + timestamp, + timestamp, + existing["id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (existing["id"],)) + else: + memory_id = make_id("mem") + legacy.db.execute( + """ + INSERT INTO agent_memories ( + id, user_id, project_id, agent_scope, platform, subject_type, subject_id, + memory_key, title, summary, details_json, confidence, last_validated_at, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + memory_id, + account["id"], + project["id"], + agent_scope, + platform, + subject_type, + subject_id, + request.memory_key.strip(), + request.title.strip(), + request.summary.strip(), + _dump(request.details), + request.confidence, + timestamp, + timestamp, + timestamp, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (memory_id,)) + return _memory_payload(row) + + def _upsert_skill( + account: dict[str, Any], + *, + agent_scope: str, + platform: str, + request: AgentSkillUpsertRequest, + skill_id: str | None = None, + ) -> dict[str, Any]: + project = _resolve_project(account, request.project_id or None) + existing = None + if skill_id: + existing = legacy.db.fetch_one( + "SELECT * FROM agent_skills WHERE id = ? AND user_id = ?", + (skill_id, account["id"]), + ) + if not existing: + raise HTTPException(status_code=404, detail="Agent skill not found") + else: + existing = legacy.db.fetch_one( + """ + SELECT * FROM agent_skills + WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ? AND skill_key = ? + """, + (account["id"], project["id"], agent_scope, platform, request.skill_key.strip()), + ) + timestamp = now() + if existing: + legacy.db.execute( + """ + UPDATE agent_skills + SET name = ?, status = ?, method_json = ?, test_spec_json = ?, last_result_json = ?, + success_count = ?, failure_count = ?, last_score = ?, last_validated_at = ?, updated_at = ? + WHERE id = ? + """, + ( + request.name.strip(), + request.status.strip() or "draft", + _dump(request.method), + _dump(request.test_spec), + _dump(request.last_result), + request.success_count, + request.failure_count, + request.last_score, + timestamp, + timestamp, + existing["id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (existing["id"],)) + else: + new_id = make_id("skill") + legacy.db.execute( + """ + INSERT INTO agent_skills ( + id, user_id, project_id, agent_scope, platform, parent_skill_id, skill_key, name, status, + method_json, test_spec_json, last_result_json, success_count, failure_count, last_score, + last_validated_at, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, '', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + new_id, + account["id"], + project["id"], + agent_scope, + platform, + request.skill_key.strip(), + request.name.strip(), + request.status.strip() or "draft", + _dump(request.method), + _dump(request.test_spec), + _dump(request.last_result), + request.success_count, + request.failure_count, + request.last_score, + timestamp, + timestamp, + timestamp, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (new_id,)) + return _skill_payload(row) + + def _create_or_update_incident( + *, + tenant_user_id: str, + tenant_project_id: str, + source_type: str, + source_id: str, + severity: str, + title: str, + summary: str, + payload: dict[str, Any], + ) -> dict[str, Any]: + existing = legacy.db.fetch_one( + "SELECT * FROM admin_ops_incidents WHERE source_type = ? AND source_id = ? AND title = ?", + (source_type, source_id, title), + ) + timestamp = now() + if existing: + legacy.db.execute( + """ + UPDATE admin_ops_incidents + SET tenant_user_id = ?, tenant_project_id = ?, severity = ?, summary = ?, payload_json = ?, updated_at = ? + WHERE id = ? + """, + ( + tenant_user_id, + tenant_project_id, + severity, + summary, + _dump(payload), + timestamp, + existing["id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (existing["id"],)) + else: + incident_id = make_id("incident") + legacy.db.execute( + """ + INSERT INTO admin_ops_incidents ( + id, tenant_user_id, tenant_project_id, source_type, source_id, severity, title, + summary, payload_json, status, assigned_to, reviewed_by, review_notes, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'open', '', '', '', ?, ?) + """, + ( + incident_id, + tenant_user_id, + tenant_project_id, + source_type, + source_id, + severity, + title, + summary, + _dump(payload), + timestamp, + timestamp, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,)) + return _incident_payload(row) + + def _scan_admin_incidents(admin: dict[str, Any]) -> dict[str, Any]: + _ = admin + created: list[dict[str, Any]] = [] + failed_jobs = legacy.db.fetch_all( + "SELECT * FROM jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 20" + ) + for job in failed_jobs: + created.append( + _create_or_update_incident( + tenant_user_id=job.get("user_id", "") or "", + tenant_project_id=job.get("project_id", "") or "", + source_type="job", + source_id=job["id"], + severity="error", + title=f"失败任务:{job.get('title') or job['id']}", + summary=job.get("error", "")[:280] or "任务失败,需要检查执行链。", + payload=legacy.job_payload(job), + ) + ) + try: + integration_health = legacy.integrations_health(admin) + except Exception as exc: + integration_health = {"collector": {"reachable": False, "error": str(exc)}} + for key, payload in integration_health.items(): + reachable = bool(payload.get("reachable", False)) + if reachable and key != "cutvideo": + continue + if key == "cutvideo" and payload.get("supports_uploads", True): + continue + created.append( + _create_or_update_incident( + tenant_user_id="", + tenant_project_id="", + source_type="integration", + source_id=key, + severity="warn" if key in {"cutvideo", "live_recorder"} else "error", + title=f"集成异常:{key}", + summary=str(payload.get("error") or payload.get("upload_error") or "集成健康检查未通过")[:280], + payload=payload, + ) + ) + return { + "created_or_updated": created, + "count": len(created), + } + + @app.get("/v2/oneliner/profile") + def get_oneliner_profile( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + row = _ensure_oneliner_profile(account, project["id"]) + return _profile_payload(row, account=account) + + @app.put("/v2/oneliner/profile") + def put_oneliner_profile( + request: OneLinerProfileRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, request.project_id or None) + assistant = _resolve_assistant(account, request.assistant_id or None, project["id"]) + existing = _ensure_oneliner_profile(account, project["id"]) + timestamp = now() + legacy.db.execute( + """ + UPDATE oneliner_profiles + SET assistant_id = ?, display_name = ?, long_term_goal = ?, notes = ?, default_platform = ?, config_json = ?, updated_at = ? + WHERE id = ? + """, + ( + (assistant or {}).get("id", ""), + request.display_name.strip() or "OneLiner", + request.long_term_goal.strip(), + request.notes.strip(), + _safe_platform(request.default_platform or existing.get("default_platform") or "douyin"), + _dump(request.config), + timestamp, + existing["id"], + ), + ) + row = legacy.db.fetch_one("SELECT * FROM oneliner_profiles WHERE id = ?", (existing["id"],)) + return _profile_payload(row, account=account) + + @app.get("/v2/oneliner/sessions") + def list_oneliner_sessions( + project_id: str | None = Query(default=None), + limit: int = Query(default=20, ge=1, le=100), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + rows = legacy.db.fetch_all( + """ + SELECT * FROM oneliner_sessions + WHERE user_id = ? AND project_id = ? + ORDER BY updated_at DESC + LIMIT ? + """, + (account["id"], project["id"], limit), + ) + items = [_session_payload(row) for row in rows] + return {"items": items, "count": len(items)} + + @app.post("/v2/oneliner/sessions") + async def create_oneliner_session( + request: OneLinerSessionCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, request.project_id or None) + profile = _ensure_oneliner_profile(account, project["id"]) + session_id = make_id("oline") + timestamp = now() + preferred_platform = _safe_platform(request.preferred_platform or profile.get("default_platform") or "douyin") + title = request.title.strip() or "新的 OneLiner 会话" + legacy.db.execute( + """ + INSERT INTO oneliner_sessions ( + id, user_id, project_id, profile_id, title, status, preferred_platform, + last_platform, last_intent_key, last_message_at, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, 'active', ?, '', '', ?, ?, ?) + """, + ( + session_id, + account["id"], + project["id"], + profile["id"], + title, + preferred_platform, + timestamp, + timestamp, + timestamp, + ), + ) + session_row = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session_id,)) + if request.initial_message.strip(): + await post_oneliner_message( + session_id, + OneLinerMessageRequest( + content=request.initial_message, + project_id=project["id"], + platform=preferred_platform, + ), + account, + ) + session_row = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session_id,)) + return _session_payload(session_row) + + @app.get("/v2/oneliner/sessions/{session_id}/messages") + def list_oneliner_messages( + session_id: str, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + session = _load_owned_session(session_id, account) + rows = legacy.db.fetch_all( + "SELECT * FROM oneliner_messages WHERE session_id = ? ORDER BY created_at ASC", + (session["id"],), + ) + items = [_message_payload(row) for row in rows] + return {"session": _session_payload(session), "items": items, "count": len(items)} + + @app.post("/v2/oneliner/sessions/{session_id}/messages") + async def post_oneliner_message( + session_id: str, + request: OneLinerMessageRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + session = _load_owned_session(session_id, account) + project = _resolve_project(account, request.project_id or session.get("project_id") or None) + plan = await _plan_oneliner_request( + account, + project_id=project["id"], + message=request.content, + platform_hint=request.platform or session.get("preferred_platform") or "", + ) + user_message = _insert_message(session["id"], account["id"], "user", request.content.strip(), {}, {}) + remembered = _remember_message_preference(account, project_id=project["id"], plan=plan, message=request.content) + result = await _generate_oneliner_reply(account, project_id=project["id"], message=request.content, plan=plan) + if remembered: + result["remembered_memory"] = remembered + assistant_message = _insert_message(session["id"], account["id"], "assistant", result["summary_text"], plan, result) + session_title = session.get("title") or request.content.strip()[:28] or "新的 OneLiner 会话" + timestamp = now() + legacy.db.execute( + """ + UPDATE oneliner_sessions + SET title = ?, last_platform = ?, last_intent_key = ?, last_message_at = ?, updated_at = ? + WHERE id = ? + """, + ( + session_title, + plan.get("platform", ""), + plan.get("intent_key", ""), + timestamp, + timestamp, + session["id"], + ), + ) + updated_session = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session["id"],)) + return { + "session": _session_payload(updated_session), + "user_message": _message_payload(user_message), + "assistant_message": _message_payload(assistant_message), + "plan": plan, + "result": result, + } + + @app.get("/v2/platform-agents") + def list_platform_agents( + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + items = _list_platform_profiles(account, project["id"]) + return {"items": items, "count": len(items)} + + @app.put("/v2/platform-agents/{platform}/profile") + def update_platform_agent( + platform: str, + request: PlatformAgentProfileRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + normalized_platform = _safe_platform(platform) + return _upsert_platform_profile(account, normalized_platform, request) + + @app.get("/v2/platform-agents/{platform}/memories") + def list_platform_memories( + platform: str, + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + normalized_platform = _safe_platform(platform) + rows = legacy.db.fetch_all( + """ + SELECT * FROM agent_memories + WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ? + ORDER BY updated_at DESC + """, + (account["id"], project["id"], normalized_platform), + ) + items = [_memory_payload(row) for row in rows] + return {"items": items, "count": len(items)} + + @app.post("/v2/platform-agents/{platform}/memories") + def upsert_platform_memory( + platform: str, + request: AgentMemoryUpsertRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + normalized_platform = _safe_platform(platform) + return _upsert_memory(account, agent_scope="platform", platform=normalized_platform, request=request) + + @app.get("/v2/platform-agents/{platform}/skills") + def list_platform_skills( + platform: str, + project_id: str | None = Query(default=None), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, project_id or None) + normalized_platform = _safe_platform(platform) + rows = legacy.db.fetch_all( + """ + SELECT * FROM agent_skills + WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ? + ORDER BY updated_at DESC + """, + (account["id"], project["id"], normalized_platform), + ) + items = [_skill_payload(row) for row in rows] + return {"items": items, "count": len(items)} + + @app.post("/v2/platform-agents/{platform}/skills") + def create_platform_skill( + platform: str, + request: AgentSkillUpsertRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + normalized_platform = _safe_platform(platform) + return _upsert_skill(account, agent_scope="platform", platform=normalized_platform, request=request) + + @app.patch("/v2/platform-agents/{platform}/skills/{skill_id}") + def update_platform_skill( + platform: str, + skill_id: str, + request: AgentSkillUpsertRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + normalized_platform = _safe_platform(platform) + return _upsert_skill(account, agent_scope="platform", platform=normalized_platform, request=request, skill_id=skill_id) + + @app.get("/v2/admin/ops/overview") + def admin_ops_overview(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]: + incidents = [ + _incident_payload(row) + for row in legacy.db.fetch_all( + "SELECT * FROM admin_ops_incidents ORDER BY updated_at DESC LIMIT 50" + ) + ] + failed_jobs = [ + legacy.job_payload(row) + for row in legacy.db.fetch_all( + "SELECT * FROM jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 12" + ) + ] + pending_accounts = [legacy.normalize_account(row) for row in legacy.db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC LIMIT 20")] + return { + "incidents": incidents, + "incident_count": len(incidents), + "failed_jobs": failed_jobs, + "failed_job_count": len(failed_jobs), + "pending_accounts": pending_accounts, + "pending_account_count": len(pending_accounts), + "integration_health": legacy.integrations_health(admin), + } + + @app.post("/v2/admin/ops/incidents/scan") + def admin_ops_scan(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]: + return _scan_admin_incidents(admin) + + @app.patch("/v2/admin/ops/incidents/{incident_id}") + def review_admin_incident( + incident_id: str, + request: AdminIncidentReviewRequest, + admin: dict[str, Any] = Depends(legacy.require_super_admin), + ) -> dict[str, Any]: + current = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,)) + if not current: + raise HTTPException(status_code=404, detail="Incident not found") + timestamp = now() + legacy.db.execute( + """ + UPDATE admin_ops_incidents + SET status = ?, reviewed_by = ?, review_notes = ?, updated_at = ? + WHERE id = ? + """, + ( + request.status.strip() or "reviewed", + admin["id"], + request.review_notes.strip(), + timestamp, + incident_id, + ), + ) + row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,)) + return _incident_payload(row) diff --git a/web/storyforge-web-v4/assets/app.js b/web/storyforge-web-v4/assets/app.js index 6df526a..9807335 100644 --- a/web/storyforge-web-v4/assets/app.js +++ b/web/storyforge-web-v4/assets/app.js @@ -31,6 +31,12 @@ const appState = { integrationHealth: null, localModelCatalog: null, backendCapabilities: null, + onelinerProfile: null, + onelinerSessions: [], + selectedOnelinerSessionId: "", + onelinerMessages: [], + platformAgents: [], + adminOpsOverview: null, busy: false, message: "", lastAction: null, @@ -145,6 +151,23 @@ const PIPELINE_GUARDS = { } }; +const ONELINER_INTENT_LABELS = { + create_project: "创建项目", + create_assistant: "创建 Agent", + import_homepage: "导入主页", + track_account: "跟踪账号", + analyze_account: "分析账号", + analyze_top_videos: "分析高分作品", + generate_copy: "生成文案", + ai_video: "AI 视频", + real_cut: "实拍剪辑", + review: "发布复盘", + live_recorder: "直播录制", + storage_status: "存储状态", + ops_admin: "运维巡检", + custom: "自定义任务" +}; + function safeArray(value) { return Array.isArray(value) ? value : []; } @@ -619,6 +642,149 @@ function closeActionModal() { document.querySelector(".action-modal-backdrop")?.classList.add("hidden"); } +function ensureOneLinerUi() { + if (!document.querySelector(".oneliner-fab")) { + const fab = document.createElement("button"); + fab.className = "oneliner-fab"; + fab.type = "button"; + fab.dataset.action = "open-oneliner"; + fab.innerHTML = ` + 1 + OneLiner + `; + document.body.appendChild(fab); + } + if (!document.querySelector(".oneliner-backdrop")) { + const panel = document.createElement("div"); + panel.className = "oneliner-backdrop hidden"; + panel.innerHTML = ` +
前端没上的需求,先由总控主 Agent 承接。
+你可以直接说目标,不用先理解平台有什么按钮。OneLiner 会先拆目标,再决定交给哪个平台 Agent。
+等 live collector 同步 `/v2/platform-agents` 后,这里会切成真实视图。
刷新后会自动读取失败任务、集成健康和待审事件。
${escapeHtml(item.summary || "待补详情")}
+ +最近主链比较稳定,继续观察即可。
${escapeHtml(appState.onelinerProfile?.notes || "你可以把用户长期目标、账号目标、默认平台都绑给 OneLiner,再让它去调度平台 Agent。")}
+ +${escapeHtml(incident.summary || "暂无摘要")}