Files
storyforge/collector-service/app/oneliner_features.py
2026-03-23 15:01:15 +08:00

1407 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
from typing import Any
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
class OneLinerProfileRequest(BaseModel):
project_id: str = ""
assistant_id: str = ""
display_name: str = "OneLiner"
long_term_goal: str = ""
notes: str = ""
default_platform: str = ""
config: dict[str, Any] = Field(default_factory=dict)
class OneLinerSessionCreateRequest(BaseModel):
project_id: str = ""
title: str = ""
preferred_platform: str = ""
initial_message: str = ""
class OneLinerMessageRequest(BaseModel):
content: str
project_id: str = ""
platform: str = ""
target_account_id: str = ""
remember_preference: bool = False
class PlatformAgentProfileRequest(BaseModel):
project_id: str = ""
assistant_id: str = ""
name: str = ""
mission: str = ""
notes: str = ""
status: str = "active"
config: dict[str, Any] = Field(default_factory=dict)
class AgentMemoryUpsertRequest(BaseModel):
project_id: str = ""
subject_type: str = "project"
subject_id: str = ""
memory_key: str
title: str = ""
summary: str
details: dict[str, Any] = Field(default_factory=dict)
confidence: float = Field(default=0.7, ge=0.0, le=1.0)
class AgentSkillUpsertRequest(BaseModel):
project_id: str = ""
skill_key: str
name: str
status: str = "draft"
method: dict[str, Any] = Field(default_factory=dict)
test_spec: dict[str, Any] = Field(default_factory=dict)
last_result: dict[str, Any] = Field(default_factory=dict)
success_count: int = Field(default=0, ge=0)
failure_count: int = Field(default=0, ge=0)
last_score: float = 0.0
class AdminIncidentReviewRequest(BaseModel):
status: str = "reviewed"
review_notes: str = ""
INTENT_ACTIONS: dict[str, list[dict[str, Any]]] = {
"create_project": [{"key": "goto-intake", "label": "去我的项目", "kind": "navigate"}],
"create_assistant": [{"key": "open-create-assistant", "label": "创建 Agent", "kind": "ui_action"}],
"import_homepage": [{"key": "open-import-homepage", "label": "导入主页", "kind": "ui_action"}],
"track_account": [{"key": "open-track-selected-account", "label": "跟踪当前账号", "kind": "ui_action"}],
"analyze_account": [{"key": "analyze-selected-account", "label": "分析当前账号", "kind": "ui_action"}],
"analyze_top_videos": [{"key": "analyze-top-videos", "label": "分析高分作品", "kind": "ui_action"}],
"generate_copy": [{"key": "open-generate-copy", "label": "生成文案", "kind": "ui_action"}],
"ai_video": [{"key": "open-ai-video", "label": "做 AI 视频", "kind": "ui_action"}],
"real_cut": [{"key": "open-real-cut", "label": "做实拍剪辑", "kind": "ui_action"}],
"review": [{"key": "goto-review", "label": "去发布与复盘", "kind": "navigate"}],
"live_recorder": [{"key": "open-live-recorder", "label": "打开录制控制", "kind": "ui_action"}],
"storage_status": [{"key": "goto-production", "label": "查看生产与存储", "kind": "navigate"}],
"ops_admin": [{"key": "goto-automation", "label": "去自动流程", "kind": "navigate"}],
}
INTENT_LABELS = {
"create_project": "创建项目",
"create_assistant": "创建 Agent",
"import_homepage": "导入主页",
"track_account": "跟踪账号",
"analyze_account": "分析账号",
"analyze_top_videos": "分析高分作品",
"generate_copy": "生成文案",
"ai_video": "生成 AI 视频",
"real_cut": "实拍剪辑",
"review": "发布复盘",
"live_recorder": "直播录制",
"storage_status": "查看存储",
"ops_admin": "运维巡检",
"custom": "自定义任务",
}
def register_oneliner_routes(app: Any, legacy: Any) -> None:
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def _parse_json(raw: str | None, fallback: Any) -> Any:
cleaned = str(raw or "").strip()
if not cleaned:
return fallback
try:
return json.loads(cleaned)
except json.JSONDecodeError:
return fallback
def _dump(value: Any) -> str:
return json.dumps(value or {}, ensure_ascii=False)
def ensure_schema() -> None:
schema = """
CREATE TABLE IF NOT EXISTS oneliner_profiles (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
assistant_id TEXT NOT NULL DEFAULT '',
display_name TEXT NOT NULL DEFAULT 'OneLiner',
long_term_goal TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
default_platform TEXT NOT NULL DEFAULT '',
config_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, project_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS oneliner_sessions (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
profile_id TEXT,
title TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'active',
preferred_platform TEXT NOT NULL DEFAULT '',
last_platform TEXT NOT NULL DEFAULT '',
last_intent_key TEXT NOT NULL DEFAULT '',
last_message_at TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(profile_id) REFERENCES oneliner_profiles(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS oneliner_messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL DEFAULT '',
plan_json TEXT NOT NULL DEFAULT '{}',
result_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(session_id) REFERENCES oneliner_sessions(id) ON DELETE CASCADE,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS platform_agent_profiles (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
platform TEXT NOT NULL,
assistant_id TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
mission TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'active',
config_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, project_id, platform),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS agent_memories (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
agent_scope TEXT NOT NULL,
platform TEXT NOT NULL DEFAULT '',
subject_type TEXT NOT NULL DEFAULT 'project',
subject_id TEXT NOT NULL DEFAULT '',
memory_key TEXT NOT NULL,
title TEXT NOT NULL DEFAULT '',
summary TEXT NOT NULL DEFAULT '',
details_json TEXT NOT NULL DEFAULT '{}',
confidence REAL NOT NULL DEFAULT 0.7,
last_validated_at TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, project_id, agent_scope, platform, subject_type, subject_id, memory_key),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS agent_skills (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
agent_scope TEXT NOT NULL,
platform TEXT NOT NULL DEFAULT '',
parent_skill_id TEXT NOT NULL DEFAULT '',
skill_key TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'draft',
method_json TEXT NOT NULL DEFAULT '{}',
test_spec_json TEXT NOT NULL DEFAULT '{}',
last_result_json TEXT NOT NULL DEFAULT '{}',
success_count INTEGER NOT NULL DEFAULT 0,
failure_count INTEGER NOT NULL DEFAULT 0,
last_score REAL NOT NULL DEFAULT 0,
last_validated_at TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, project_id, agent_scope, platform, skill_key),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS admin_ops_incidents (
id TEXT PRIMARY KEY,
tenant_user_id TEXT NOT NULL DEFAULT '',
tenant_project_id TEXT NOT NULL DEFAULT '',
source_type TEXT NOT NULL,
source_id TEXT NOT NULL DEFAULT '',
severity TEXT NOT NULL DEFAULT 'warn',
title TEXT NOT NULL,
summary TEXT NOT NULL DEFAULT '',
payload_json TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'open',
assigned_to TEXT NOT NULL DEFAULT '',
reviewed_by TEXT NOT NULL DEFAULT '',
review_notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(source_type, source_id, title),
FOREIGN KEY(tenant_user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(tenant_project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(assigned_to) REFERENCES accounts(id) ON DELETE SET NULL,
FOREIGN KEY(reviewed_by) REFERENCES accounts(id) ON DELETE SET NULL
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
ensure_schema()
@app.on_event("startup")
def _startup_oneliner_schema() -> None:
ensure_schema()
def _resolve_project(account: dict[str, Any], project_id: str | None) -> dict[str, Any]:
return legacy.resolve_target_project(account["id"], project_id or None, username=account["username"])
def _resolve_assistant(account: dict[str, Any], assistant_id: str | None, project_id: str = "") -> dict[str, Any] | None:
return legacy.resolve_target_assistant(account["id"], assistant_id or None, project_id)
def _safe_platform(platform_value: str | None, fallback: str = "douyin") -> str:
return legacy.ensure_domestic_platform(platform_value or fallback, allow_blank=not fallback) or fallback
def _fetch_profile_row(account: dict[str, Any], project_id: str = "") -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM oneliner_profiles WHERE user_id = ? AND project_id = ?",
(account["id"], project_id),
)
def _profile_payload(row: dict[str, Any], *, account: dict[str, Any] | None = None) -> dict[str, Any]:
assistant = None
if row.get("assistant_id"):
assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],))
if assistant_row and (not account or assistant_row.get("user_id") == account["id"]):
assistant = legacy.assistant_payload(assistant_row)
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"assistant_id": row.get("assistant_id", ""),
"display_name": row.get("display_name", "OneLiner"),
"long_term_goal": row.get("long_term_goal", ""),
"notes": row.get("notes", ""),
"default_platform": row.get("default_platform", ""),
"config": _parse_json(row.get("config_json"), {}),
"assistant": assistant,
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _ensure_oneliner_profile(account: dict[str, Any], project_id: str = "") -> dict[str, Any]:
row = _fetch_profile_row(account, project_id)
if row:
return row
assistant_id = ""
if project_id:
assistant_row = legacy.db.fetch_one(
"SELECT * FROM assistants WHERE user_id = ? AND (project_id = ? OR project_id = '') ORDER BY created_at ASC LIMIT 1",
(account["id"], project_id),
)
else:
assistant_row = legacy.db.fetch_one(
"SELECT * FROM assistants WHERE user_id = ? ORDER BY created_at ASC LIMIT 1",
(account["id"],),
)
if assistant_row:
assistant_id = assistant_row["id"]
profile_id = make_id("oneliner")
created_at = now()
default_platform = "douyin"
legacy.db.execute(
"""
INSERT INTO oneliner_profiles (
id, user_id, project_id, assistant_id, display_name, long_term_goal, notes,
default_platform, config_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
profile_id,
account["id"],
project_id,
assistant_id,
"OneLiner",
"",
"",
default_platform,
_dump({"chat_only_for_unreleased_ui": True}),
created_at,
created_at,
),
)
return legacy.db.fetch_one("SELECT * FROM oneliner_profiles WHERE id = ?", (profile_id,))
def _list_platform_profiles(account: dict[str, Any], project_id: str = "") -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? ORDER BY platform ASC",
(account["id"], project_id),
)
mapping = {row["platform"]: row for row in rows}
results: list[dict[str, Any]] = []
for platform in sorted(legacy.DOMESTIC_PLATFORMS):
row = mapping.get(platform)
payload = _platform_agent_payload(account, row, platform=platform, project_id=project_id)
results.append(payload)
return results
def _platform_agent_payload(
account: dict[str, Any],
row: dict[str, Any] | None,
*,
platform: str,
project_id: str = "",
) -> dict[str, Any]:
assistant = None
if row and row.get("assistant_id"):
assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],))
if assistant_row and assistant_row.get("user_id") == account["id"]:
assistant = legacy.assistant_payload(assistant_row)
memory_count = legacy.db.fetch_one(
"""
SELECT COUNT(*) AS count FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
""",
(account["id"], project_id, platform),
)["count"]
skill_count = legacy.db.fetch_one(
"""
SELECT COUNT(*) AS count FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
""",
(account["id"], project_id, platform),
)["count"]
return {
"id": row["id"] if row else "",
"user_id": account["id"],
"project_id": project_id,
"platform": platform,
"platform_label": legacy.platform_label(platform),
"assistant_id": row.get("assistant_id", "") if row else "",
"name": row.get("name", f"{legacy.platform_label(platform)} Agent") if row else f"{legacy.platform_label(platform)} Agent",
"mission": row.get("mission", "") if row else "",
"notes": row.get("notes", "") if row else "",
"status": row.get("status", "draft") if row else "draft",
"config": _parse_json((row or {}).get("config_json"), {}),
"memory_count": memory_count,
"skill_count": skill_count,
"assistant": assistant,
"created_at": (row or {}).get("created_at", ""),
"updated_at": (row or {}).get("updated_at", ""),
}
def _memory_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"agent_scope": row.get("agent_scope", ""),
"platform": row.get("platform", ""),
"platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "",
"subject_type": row.get("subject_type", ""),
"subject_id": row.get("subject_id", ""),
"memory_key": row.get("memory_key", ""),
"title": row.get("title", ""),
"summary": row.get("summary", ""),
"details": _parse_json(row.get("details_json"), {}),
"confidence": float(row.get("confidence") or 0),
"last_validated_at": row.get("last_validated_at", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _skill_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"agent_scope": row.get("agent_scope", ""),
"platform": row.get("platform", ""),
"platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "",
"parent_skill_id": row.get("parent_skill_id", ""),
"skill_key": row.get("skill_key", ""),
"name": row.get("name", ""),
"status": row.get("status", "draft"),
"method": _parse_json(row.get("method_json"), {}),
"test_spec": _parse_json(row.get("test_spec_json"), {}),
"last_result": _parse_json(row.get("last_result_json"), {}),
"success_count": int(row.get("success_count") or 0),
"failure_count": int(row.get("failure_count") or 0),
"last_score": float(row.get("last_score") or 0),
"last_validated_at": row.get("last_validated_at", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _session_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"profile_id": row.get("profile_id", ""),
"title": row.get("title", ""),
"status": row.get("status", "active"),
"preferred_platform": row.get("preferred_platform", ""),
"last_platform": row.get("last_platform", ""),
"last_intent_key": row.get("last_intent_key", ""),
"last_message_at": row.get("last_message_at", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _message_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"session_id": row["session_id"],
"user_id": row["user_id"],
"role": row.get("role", "user"),
"content": row.get("content", ""),
"plan": _parse_json(row.get("plan_json"), {}),
"result": _parse_json(row.get("result_json"), {}),
"created_at": row["created_at"],
}
def _incident_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"tenant_user_id": row.get("tenant_user_id", ""),
"tenant_project_id": row.get("tenant_project_id", ""),
"source_type": row.get("source_type", ""),
"source_id": row.get("source_id", ""),
"severity": row.get("severity", "warn"),
"title": row.get("title", ""),
"summary": row.get("summary", ""),
"payload": _parse_json(row.get("payload_json"), {}),
"status": row.get("status", "open"),
"assigned_to": row.get("assigned_to", ""),
"reviewed_by": row.get("reviewed_by", ""),
"review_notes": row.get("review_notes", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _load_owned_session(session_id: str, account: dict[str, Any]) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM oneliner_sessions WHERE id = ? AND user_id = ?",
(session_id, account["id"]),
)
if not row:
raise HTTPException(status_code=404, detail="OneLiner session not found")
return row
def _deterministic_intent(message: str, platform_hint: str, account: dict[str, Any]) -> dict[str, Any]:
text = message.strip()
lowered = text.lower()
platform = normalize_platform_from_text(text) or _safe_platform(platform_hint or "", fallback="")
intent_key = "custom"
confidence = 0.45
summary = "先理解目标,再把任务路由到合适的平台 Agent 或固定能力。"
if any(keyword in text for keyword in ("新建项目", "创建项目", "建项目")):
intent_key = "create_project"
confidence = 0.96
summary = "这是一个新项目启动诉求,优先进入项目创建流。"
elif any(keyword in lowered for keyword in ("create agent",)) or any(keyword in text for keyword in ("创建agent", "创建 Agent", "新建agent", "新建 Agent")):
intent_key = "create_assistant"
confidence = 0.96
summary = "这是定义新 Agent 的需求,适合直接进入 Agent 创建流。"
elif any(keyword in text for keyword in ("导入主页", "主页链接", "账号主页", "主页账号")):
intent_key = "import_homepage"
confidence = 0.9
summary = "这是账号主页导入诉求,适合进入内容源接入。"
elif any(keyword in text for keyword in ("跟踪", "日报", "更新提醒", "持续跟")):
intent_key = "track_account"
confidence = 0.9
summary = "这是持续跟踪类任务,适合交给平台 Agent 和跟踪摘要链。"
elif any(keyword in text for keyword in ("高分", "爆款", "优质作品", "高表现")):
intent_key = "analyze_top_videos"
confidence = 0.88
summary = "这是高表现内容拆解诉求,优先分析高分作品。"
elif any(keyword in text for keyword in ("对标", "分析账号", "调研", "拆账号")):
intent_key = "analyze_account"
confidence = 0.86
summary = "这是账号层面的调研任务,优先交给对应平台 Agent。"
elif any(keyword in text for keyword in ("文案", "脚本", "口播", "改写")):
intent_key = "generate_copy"
confidence = 0.88
summary = "这是文案/脚本生成任务,适合走 Agent 生成链。"
elif any(keyword in text for keyword in ("AI视频", "AI 视频", "生成视频")):
intent_key = "ai_video"
confidence = 0.9
summary = "这是 AI 视频生产任务,适合走 AI 视频链。"
elif any(keyword in text for keyword in ("实拍", "剪辑", "混剪")):
intent_key = "real_cut"
confidence = 0.9
summary = "这是实拍剪辑任务,适合走 cutvideo 链。"
elif any(keyword in text for keyword in ("直播", "录制", "开录")):
intent_key = "live_recorder"
confidence = 0.9
summary = "这是直播录制任务,适合走 NAS 录制能力。"
elif any(keyword in text for keyword in ("复盘", "发布总结", "回看数据")):
intent_key = "review"
confidence = 0.84
summary = "这是发布复盘任务,适合进入复盘工作台。"
elif any(keyword in text for keyword in ("空间", "缓存", "存储", "NAS")):
intent_key = "storage_status"
confidence = 0.82
summary = "这是存储状态问题,适合查看租户存储面板。"
elif account.get("role") == "super_admin" and any(keyword in text for keyword in ("报错", "日志", "故障", "运维", "修复")):
intent_key = "ops_admin"
confidence = 0.84
summary = "这是平台级运维诉求,只能交给管理员运维/审计能力。"
return {
"intent_key": intent_key,
"platform": platform or "",
"confidence": confidence,
"summary": summary,
"reasoning_mode": "deterministic-first",
}
def normalize_platform_from_text(text: str) -> str:
for key, value in legacy.PLATFORM_ALIASES.items():
if key and key in text.lower():
if value in legacy.DOMESTIC_PLATFORMS:
return value
for key, value in legacy.PLATFORM_ALIASES.items():
if key and key in text:
if value in legacy.DOMESTIC_PLATFORMS:
return value
return ""
async def _model_refine_intent(
account: dict[str, Any],
*,
project_id: str,
message: str,
platform_hint: str,
) -> dict[str, Any]:
profile = legacy.model_profile_for_account(account["id"], None)
if not profile:
raise HTTPException(status_code=503, detail="No model profile available")
system_prompt = (
"你是 StoryForge 的 OneLiner 总控主Agent只负责把用户目标分类成安全的系统动作。"
"必须输出 JSON不要输出 Markdown。"
)
user_prompt = (
f"用户角色:{account.get('role','user')}\n"
f"项目:{project_id or '默认项目'}\n"
f"平台提示:{platform_hint or '未指定'}\n"
f"用户原话:{message}\n\n"
"请输出 JSON"
"{"
'"intent_key":"","platform":"","confidence":0.0,'
'"summary":"","needs_oneliner_only":false,'
'"remember_preference":false'
"}\n"
"intent_key 只能取create_project, create_assistant, import_homepage, track_account, analyze_account, analyze_top_videos, generate_copy, ai_video, real_cut, review, live_recorder, storage_status, ops_admin, custom。"
"如果前端 UI 还没有明确产品化needs_oneliner_only 返回 true。"
)
raw = await legacy.call_model(profile, system_prompt, user_prompt, temperature=0.1)
parsed = legacy.parse_json_object(raw)
if not parsed:
raise HTTPException(status_code=502, detail="OneLiner planner returned empty result")
return {
"intent_key": str(parsed.get("intent_key") or "custom").strip() or "custom",
"platform": normalize_platform_from_text(str(parsed.get("platform") or "")) or _safe_platform(platform_hint or "", fallback=""),
"confidence": float(parsed.get("confidence") or 0),
"summary": str(parsed.get("summary") or "").strip() or "已按模型判断用户目标。",
"needs_oneliner_only": bool(parsed.get("needs_oneliner_only")),
"remember_preference": bool(parsed.get("remember_preference")),
"reasoning_mode": "model-refine",
}
async def _plan_oneliner_request(
account: dict[str, Any],
*,
project_id: str,
message: str,
platform_hint: str,
) -> dict[str, Any]:
plan = _deterministic_intent(message, platform_hint, account)
if plan["confidence"] < 0.82:
try:
refined = await _model_refine_intent(account, project_id=project_id, message=message, platform_hint=platform_hint)
if refined.get("confidence", 0) >= plan.get("confidence", 0):
plan = {**plan, **refined}
except Exception:
pass
intent_key = plan.get("intent_key") or "custom"
actions = INTENT_ACTIONS.get(intent_key, [])
if intent_key == "ops_admin" and account.get("role") != "super_admin":
actions = []
plan["summary"] = "这是平台级运维诉求,但当前账号没有管理员权限。"
plan["needs_oneliner_only"] = True
ui_supported = bool(actions)
if intent_key == "custom":
plan["needs_oneliner_only"] = True
plan["ui_supported"] = ui_supported
plan["delivery_mode"] = "ui" if ui_supported and not plan.get("needs_oneliner_only") else "oneliner"
plan["suggested_actions"] = actions
plan["intent_label"] = INTENT_LABELS.get(intent_key, intent_key)
plan["platform_label"] = legacy.platform_label(plan.get("platform")) if plan.get("platform") else "待判断"
plan["economicity"] = {
"policy": "deterministic-first",
"explanation": "先走固定流程,再走平台 Agent最后才升级到 OneLiner 深度调度。",
}
return plan
def _remember_message_preference(
account: dict[str, Any],
*,
project_id: str,
plan: dict[str, Any],
message: str,
) -> dict[str, Any] | None:
cues = ("记住", "以后", "长期", "默认", "一直", "优先")
if not plan.get("remember_preference") and not any(cue in message for cue in cues):
return None
agent_scope = "oneliner"
platform = plan.get("platform") or ""
subject_type = "project" if project_id else "account"
subject_id = project_id or account["id"]
memory_key = f"preference::{plan.get('intent_key') or 'custom'}"
existing = legacy.db.fetch_one(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ?
AND subject_type = ? AND subject_id = ? AND memory_key = ?
""",
(account["id"], project_id, agent_scope, platform, subject_type, subject_id, memory_key),
)
timestamp = now()
details = {
"captured_from": "oneliner_chat",
"intent_key": plan.get("intent_key", "custom"),
"source_message": message,
"platform": platform,
}
if existing:
legacy.db.execute(
"""
UPDATE agent_memories
SET title = ?, summary = ?, details_json = ?, confidence = ?, last_validated_at = ?, updated_at = ?
WHERE id = ?
""",
(
f"{INTENT_LABELS.get(plan.get('intent_key') or 'custom', '偏好')}偏好",
message.strip()[:280],
_dump(details),
0.82,
timestamp,
timestamp,
existing["id"],
),
)
stored = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (existing["id"],))
else:
memory_id = make_id("mem")
legacy.db.execute(
"""
INSERT INTO agent_memories (
id, user_id, project_id, agent_scope, platform, subject_type, subject_id,
memory_key, title, summary, details_json, confidence, last_validated_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
memory_id,
account["id"],
project_id,
agent_scope,
platform,
subject_type,
subject_id,
memory_key,
f"{INTENT_LABELS.get(plan.get('intent_key') or 'custom', '偏好')}偏好",
message.strip()[:280],
_dump(details),
0.82,
timestamp,
timestamp,
timestamp,
),
)
stored = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (memory_id,))
return _memory_payload(stored) if stored else None
def _session_context_summary(account: dict[str, Any], project_id: str, platform: str) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
assistant = None
profile_row = _fetch_profile_row(account, project["id"])
if profile_row and profile_row.get("assistant_id"):
assistant = _resolve_assistant(account, profile_row.get("assistant_id"), project["id"])
platform_profile = legacy.db.fetch_one(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project["id"], platform),
) if platform else None
return {
"project": legacy.project_payload(project),
"oneliner_profile": _profile_payload(profile_row, account=account) if profile_row else None,
"platform_agent": _platform_agent_payload(account, platform_profile, platform=platform, project_id=project["id"]) if platform else None,
"assistant": legacy.assistant_payload(assistant) if assistant else None,
}
async def _generate_oneliner_reply(
account: dict[str, Any],
*,
project_id: str,
message: str,
plan: dict[str, Any],
) -> dict[str, Any]:
context = _session_context_summary(account, project_id or "", plan.get("platform") or "")
summary_lines = [
f"我理解你的目标是:{plan.get('intent_label', '自定义任务')}",
f"建议优先处理的平台:{plan.get('platform_label', '待判断')}",
plan.get("summary", ""),
]
if plan.get("delivery_mode") == "oneliner":
summary_lines.append("这项能力当前更适合先由 OneLiner 对话承接,而不是要求你先理解前端功能树。")
if plan.get("intent_key") == "ops_admin" and account.get("role") != "super_admin":
summary_lines.append("当前账号不是平台最高权限用户,所以我不会放出运维 Agent 入口。")
if context.get("platform_agent"):
summary_lines.append(f"当前 {context['platform_agent']['platform_label']} Agent 已绑定:{context['platform_agent'].get('assistant', {}).get('name') or '未绑定执行 Agent'}")
return {
"summary_text": "\n".join([line for line in summary_lines if line.strip()]),
"context": context,
"safe_boundary": {
"core_code_locked": True,
"tenant_isolation": True,
"ops_admin_only": True,
},
}
def _insert_message(session_id: str, account_id: str, role: str, content: str, plan: dict[str, Any], result: dict[str, Any]) -> dict[str, Any]:
message_id = make_id("oline_msg")
created_at = now()
legacy.db.execute(
"""
INSERT INTO oneliner_messages (id, session_id, user_id, role, content, plan_json, result_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(message_id, session_id, account_id, role, content, _dump(plan), _dump(result), created_at),
)
return legacy.db.fetch_one("SELECT * FROM oneliner_messages WHERE id = ?", (message_id,))
def _upsert_platform_profile(account: dict[str, Any], platform: str, request: PlatformAgentProfileRequest) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
assistant = _resolve_assistant(account, request.assistant_id or None, project["id"])
existing = legacy.db.fetch_one(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project["id"], platform),
)
timestamp = now()
if existing:
legacy.db.execute(
"""
UPDATE platform_agent_profiles
SET assistant_id = ?, name = ?, mission = ?, notes = ?, status = ?, config_json = ?, updated_at = ?
WHERE id = ?
""",
(
(assistant or {}).get("id", ""),
request.name.strip() or existing.get("name") or f"{legacy.platform_label(platform)} Agent",
request.mission.strip(),
request.notes.strip(),
request.status.strip() or "active",
_dump(request.config),
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (existing["id"],))
else:
profile_id = make_id("plat_agent")
legacy.db.execute(
"""
INSERT INTO platform_agent_profiles (
id, user_id, project_id, platform, assistant_id, name, mission, notes, status, config_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
profile_id,
account["id"],
project["id"],
platform,
(assistant or {}).get("id", ""),
request.name.strip() or f"{legacy.platform_label(platform)} Agent",
request.mission.strip(),
request.notes.strip(),
request.status.strip() or "active",
_dump(request.config),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (profile_id,))
return _platform_agent_payload(account, row, platform=platform, project_id=project["id"])
def _upsert_memory(
account: dict[str, Any],
*,
agent_scope: str,
platform: str,
request: AgentMemoryUpsertRequest,
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
subject_type = request.subject_type.strip() or "project"
subject_id = request.subject_id.strip() or (project["id"] if subject_type == "project" else account["id"])
existing = legacy.db.fetch_one(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ?
AND subject_type = ? AND subject_id = ? AND memory_key = ?
""",
(account["id"], project["id"], agent_scope, platform, subject_type, subject_id, request.memory_key.strip()),
)
timestamp = now()
if existing:
legacy.db.execute(
"""
UPDATE agent_memories
SET title = ?, summary = ?, details_json = ?, confidence = ?, last_validated_at = ?, updated_at = ?
WHERE id = ?
""",
(
request.title.strip(),
request.summary.strip(),
_dump(request.details),
request.confidence,
timestamp,
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (existing["id"],))
else:
memory_id = make_id("mem")
legacy.db.execute(
"""
INSERT INTO agent_memories (
id, user_id, project_id, agent_scope, platform, subject_type, subject_id,
memory_key, title, summary, details_json, confidence, last_validated_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
memory_id,
account["id"],
project["id"],
agent_scope,
platform,
subject_type,
subject_id,
request.memory_key.strip(),
request.title.strip(),
request.summary.strip(),
_dump(request.details),
request.confidence,
timestamp,
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (memory_id,))
return _memory_payload(row)
def _upsert_skill(
account: dict[str, Any],
*,
agent_scope: str,
platform: str,
request: AgentSkillUpsertRequest,
skill_id: str | None = None,
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
existing = None
if skill_id:
existing = legacy.db.fetch_one(
"SELECT * FROM agent_skills WHERE id = ? AND user_id = ?",
(skill_id, account["id"]),
)
if not existing:
raise HTTPException(status_code=404, detail="Agent skill not found")
else:
existing = legacy.db.fetch_one(
"""
SELECT * FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ? AND skill_key = ?
""",
(account["id"], project["id"], agent_scope, platform, request.skill_key.strip()),
)
timestamp = now()
if existing:
legacy.db.execute(
"""
UPDATE agent_skills
SET name = ?, status = ?, method_json = ?, test_spec_json = ?, last_result_json = ?,
success_count = ?, failure_count = ?, last_score = ?, last_validated_at = ?, updated_at = ?
WHERE id = ?
""",
(
request.name.strip(),
request.status.strip() or "draft",
_dump(request.method),
_dump(request.test_spec),
_dump(request.last_result),
request.success_count,
request.failure_count,
request.last_score,
timestamp,
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (existing["id"],))
else:
new_id = make_id("skill")
legacy.db.execute(
"""
INSERT INTO agent_skills (
id, user_id, project_id, agent_scope, platform, parent_skill_id, skill_key, name, status,
method_json, test_spec_json, last_result_json, success_count, failure_count, last_score,
last_validated_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, '', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
new_id,
account["id"],
project["id"],
agent_scope,
platform,
request.skill_key.strip(),
request.name.strip(),
request.status.strip() or "draft",
_dump(request.method),
_dump(request.test_spec),
_dump(request.last_result),
request.success_count,
request.failure_count,
request.last_score,
timestamp,
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (new_id,))
return _skill_payload(row)
def _create_or_update_incident(
*,
tenant_user_id: str,
tenant_project_id: str,
source_type: str,
source_id: str,
severity: str,
title: str,
summary: str,
payload: dict[str, Any],
) -> dict[str, Any]:
existing = legacy.db.fetch_one(
"SELECT * FROM admin_ops_incidents WHERE source_type = ? AND source_id = ? AND title = ?",
(source_type, source_id, title),
)
timestamp = now()
if existing:
legacy.db.execute(
"""
UPDATE admin_ops_incidents
SET tenant_user_id = ?, tenant_project_id = ?, severity = ?, summary = ?, payload_json = ?, updated_at = ?
WHERE id = ?
""",
(
tenant_user_id,
tenant_project_id,
severity,
summary,
_dump(payload),
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (existing["id"],))
else:
incident_id = make_id("incident")
legacy.db.execute(
"""
INSERT INTO admin_ops_incidents (
id, tenant_user_id, tenant_project_id, source_type, source_id, severity, title,
summary, payload_json, status, assigned_to, reviewed_by, review_notes, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'open', '', '', '', ?, ?)
""",
(
incident_id,
tenant_user_id,
tenant_project_id,
source_type,
source_id,
severity,
title,
summary,
_dump(payload),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,))
return _incident_payload(row)
def _scan_admin_incidents(admin: dict[str, Any]) -> dict[str, Any]:
_ = admin
created: list[dict[str, Any]] = []
failed_jobs = legacy.db.fetch_all(
"SELECT * FROM jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 20"
)
for job in failed_jobs:
created.append(
_create_or_update_incident(
tenant_user_id=job.get("user_id", "") or "",
tenant_project_id=job.get("project_id", "") or "",
source_type="job",
source_id=job["id"],
severity="error",
title=f"失败任务:{job.get('title') or job['id']}",
summary=job.get("error", "")[:280] or "任务失败,需要检查执行链。",
payload=legacy.job_payload(job),
)
)
try:
integration_health = legacy.integrations_health(admin)
except Exception as exc:
integration_health = {"collector": {"reachable": False, "error": str(exc)}}
for key, payload in integration_health.items():
reachable = bool(payload.get("reachable", False))
if reachable and key != "cutvideo":
continue
if key == "cutvideo" and payload.get("supports_uploads", True):
continue
created.append(
_create_or_update_incident(
tenant_user_id="",
tenant_project_id="",
source_type="integration",
source_id=key,
severity="warn" if key in {"cutvideo", "live_recorder"} else "error",
title=f"集成异常:{key}",
summary=str(payload.get("error") or payload.get("upload_error") or "集成健康检查未通过")[:280],
payload=payload,
)
)
return {
"created_or_updated": created,
"count": len(created),
}
@app.get("/v2/oneliner/profile")
def get_oneliner_profile(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
row = _ensure_oneliner_profile(account, project["id"])
return _profile_payload(row, account=account)
@app.put("/v2/oneliner/profile")
def put_oneliner_profile(
request: OneLinerProfileRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
assistant = _resolve_assistant(account, request.assistant_id or None, project["id"])
existing = _ensure_oneliner_profile(account, project["id"])
timestamp = now()
legacy.db.execute(
"""
UPDATE oneliner_profiles
SET assistant_id = ?, display_name = ?, long_term_goal = ?, notes = ?, default_platform = ?, config_json = ?, updated_at = ?
WHERE id = ?
""",
(
(assistant or {}).get("id", ""),
request.display_name.strip() or "OneLiner",
request.long_term_goal.strip(),
request.notes.strip(),
_safe_platform(request.default_platform or existing.get("default_platform") or "douyin"),
_dump(request.config),
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM oneliner_profiles WHERE id = ?", (existing["id"],))
return _profile_payload(row, account=account)
@app.get("/v2/oneliner/sessions")
def list_oneliner_sessions(
project_id: str | None = Query(default=None),
limit: int = Query(default=20, ge=1, le=100),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
rows = legacy.db.fetch_all(
"""
SELECT * FROM oneliner_sessions
WHERE user_id = ? AND project_id = ?
ORDER BY updated_at DESC
LIMIT ?
""",
(account["id"], project["id"], limit),
)
items = [_session_payload(row) for row in rows]
return {"items": items, "count": len(items)}
@app.post("/v2/oneliner/sessions")
async def create_oneliner_session(
request: OneLinerSessionCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
profile = _ensure_oneliner_profile(account, project["id"])
session_id = make_id("oline")
timestamp = now()
preferred_platform = _safe_platform(request.preferred_platform or profile.get("default_platform") or "douyin")
title = request.title.strip() or "新的 OneLiner 会话"
legacy.db.execute(
"""
INSERT INTO oneliner_sessions (
id, user_id, project_id, profile_id, title, status, preferred_platform,
last_platform, last_intent_key, last_message_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, 'active', ?, '', '', ?, ?, ?)
""",
(
session_id,
account["id"],
project["id"],
profile["id"],
title,
preferred_platform,
timestamp,
timestamp,
timestamp,
),
)
session_row = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session_id,))
if request.initial_message.strip():
await post_oneliner_message(
session_id,
OneLinerMessageRequest(
content=request.initial_message,
project_id=project["id"],
platform=preferred_platform,
),
account,
)
session_row = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session_id,))
return _session_payload(session_row)
@app.get("/v2/oneliner/sessions/{session_id}/messages")
def list_oneliner_messages(
session_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
session = _load_owned_session(session_id, account)
rows = legacy.db.fetch_all(
"SELECT * FROM oneliner_messages WHERE session_id = ? ORDER BY created_at ASC",
(session["id"],),
)
items = [_message_payload(row) for row in rows]
return {"session": _session_payload(session), "items": items, "count": len(items)}
@app.post("/v2/oneliner/sessions/{session_id}/messages")
async def post_oneliner_message(
session_id: str,
request: OneLinerMessageRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
session = _load_owned_session(session_id, account)
project = _resolve_project(account, request.project_id or session.get("project_id") or None)
plan = await _plan_oneliner_request(
account,
project_id=project["id"],
message=request.content,
platform_hint=request.platform or session.get("preferred_platform") or "",
)
user_message = _insert_message(session["id"], account["id"], "user", request.content.strip(), {}, {})
remembered = _remember_message_preference(account, project_id=project["id"], plan=plan, message=request.content)
result = await _generate_oneliner_reply(account, project_id=project["id"], message=request.content, plan=plan)
if remembered:
result["remembered_memory"] = remembered
assistant_message = _insert_message(session["id"], account["id"], "assistant", result["summary_text"], plan, result)
session_title = session.get("title") or request.content.strip()[:28] or "新的 OneLiner 会话"
timestamp = now()
legacy.db.execute(
"""
UPDATE oneliner_sessions
SET title = ?, last_platform = ?, last_intent_key = ?, last_message_at = ?, updated_at = ?
WHERE id = ?
""",
(
session_title,
plan.get("platform", ""),
plan.get("intent_key", ""),
timestamp,
timestamp,
session["id"],
),
)
updated_session = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session["id"],))
return {
"session": _session_payload(updated_session),
"user_message": _message_payload(user_message),
"assistant_message": _message_payload(assistant_message),
"plan": plan,
"result": result,
}
@app.get("/v2/platform-agents")
def list_platform_agents(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
items = _list_platform_profiles(account, project["id"])
return {"items": items, "count": len(items)}
@app.put("/v2/platform-agents/{platform}/profile")
def update_platform_agent(
platform: str,
request: PlatformAgentProfileRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _upsert_platform_profile(account, normalized_platform, request)
@app.get("/v2/platform-agents/{platform}/memories")
def list_platform_memories(
platform: str,
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
normalized_platform = _safe_platform(platform)
rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY updated_at DESC
""",
(account["id"], project["id"], normalized_platform),
)
items = [_memory_payload(row) for row in rows]
return {"items": items, "count": len(items)}
@app.post("/v2/platform-agents/{platform}/memories")
def upsert_platform_memory(
platform: str,
request: AgentMemoryUpsertRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _upsert_memory(account, agent_scope="platform", platform=normalized_platform, request=request)
@app.get("/v2/platform-agents/{platform}/skills")
def list_platform_skills(
platform: str,
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
normalized_platform = _safe_platform(platform)
rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY updated_at DESC
""",
(account["id"], project["id"], normalized_platform),
)
items = [_skill_payload(row) for row in rows]
return {"items": items, "count": len(items)}
@app.post("/v2/platform-agents/{platform}/skills")
def create_platform_skill(
platform: str,
request: AgentSkillUpsertRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _upsert_skill(account, agent_scope="platform", platform=normalized_platform, request=request)
@app.patch("/v2/platform-agents/{platform}/skills/{skill_id}")
def update_platform_skill(
platform: str,
skill_id: str,
request: AgentSkillUpsertRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _upsert_skill(account, agent_scope="platform", platform=normalized_platform, request=request, skill_id=skill_id)
@app.get("/v2/admin/ops/overview")
def admin_ops_overview(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]:
incidents = [
_incident_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM admin_ops_incidents ORDER BY updated_at DESC LIMIT 50"
)
]
failed_jobs = [
legacy.job_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 12"
)
]
pending_accounts = [legacy.normalize_account(row) for row in legacy.db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC LIMIT 20")]
return {
"incidents": incidents,
"incident_count": len(incidents),
"failed_jobs": failed_jobs,
"failed_job_count": len(failed_jobs),
"pending_accounts": pending_accounts,
"pending_account_count": len(pending_accounts),
"integration_health": legacy.integrations_health(admin),
}
@app.post("/v2/admin/ops/incidents/scan")
def admin_ops_scan(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]:
return _scan_admin_incidents(admin)
@app.patch("/v2/admin/ops/incidents/{incident_id}")
def review_admin_incident(
incident_id: str,
request: AdminIncidentReviewRequest,
admin: dict[str, Any] = Depends(legacy.require_super_admin),
) -> dict[str, Any]:
current = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,))
if not current:
raise HTTPException(status_code=404, detail="Incident not found")
timestamp = now()
legacy.db.execute(
"""
UPDATE admin_ops_incidents
SET status = ?, reviewed_by = ?, review_notes = ?, updated_at = ?
WHERE id = ?
""",
(
request.status.strip() or "reviewed",
admin["id"],
request.review_notes.strip(),
timestamp,
incident_id,
),
)
row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,))
return _incident_payload(row)