Files
storyforge/collector-service/app/oneliner_features.py
2026-03-23 15:31:36 +08:00

1964 lines
85 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
from typing import Any
from fastapi import Depends, HTTPException, Query
from pydantic import BaseModel, Field
class OneLinerProfileRequest(BaseModel):
project_id: str = ""
assistant_id: str = ""
display_name: str = "OneLiner"
long_term_goal: str = ""
notes: str = ""
default_platform: str = ""
config: dict[str, Any] = Field(default_factory=dict)
class OneLinerSessionCreateRequest(BaseModel):
project_id: str = ""
title: str = ""
preferred_platform: str = ""
initial_message: str = ""
class OneLinerMessageRequest(BaseModel):
content: str
project_id: str = ""
platform: str = ""
target_account_id: str = ""
remember_preference: bool = False
class PlatformAgentProfileRequest(BaseModel):
project_id: str = ""
assistant_id: str = ""
name: str = ""
mission: str = ""
notes: str = ""
status: str = "active"
config: dict[str, Any] = Field(default_factory=dict)
class AgentMemoryUpsertRequest(BaseModel):
project_id: str = ""
subject_type: str = "project"
subject_id: str = ""
memory_key: str
title: str = ""
summary: str
details: dict[str, Any] = Field(default_factory=dict)
confidence: float = Field(default=0.7, ge=0.0, le=1.0)
class AgentSkillUpsertRequest(BaseModel):
project_id: str = ""
skill_key: str
name: str
status: str = "draft"
method: dict[str, Any] = Field(default_factory=dict)
test_spec: dict[str, Any] = Field(default_factory=dict)
last_result: dict[str, Any] = Field(default_factory=dict)
success_count: int = Field(default=0, ge=0)
failure_count: int = Field(default=0, ge=0)
last_score: float = 0.0
class AdminIncidentReviewRequest(BaseModel):
status: str = "reviewed"
review_notes: str = ""
class PlatformAgentSelfCheckRequest(BaseModel):
project_id: str = ""
sample_limit: int = Field(default=3, ge=1, le=12)
remember_summary: bool = True
class PlatformSkillReviewRequest(BaseModel):
project_id: str = ""
accepted: bool = True
score: float = Field(default=0.8, ge=0.0, le=1.0)
status: str = ""
summary: str = ""
review_notes: str = ""
class OneLinerActionExecuteRequest(BaseModel):
action_key: str
project_id: str = ""
platform: str = ""
session_id: str = ""
payload: dict[str, Any] = Field(default_factory=dict)
INTENT_ACTIONS: dict[str, list[dict[str, Any]]] = {
"create_project": [{"key": "goto-intake", "label": "去我的项目", "kind": "navigate"}],
"create_assistant": [{"key": "open-create-assistant", "label": "创建 Agent", "kind": "ui_action"}],
"import_homepage": [{"key": "open-import-homepage", "label": "导入主页", "kind": "ui_action"}],
"track_account": [{"key": "open-track-selected-account", "label": "跟踪当前账号", "kind": "ui_action"}],
"analyze_account": [{"key": "analyze-selected-account", "label": "分析当前账号", "kind": "ui_action"}],
"analyze_top_videos": [{"key": "analyze-top-videos", "label": "分析高分作品", "kind": "ui_action"}],
"generate_copy": [{"key": "open-generate-copy", "label": "生成文案", "kind": "ui_action"}],
"ai_video": [{"key": "open-ai-video", "label": "做 AI 视频", "kind": "ui_action"}],
"real_cut": [{"key": "open-real-cut", "label": "做实拍剪辑", "kind": "ui_action"}],
"review": [{"key": "goto-review", "label": "去发布与复盘", "kind": "navigate"}],
"live_recorder": [{"key": "open-live-recorder", "label": "打开录制控制", "kind": "ui_action"}],
"storage_status": [{"key": "goto-production", "label": "查看生产与存储", "kind": "navigate"}],
"ops_admin": [{"key": "goto-automation", "label": "去自动流程", "kind": "navigate"}],
}
INTENT_LABELS = {
"create_project": "创建项目",
"create_assistant": "创建 Agent",
"import_homepage": "导入主页",
"track_account": "跟踪账号",
"analyze_account": "分析账号",
"analyze_top_videos": "分析高分作品",
"generate_copy": "生成文案",
"ai_video": "生成 AI 视频",
"real_cut": "实拍剪辑",
"review": "发布复盘",
"live_recorder": "直播录制",
"storage_status": "查看存储",
"ops_admin": "运维巡检",
"custom": "自定义任务",
}
def register_oneliner_routes(app: Any, legacy: Any) -> None:
def now() -> str:
return legacy.utc_now()
def make_id(prefix: str) -> str:
return legacy.make_id(prefix)
def _parse_json(raw: str | None, fallback: Any) -> Any:
cleaned = str(raw or "").strip()
if not cleaned:
return fallback
try:
return json.loads(cleaned)
except json.JSONDecodeError:
return fallback
def _dump(value: Any) -> str:
return json.dumps(value or {}, ensure_ascii=False)
def ensure_schema() -> None:
schema = """
CREATE TABLE IF NOT EXISTS oneliner_profiles (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
assistant_id TEXT NOT NULL DEFAULT '',
display_name TEXT NOT NULL DEFAULT 'OneLiner',
long_term_goal TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
default_platform TEXT NOT NULL DEFAULT '',
config_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, project_id),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS oneliner_sessions (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
profile_id TEXT,
title TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'active',
preferred_platform TEXT NOT NULL DEFAULT '',
last_platform TEXT NOT NULL DEFAULT '',
last_intent_key TEXT NOT NULL DEFAULT '',
last_message_at TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(profile_id) REFERENCES oneliner_profiles(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS oneliner_messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL DEFAULT '',
plan_json TEXT NOT NULL DEFAULT '{}',
result_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(session_id) REFERENCES oneliner_sessions(id) ON DELETE CASCADE,
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS platform_agent_profiles (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
platform TEXT NOT NULL,
assistant_id TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
mission TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'active',
config_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, project_id, platform),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(assistant_id) REFERENCES assistants(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS agent_memories (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
agent_scope TEXT NOT NULL,
platform TEXT NOT NULL DEFAULT '',
subject_type TEXT NOT NULL DEFAULT 'project',
subject_id TEXT NOT NULL DEFAULT '',
memory_key TEXT NOT NULL,
title TEXT NOT NULL DEFAULT '',
summary TEXT NOT NULL DEFAULT '',
details_json TEXT NOT NULL DEFAULT '{}',
confidence REAL NOT NULL DEFAULT 0.7,
last_validated_at TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, project_id, agent_scope, platform, subject_type, subject_id, memory_key),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS agent_skills (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
project_id TEXT NOT NULL DEFAULT '',
agent_scope TEXT NOT NULL,
platform TEXT NOT NULL DEFAULT '',
parent_skill_id TEXT NOT NULL DEFAULT '',
skill_key TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'draft',
method_json TEXT NOT NULL DEFAULT '{}',
test_spec_json TEXT NOT NULL DEFAULT '{}',
last_result_json TEXT NOT NULL DEFAULT '{}',
success_count INTEGER NOT NULL DEFAULT 0,
failure_count INTEGER NOT NULL DEFAULT 0,
last_score REAL NOT NULL DEFAULT 0,
last_validated_at TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(user_id, project_id, agent_scope, platform, skill_key),
FOREIGN KEY(user_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS admin_ops_incidents (
id TEXT PRIMARY KEY,
tenant_user_id TEXT NOT NULL DEFAULT '',
tenant_project_id TEXT NOT NULL DEFAULT '',
source_type TEXT NOT NULL,
source_id TEXT NOT NULL DEFAULT '',
severity TEXT NOT NULL DEFAULT 'warn',
title TEXT NOT NULL,
summary TEXT NOT NULL DEFAULT '',
payload_json TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'open',
assigned_to TEXT NOT NULL DEFAULT '',
reviewed_by TEXT NOT NULL DEFAULT '',
review_notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(source_type, source_id, title),
FOREIGN KEY(tenant_user_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY(tenant_project_id) REFERENCES projects(id) ON DELETE SET NULL,
FOREIGN KEY(assigned_to) REFERENCES accounts(id) ON DELETE SET NULL,
FOREIGN KEY(reviewed_by) REFERENCES accounts(id) ON DELETE SET NULL
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
ensure_schema()
@app.on_event("startup")
def _startup_oneliner_schema() -> None:
ensure_schema()
def _resolve_project(account: dict[str, Any], project_id: str | None) -> dict[str, Any]:
return legacy.resolve_target_project(account["id"], project_id or None, username=account["username"])
def _resolve_assistant(account: dict[str, Any], assistant_id: str | None, project_id: str = "") -> dict[str, Any] | None:
return legacy.resolve_target_assistant(account["id"], assistant_id or None, project_id)
def _safe_platform(platform_value: str | None, fallback: str = "douyin") -> str:
return legacy.ensure_domestic_platform(platform_value or fallback, allow_blank=not fallback) or fallback
def _route_supported(path: str) -> bool:
return any(getattr(route, "path", "") == path for route in app.routes)
def _platform_route_checks(platform: str) -> list[dict[str, Any]]:
checks = [
("accounts", f"/v2/{platform}/accounts"),
("workspace", f"/v2/{platform}/accounts/{{account_id}}/workspace"),
("videos", f"/v2/{platform}/accounts/{{account_id}}/videos"),
("analyze_account", f"/v2/{platform}/accounts/{{account_id}}/analysis"),
("analyze_top_videos", f"/v2/{platform}/accounts/{{account_id}}/videos/analyze-top"),
("similar_searches", f"/v2/{platform}/similar-searches"),
("benchmark_links", f"/v2/{platform}/accounts/{{account_id}}/benchmark-links"),
]
return [
{
"key": key,
"path": path,
"ok": _route_supported(path),
}
for key, path in checks
]
def _fetch_profile_row(account: dict[str, Any], project_id: str = "") -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM oneliner_profiles WHERE user_id = ? AND project_id = ?",
(account["id"], project_id),
)
def _profile_payload(row: dict[str, Any], *, account: dict[str, Any] | None = None) -> dict[str, Any]:
assistant = None
if row.get("assistant_id"):
assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],))
if assistant_row and (not account or assistant_row.get("user_id") == account["id"]):
assistant = legacy.assistant_payload(assistant_row)
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"assistant_id": row.get("assistant_id", ""),
"display_name": row.get("display_name", "OneLiner"),
"long_term_goal": row.get("long_term_goal", ""),
"notes": row.get("notes", ""),
"default_platform": row.get("default_platform", ""),
"config": _parse_json(row.get("config_json"), {}),
"assistant": assistant,
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _ensure_oneliner_profile(account: dict[str, Any], project_id: str = "") -> dict[str, Any]:
row = _fetch_profile_row(account, project_id)
if row:
return row
assistant_id = ""
if project_id:
assistant_row = legacy.db.fetch_one(
"SELECT * FROM assistants WHERE user_id = ? AND (project_id = ? OR project_id = '') ORDER BY created_at ASC LIMIT 1",
(account["id"], project_id),
)
else:
assistant_row = legacy.db.fetch_one(
"SELECT * FROM assistants WHERE user_id = ? ORDER BY created_at ASC LIMIT 1",
(account["id"],),
)
if assistant_row:
assistant_id = assistant_row["id"]
profile_id = make_id("oneliner")
created_at = now()
default_platform = "douyin"
legacy.db.execute(
"""
INSERT INTO oneliner_profiles (
id, user_id, project_id, assistant_id, display_name, long_term_goal, notes,
default_platform, config_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
profile_id,
account["id"],
project_id,
assistant_id,
"OneLiner",
"",
"",
default_platform,
_dump({"chat_only_for_unreleased_ui": True}),
created_at,
created_at,
),
)
return legacy.db.fetch_one("SELECT * FROM oneliner_profiles WHERE id = ?", (profile_id,))
def _list_platform_profiles(account: dict[str, Any], project_id: str = "") -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? ORDER BY platform ASC",
(account["id"], project_id),
)
mapping = {row["platform"]: row for row in rows}
results: list[dict[str, Any]] = []
for platform in sorted(legacy.DOMESTIC_PLATFORMS):
row = mapping.get(platform)
payload = _platform_agent_payload(account, row, platform=platform, project_id=project_id)
results.append(payload)
return results
def _platform_agent_payload(
account: dict[str, Any],
row: dict[str, Any] | None,
*,
platform: str,
project_id: str = "",
) -> dict[str, Any]:
assistant = None
if row and row.get("assistant_id"):
assistant_row = legacy.db.fetch_one("SELECT * FROM assistants WHERE id = ?", (row["assistant_id"],))
if assistant_row and assistant_row.get("user_id") == account["id"]:
assistant = legacy.assistant_payload(assistant_row)
memory_count = legacy.db.fetch_one(
"""
SELECT COUNT(*) AS count FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
""",
(account["id"], project_id, platform),
)["count"]
skill_count = legacy.db.fetch_one(
"""
SELECT COUNT(*) AS count FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
""",
(account["id"], project_id, platform),
)["count"]
recent_memory_row = legacy.db.fetch_one(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY updated_at DESC
LIMIT 1
""",
(account["id"], project_id, platform),
)
recent_skill_row = legacy.db.fetch_one(
"""
SELECT * FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY
CASE WHEN status = 'validated' THEN 0 WHEN status = 'draft' THEN 1 ELSE 2 END,
updated_at DESC
LIMIT 1
""",
(account["id"], project_id, platform),
)
readiness_items = [
bool(row and row.get("status") == "active"),
bool(assistant),
bool(memory_count),
bool(skill_count),
]
readiness_score = int(sum(1 for item in readiness_items if item) * 25)
if readiness_score >= 100:
readiness_label = "就绪"
elif readiness_score >= 50:
readiness_label = "可用"
else:
readiness_label = "待补全"
return {
"id": row["id"] if row else "",
"user_id": account["id"],
"project_id": project_id,
"platform": platform,
"platform_label": legacy.platform_label(platform),
"assistant_id": row.get("assistant_id", "") if row else "",
"name": row.get("name", f"{legacy.platform_label(platform)} Agent") if row else f"{legacy.platform_label(platform)} Agent",
"mission": row.get("mission", "") if row else "",
"notes": row.get("notes", "") if row else "",
"status": row.get("status", "draft") if row else "draft",
"config": _parse_json((row or {}).get("config_json"), {}),
"memory_count": memory_count,
"skill_count": skill_count,
"recent_memory": _memory_payload(recent_memory_row) if recent_memory_row else None,
"recent_skill": _skill_payload(recent_skill_row) if recent_skill_row else None,
"readiness_score": readiness_score,
"readiness_label": readiness_label,
"assistant": assistant,
"created_at": (row or {}).get("created_at", ""),
"updated_at": (row or {}).get("updated_at", ""),
}
def _memory_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"agent_scope": row.get("agent_scope", ""),
"platform": row.get("platform", ""),
"platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "",
"subject_type": row.get("subject_type", ""),
"subject_id": row.get("subject_id", ""),
"memory_key": row.get("memory_key", ""),
"title": row.get("title", ""),
"summary": row.get("summary", ""),
"details": _parse_json(row.get("details_json"), {}),
"confidence": float(row.get("confidence") or 0),
"last_validated_at": row.get("last_validated_at", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _skill_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"agent_scope": row.get("agent_scope", ""),
"platform": row.get("platform", ""),
"platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "",
"parent_skill_id": row.get("parent_skill_id", ""),
"skill_key": row.get("skill_key", ""),
"name": row.get("name", ""),
"status": row.get("status", "draft"),
"method": _parse_json(row.get("method_json"), {}),
"test_spec": _parse_json(row.get("test_spec_json"), {}),
"last_result": _parse_json(row.get("last_result_json"), {}),
"success_count": int(row.get("success_count") or 0),
"failure_count": int(row.get("failure_count") or 0),
"last_score": float(row.get("last_score") or 0),
"last_validated_at": row.get("last_validated_at", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _session_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"user_id": row["user_id"],
"project_id": row.get("project_id", ""),
"profile_id": row.get("profile_id", ""),
"title": row.get("title", ""),
"status": row.get("status", "active"),
"preferred_platform": row.get("preferred_platform", ""),
"last_platform": row.get("last_platform", ""),
"last_intent_key": row.get("last_intent_key", ""),
"last_message_at": row.get("last_message_at", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _message_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"session_id": row["session_id"],
"user_id": row["user_id"],
"role": row.get("role", "user"),
"content": row.get("content", ""),
"plan": _parse_json(row.get("plan_json"), {}),
"result": _parse_json(row.get("result_json"), {}),
"created_at": row["created_at"],
}
def _incident_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"tenant_user_id": row.get("tenant_user_id", ""),
"tenant_project_id": row.get("tenant_project_id", ""),
"source_type": row.get("source_type", ""),
"source_id": row.get("source_id", ""),
"severity": row.get("severity", "warn"),
"title": row.get("title", ""),
"summary": row.get("summary", ""),
"payload": _parse_json(row.get("payload_json"), {}),
"status": row.get("status", "open"),
"assigned_to": row.get("assigned_to", ""),
"reviewed_by": row.get("reviewed_by", ""),
"review_notes": row.get("review_notes", ""),
"created_at": row["created_at"],
"updated_at": row["updated_at"],
}
def _platform_source_samples(
account: dict[str, Any],
*,
project_id: str,
platform: str,
limit: int = 3,
) -> list[dict[str, Any]]:
safe_limit = max(1, min(int(limit or 3), 12))
rows = legacy.db.fetch_all(
f"""
SELECT * FROM content_sources
WHERE user_id = ? AND project_id = ? AND platform = ?
ORDER BY updated_at DESC
LIMIT {safe_limit}
""",
(account["id"], project_id, platform),
)
return [legacy.content_source_payload(row) for row in rows]
def _load_owned_session(session_id: str, account: dict[str, Any]) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM oneliner_sessions WHERE id = ? AND user_id = ?",
(session_id, account["id"]),
)
if not row:
raise HTTPException(status_code=404, detail="OneLiner session not found")
return row
def _deterministic_intent(message: str, platform_hint: str, account: dict[str, Any]) -> dict[str, Any]:
text = message.strip()
lowered = text.lower()
platform = normalize_platform_from_text(text) or _safe_platform(platform_hint or "", fallback="")
intent_key = "custom"
confidence = 0.45
summary = "先理解目标,再把任务路由到合适的平台 Agent 或固定能力。"
if any(keyword in text for keyword in ("新建项目", "创建项目", "建项目")):
intent_key = "create_project"
confidence = 0.96
summary = "这是一个新项目启动诉求,优先进入项目创建流。"
elif any(keyword in lowered for keyword in ("create agent",)) or any(keyword in text for keyword in ("创建agent", "创建 Agent", "新建agent", "新建 Agent")):
intent_key = "create_assistant"
confidence = 0.96
summary = "这是定义新 Agent 的需求,适合直接进入 Agent 创建流。"
elif any(keyword in text for keyword in ("导入主页", "主页链接", "账号主页", "主页账号")):
intent_key = "import_homepage"
confidence = 0.9
summary = "这是账号主页导入诉求,适合进入内容源接入。"
elif any(keyword in text for keyword in ("跟踪", "日报", "更新提醒", "持续跟")):
intent_key = "track_account"
confidence = 0.9
summary = "这是持续跟踪类任务,适合交给平台 Agent 和跟踪摘要链。"
elif any(keyword in text for keyword in ("高分", "爆款", "优质作品", "高表现")):
intent_key = "analyze_top_videos"
confidence = 0.88
summary = "这是高表现内容拆解诉求,优先分析高分作品。"
elif any(keyword in text for keyword in ("对标", "分析账号", "调研", "拆账号")):
intent_key = "analyze_account"
confidence = 0.86
summary = "这是账号层面的调研任务,优先交给对应平台 Agent。"
elif any(keyword in text for keyword in ("文案", "脚本", "口播", "改写")):
intent_key = "generate_copy"
confidence = 0.88
summary = "这是文案/脚本生成任务,适合走 Agent 生成链。"
elif any(keyword in text for keyword in ("AI视频", "AI 视频", "生成视频")):
intent_key = "ai_video"
confidence = 0.9
summary = "这是 AI 视频生产任务,适合走 AI 视频链。"
elif any(keyword in text for keyword in ("实拍", "剪辑", "混剪")):
intent_key = "real_cut"
confidence = 0.9
summary = "这是实拍剪辑任务,适合走 cutvideo 链。"
elif any(keyword in text for keyword in ("直播", "录制", "开录")):
intent_key = "live_recorder"
confidence = 0.9
summary = "这是直播录制任务,适合走 NAS 录制能力。"
elif any(keyword in text for keyword in ("复盘", "发布总结", "回看数据")):
intent_key = "review"
confidence = 0.84
summary = "这是发布复盘任务,适合进入复盘工作台。"
elif any(keyword in text for keyword in ("空间", "缓存", "存储", "NAS")):
intent_key = "storage_status"
confidence = 0.82
summary = "这是存储状态问题,适合查看租户存储面板。"
elif account.get("role") == "super_admin" and any(keyword in text for keyword in ("报错", "日志", "故障", "运维", "修复")):
intent_key = "ops_admin"
confidence = 0.84
summary = "这是平台级运维诉求,只能交给管理员运维/审计能力。"
return {
"intent_key": intent_key,
"platform": platform or "",
"confidence": confidence,
"summary": summary,
"reasoning_mode": "deterministic-first",
}
def normalize_platform_from_text(text: str) -> str:
for key, value in legacy.PLATFORM_ALIASES.items():
if key and key in text.lower():
if value in legacy.DOMESTIC_PLATFORMS:
return value
for key, value in legacy.PLATFORM_ALIASES.items():
if key and key in text:
if value in legacy.DOMESTIC_PLATFORMS:
return value
return ""
async def _model_refine_intent(
account: dict[str, Any],
*,
project_id: str,
message: str,
platform_hint: str,
) -> dict[str, Any]:
profile = legacy.model_profile_for_account(account["id"], None)
if not profile:
raise HTTPException(status_code=503, detail="No model profile available")
system_prompt = (
"你是 StoryForge 的 OneLiner 总控主Agent只负责把用户目标分类成安全的系统动作。"
"必须输出 JSON不要输出 Markdown。"
)
user_prompt = (
f"用户角色:{account.get('role','user')}\n"
f"项目:{project_id or '默认项目'}\n"
f"平台提示:{platform_hint or '未指定'}\n"
f"用户原话:{message}\n\n"
"请输出 JSON"
"{"
'"intent_key":"","platform":"","confidence":0.0,'
'"summary":"","needs_oneliner_only":false,'
'"remember_preference":false'
"}\n"
"intent_key 只能取create_project, create_assistant, import_homepage, track_account, analyze_account, analyze_top_videos, generate_copy, ai_video, real_cut, review, live_recorder, storage_status, ops_admin, custom。"
"如果前端 UI 还没有明确产品化needs_oneliner_only 返回 true。"
)
raw = await legacy.call_model(profile, system_prompt, user_prompt, temperature=0.1)
parsed = legacy.parse_json_object(raw)
if not parsed:
raise HTTPException(status_code=502, detail="OneLiner planner returned empty result")
return {
"intent_key": str(parsed.get("intent_key") or "custom").strip() or "custom",
"platform": normalize_platform_from_text(str(parsed.get("platform") or "")) or _safe_platform(platform_hint or "", fallback=""),
"confidence": float(parsed.get("confidence") or 0),
"summary": str(parsed.get("summary") or "").strip() or "已按模型判断用户目标。",
"needs_oneliner_only": bool(parsed.get("needs_oneliner_only")),
"remember_preference": bool(parsed.get("remember_preference")),
"reasoning_mode": "model-refine",
}
async def _plan_oneliner_request(
account: dict[str, Any],
*,
project_id: str,
message: str,
platform_hint: str,
) -> dict[str, Any]:
plan = _deterministic_intent(message, platform_hint, account)
if plan["confidence"] < 0.82:
try:
refined = await _model_refine_intent(account, project_id=project_id, message=message, platform_hint=platform_hint)
if refined.get("confidence", 0) >= plan.get("confidence", 0):
plan = {**plan, **refined}
except Exception:
pass
intent_key = plan.get("intent_key") or "custom"
actions = INTENT_ACTIONS.get(intent_key, [])
if intent_key == "ops_admin" and account.get("role") != "super_admin":
actions = []
plan["summary"] = "这是平台级运维诉求,但当前账号没有管理员权限。"
plan["needs_oneliner_only"] = True
ui_supported = bool(actions)
if intent_key == "custom":
plan["needs_oneliner_only"] = True
plan["ui_supported"] = ui_supported
plan["delivery_mode"] = "ui" if ui_supported and not plan.get("needs_oneliner_only") else "oneliner"
plan["suggested_actions"] = actions
plan["intent_label"] = INTENT_LABELS.get(intent_key, intent_key)
plan["platform_label"] = legacy.platform_label(plan.get("platform")) if plan.get("platform") else "待判断"
plan["economicity"] = {
"policy": "deterministic-first",
"explanation": "先走固定流程,再走平台 Agent最后才升级到 OneLiner 深度调度。",
}
return plan
def _remember_message_preference(
account: dict[str, Any],
*,
project_id: str,
plan: dict[str, Any],
message: str,
) -> dict[str, Any] | None:
cues = ("记住", "以后", "长期", "默认", "一直", "优先")
if not plan.get("remember_preference") and not any(cue in message for cue in cues):
return None
agent_scope = "oneliner"
platform = plan.get("platform") or ""
subject_type = "project" if project_id else "account"
subject_id = project_id or account["id"]
memory_key = f"preference::{plan.get('intent_key') or 'custom'}"
existing = legacy.db.fetch_one(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ?
AND subject_type = ? AND subject_id = ? AND memory_key = ?
""",
(account["id"], project_id, agent_scope, platform, subject_type, subject_id, memory_key),
)
timestamp = now()
details = {
"captured_from": "oneliner_chat",
"intent_key": plan.get("intent_key", "custom"),
"source_message": message,
"platform": platform,
}
if existing:
legacy.db.execute(
"""
UPDATE agent_memories
SET title = ?, summary = ?, details_json = ?, confidence = ?, last_validated_at = ?, updated_at = ?
WHERE id = ?
""",
(
f"{INTENT_LABELS.get(plan.get('intent_key') or 'custom', '偏好')}偏好",
message.strip()[:280],
_dump(details),
0.82,
timestamp,
timestamp,
existing["id"],
),
)
stored = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (existing["id"],))
else:
memory_id = make_id("mem")
legacy.db.execute(
"""
INSERT INTO agent_memories (
id, user_id, project_id, agent_scope, platform, subject_type, subject_id,
memory_key, title, summary, details_json, confidence, last_validated_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
memory_id,
account["id"],
project_id,
agent_scope,
platform,
subject_type,
subject_id,
memory_key,
f"{INTENT_LABELS.get(plan.get('intent_key') or 'custom', '偏好')}偏好",
message.strip()[:280],
_dump(details),
0.82,
timestamp,
timestamp,
timestamp,
),
)
stored = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (memory_id,))
return _memory_payload(stored) if stored else None
def _remember_platform_observation(
account: dict[str, Any],
*,
project_id: str,
platform: str,
memory_key: str,
title: str,
summary: str,
details: dict[str, Any],
confidence: float = 0.82,
) -> dict[str, Any]:
request = AgentMemoryUpsertRequest(
project_id=project_id,
subject_type="project",
subject_id=project_id,
memory_key=memory_key,
title=title,
summary=summary,
details=details,
confidence=confidence,
)
return _upsert_memory(account, agent_scope="platform", platform=platform, request=request)
def _session_context_summary(account: dict[str, Any], project_id: str, platform: str) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
assistant = None
profile_row = _fetch_profile_row(account, project["id"])
if profile_row and profile_row.get("assistant_id"):
assistant = _resolve_assistant(account, profile_row.get("assistant_id"), project["id"])
platform_profile = legacy.db.fetch_one(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project["id"], platform),
) if platform else None
oneliner_memory_rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'oneliner'
ORDER BY updated_at DESC
LIMIT 3
""",
(account["id"], project["id"]),
)
platform_memory_rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY updated_at DESC
LIMIT 3
""",
(account["id"], project["id"], platform),
) if platform else []
platform_skill_rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY
CASE WHEN status = 'validated' THEN 0 WHEN status = 'draft' THEN 1 ELSE 2 END,
updated_at DESC
LIMIT 3
""",
(account["id"], project["id"], platform),
) if platform else []
return {
"project": legacy.project_payload(project),
"oneliner_profile": _profile_payload(profile_row, account=account) if profile_row else None,
"platform_agent": _platform_agent_payload(account, platform_profile, platform=platform, project_id=project["id"]) if platform else None,
"assistant": legacy.assistant_payload(assistant) if assistant else None,
"oneliner_memories": [_memory_payload(row) for row in oneliner_memory_rows],
"platform_memories": [_memory_payload(row) for row in platform_memory_rows],
"platform_skills": [_skill_payload(row) for row in platform_skill_rows],
}
async def _generate_oneliner_reply(
account: dict[str, Any],
*,
project_id: str,
message: str,
plan: dict[str, Any],
) -> dict[str, Any]:
context = _session_context_summary(account, project_id or "", plan.get("platform") or "")
platform_agent = context.get("platform_agent") or {}
primary_action = (plan.get("suggested_actions") or [{}])[0] if plan.get("suggested_actions") else None
evidence = []
if platform_agent.get("recent_memory"):
evidence.append(
{
"kind": "memory",
"title": platform_agent["recent_memory"].get("title") or platform_agent["recent_memory"].get("memory_key") or "最近记忆",
"summary": platform_agent["recent_memory"].get("summary", ""),
}
)
if platform_agent.get("recent_skill"):
evidence.append(
{
"kind": "skill",
"title": platform_agent["recent_skill"].get("name") or platform_agent["recent_skill"].get("skill_key") or "最近技能",
"summary": platform_agent["recent_skill"].get("test_spec", {}).get("summary")
or platform_agent["recent_skill"].get("method", {}).get("summary")
or "",
"score": platform_agent["recent_skill"].get("last_score", 0),
}
)
blocked_reason = ""
if plan.get("intent_key") == "ops_admin" and account.get("role") != "super_admin":
blocked_reason = "当前账号不是平台最高权限用户,所以不会开放运维 Agent。"
elif plan.get("delivery_mode") == "oneliner":
blocked_reason = "当前更适合由 OneLiner 对话承接,等前端产品化后再下沉到固定 UI。"
next_steps = []
if primary_action:
next_steps.append(f"优先执行「{primary_action.get('label', primary_action.get('key', '下一步'))}」。")
if platform_agent.get("assistant", {}).get("name"):
next_steps.append(f"默认调度 {platform_agent['assistant']['name']} 作为执行 Agent。")
if evidence:
next_steps.append("我会优先参考该平台 Agent 最近沉淀的方法与技能。")
summary_lines = [
f"我理解你的目标是:{plan.get('intent_label', '自定义任务')}",
f"建议优先处理的平台:{plan.get('platform_label', '待判断')}",
plan.get("summary", ""),
]
if plan.get("delivery_mode") == "oneliner":
summary_lines.append("这项能力当前更适合先由 OneLiner 对话承接,而不是要求你先理解前端功能树。")
if plan.get("intent_key") == "ops_admin" and account.get("role") != "super_admin":
summary_lines.append("当前账号不是平台最高权限用户,所以我不会放出运维 Agent 入口。")
if context.get("platform_agent"):
summary_lines.append(f"当前 {context['platform_agent']['platform_label']} Agent 已绑定:{context['platform_agent'].get('assistant', {}).get('name') or '未绑定执行 Agent'}")
if platform_agent.get("recent_memory"):
summary_lines.append(f"最近有效经验:{platform_agent['recent_memory'].get('title') or '一条平台记忆'}")
if platform_agent.get("recent_skill"):
summary_lines.append(f"最近有效技能:{platform_agent['recent_skill'].get('name') or '一条技能'}")
secondary_actions: list[dict[str, Any]] = []
if plan.get("platform"):
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "运行平台自检",
"kind": "api_action",
"executor_key": "platform-self-check",
"platform": plan.get("platform", ""),
}
)
secondary_actions.append(
{
"key": "open-platform-agent-detail",
"label": f"查看{plan.get('platform_label', '平台')} Agent",
"kind": "ui_action",
"platform": plan.get("platform", ""),
}
)
if plan.get("intent_key") in {"storage_status", "custom"}:
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "查看当前存储状态",
"kind": "api_action",
"executor_key": "storage-status",
"platform": plan.get("platform", ""),
}
)
if plan.get("intent_key") == "live_recorder":
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "查看录制状态",
"kind": "api_action",
"executor_key": "live-recorder-status",
"platform": plan.get("platform", ""),
}
)
if account.get("role") == "super_admin":
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "重新扫描故障",
"kind": "api_action",
"executor_key": "scan-admin-ops",
"platform": "",
}
)
return {
"summary_text": "\n".join([line for line in summary_lines if line.strip()]),
"context": context,
"execution_card": {
"intent_key": plan.get("intent_key", "custom"),
"intent_label": plan.get("intent_label", "自定义任务"),
"delivery_mode": plan.get("delivery_mode", "oneliner"),
"platform": plan.get("platform", ""),
"platform_label": plan.get("platform_label", "待判断"),
"platform_agent_name": platform_agent.get("name") or "",
"assistant_name": platform_agent.get("assistant", {}).get("name") or context.get("assistant", {}).get("name") or "",
"readiness_label": platform_agent.get("readiness_label") or "",
"readiness_score": platform_agent.get("readiness_score") or 0,
"primary_action": primary_action or {},
"blocked_reason": blocked_reason,
"evidence": evidence,
"next_steps": next_steps,
"secondary_actions": secondary_actions,
},
"safe_boundary": {
"core_code_locked": True,
"tenant_isolation": True,
"ops_admin_only": True,
},
}
def _insert_message(session_id: str, account_id: str, role: str, content: str, plan: dict[str, Any], result: dict[str, Any]) -> dict[str, Any]:
message_id = make_id("oline_msg")
created_at = now()
legacy.db.execute(
"""
INSERT INTO oneliner_messages (id, session_id, user_id, role, content, plan_json, result_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(message_id, session_id, account_id, role, content, _dump(plan), _dump(result), created_at),
)
return legacy.db.fetch_one("SELECT * FROM oneliner_messages WHERE id = ?", (message_id,))
def _upsert_platform_profile(account: dict[str, Any], platform: str, request: PlatformAgentProfileRequest) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
assistant = _resolve_assistant(account, request.assistant_id or None, project["id"])
if not assistant:
fallback_profile = _fetch_profile_row(account, project["id"]) or _ensure_oneliner_profile(account, project["id"])
if fallback_profile.get("assistant_id"):
assistant = _resolve_assistant(account, fallback_profile.get("assistant_id"), project["id"])
if not assistant:
assistant = _resolve_assistant(account, None, project["id"])
existing = legacy.db.fetch_one(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project["id"], platform),
)
timestamp = now()
if existing:
legacy.db.execute(
"""
UPDATE platform_agent_profiles
SET assistant_id = ?, name = ?, mission = ?, notes = ?, status = ?, config_json = ?, updated_at = ?
WHERE id = ?
""",
(
(assistant or {}).get("id", ""),
request.name.strip() or existing.get("name") or f"{legacy.platform_label(platform)} Agent",
request.mission.strip(),
request.notes.strip(),
request.status.strip() or "active",
_dump(request.config),
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (existing["id"],))
else:
profile_id = make_id("plat_agent")
legacy.db.execute(
"""
INSERT INTO platform_agent_profiles (
id, user_id, project_id, platform, assistant_id, name, mission, notes, status, config_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
profile_id,
account["id"],
project["id"],
platform,
(assistant or {}).get("id", ""),
request.name.strip() or f"{legacy.platform_label(platform)} Agent",
request.mission.strip(),
request.notes.strip(),
request.status.strip() or "active",
_dump(request.config),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (profile_id,))
return _platform_agent_payload(account, row, platform=platform, project_id=project["id"])
def _upsert_memory(
account: dict[str, Any],
*,
agent_scope: str,
platform: str,
request: AgentMemoryUpsertRequest,
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
subject_type = request.subject_type.strip() or "project"
subject_id = request.subject_id.strip() or (project["id"] if subject_type == "project" else account["id"])
existing = legacy.db.fetch_one(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ?
AND subject_type = ? AND subject_id = ? AND memory_key = ?
""",
(account["id"], project["id"], agent_scope, platform, subject_type, subject_id, request.memory_key.strip()),
)
timestamp = now()
if existing:
legacy.db.execute(
"""
UPDATE agent_memories
SET title = ?, summary = ?, details_json = ?, confidence = ?, last_validated_at = ?, updated_at = ?
WHERE id = ?
""",
(
request.title.strip(),
request.summary.strip(),
_dump(request.details),
request.confidence,
timestamp,
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (existing["id"],))
else:
memory_id = make_id("mem")
legacy.db.execute(
"""
INSERT INTO agent_memories (
id, user_id, project_id, agent_scope, platform, subject_type, subject_id,
memory_key, title, summary, details_json, confidence, last_validated_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
memory_id,
account["id"],
project["id"],
agent_scope,
platform,
subject_type,
subject_id,
request.memory_key.strip(),
request.title.strip(),
request.summary.strip(),
_dump(request.details),
request.confidence,
timestamp,
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (memory_id,))
return _memory_payload(row)
def _upsert_skill(
account: dict[str, Any],
*,
agent_scope: str,
platform: str,
request: AgentSkillUpsertRequest,
skill_id: str | None = None,
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
existing = None
if skill_id:
existing = legacy.db.fetch_one(
"SELECT * FROM agent_skills WHERE id = ? AND user_id = ?",
(skill_id, account["id"]),
)
if not existing:
raise HTTPException(status_code=404, detail="Agent skill not found")
else:
existing = legacy.db.fetch_one(
"""
SELECT * FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = ? AND platform = ? AND skill_key = ?
""",
(account["id"], project["id"], agent_scope, platform, request.skill_key.strip()),
)
timestamp = now()
if existing:
legacy.db.execute(
"""
UPDATE agent_skills
SET name = ?, status = ?, method_json = ?, test_spec_json = ?, last_result_json = ?,
success_count = ?, failure_count = ?, last_score = ?, last_validated_at = ?, updated_at = ?
WHERE id = ?
""",
(
request.name.strip(),
request.status.strip() or "draft",
_dump(request.method),
_dump(request.test_spec),
_dump(request.last_result),
request.success_count,
request.failure_count,
request.last_score,
timestamp,
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (existing["id"],))
else:
new_id = make_id("skill")
legacy.db.execute(
"""
INSERT INTO agent_skills (
id, user_id, project_id, agent_scope, platform, parent_skill_id, skill_key, name, status,
method_json, test_spec_json, last_result_json, success_count, failure_count, last_score,
last_validated_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, '', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
new_id,
account["id"],
project["id"],
agent_scope,
platform,
request.skill_key.strip(),
request.name.strip(),
request.status.strip() or "draft",
_dump(request.method),
_dump(request.test_spec),
_dump(request.last_result),
request.success_count,
request.failure_count,
request.last_score,
timestamp,
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (new_id,))
return _skill_payload(row)
def _create_or_update_incident(
*,
tenant_user_id: str,
tenant_project_id: str,
source_type: str,
source_id: str,
severity: str,
title: str,
summary: str,
payload: dict[str, Any],
) -> dict[str, Any]:
existing = legacy.db.fetch_one(
"SELECT * FROM admin_ops_incidents WHERE source_type = ? AND source_id = ? AND title = ?",
(source_type, source_id, title),
)
timestamp = now()
if existing:
legacy.db.execute(
"""
UPDATE admin_ops_incidents
SET tenant_user_id = ?, tenant_project_id = ?, severity = ?, summary = ?, payload_json = ?, updated_at = ?
WHERE id = ?
""",
(
tenant_user_id,
tenant_project_id,
severity,
summary,
_dump(payload),
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (existing["id"],))
else:
incident_id = make_id("incident")
legacy.db.execute(
"""
INSERT INTO admin_ops_incidents (
id, tenant_user_id, tenant_project_id, source_type, source_id, severity, title,
summary, payload_json, status, assigned_to, reviewed_by, review_notes, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'open', '', '', '', ?, ?)
""",
(
incident_id,
tenant_user_id,
tenant_project_id,
source_type,
source_id,
severity,
title,
summary,
_dump(payload),
timestamp,
timestamp,
),
)
row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,))
return _incident_payload(row)
def _scan_admin_incidents(admin: dict[str, Any]) -> dict[str, Any]:
_ = admin
created: list[dict[str, Any]] = []
failed_jobs = legacy.db.fetch_all(
"SELECT * FROM jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 20"
)
for job in failed_jobs:
created.append(
_create_or_update_incident(
tenant_user_id=job.get("user_id", "") or "",
tenant_project_id=job.get("project_id", "") or "",
source_type="job",
source_id=job["id"],
severity="error",
title=f"失败任务:{job.get('title') or job['id']}",
summary=job.get("error", "")[:280] or "任务失败,需要检查执行链。",
payload=legacy.job_payload(job),
)
)
try:
integration_health = legacy.integrations_health(admin)
except Exception as exc:
integration_health = {"collector": {"reachable": False, "error": str(exc)}}
for key, payload in integration_health.items():
reachable = bool(payload.get("reachable", False))
if reachable and key != "cutvideo":
continue
if key == "cutvideo" and payload.get("supports_uploads", True):
continue
created.append(
_create_or_update_incident(
tenant_user_id="",
tenant_project_id="",
source_type="integration",
source_id=key,
severity="warn" if key in {"cutvideo", "live_recorder"} else "error",
title=f"集成异常:{key}",
summary=str(payload.get("error") or payload.get("upload_error") or "集成健康检查未通过")[:280],
payload=payload,
)
)
return {
"created_or_updated": created,
"count": len(created),
}
def _admin_ops_overview_payload(admin: dict[str, Any]) -> dict[str, Any]:
incidents = [
_incident_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM admin_ops_incidents ORDER BY updated_at DESC LIMIT 50"
)
]
open_incidents = [item for item in incidents if item.get("status") in {"open", "watching", ""}]
severity_counts = {
"error": len([item for item in incidents if item.get("severity") == "error"]),
"warn": len([item for item in incidents if item.get("severity") == "warn"]),
"info": len([item for item in incidents if item.get("severity") == "info"]),
}
failed_jobs = [
legacy.job_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 12"
)
]
pending_accounts = [
legacy.normalize_account(row)
for row in legacy.db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC LIMIT 20")
]
return {
"incidents": incidents,
"incident_count": len(incidents),
"open_incident_count": len(open_incidents),
"severity_counts": severity_counts,
"failed_jobs": failed_jobs,
"failed_job_count": len(failed_jobs),
"pending_accounts": pending_accounts,
"pending_account_count": len(pending_accounts),
"integration_health": legacy.integrations_health(admin),
}
def _platform_self_check(
account: dict[str, Any],
*,
platform: str,
project_id: str,
sample_limit: int = 3,
remember_summary: bool = True,
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
normalized_platform = _safe_platform(platform)
profile = _platform_agent_payload(
account,
legacy.db.fetch_one(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project["id"], normalized_platform),
),
platform=normalized_platform,
project_id=project["id"],
)
route_checks = _platform_route_checks(normalized_platform)
route_ok_count = len([item for item in route_checks if item["ok"]])
route_ratio = (route_ok_count / len(route_checks)) if route_checks else 0
source_samples = _platform_source_samples(account, project_id=project["id"], platform=normalized_platform, limit=sample_limit)
signal_checks = [
("配置激活", bool(profile.get("status") == "active")),
("已绑定执行 Agent", bool(profile.get("assistant_id"))),
("已有平台记忆", bool(profile.get("memory_count"))),
("已有平台技能", bool(profile.get("skill_count"))),
("已有平台账号源", bool(source_samples)),
]
signal_score = sum(1 for _, ok in signal_checks if ok) * 12
route_score = int(route_ratio * 40)
score = min(100, signal_score + route_score)
if score >= 85:
verdict = "validated"
label = "稳定"
elif score >= 60:
verdict = "usable"
label = "可用"
else:
verdict = "needs_work"
label = "待加强"
suggestions = []
if not profile.get("assistant_id"):
suggestions.append("先给平台 Agent 绑定一个执行 Agent。")
if not profile.get("memory_count"):
suggestions.append("补一条平台记忆,沉淀最近有效经验。")
if not profile.get("skill_count"):
suggestions.append("补一条可验收的平台技能。")
if not source_samples:
suggestions.append("先导入至少一个该平台账号源,避免空跑。")
if route_ratio < 1:
suggestions.append("补齐当前平台 workbench 路由,避免调度时出现断点。")
payload = {
"platform": normalized_platform,
"platform_label": legacy.platform_label(normalized_platform),
"project_id": project["id"],
"score": score,
"readiness_label": label,
"verdict": verdict,
"route_checks": route_checks,
"signals": [{"label": name, "ok": ok} for name, ok in signal_checks],
"source_count": len(source_samples),
"source_samples": source_samples,
"checked_at": now(),
"suggestions": suggestions,
"profile": profile,
}
if remember_summary:
_remember_platform_observation(
account,
project_id=project["id"],
platform=normalized_platform,
memory_key=f"self_check::{normalized_platform}",
title=f"{legacy.platform_label(normalized_platform)} Agent 自检",
summary=f"平台自检得分 {score},当前判定为{label}",
details=payload,
confidence=0.88 if score >= 85 else 0.72,
)
return payload
def _review_platform_skill(
account: dict[str, Any],
*,
platform: str,
skill_id: str,
request: PlatformSkillReviewRequest,
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
normalized_platform = _safe_platform(platform)
current = legacy.db.fetch_one(
"""
SELECT * FROM agent_skills
WHERE id = ? AND user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
""",
(skill_id, account["id"], project["id"], normalized_platform),
)
if not current:
raise HTTPException(status_code=404, detail="Platform skill not found")
accepted = bool(request.accepted)
next_status = (request.status or "").strip() or ("validated" if accepted else "needs_revision")
timestamp = now()
next_success = int(current.get("success_count") or 0) + (1 if accepted else 0)
next_failure = int(current.get("failure_count") or 0) + (0 if accepted else 1)
result_payload = {
**_parse_json(current.get("last_result_json"), {}),
"accepted": accepted,
"review_notes": request.review_notes.strip(),
"summary": request.summary.strip(),
"reviewed_at": timestamp,
"reviewed_by": account["id"],
}
legacy.db.execute(
"""
UPDATE agent_skills
SET status = ?, last_result_json = ?, success_count = ?, failure_count = ?, last_score = ?, last_validated_at = ?, updated_at = ?
WHERE id = ?
""",
(
next_status,
_dump(result_payload),
next_success,
next_failure,
request.score,
timestamp,
timestamp,
skill_id,
),
)
updated = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (skill_id,))
feedback_summary = (request.summary or request.review_notes or "").strip()
feedback_memory = None
if feedback_summary:
feedback_memory = _remember_platform_observation(
account,
project_id=project["id"],
platform=normalized_platform,
memory_key=f"skill_feedback::{current.get('skill_key')}",
title=f"{current.get('name') or current.get('skill_key') or '技能'}·{'已验证' if accepted else '待优化'}",
summary=feedback_summary[:280],
details={
"skill_id": skill_id,
"skill_key": current.get("skill_key", ""),
"accepted": accepted,
"score": request.score,
"review_notes": request.review_notes.strip(),
"status": next_status,
},
confidence=0.9 if accepted else 0.66,
)
payload = _skill_payload(updated)
if feedback_memory:
payload["feedback_memory"] = feedback_memory
return payload
async def _execute_oneliner_action(
account: dict[str, Any],
request: OneLinerActionExecuteRequest,
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
normalized_platform = normalize_platform_from_text(request.platform) or _safe_platform(request.platform or "", fallback="")
action_key = (request.action_key or "").strip()
if not action_key:
raise HTTPException(status_code=400, detail="Action key is required")
async def _run_platform_self_check() -> dict[str, Any]:
if not normalized_platform:
raise HTTPException(status_code=400, detail="Platform is required for self-check")
payload = _platform_self_check(
account,
platform=normalized_platform,
project_id=project["id"],
sample_limit=int((request.payload or {}).get("sample_limit") or 3),
remember_summary=True,
)
return {
"title": f"{payload['platform_label']} Agent 自检",
"summary": f"平台自检得分 {payload['score']},当前状态:{payload['readiness_label']}",
"payload": payload,
}
async def _run_storage_status() -> dict[str, Any]:
payload = legacy.storage_status(project_id=project["id"], account=account)
tenant_usage = payload.get("tenant_usage", {})
return {
"title": "当前存储状态",
"summary": (
f"项目 jobs 占用 {tenant_usage.get('project_jobs', {}).get('human_size', '0B')}"
f"downloads 占用 {tenant_usage.get('project_downloads', {}).get('human_size', '0B')}"
),
"payload": payload,
}
async def _run_live_recorder_status() -> dict[str, Any]:
payload = legacy.live_recorder_status(project_id=project["id"], account=account)
return {
"title": "直播录制状态",
"summary": f"当前共 {len(payload.get('items', []))} 条录制源,最近文件 {len(payload.get('files', []))} 个。",
"payload": payload,
}
async def _run_ops_scan() -> dict[str, Any]:
admin = legacy.require_super_admin(account)
payload = _scan_admin_incidents(admin)
return {
"title": "运维 Agent 故障扫描",
"summary": f"本轮共归集 {payload.get('count', 0)} 条事件。",
"payload": payload,
}
executors = {
"platform-self-check": _run_platform_self_check,
"storage-status": _run_storage_status,
"live-recorder-status": _run_live_recorder_status,
"scan-admin-ops": _run_ops_scan,
}
executor = executors.get(action_key)
if not executor:
raise HTTPException(status_code=400, detail=f"Unsupported OneLiner action: {action_key}")
result = await executor()
if request.session_id:
session = _load_owned_session(request.session_id, account)
_insert_message(
session["id"],
account["id"],
"assistant",
result["summary"],
{
"intent_key": "custom",
"delivery_mode": "oneliner",
"platform": normalized_platform,
"suggested_actions": [],
},
{
"summary_text": result["summary"],
"execution_result": result,
},
)
return {
"action_key": action_key,
"project_id": project["id"],
"platform": normalized_platform,
"executed_at": now(),
**result,
}
@app.get("/v2/oneliner/profile")
def get_oneliner_profile(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
row = _ensure_oneliner_profile(account, project["id"])
return _profile_payload(row, account=account)
@app.put("/v2/oneliner/profile")
def put_oneliner_profile(
request: OneLinerProfileRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
assistant = _resolve_assistant(account, request.assistant_id or None, project["id"])
existing = _ensure_oneliner_profile(account, project["id"])
timestamp = now()
legacy.db.execute(
"""
UPDATE oneliner_profiles
SET assistant_id = ?, display_name = ?, long_term_goal = ?, notes = ?, default_platform = ?, config_json = ?, updated_at = ?
WHERE id = ?
""",
(
(assistant or {}).get("id", ""),
request.display_name.strip() or "OneLiner",
request.long_term_goal.strip(),
request.notes.strip(),
_safe_platform(request.default_platform or existing.get("default_platform") or "douyin"),
_dump(request.config),
timestamp,
existing["id"],
),
)
row = legacy.db.fetch_one("SELECT * FROM oneliner_profiles WHERE id = ?", (existing["id"],))
return _profile_payload(row, account=account)
@app.get("/v2/oneliner/sessions")
def list_oneliner_sessions(
project_id: str | None = Query(default=None),
limit: int = Query(default=20, ge=1, le=100),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
rows = legacy.db.fetch_all(
"""
SELECT * FROM oneliner_sessions
WHERE user_id = ? AND project_id = ?
ORDER BY updated_at DESC
LIMIT ?
""",
(account["id"], project["id"], limit),
)
items = [_session_payload(row) for row in rows]
return {"items": items, "count": len(items)}
@app.post("/v2/oneliner/sessions")
async def create_oneliner_session(
request: OneLinerSessionCreateRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
profile = _ensure_oneliner_profile(account, project["id"])
session_id = make_id("oline")
timestamp = now()
preferred_platform = _safe_platform(request.preferred_platform or profile.get("default_platform") or "douyin")
title = request.title.strip() or "新的 OneLiner 会话"
legacy.db.execute(
"""
INSERT INTO oneliner_sessions (
id, user_id, project_id, profile_id, title, status, preferred_platform,
last_platform, last_intent_key, last_message_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, 'active', ?, '', '', ?, ?, ?)
""",
(
session_id,
account["id"],
project["id"],
profile["id"],
title,
preferred_platform,
timestamp,
timestamp,
timestamp,
),
)
session_row = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session_id,))
if request.initial_message.strip():
await post_oneliner_message(
session_id,
OneLinerMessageRequest(
content=request.initial_message,
project_id=project["id"],
platform=preferred_platform,
),
account,
)
session_row = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session_id,))
return _session_payload(session_row)
@app.get("/v2/oneliner/sessions/{session_id}/messages")
def list_oneliner_messages(
session_id: str,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
session = _load_owned_session(session_id, account)
rows = legacy.db.fetch_all(
"SELECT * FROM oneliner_messages WHERE session_id = ? ORDER BY created_at ASC",
(session["id"],),
)
items = [_message_payload(row) for row in rows]
return {"session": _session_payload(session), "items": items, "count": len(items)}
@app.post("/v2/oneliner/sessions/{session_id}/messages")
async def post_oneliner_message(
session_id: str,
request: OneLinerMessageRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
session = _load_owned_session(session_id, account)
project = _resolve_project(account, request.project_id or session.get("project_id") or None)
plan = await _plan_oneliner_request(
account,
project_id=project["id"],
message=request.content,
platform_hint=request.platform or session.get("preferred_platform") or "",
)
user_message = _insert_message(session["id"], account["id"], "user", request.content.strip(), {}, {})
remembered = _remember_message_preference(account, project_id=project["id"], plan=plan, message=request.content)
result = await _generate_oneliner_reply(account, project_id=project["id"], message=request.content, plan=plan)
if remembered:
result["remembered_memory"] = remembered
assistant_message = _insert_message(session["id"], account["id"], "assistant", result["summary_text"], plan, result)
session_title = session.get("title") or request.content.strip()[:28] or "新的 OneLiner 会话"
timestamp = now()
legacy.db.execute(
"""
UPDATE oneliner_sessions
SET title = ?, last_platform = ?, last_intent_key = ?, last_message_at = ?, updated_at = ?
WHERE id = ?
""",
(
session_title,
plan.get("platform", ""),
plan.get("intent_key", ""),
timestamp,
timestamp,
session["id"],
),
)
updated_session = legacy.db.fetch_one("SELECT * FROM oneliner_sessions WHERE id = ?", (session["id"],))
return {
"session": _session_payload(updated_session),
"user_message": _message_payload(user_message),
"assistant_message": _message_payload(assistant_message),
"plan": plan,
"result": result,
}
@app.post("/v2/oneliner/actions/execute")
async def execute_oneliner_action(
request: OneLinerActionExecuteRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
return await _execute_oneliner_action(account, request)
@app.get("/v2/platform-agents")
def list_platform_agents(
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
items = _list_platform_profiles(account, project["id"])
return {"items": items, "count": len(items)}
@app.put("/v2/platform-agents/{platform}/profile")
def update_platform_agent(
platform: str,
request: PlatformAgentProfileRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _upsert_platform_profile(account, normalized_platform, request)
@app.get("/v2/platform-agents/{platform}/memories")
def list_platform_memories(
platform: str,
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
normalized_platform = _safe_platform(platform)
rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY updated_at DESC
""",
(account["id"], project["id"], normalized_platform),
)
items = [_memory_payload(row) for row in rows]
return {"items": items, "count": len(items)}
@app.post("/v2/platform-agents/{platform}/memories")
def upsert_platform_memory(
platform: str,
request: AgentMemoryUpsertRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _upsert_memory(account, agent_scope="platform", platform=normalized_platform, request=request)
@app.get("/v2/platform-agents/{platform}/skills")
def list_platform_skills(
platform: str,
project_id: str | None = Query(default=None),
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
normalized_platform = _safe_platform(platform)
rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY updated_at DESC
""",
(account["id"], project["id"], normalized_platform),
)
items = [_skill_payload(row) for row in rows]
return {"items": items, "count": len(items)}
@app.post("/v2/platform-agents/{platform}/skills")
def create_platform_skill(
platform: str,
request: AgentSkillUpsertRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _upsert_skill(account, agent_scope="platform", platform=normalized_platform, request=request)
@app.patch("/v2/platform-agents/{platform}/skills/{skill_id}")
def update_platform_skill(
platform: str,
skill_id: str,
request: AgentSkillUpsertRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _upsert_skill(account, agent_scope="platform", platform=normalized_platform, request=request, skill_id=skill_id)
@app.post("/v2/platform-agents/{platform}/self-check")
def run_platform_agent_self_check(
platform: str,
request: PlatformAgentSelfCheckRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _platform_self_check(
account,
platform=normalized_platform,
project_id=request.project_id,
sample_limit=request.sample_limit,
remember_summary=request.remember_summary,
)
@app.post("/v2/platform-agents/{platform}/skills/{skill_id}/review")
def review_platform_skill(
platform: str,
skill_id: str,
request: PlatformSkillReviewRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _review_platform_skill(account, platform=normalized_platform, skill_id=skill_id, request=request)
@app.get("/v2/admin/ops/overview")
def admin_ops_overview(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]:
return _admin_ops_overview_payload(admin)
@app.post("/v2/admin/ops/incidents/scan")
def admin_ops_scan(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]:
return _scan_admin_incidents(admin)
@app.patch("/v2/admin/ops/incidents/{incident_id}")
def review_admin_incident(
incident_id: str,
request: AdminIncidentReviewRequest,
admin: dict[str, Any] = Depends(legacy.require_super_admin),
) -> dict[str, Any]:
current = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,))
if not current:
raise HTTPException(status_code=404, detail="Incident not found")
timestamp = now()
legacy.db.execute(
"""
UPDATE admin_ops_incidents
SET status = ?, reviewed_by = ?, review_notes = ?, updated_at = ?
WHERE id = ?
""",
(
request.status.strip() or "reviewed",
admin["id"],
request.review_notes.strip(),
timestamp,
incident_id,
),
)
row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,))
return _incident_payload(row)