feat: expand oneliner execution and admin audit logs

This commit is contained in:
kris
2026-03-23 15:43:34 +08:00
parent 28ac70cf8f
commit 3b7d4f0d5b

View File

@@ -283,6 +283,19 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
FOREIGN KEY(assigned_to) REFERENCES accounts(id) ON DELETE SET NULL,
FOREIGN KEY(reviewed_by) REFERENCES accounts(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS admin_ops_audit_logs (
id TEXT PRIMARY KEY,
actor_user_id TEXT NOT NULL DEFAULT '',
incident_id TEXT NOT NULL DEFAULT '',
action_key TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'recorded',
summary TEXT NOT NULL DEFAULT '',
details_json TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL,
FOREIGN KEY(actor_user_id) REFERENCES accounts(id) ON DELETE SET NULL,
FOREIGN KEY(incident_id) REFERENCES admin_ops_incidents(id) ON DELETE CASCADE
);
"""
with legacy.db.session() as conn:
conn.executescript(schema)
@@ -579,6 +592,54 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"updated_at": row["updated_at"],
}
def _admin_audit_payload(row: dict[str, Any]) -> dict[str, Any]:
return {
"id": row["id"],
"actor_user_id": row.get("actor_user_id", ""),
"incident_id": row.get("incident_id", ""),
"action_key": row.get("action_key", ""),
"status": row.get("status", "recorded"),
"summary": row.get("summary", ""),
"details": _parse_json(row.get("details_json"), {}),
"created_at": row.get("created_at", ""),
}
def _log_admin_audit_event(
*,
actor_user_id: str,
incident_id: str = "",
action_key: str,
status: str,
summary: str,
details: dict[str, Any] | None = None,
) -> dict[str, Any]:
audit_id = make_id("ops_audit")
timestamp = now()
sql = """
INSERT INTO admin_ops_audit_logs (
id, actor_user_id, incident_id, action_key, status, summary, details_json, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (
audit_id,
actor_user_id,
incident_id,
action_key,
status,
summary,
_dump(details or {}),
timestamp,
)
if incident_id:
legacy.db.execute(sql, params)
else:
with legacy.db.session() as conn:
conn.execute("PRAGMA foreign_keys=OFF")
conn.execute(sql, params)
conn.execute("PRAGMA foreign_keys=ON")
row = legacy.db.fetch_one("SELECT * FROM admin_ops_audit_logs WHERE id = ?", (audit_id,))
return _admin_audit_payload(row)
def _platform_source_samples(
account: dict[str, Any],
*,
@@ -598,6 +659,47 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
)
return [legacy.content_source_payload(row) for row in rows]
def _resolve_execution_assistant(account: dict[str, Any], *, project_id: str, platform: str = "") -> dict[str, Any] | None:
normalized_platform = _safe_platform(platform or "", fallback="")
if normalized_platform:
profile_row = legacy.db.fetch_one(
"SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?",
(account["id"], project_id, normalized_platform),
)
if profile_row and profile_row.get("assistant_id"):
assistant = _resolve_assistant(account, profile_row.get("assistant_id"), project_id)
if assistant:
return assistant
profile_row = _fetch_profile_row(account, project_id) or _ensure_oneliner_profile(account, project_id)
if profile_row.get("assistant_id"):
assistant = _resolve_assistant(account, profile_row.get("assistant_id"), project_id)
if assistant:
return assistant
return _resolve_assistant(account, None, project_id)
def _latest_project_job(account: dict[str, Any], *, project_id: str) -> dict[str, Any] | None:
return legacy.db.fetch_one(
"""
SELECT * FROM jobs
WHERE user_id = ? AND project_id = ? AND status IN ('completed', 'done', 'succeeded')
ORDER BY updated_at DESC, created_at DESC
LIMIT 1
""",
(account["id"], project_id),
)
def _last_user_message_text(session_id: str, account_id: str) -> str:
row = legacy.db.fetch_one(
"""
SELECT * FROM oneliner_messages
WHERE session_id = ? AND user_id = ? AND role = 'user'
ORDER BY created_at DESC
LIMIT 1
""",
(session_id, account_id),
)
return str((row or {}).get("content") or "").strip()
def _load_owned_session(session_id: str, account: dict[str, Any]) -> dict[str, Any]:
row = legacy.db.fetch_one(
"SELECT * FROM oneliner_sessions WHERE id = ? AND user_id = ?",
@@ -1008,6 +1110,27 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"platform": plan.get("platform", ""),
}
)
if context.get("assistant"):
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "直接生成一版文案",
"kind": "api_action",
"executor_key": "generate-copy",
"platform": plan.get("platform", ""),
}
)
latest_job = _latest_project_job(account, project_id=project_id or "")
if latest_job:
secondary_actions.append(
{
"key": "run-oneliner-action",
"label": "生成复盘草稿",
"kind": "api_action",
"executor_key": "review-draft",
"platform": plan.get("platform", ""),
}
)
if account.get("role") == "super_admin":
secondary_actions.append(
{
@@ -1368,6 +1491,13 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
return {
"created_or_updated": created,
"count": len(created),
"audit": _log_admin_audit_event(
actor_user_id=admin["id"],
action_key="scan",
status="completed",
summary=f"本轮扫描归集 {len(created)} 条事件。",
details={"count": len(created)},
),
}
def _admin_ops_overview_payload(admin: dict[str, Any]) -> dict[str, Any]:
@@ -1393,6 +1523,12 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
legacy.normalize_account(row)
for row in legacy.db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC LIMIT 20")
]
recent_audits = [
_admin_audit_payload(row)
for row in legacy.db.fetch_all(
"SELECT * FROM admin_ops_audit_logs ORDER BY created_at DESC LIMIT 20"
)
]
return {
"incidents": incidents,
"incident_count": len(incidents),
@@ -1402,6 +1538,8 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"failed_job_count": len(failed_jobs),
"pending_accounts": pending_accounts,
"pending_account_count": len(pending_accounts),
"recent_audits": recent_audits,
"audit_count": len(recent_audits),
"integration_health": legacy.integrations_health(admin),
}
@@ -1569,6 +1707,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
action_key = (request.action_key or "").strip()
if not action_key:
raise HTTPException(status_code=400, detail="Action key is required")
latest_user_message = _last_user_message_text(request.session_id, account["id"]) if request.session_id else ""
async def _run_platform_self_check() -> dict[str, Any]:
if not normalized_platform:
@@ -1615,11 +1754,79 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"payload": payload,
}
async def _run_generate_copy() -> dict[str, Any]:
assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=normalized_platform)
if not assistant:
raise HTTPException(status_code=404, detail="No execution assistant available")
brief = str((request.payload or {}).get("brief") or latest_user_message or "").strip()
if not brief:
brief = f"请基于当前项目目标,输出一版适合{legacy.platform_label(normalized_platform or 'douyin')}发布的短视频文案。"
payload = await legacy.generate_copy(
assistant["id"],
legacy.GenerateCopyRequest(
brief=brief,
platform=normalized_platform or "douyin",
audience=str((request.payload or {}).get("audience") or "创业者"),
extra_requirements=str((request.payload or {}).get("extra_requirements") or ""),
knowledge_base_ids=list((request.payload or {}).get("knowledge_base_ids") or []),
),
account,
)
return {
"title": "OneLiner 已生成文案",
"summary": f"已用 {assistant.get('name') or '默认 Agent'} 生成一版可发布文案。",
"payload": payload,
}
async def _run_review_draft() -> dict[str, Any]:
latest_job = _latest_project_job(account, project_id=project["id"])
if not latest_job:
raise HTTPException(status_code=404, detail="No completed job available for review draft")
existing = legacy.db.fetch_one(
"SELECT * FROM publish_reviews WHERE user_id = ? AND source_job_id = ? ORDER BY created_at DESC LIMIT 1",
(account["id"], latest_job["id"]),
)
if existing:
payload = legacy.review_payload(existing)
return {
"title": "OneLiner 找到已有复盘",
"summary": f"任务「{latest_job.get('title') or latest_job['id']}」已经有复盘记录。",
"payload": payload,
}
assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=normalized_platform)
result = latest_job.get("result_json") or "{}"
try:
result_map = json.loads(result)
except json.JSONDecodeError:
result_map = {}
payload = legacy.create_review(
legacy.ReviewCreateRequest(
project_id=project["id"],
source_job_id=latest_job["id"],
assistant_id=(assistant or {}).get("id", ""),
title=f"{latest_job.get('title') or '任务'} 复盘草稿",
platform=normalized_platform or "douyin",
content_type="video",
verdict="待补充",
highlights=str(result_map.get("summary") or result_map.get("headline_summary") or "")[:400],
next_actions="补充发布结果、完善指标、确认下一步动作。",
notes=str((request.payload or {}).get("notes") or "由 OneLiner 自动生成复盘草稿。"),
),
account,
)
return {
"title": "OneLiner 已生成复盘草稿",
"summary": f"已基于最近完成任务「{latest_job.get('title') or latest_job['id']}」生成复盘草稿。",
"payload": payload,
}
executors = {
"platform-self-check": _run_platform_self_check,
"storage-status": _run_storage_status,
"live-recorder-status": _run_live_recorder_status,
"scan-admin-ops": _run_ops_scan,
"generate-copy": _run_generate_copy,
"review-draft": _run_review_draft,
}
executor = executors.get(action_key)
if not executor:
@@ -1960,4 +2167,17 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
),
)
row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,))
return _incident_payload(row)
payload = _incident_payload(row)
payload["audit"] = _log_admin_audit_event(
actor_user_id=admin["id"],
incident_id=incident_id,
action_key="review",
status=payload.get("status", "reviewed"),
summary=f"事件「{payload.get('title', incident_id)}」已更新为 {payload.get('status', 'reviewed')}",
details={
"review_notes": request.review_notes.strip(),
"severity": payload.get("severity", ""),
"source_type": payload.get("source_type", ""),
},
)
return payload