diff --git a/collector-service/app/oneliner_features.py b/collector-service/app/oneliner_features.py index 3ea6551..9f61389 100644 --- a/collector-service/app/oneliner_features.py +++ b/collector-service/app/oneliner_features.py @@ -158,6 +158,30 @@ class AgentPolicyRollbackRequest(BaseModel): reason: str = "" +class AgentRunCreateRequest(BaseModel): + project_id: str = "" + session_id: str = "" + source_screen: str = "dashboard" + source_action_key: str = "" + title: str = "" + summary: str = "" + intent_key: str = "custom" + platform: str = "" + platform_scope: str = "single_platform" + delivery_mode: str = "hybrid" + scheduling_mode: str = "queued" + plan_request: dict[str, Any] = Field(default_factory=dict) + payload: dict[str, Any] = Field(default_factory=dict) + + +class AgentRunConfirmRequest(BaseModel): + reason: str = "" + + +class AgentRunCancelRequest(BaseModel): + reason: str = "" + + INTENT_ACTIONS: dict[str, list[dict[str, Any]]] = { "create_project": [{"key": "goto-intake", "label": "去我的项目", "kind": "navigate"}], "create_assistant": [{"key": "open-create-assistant", "label": "创建 Agent", "kind": "ui_action"}], @@ -541,6 +565,48 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: created_at TEXT NOT NULL ); + CREATE TABLE IF NOT EXISTS agent_runs ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + project_id TEXT NOT NULL DEFAULT '', + session_id TEXT NOT NULL DEFAULT '', + source_screen TEXT NOT NULL DEFAULT '', + source_action_key TEXT NOT NULL DEFAULT '', + title TEXT NOT NULL DEFAULT '', + summary TEXT NOT NULL DEFAULT '', + intent_key TEXT NOT NULL DEFAULT 'custom', + platform TEXT NOT NULL DEFAULT '', + platform_scope TEXT NOT NULL DEFAULT 'single_platform', + delivery_mode TEXT NOT NULL DEFAULT 'hybrid', + run_status TEXT NOT NULL DEFAULT 'needs_confirmation', + scheduling_mode TEXT NOT NULL DEFAULT 'queued', + active_executor_key TEXT NOT NULL DEFAULT 'main_agent', + plan_json TEXT NOT NULL DEFAULT '{}', + governance_json TEXT NOT NULL DEFAULT '{}', + result_json TEXT NOT NULL DEFAULT '{}', + status_summary TEXT NOT NULL DEFAULT '', + needs_user_input INTEGER NOT NULL DEFAULT 1, + blocked_reason TEXT NOT NULL DEFAULT '', + active_admin_override_notice_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + started_at TEXT NOT NULL DEFAULT '', + finished_at TEXT NOT NULL DEFAULT '', + FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + FOREIGN KEY(session_id) REFERENCES oneliner_sessions(id) ON DELETE SET NULL + ); + + CREATE TABLE IF NOT EXISTS agent_run_events ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + event_type TEXT NOT NULL, + summary TEXT NOT NULL DEFAULT '', + details_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY(run_id) REFERENCES agent_runs(id) ON DELETE CASCADE + ); + CREATE TABLE IF NOT EXISTS admin_ops_incidents ( id TEXT PRIMARY KEY, tenant_user_id TEXT NOT NULL DEFAULT '', @@ -955,6 +1021,61 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "created_at": row["created_at"], } + def _agent_run_event_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "run_id": row.get("run_id", ""), + "event_type": row.get("event_type", ""), + "summary": row.get("summary", ""), + "details": _parse_json(row.get("details_json"), {}), + "created_at": row.get("created_at", ""), + } + + def _list_agent_run_events(run_id: str) -> list[dict[str, Any]]: + rows = legacy.db.fetch_all( + """ + SELECT * FROM agent_run_events + WHERE run_id = ? + ORDER BY created_at ASC + """, + (run_id,), + ) + return [_agent_run_event_payload(row) for row in rows] + + def _agent_run_payload(row: dict[str, Any], *, include_events: bool = True) -> dict[str, Any]: + payload = { + "id": row["id"], + "user_id": row.get("user_id", ""), + "project_id": row.get("project_id", ""), + "session_id": row.get("session_id", ""), + "source_screen": row.get("source_screen", ""), + "source_action_key": row.get("source_action_key", ""), + "title": row.get("title", ""), + "summary": row.get("summary", ""), + "intent_key": row.get("intent_key", "custom"), + "platform": row.get("platform", ""), + "platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "", + "platform_scope": row.get("platform_scope", "single_platform"), + "delivery_mode": row.get("delivery_mode", "hybrid"), + "run_status": row.get("run_status", "needs_confirmation"), + "scheduling_mode": row.get("scheduling_mode", "queued"), + "active_executor_key": row.get("active_executor_key", "main_agent"), + "plan": _parse_json(row.get("plan_json"), {}), + "governance": _parse_json(row.get("governance_json"), {}), + "result": _parse_json(row.get("result_json"), {}), + "status_summary": row.get("status_summary", ""), + "needs_user_input": bool(row.get("needs_user_input")), + "blocked_reason": row.get("blocked_reason", ""), + "active_admin_override_notice": _parse_json(row.get("active_admin_override_notice_json"), {}), + "created_at": row.get("created_at", ""), + "updated_at": row.get("updated_at", ""), + "started_at": row.get("started_at", ""), + "finished_at": row.get("finished_at", ""), + } + if include_events: + payload["events"] = _list_agent_run_events(row["id"]) + return payload + def _incident_payload(row: dict[str, Any]) -> dict[str, Any]: return { "id": row["id"], @@ -2410,6 +2531,157 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: raise HTTPException(status_code=404, detail="OneLiner session not found") return row + def _load_owned_agent_run(run_id: str, account: dict[str, Any]) -> dict[str, Any]: + row = legacy.db.fetch_one( + "SELECT * FROM agent_runs WHERE id = ? AND user_id = ?", + (run_id, account["id"]), + ) + if not row: + raise HTTPException(status_code=404, detail="OneLiner run not found") + return row + + def _normalize_platform_scope(value: str | None) -> str: + normalized = str(value or "").strip().lower() + if normalized == "all_platforms": + return "all_platforms" + return "single_platform" + + def _normalize_delivery_mode(value: str | None) -> str: + normalized = str(value or "").strip().lower() + if normalized in {"ui", "oneliner", "hybrid"}: + return normalized + return "hybrid" + + def _normalize_scheduling_mode(value: str | None) -> str: + normalized = str(value or "").strip().lower() + if normalized == "parallel": + return "parallel" + return "queued" + + def _ensure_run_session( + account: dict[str, Any], + *, + project_id: str, + requested_session_id: str, + title: str, + preferred_platform: str, + ) -> dict[str, Any]: + if requested_session_id: + return _load_owned_session(requested_session_id, account) + latest_row = legacy.db.fetch_one( + """ + SELECT * FROM oneliner_sessions + WHERE user_id = ? AND project_id = ? + ORDER BY updated_at DESC, created_at DESC + LIMIT 1 + """, + (account["id"], project_id), + ) + if latest_row: + return latest_row + profile = _fetch_profile_row(account, project_id) + session_id = make_id("oline") + timestamp = now() + legacy.db.execute( + """ + INSERT INTO oneliner_sessions ( + id, user_id, project_id, profile_id, title, status, preferred_platform, + last_platform, last_intent_key, last_message_at, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, 'active', ?, '', '', ?, ?, ?) + """, + ( + session_id, + account["id"], + project_id, + (profile or {}).get("id"), + title.strip() or "新的 OneLiner 会话", + preferred_platform, + timestamp, + timestamp, + timestamp, + ), + ) + created = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session_id,)) + assert created is not None + return created + + def _touch_session_for_run(session_id: str, *, platform: str, intent_key: str) -> None: + timestamp = now() + legacy.db.execute( + """ + UPDATE oneliner_sessions + SET last_platform = ?, last_intent_key = ?, last_message_at = ?, updated_at = ? + WHERE id = ? + """, + (platform, intent_key, timestamp, timestamp, session_id), + ) + + def _log_agent_run_event( + run_id: str, + *, + event_type: str, + summary: str, + details: dict[str, Any] | None = None, + ) -> dict[str, Any]: + event_id = make_id("run_evt") + legacy.db.execute( + """ + INSERT INTO agent_run_events (id, run_id, event_type, summary, details_json, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + event_id, + run_id, + event_type, + summary, + _dump(details or {}), + now(), + ), + ) + row = legacy.db.fetch_one("SELECT * FROM agent_run_events WHERE id = ?", (event_id,)) + assert row is not None + return _agent_run_event_payload(row) + + def _agent_run_plan_payload( + request: AgentRunCreateRequest, + *, + governance: dict[str, Any], + platform: str, + platform_scope: str, + ) -> dict[str, Any]: + requested_plan = dict(request.plan_request or {}) + raw_steps = requested_plan.get("steps") or [] + if not isinstance(raw_steps, list): + raw_steps = [raw_steps] + steps = [str(item).strip() for item in raw_steps if str(item).strip()] + if not steps: + steps = ["读取当前项目上下文", "结合治理层生成执行计划", "等待用户确认后执行"] + return { + **requested_plan, + "goal": str(requested_plan.get("goal") or request.title or "主 Agent 任务").strip() or "主 Agent 任务", + "steps": steps, + "intent_key": str(request.intent_key or "custom").strip() or "custom", + "platform": platform, + "platform_scope": platform_scope, + "source_screen": str(request.source_screen or "").strip(), + "source_action_key": str(request.source_action_key or "").strip(), + "summary": str(request.summary or requested_plan.get("summary") or "").strip(), + "requested_delivery_mode": _normalize_delivery_mode(request.delivery_mode), + "active_admin_override_notice": governance.get("active_admin_override_notice") or {}, + } + + def _has_other_active_runs(*, account_id: str, project_id: str, run_id: str) -> bool: + row = legacy.db.fetch_one( + """ + SELECT id FROM agent_runs + WHERE user_id = ? AND project_id = ? AND id != ? AND run_status IN ('queued', 'running', 'blocked') + ORDER BY updated_at DESC + LIMIT 1 + """, + (account_id, project_id, run_id), + ) + return bool(row) + def _deterministic_intent(message: str, platform_hint: str, account: dict[str, Any]) -> dict[str, Any]: text = message.strip() lowered = text.lower() @@ -4207,6 +4479,206 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: **result, } + @app.post("/v2/oneliner/runs") + def create_oneliner_run( + request: AgentRunCreateRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project(account, request.project_id or None) + normalized_platform = _safe_platform(request.platform, fallback="") if str(request.platform or "").strip() else "" + platform_scope = _normalize_platform_scope(request.platform_scope) + governance = _effective_policy_payload( + subject_account=account, + subject_project_id=project["id"], + platform=normalized_platform, + ) + plan = _agent_run_plan_payload( + request, + governance=governance, + platform=normalized_platform, + platform_scope=platform_scope, + ) + session = _ensure_run_session( + account, + project_id=project["id"], + requested_session_id=request.session_id, + title=request.title.strip() or plan["goal"], + preferred_platform=normalized_platform or "douyin", + ) + _touch_session_for_run(session["id"], platform=normalized_platform, intent_key=plan.get("intent_key", "custom")) + run_id = make_id("oline_run") + timestamp = now() + active_admin_override_notice = governance.get("active_admin_override_notice") or {} + legacy.db.execute( + """ + INSERT INTO agent_runs ( + id, user_id, project_id, session_id, source_screen, source_action_key, title, summary, + intent_key, platform, platform_scope, delivery_mode, run_status, scheduling_mode, + active_executor_key, plan_json, governance_json, result_json, status_summary, + needs_user_input, blocked_reason, active_admin_override_notice_json, + created_at, updated_at, started_at, finished_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'needs_confirmation', ?, 'main_agent', ?, ?, '{}', ?, 1, '', ?, ?, ?, '', '') + """, + ( + run_id, + account["id"], + project["id"], + session["id"], + str(request.source_screen or "").strip(), + str(request.source_action_key or "").strip(), + request.title.strip() or plan["goal"], + request.summary.strip(), + str(request.intent_key or "custom").strip() or "custom", + normalized_platform, + platform_scope, + _normalize_delivery_mode(request.delivery_mode), + _normalize_scheduling_mode(request.scheduling_mode), + _dump(plan), + _dump(governance), + "等待你确认执行计划", + _dump(active_admin_override_notice), + timestamp, + timestamp, + ), + ) + _log_agent_run_event( + run_id, + event_type="run.created", + summary=f"已创建待确认任务:{request.title.strip() or plan['goal']}", + details={ + "run_status": "needs_confirmation", + "source_screen": str(request.source_screen or "").strip(), + "source_action_key": str(request.source_action_key or "").strip(), + "platform": normalized_platform, + "platform_scope": platform_scope, + }, + ) + row = legacy.db.fetch_one("SELECT * FROM agent_runs WHERE id = ?", (run_id,)) + assert row is not None + return _agent_run_payload(row) + + @app.get("/v2/oneliner/runs") + def list_oneliner_runs( + project_id: str | None = Query(default=None), + limit: int = Query(default=20, ge=1, le=100), + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + project = _resolve_project_for_read(account, project_id or None) if (project_id or "").strip() else _resolve_project_for_read(account, None) + if project: + rows = legacy.db.fetch_all( + """ + SELECT * FROM agent_runs + WHERE user_id = ? AND project_id = ? + ORDER BY created_at DESC + LIMIT ? + """, + (account["id"], project["id"], limit), + ) + else: + rows = legacy.db.fetch_all( + """ + SELECT * FROM agent_runs + WHERE user_id = ? + ORDER BY created_at DESC + LIMIT ? + """, + (account["id"], limit), + ) + items = [_agent_run_payload(row, include_events=False) for row in rows] + return {"items": items, "count": len(items)} + + @app.get("/v2/oneliner/runs/{run_id}") + def get_oneliner_run( + run_id: str, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = _load_owned_agent_run(run_id, account) + return _agent_run_payload(row) + + @app.get("/v2/oneliner/runs/{run_id}/events") + def list_oneliner_run_events( + run_id: str, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = _load_owned_agent_run(run_id, account) + items = _list_agent_run_events(row["id"]) + return {"run": _agent_run_payload(row, include_events=False), "items": items, "count": len(items)} + + @app.post("/v2/oneliner/runs/{run_id}/confirm") + def confirm_oneliner_run( + run_id: str, + request: AgentRunConfirmRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = _load_owned_agent_run(run_id, account) + current_status = str(row.get("run_status") or "needs_confirmation") + if current_status not in {"needs_confirmation", "queued", "running"}: + raise HTTPException(status_code=409, detail="Run can no longer be confirmed") + if current_status == "needs_confirmation": + _log_agent_run_event( + run_id, + event_type="run.confirmed", + summary=request.reason.strip() or "用户已确认执行计划", + details={"reason": request.reason.strip()}, + ) + next_status = "queued" if _has_other_active_runs(account_id=account["id"], project_id=row.get("project_id", ""), run_id=run_id) else "running" + timestamp = now() + started_at = row.get("started_at", "") + event_type = "run.queued" + event_summary = "已进入主 Agent 等待队列" + status_summary = "等待主 Agent 执行" + if next_status == "running": + started_at = timestamp + event_type = "run.started" + event_summary = "主 Agent 已开始执行" + status_summary = "主 Agent 正在执行" + legacy.db.execute( + """ + UPDATE agent_runs + SET run_status = ?, status_summary = ?, needs_user_input = 0, updated_at = ?, started_at = ? + WHERE id = ? + """, + (next_status, status_summary, timestamp, started_at, run_id), + ) + _log_agent_run_event( + run_id, + event_type=event_type, + summary=event_summary, + details={"run_status": next_status}, + ) + updated = legacy.db.fetch_one("SELECT * FROM agent_runs WHERE id = ?", (run_id,)) + assert updated is not None + return _agent_run_payload(updated) + + @app.post("/v2/oneliner/runs/{run_id}/cancel") + def cancel_oneliner_run( + run_id: str, + request: AgentRunCancelRequest, + account: dict[str, Any] = Depends(legacy.require_approved), + ) -> dict[str, Any]: + row = _load_owned_agent_run(run_id, account) + current_status = str(row.get("run_status") or "") + if current_status not in {"needs_confirmation", "queued"}: + raise HTTPException(status_code=409, detail="Run can no longer be cancelled") + timestamp = now() + legacy.db.execute( + """ + UPDATE agent_runs + SET run_status = 'cancelled', status_summary = ?, needs_user_input = 0, updated_at = ?, finished_at = ? + WHERE id = ? + """, + (request.reason.strip() or "任务已取消", timestamp, timestamp, run_id), + ) + _log_agent_run_event( + run_id, + event_type="run.cancelled", + summary=request.reason.strip() or "用户取消了当前任务", + details={"from_status": current_status}, + ) + updated = legacy.db.fetch_one("SELECT * FROM agent_runs WHERE id = ?", (run_id,)) + assert updated is not None + return _agent_run_payload(updated) + @app.get("/v2/oneliner/profile") def get_oneliner_profile( project_id: str | None = Query(default=None), diff --git a/scripts/deploy_fnos_storyforge_collector.sh b/scripts/deploy_fnos_storyforge_collector.sh index dff67a4..095aff3 100755 --- a/scripts/deploy_fnos_storyforge_collector.sh +++ b/scripts/deploy_fnos_storyforge_collector.sh @@ -71,7 +71,13 @@ need_cmd() { need_cmd python3 need_cmd security need_cmd rsync -need_cmd docker + +if [ "$DEPLOY_MODE" = "prebuilt_local" ]; then + if ! command -v docker >/dev/null 2>&1 || ! docker info >/dev/null 2>&1; then + echo "[deploy] local Docker unavailable, fallback to remote_build" >&2 + DEPLOY_MODE="remote_build" + fi +fi shell_quote() { python3 - "$1" <<'PY' diff --git a/tests/test_main_agent_governance.py b/tests/test_main_agent_governance.py index ef4e071..cf97c8e 100644 --- a/tests/test_main_agent_governance.py +++ b/tests/test_main_agent_governance.py @@ -49,6 +49,8 @@ class MainAgentGovernanceTests(unittest.TestCase): def _clear_tables(self) -> None: tables = [ + "agent_run_events", + "agent_runs", "agent_policy_audit_logs", "agent_policy_effectivity", "agent_policy_versions", @@ -180,6 +182,72 @@ class MainAgentGovernanceTests(unittest.TestCase): "member_headers": {"Authorization": f"Bearer {member_token}"}, } + def test_agent_run_creation_snapshots_governance_and_needs_confirmation(self) -> None: + response = self.client.post( + "/v2/oneliner/runs", + headers=self.ctx["member_headers"], + json={ + "project_id": self.ctx["project_id"], + "source_screen": "dashboard", + "source_action_key": "homepage-primary-action", + "title": "跟进重点账号", + "summary": "先由主 Agent 评估优先级", + "intent_key": "track_account", + "platform": "douyin", + "platform_scope": "single_platform", + "plan_request": { + "goal": "跟进重点账号", + "steps": ["读取当前项目上下文", "检查重点账号变化", "决定下一步"], + }, + }, + ) + self.assertEqual(response.status_code, 200, response.text) + payload = response.json() + self.assertEqual(payload["run_status"], "needs_confirmation") + self.assertEqual(payload["source_screen"], "dashboard") + self.assertEqual(payload["platform"], "douyin") + self.assertEqual(payload["platform_scope"], "single_platform") + self.assertEqual(payload["session_id"][:5], "oline") + self.assertEqual(payload["plan"]["goal"], "跟进重点账号") + self.assertEqual(payload["governance"]["project_id"], self.ctx["project_id"]) + self.assertIn("layers", payload["governance"]) + self.assertEqual(payload["events"][0]["event_type"], "run.created") + + def test_agent_run_confirm_transitions_to_queue_or_running_and_logs_events(self) -> None: + create = self.client.post( + "/v2/oneliner/runs", + headers=self.ctx["member_headers"], + json={ + "project_id": self.ctx["project_id"], + "source_screen": "strategy", + "source_action_key": "handoff-to-main-agent", + "title": "调整当前平台策略", + "summary": "让主 Agent 先给执行计划", + "intent_key": "custom", + "platform": "douyin", + "platform_scope": "single_platform", + "plan_request": { + "goal": "调整当前平台策略", + "steps": ["读取当前平台策略", "生成调整建议"], + }, + }, + ) + self.assertEqual(create.status_code, 200, create.text) + run_id = create.json()["id"] + + confirm = self.client.post( + f"/v2/oneliner/runs/{run_id}/confirm", + headers=self.ctx["member_headers"], + json={"reason": "user confirmed"}, + ) + self.assertEqual(confirm.status_code, 200, confirm.text) + payload = confirm.json() + self.assertIn(payload["run_status"], {"queued", "running"}) + event_types = [item["event_type"] for item in payload["events"]] + self.assertIn("run.created", event_types) + self.assertIn("run.confirmed", event_types) + self.assertTrue("run.queued" in event_types or "run.started" in event_types) + def test_effective_policy_merges_system_user_global_and_platform_layers(self) -> None: system_response = self.client.put( "/v2/admin/oneliner/governance/system/main-agent", diff --git a/web/storyforge-web-v4/assets/app.js b/web/storyforge-web-v4/assets/app.js index 7bc4de5..7779efa 100644 --- a/web/storyforge-web-v4/assets/app.js +++ b/web/storyforge-web-v4/assets/app.js @@ -52,6 +52,8 @@ const appState = { onelinerProfile: null, onelinerSessions: [], selectedOnelinerSessionId: "", + onelinerRuns: [], + selectedOnelinerRunId: "", onelinerMessages: [], onelinerActionRegistry: [], platformAgents: [], @@ -209,6 +211,15 @@ function safeArray(value) { return Array.isArray(value) ? value : []; } +function parseJsonSafe(value, fallback) { + if (typeof value !== "string" || !value.trim()) return fallback; + try { + return JSON.parse(value); + } catch { + return fallback; + } +} + function getRuntimePlatformValues() { return PLATFORM_RUNTIME.getRuntimePlatformValues(); } @@ -918,6 +929,7 @@ function ensureOneLinerUi() {
+