feat: add oneliner execution and platform validation

This commit is contained in:
kris
2026-03-23 15:31:36 +08:00
parent 52f20c3c3d
commit 28ac70cf8f

View File

@@ -71,6 +71,29 @@ class AdminIncidentReviewRequest(BaseModel):
review_notes: str = ""
class PlatformAgentSelfCheckRequest(BaseModel):
project_id: str = ""
sample_limit: int = Field(default=3, ge=1, le=12)
remember_summary: bool = True
class PlatformSkillReviewRequest(BaseModel):
project_id: str = ""
accepted: bool = True
score: float = Field(default=0.8, ge=0.0, le=1.0)
status: str = ""
summary: str = ""
review_notes: str = ""
class OneLinerActionExecuteRequest(BaseModel):
action_key: str
project_id: str = ""
platform: str = ""
session_id: str = ""
payload: dict[str, Any] = Field(default_factory=dict)
INTENT_ACTIONS: dict[str, list[dict[str, Any]]] = {
"create_project": [{"key": "goto-intake", "label": "去我的项目", "kind": "navigate"}],
"create_assistant": [{"key": "open-create-assistant", "label": "创建 Agent", "kind": "ui_action"}],
@@ -279,6 +302,28 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
def _safe_platform(platform_value: str | None, fallback: str = "douyin") -> str:
return legacy.ensure_domestic_platform(platform_value or fallback, allow_blank=not fallback) or fallback
def _route_supported(path: str) -> bool:
return any(getattr(route, "path", "") == path for route in app.routes)
def _platform_route_checks(platform: str) -> list[dict[str, Any]]:
checks = [
("accounts", f"/v2/{platform}/accounts"),
("workspace", f"/v2/{platform}/accounts/{{account_id}}/workspace"),
("videos", f"/v2/{platform}/accounts/{{account_id}}/videos"),
("analyze_account", f"/v2/{platform}/accounts/{{account_id}}/analysis"),
("analyze_top_videos", f"/v2/{platform}/accounts/{{account_id}}/videos/analyze-top"),
("similar_searches", f"/v2/{platform}/similar-searches"),
("benchmark_links", f"/v2/{platform}/accounts/{{account_id}}/benchmark-links"),
]
return [
{
"key": key,
"path": path,
"ok": _route_supported(path),
}
for key, path in checks
]
def _fetch_profile_row(account: dict[str, Any], project_id: str = "") -> dict[str, Any] | None:
return legacy.db.fetch_one(
"SELECT * FROM oneliner_profiles WHERE user_id = ? AND project_id = ?",
@@ -388,6 +433,39 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
""",
(account["id"], project_id, platform),
)["count"]
recent_memory_row = legacy.db.fetch_one(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY updated_at DESC
LIMIT 1
""",
(account["id"], project_id, platform),
)
recent_skill_row = legacy.db.fetch_one(
"""
SELECT * FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY
CASE WHEN status = 'validated' THEN 0 WHEN status = 'draft' THEN 1 ELSE 2 END,
updated_at DESC
LIMIT 1
""",
(account["id"], project_id, platform),
)
readiness_items = [
bool(row and row.get("status") == "active"),
bool(assistant),
bool(memory_count),
bool(skill_count),
]
readiness_score = int(sum(1 for item in readiness_items if item) * 25)
if readiness_score >= 100:
readiness_label = "就绪"
elif readiness_score >= 50:
readiness_label = "可用"
else:
readiness_label = "待补全"
return {
"id": row["id"] if row else "",
"user_id": account["id"],
@@ -402,6 +480,10 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"config": _parse_json((row or {}).get("config_json"), {}),
"memory_count": memory_count,
"skill_count": skill_count,
"recent_memory": _memory_payload(recent_memory_row) if recent_memory_row else None,
"recent_skill": _skill_payload(recent_skill_row) if recent_skill_row else None,
"readiness_score": readiness_score,
"readiness_label": readiness_label,
"assistant": assistant,
"created_at": (row or {}).get("created_at", ""),
"updated_at": (row or {}).get("updated_at", ""),
@@ -497,6 +579,25 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"updated_at": row["updated_at"],
}
def _platform_source_samples(
account: dict[str, Any],
*,
project_id: str,
platform: str,
limit: int = 3,
) -> list[dict[str, Any]]:
safe_limit = max(1, min(int(limit or 3), 12))
rows = legacy.db.fetch_all(
f"""
SELECT * FROM content_sources
WHERE user_id = ? AND project_id = ? AND platform = ?
ORDER BY updated_at DESC
LIMIT {safe_limit}
""",
(account["id"], project_id, platform),
)
return [legacy.content_source_payload(row) for row in rows]
def _load_owned_session(session_id: str, account: dict[str, Any]) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM oneliner_sessions WHERE id = ? AND user_id = ?",
@@ -739,6 +840,29 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
stored = legacy.db.fetch_one("SELECT * FROM agent_memories WHERE id = ?", (memory_id,))
return _memory_payload(stored) if stored else None
def _remember_platform_observation(
account: dict[str, Any],
*,
project_id: str,
platform: str,
memory_key: str,
title: str,
summary: str,
details: dict[str, Any],
confidence: float = 0.82,
) -> dict[str, Any]:
request = AgentMemoryUpsertRequest(
project_id=project_id,
subject_type="project",
subject_id=project_id,
memory_key=memory_key,
title=title,
summary=summary,
details=details,
confidence=confidence,
)
return _upsert_memory(account, agent_scope="platform", platform=platform, request=request)
def _session_context_summary(account: dict[str, Any], project_id: str, platform: str) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
assistant = None
@@ -749,11 +873,43 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project["id"], platform),
) if platform else None
oneliner_memory_rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'oneliner'
ORDER BY updated_at DESC
LIMIT 3
""",
(account["id"], project["id"]),
)
platform_memory_rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_memories
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY updated_at DESC
LIMIT 3
""",
(account["id"], project["id"], platform),
) if platform else []
platform_skill_rows = legacy.db.fetch_all(
"""
SELECT * FROM agent_skills
WHERE user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
ORDER BY
CASE WHEN status = 'validated' THEN 0 WHEN status = 'draft' THEN 1 ELSE 2 END,
updated_at DESC
LIMIT 3
""",
(account["id"], project["id"], platform),
) if platform else []
return {
"project": legacy.project_payload(project),
"oneliner_profile": _profile_payload(profile_row, account=account) if profile_row else None,
"platform_agent": _platform_agent_payload(account, platform_profile, platform=platform, project_id=project["id"]) if platform else None,
"assistant": legacy.assistant_payload(assistant) if assistant else None,
"oneliner_memories": [_memory_payload(row) for row in oneliner_memory_rows],
"platform_memories": [_memory_payload(row) for row in platform_memory_rows],
"platform_skills": [_skill_payload(row) for row in platform_skill_rows],
}
async def _generate_oneliner_reply(
@@ -764,6 +920,40 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
plan: dict[str, Any],
) -> dict[str, Any]:
context = _session_context_summary(account, project_id or "", plan.get("platform") or "")
platform_agent = context.get("platform_agent") or {}
primary_action = (plan.get("suggested_actions") or [{}])[0] if plan.get("suggested_actions") else None
evidence = []
if platform_agent.get("recent_memory"):
evidence.append(
{
"kind": "memory",
"title": platform_agent["recent_memory"].get("title") or platform_agent["recent_memory"].get("memory_key") or "最近记忆",
"summary": platform_agent["recent_memory"].get("summary", ""),
}
)
if platform_agent.get("recent_skill"):
evidence.append(
{
"kind": "skill",
"title": platform_agent["recent_skill"].get("name") or platform_agent["recent_skill"].get("skill_key") or "最近技能",
"summary": platform_agent["recent_skill"].get("test_spec", {}).get("summary")
or platform_agent["recent_skill"].get("method", {}).get("summary")
or "",
"score": platform_agent["recent_skill"].get("last_score", 0),
}
)
blocked_reason = ""
if plan.get("intent_key") == "ops_admin" and account.get("role") != "super_admin":
blocked_reason = "当前账号不是平台最高权限用户,所以不会开放运维 Agent。"
elif plan.get("delivery_mode") == "oneliner":
blocked_reason = "当前更适合由 OneLiner 对话承接,等前端产品化后再下沉到固定 UI。"
next_steps = []
if primary_action:
next_steps.append(f"优先执行「{primary_action.get('label', primary_action.get('key', '下一步'))}」。")
if platform_agent.get("assistant", {}).get("name"):
next_steps.append(f"默认调度 {platform_agent['assistant']['name']} 作为执行 Agent。")
if evidence:
next_steps.append("我会优先参考该平台 Agent 最近沉淀的方法与技能。")
summary_lines = [
f"我理解你的目标是:{plan.get('intent_label', '自定义任务')}",
f"建议优先处理的平台:{plan.get('platform_label', '待判断')}",
@@ -775,9 +965,78 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
summary_lines.append("当前账号不是平台最高权限用户,所以我不会放出运维 Agent 入口。")
if context.get("platform_agent"):
summary_lines.append(f"当前 {context['platform_agent']['platform_label']} Agent 已绑定:{context['platform_agent'].get('assistant', {}).get('name') or '未绑定执行 Agent'}")
if platform_agent.get("recent_memory"):
summary_lines.append(f"最近有效经验:{platform_agent['recent_memory'].get('title') or '一条平台记忆'}")
if platform_agent.get("recent_skill"):
summary_lines.append(f"最近有效技能:{platform_agent['recent_skill'].get('name') or '一条技能'}")
secondary_actions: list[dict[str, Any]] = []
if plan.get("platform"):
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "运行平台自检",
"kind": "api_action",
"executor_key": "platform-self-check",
"platform": plan.get("platform", ""),
}
)
secondary_actions.append(
{
"key": "open-platform-agent-detail",
"label": f"查看{plan.get('platform_label', '平台')} Agent",
"kind": "ui_action",
"platform": plan.get("platform", ""),
}
)
if plan.get("intent_key") in {"storage_status", "custom"}:
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "查看当前存储状态",
"kind": "api_action",
"executor_key": "storage-status",
"platform": plan.get("platform", ""),
}
)
if plan.get("intent_key") == "live_recorder":
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "查看录制状态",
"kind": "api_action",
"executor_key": "live-recorder-status",
"platform": plan.get("platform", ""),
}
)
if account.get("role") == "super_admin":
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "重新扫描故障",
"kind": "api_action",
"executor_key": "scan-admin-ops",
"platform": "",
}
)
return {
"summary_text": "\n".join([line for line in summary_lines if line.strip()]),
"context": context,
"execution_card": {
"intent_key": plan.get("intent_key", "custom"),
"intent_label": plan.get("intent_label", "自定义任务"),
"delivery_mode": plan.get("delivery_mode", "oneliner"),
"platform": plan.get("platform", ""),
"platform_label": plan.get("platform_label", "待判断"),
"platform_agent_name": platform_agent.get("name") or "",
"assistant_name": platform_agent.get("assistant", {}).get("name") or context.get("assistant", {}).get("name") or "",
"readiness_label": platform_agent.get("readiness_label") or "",
"readiness_score": platform_agent.get("readiness_score") or 0,
"primary_action": primary_action or {},
"blocked_reason": blocked_reason,
"evidence": evidence,
"next_steps": next_steps,
"secondary_actions": secondary_actions,
},
"safe_boundary": {
"core_code_locked": True,
"tenant_isolation": True,
@@ -800,6 +1059,12 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
def _upsert_platform_profile(account: dict[str, Any], platform: str, request: PlatformAgentProfileRequest) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
assistant = _resolve_assistant(account, request.assistant_id or None, project["id"])
if not assistant:
fallback_profile = _fetch_profile_row(account, project["id"]) or _ensure_oneliner_profile(account, project["id"])
if fallback_profile.get("assistant_id"):
assistant = _resolve_assistant(account, fallback_profile.get("assistant_id"), project["id"])
if not assistant:
assistant = _resolve_assistant(account, None, project["id"])
existing = legacy.db.fetch_one(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project["id"], platform),
@@ -1105,6 +1370,287 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"count": len(created),
}
def _admin_ops_overview_payload(admin: dict[str, Any]) -> dict[str, Any]:
incidents = [
_incident_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM admin_ops_incidents ORDER BY updated_at DESC LIMIT 50"
)
]
open_incidents = [item for item in incidents if item.get("status") in {"open", "watching", ""}]
severity_counts = {
"error": len([item for item in incidents if item.get("severity") == "error"]),
"warn": len([item for item in incidents if item.get("severity") == "warn"]),
"info": len([item for item in incidents if item.get("severity") == "info"]),
}
failed_jobs = [
legacy.job_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 12"
)
]
pending_accounts = [
legacy.normalize_account(row)
for row in legacy.db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC LIMIT 20")
]
return {
"incidents": incidents,
"incident_count": len(incidents),
"open_incident_count": len(open_incidents),
"severity_counts": severity_counts,
"failed_jobs": failed_jobs,
"failed_job_count": len(failed_jobs),
"pending_accounts": pending_accounts,
"pending_account_count": len(pending_accounts),
"integration_health": legacy.integrations_health(admin),
}
def _platform_self_check(
account: dict[str, Any],
*,
platform: str,
project_id: str,
sample_limit: int = 3,
remember_summary: bool = True,
) -> dict[str, Any]:
project = _resolve_project(account, project_id or None)
normalized_platform = _safe_platform(platform)
profile = _platform_agent_payload(
account,
legacy.db.fetch_one(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project["id"], normalized_platform),
),
platform=normalized_platform,
project_id=project["id"],
)
route_checks = _platform_route_checks(normalized_platform)
route_ok_count = len([item for item in route_checks if item["ok"]])
route_ratio = (route_ok_count / len(route_checks)) if route_checks else 0
source_samples = _platform_source_samples(account, project_id=project["id"], platform=normalized_platform, limit=sample_limit)
signal_checks = [
("配置激活", bool(profile.get("status") == "active")),
("已绑定执行 Agent", bool(profile.get("assistant_id"))),
("已有平台记忆", bool(profile.get("memory_count"))),
("已有平台技能", bool(profile.get("skill_count"))),
("已有平台账号源", bool(source_samples)),
]
signal_score = sum(1 for _, ok in signal_checks if ok) * 12
route_score = int(route_ratio * 40)
score = min(100, signal_score + route_score)
if score >= 85:
verdict = "validated"
label = "稳定"
elif score >= 60:
verdict = "usable"
label = "可用"
else:
verdict = "needs_work"
label = "待加强"
suggestions = []
if not profile.get("assistant_id"):
suggestions.append("先给平台 Agent 绑定一个执行 Agent。")
if not profile.get("memory_count"):
suggestions.append("补一条平台记忆,沉淀最近有效经验。")
if not profile.get("skill_count"):
suggestions.append("补一条可验收的平台技能。")
if not source_samples:
suggestions.append("先导入至少一个该平台账号源,避免空跑。")
if route_ratio < 1:
suggestions.append("补齐当前平台 workbench 路由,避免调度时出现断点。")
payload = {
"platform": normalized_platform,
"platform_label": legacy.platform_label(normalized_platform),
"project_id": project["id"],
"score": score,
"readiness_label": label,
"verdict": verdict,
"route_checks": route_checks,
"signals": [{"label": name, "ok": ok} for name, ok in signal_checks],
"source_count": len(source_samples),
"source_samples": source_samples,
"checked_at": now(),
"suggestions": suggestions,
"profile": profile,
}
if remember_summary:
_remember_platform_observation(
account,
project_id=project["id"],
platform=normalized_platform,
memory_key=f"self_check::{normalized_platform}",
title=f"{legacy.platform_label(normalized_platform)} Agent 自检",
summary=f"平台自检得分 {score},当前判定为{label}",
details=payload,
confidence=0.88 if score >= 85 else 0.72,
)
return payload
def _review_platform_skill(
account: dict[str, Any],
*,
platform: str,
skill_id: str,
request: PlatformSkillReviewRequest,
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
normalized_platform = _safe_platform(platform)
current = legacy.db.fetch_one(
"""
SELECT * FROM agent_skills
WHERE id = ? AND user_id = ? AND project_id = ? AND agent_scope = 'platform' AND platform = ?
""",
(skill_id, account["id"], project["id"], normalized_platform),
)
if not current:
raise HTTPException(status_code=404, detail="Platform skill not found")
accepted = bool(request.accepted)
next_status = (request.status or "").strip() or ("validated" if accepted else "needs_revision")
timestamp = now()
next_success = int(current.get("success_count") or 0) + (1 if accepted else 0)
next_failure = int(current.get("failure_count") or 0) + (0 if accepted else 1)
result_payload = {
**_parse_json(current.get("last_result_json"), {}),
"accepted": accepted,
"review_notes": request.review_notes.strip(),
"summary": request.summary.strip(),
"reviewed_at": timestamp,
"reviewed_by": account["id"],
}
legacy.db.execute(
"""
UPDATE agent_skills
SET status = ?, last_result_json = ?, success_count = ?, failure_count = ?, last_score = ?, last_validated_at = ?, updated_at = ?
WHERE id = ?
""",
(
next_status,
_dump(result_payload),
next_success,
next_failure,
request.score,
timestamp,
timestamp,
skill_id,
),
)
updated = legacy.db.fetch_one("SELECT * FROM agent_skills WHERE id = ?", (skill_id,))
feedback_summary = (request.summary or request.review_notes or "").strip()
feedback_memory = None
if feedback_summary:
feedback_memory = _remember_platform_observation(
account,
project_id=project["id"],
platform=normalized_platform,
memory_key=f"skill_feedback::{current.get('skill_key')}",
title=f"{current.get('name') or current.get('skill_key') or '技能'}·{'已验证' if accepted else '待优化'}",
summary=feedback_summary[:280],
details={
"skill_id": skill_id,
"skill_key": current.get("skill_key", ""),
"accepted": accepted,
"score": request.score,
"review_notes": request.review_notes.strip(),
"status": next_status,
},
confidence=0.9 if accepted else 0.66,
)
payload = _skill_payload(updated)
if feedback_memory:
payload["feedback_memory"] = feedback_memory
return payload
async def _execute_oneliner_action(
account: dict[str, Any],
request: OneLinerActionExecuteRequest,
) -> dict[str, Any]:
project = _resolve_project(account, request.project_id or None)
normalized_platform = normalize_platform_from_text(request.platform) or _safe_platform(request.platform or "", fallback="")
action_key = (request.action_key or "").strip()
if not action_key:
raise HTTPException(status_code=400, detail="Action key is required")
async def _run_platform_self_check() -> dict[str, Any]:
if not normalized_platform:
raise HTTPException(status_code=400, detail="Platform is required for self-check")
payload = _platform_self_check(
account,
platform=normalized_platform,
project_id=project["id"],
sample_limit=int((request.payload or {}).get("sample_limit") or 3),
remember_summary=True,
)
return {
"title": f"{payload['platform_label']} Agent 自检",
"summary": f"平台自检得分 {payload['score']},当前状态:{payload['readiness_label']}",
"payload": payload,
}
async def _run_storage_status() -> dict[str, Any]:
payload = legacy.storage_status(project_id=project["id"], account=account)
tenant_usage = payload.get("tenant_usage", {})
return {
"title": "当前存储状态",
"summary": (
f"项目 jobs 占用 {tenant_usage.get('project_jobs', {}).get('human_size', '0B')}"
f"downloads 占用 {tenant_usage.get('project_downloads', {}).get('human_size', '0B')}"
),
"payload": payload,
}
async def _run_live_recorder_status() -> dict[str, Any]:
payload = legacy.live_recorder_status(project_id=project["id"], account=account)
return {
"title": "直播录制状态",
"summary": f"当前共 {len(payload.get('items', []))} 条录制源,最近文件 {len(payload.get('files', []))} 个。",
"payload": payload,
}
async def _run_ops_scan() -> dict[str, Any]:
admin = legacy.require_super_admin(account)
payload = _scan_admin_incidents(admin)
return {
"title": "运维 Agent 故障扫描",
"summary": f"本轮共归集 {payload.get('count', 0)} 条事件。",
"payload": payload,
}
executors = {
"platform-self-check": _run_platform_self_check,
"storage-status": _run_storage_status,
"live-recorder-status": _run_live_recorder_status,
"scan-admin-ops": _run_ops_scan,
}
executor = executors.get(action_key)
if not executor:
raise HTTPException(status_code=400, detail=f"Unsupported OneLiner action: {action_key}")
result = await executor()
if request.session_id:
session = _load_owned_session(request.session_id, account)
_insert_message(
session["id"],
account["id"],
"assistant",
result["summary"],
{
"intent_key": "custom",
"delivery_mode": "oneliner",
"platform": normalized_platform,
"suggested_actions": [],
},
{
"summary_text": result["summary"],
"execution_result": result,
},
)
return {
"action_key": action_key,
"project_id": project["id"],
"platform": normalized_platform,
"executed_at": now(),
**result,
}
@app.get("/v2/oneliner/profile")
def get_oneliner_profile(
project_id: str | None = Query(default=None),
@@ -1265,6 +1811,13 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"result": result,
}
@app.post("/v2/oneliner/actions/execute")
async def execute_oneliner_action(
request: OneLinerActionExecuteRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
return await _execute_oneliner_action(account, request)
@app.get("/v2/platform-agents")
def list_platform_agents(
project_id: str | None = Query(default=None),
@@ -1349,30 +1902,34 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
normalized_platform = _safe_platform(platform)
return _upsert_skill(account, agent_scope="platform", platform=normalized_platform, request=request, skill_id=skill_id)
@app.post("/v2/platform-agents/{platform}/self-check")
def run_platform_agent_self_check(
platform: str,
request: PlatformAgentSelfCheckRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _platform_self_check(
account,
platform=normalized_platform,
project_id=request.project_id,
sample_limit=request.sample_limit,
remember_summary=request.remember_summary,
)
@app.post("/v2/platform-agents/{platform}/skills/{skill_id}/review")
def review_platform_skill(
platform: str,
skill_id: str,
request: PlatformSkillReviewRequest,
account: dict[str, Any] = Depends(legacy.require_approved),
) -> dict[str, Any]:
normalized_platform = _safe_platform(platform)
return _review_platform_skill(account, platform=normalized_platform, skill_id=skill_id, request=request)
@app.get("/v2/admin/ops/overview")
def admin_ops_overview(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]:
incidents = [
_incident_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM admin_ops_incidents ORDER BY updated_at DESC LIMIT 50"
)
]
failed_jobs = [
legacy.job_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM jobs WHERE status = 'failed' ORDER BY updated_at DESC LIMIT 12"
)
]
pending_accounts = [legacy.normalize_account(row) for row in legacy.db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC LIMIT 20")]
return {
"incidents": incidents,
"incident_count": len(incidents),
"failed_jobs": failed_jobs,
"failed_job_count": len(failed_jobs),
"pending_accounts": pending_accounts,
"pending_account_count": len(pending_accounts),
"integration_health": legacy.integrations_health(admin),
}
return _admin_ops_overview_payload(admin)
@app.post("/v2/admin/ops/incidents/scan")
def admin_ops_scan(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]: