From 628eeb0d08b48f9132a8284aa31db09af9196c47 Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 23 Mar 2026 15:43:34 +0800 Subject: [PATCH] feat: extend oneliner actions and ops workspace --- collector-service/app/oneliner_features.py | 222 ++++++++++++++++++++- web/storyforge-web-v4/assets/app.js | 44 ++++ 2 files changed, 265 insertions(+), 1 deletion(-) diff --git a/collector-service/app/oneliner_features.py b/collector-service/app/oneliner_features.py index b3f41d3..077281b 100644 --- a/collector-service/app/oneliner_features.py +++ b/collector-service/app/oneliner_features.py @@ -283,6 +283,19 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: FOREIGN KEY(assigned_to) REFERENCES accounts(id) ON DELETE SET NULL, FOREIGN KEY(reviewed_by) REFERENCES accounts(id) ON DELETE SET NULL ); + + CREATE TABLE IF NOT EXISTS admin_ops_audit_logs ( + id TEXT PRIMARY KEY, + actor_user_id TEXT NOT NULL DEFAULT '', + incident_id TEXT NOT NULL DEFAULT '', + action_key TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'recorded', + summary TEXT NOT NULL DEFAULT '', + details_json TEXT NOT NULL DEFAULT '{}', + created_at TEXT NOT NULL, + FOREIGN KEY(actor_user_id) REFERENCES accounts(id) ON DELETE SET NULL, + FOREIGN KEY(incident_id) REFERENCES admin_ops_incidents(id) ON DELETE CASCADE + ); """ with legacy.db.session() as conn: conn.executescript(schema) @@ -579,6 +592,54 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "updated_at": row["updated_at"], } + def _admin_audit_payload(row: dict[str, Any]) -> dict[str, Any]: + return { + "id": row["id"], + "actor_user_id": row.get("actor_user_id", ""), + "incident_id": row.get("incident_id", ""), + "action_key": row.get("action_key", ""), + "status": row.get("status", "recorded"), + "summary": row.get("summary", ""), + "details": _parse_json(row.get("details_json"), {}), + "created_at": row.get("created_at", ""), + } + + def _log_admin_audit_event( + *, + actor_user_id: str, + incident_id: str = "", + action_key: str, + status: str, + summary: str, + details: dict[str, Any] | None = None, + ) -> dict[str, Any]: + audit_id = make_id("ops_audit") + timestamp = now() + sql = """ + INSERT INTO admin_ops_audit_logs ( + id, actor_user_id, incident_id, action_key, status, summary, details_json, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """ + params = ( + audit_id, + actor_user_id, + incident_id, + action_key, + status, + summary, + _dump(details or {}), + timestamp, + ) + if incident_id: + legacy.db.execute(sql, params) + else: + with legacy.db.session() as conn: + conn.execute("PRAGMA foreign_keys=OFF") + conn.execute(sql, params) + conn.execute("PRAGMA foreign_keys=ON") + row = legacy.db.fetch_one("SELECT * FROM admin_ops_audit_logs WHERE id = ?", (audit_id,)) + return _admin_audit_payload(row) + def _platform_source_samples( account: dict[str, Any], *, @@ -598,6 +659,47 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ) return [legacy.content_source_payload(row) for row in rows] + def _resolve_execution_assistant(account: dict[str, Any], *, project_id: str, platform: str = "") -> dict[str, Any] | None: + normalized_platform = _safe_platform(platform or "", fallback="") + if normalized_platform: + profile_row = legacy.db.fetch_one( + "SELECT * FROM platform_agent_profiles WHERE user_id = ? AND project_id = ? AND platform = ?", + (account["id"], project_id, normalized_platform), + ) + if profile_row and profile_row.get("assistant_id"): + assistant = _resolve_assistant(account, profile_row.get("assistant_id"), project_id) + if assistant: + return assistant + profile_row = _fetch_profile_row(account, project_id) or _ensure_oneliner_profile(account, project_id) + if profile_row.get("assistant_id"): + assistant = _resolve_assistant(account, profile_row.get("assistant_id"), project_id) + if assistant: + return assistant + return _resolve_assistant(account, None, project_id) + + def _latest_project_job(account: dict[str, Any], *, project_id: str) -> dict[str, Any] | None: + return legacy.db.fetch_one( + """ + SELECT * FROM jobs + WHERE user_id = ? AND project_id = ? AND status IN ('completed', 'done', 'succeeded') + ORDER BY updated_at DESC, created_at DESC + LIMIT 1 + """, + (account["id"], project_id), + ) + + def _last_user_message_text(session_id: str, account_id: str) -> str: + row = legacy.db.fetch_one( + """ + SELECT * FROM oneliner_messages + WHERE session_id = ? AND user_id = ? AND role = 'user' + ORDER BY created_at DESC + LIMIT 1 + """, + (session_id, account_id), + ) + return str((row or {}).get("content") or "").strip() + def _load_owned_session(session_id: str, account: dict[str, Any]) -> dict[str, Any]: row = legacy.db.fetch_one( "SELECT * FROM oneliner_sessions WHERE id = ? AND user_id = ?", @@ -1008,6 +1110,27 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "platform": plan.get("platform", ""), } ) + if context.get("assistant"): + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接生成一版文案", + "kind": "api_action", + "executor_key": "generate-copy", + "platform": plan.get("platform", ""), + } + ) + latest_job = _latest_project_job(account, project_id=project_id or "") + if latest_job: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "生成复盘草稿", + "kind": "api_action", + "executor_key": "review-draft", + "platform": plan.get("platform", ""), + } + ) if account.get("role") == "super_admin": secondary_actions.append( { @@ -1368,6 +1491,13 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: return { "created_or_updated": created, "count": len(created), + "audit": _log_admin_audit_event( + actor_user_id=admin["id"], + action_key="scan", + status="completed", + summary=f"本轮扫描归集 {len(created)} 条事件。", + details={"count": len(created)}, + ), } def _admin_ops_overview_payload(admin: dict[str, Any]) -> dict[str, Any]: @@ -1393,6 +1523,12 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: legacy.normalize_account(row) for row in legacy.db.fetch_all("SELECT * FROM accounts WHERE approval_status = 'pending' ORDER BY created_at ASC LIMIT 20") ] + recent_audits = [ + _admin_audit_payload(row) + for row in legacy.db.fetch_all( + "SELECT * FROM admin_ops_audit_logs ORDER BY created_at DESC LIMIT 20" + ) + ] return { "incidents": incidents, "incident_count": len(incidents), @@ -1402,6 +1538,8 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "failed_job_count": len(failed_jobs), "pending_accounts": pending_accounts, "pending_account_count": len(pending_accounts), + "recent_audits": recent_audits, + "audit_count": len(recent_audits), "integration_health": legacy.integrations_health(admin), } @@ -1569,6 +1707,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: action_key = (request.action_key or "").strip() if not action_key: raise HTTPException(status_code=400, detail="Action key is required") + latest_user_message = _last_user_message_text(request.session_id, account["id"]) if request.session_id else "" async def _run_platform_self_check() -> dict[str, Any]: if not normalized_platform: @@ -1615,11 +1754,79 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "payload": payload, } + async def _run_generate_copy() -> dict[str, Any]: + assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=normalized_platform) + if not assistant: + raise HTTPException(status_code=404, detail="No execution assistant available") + brief = str((request.payload or {}).get("brief") or latest_user_message or "").strip() + if not brief: + brief = f"请基于当前项目目标,输出一版适合{legacy.platform_label(normalized_platform or 'douyin')}发布的短视频文案。" + payload = await legacy.generate_copy( + assistant["id"], + legacy.GenerateCopyRequest( + brief=brief, + platform=normalized_platform or "douyin", + audience=str((request.payload or {}).get("audience") or "创业者"), + extra_requirements=str((request.payload or {}).get("extra_requirements") or ""), + knowledge_base_ids=list((request.payload or {}).get("knowledge_base_ids") or []), + ), + account, + ) + return { + "title": "OneLiner 已生成文案", + "summary": f"已用 {assistant.get('name') or '默认 Agent'} 生成一版可发布文案。", + "payload": payload, + } + + async def _run_review_draft() -> dict[str, Any]: + latest_job = _latest_project_job(account, project_id=project["id"]) + if not latest_job: + raise HTTPException(status_code=404, detail="No completed job available for review draft") + existing = legacy.db.fetch_one( + "SELECT * FROM publish_reviews WHERE user_id = ? AND source_job_id = ? ORDER BY created_at DESC LIMIT 1", + (account["id"], latest_job["id"]), + ) + if existing: + payload = legacy.review_payload(existing) + return { + "title": "OneLiner 找到已有复盘", + "summary": f"任务「{latest_job.get('title') or latest_job['id']}」已经有复盘记录。", + "payload": payload, + } + assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=normalized_platform) + result = latest_job.get("result_json") or "{}" + try: + result_map = json.loads(result) + except json.JSONDecodeError: + result_map = {} + payload = legacy.create_review( + legacy.ReviewCreateRequest( + project_id=project["id"], + source_job_id=latest_job["id"], + assistant_id=(assistant or {}).get("id", ""), + title=f"{latest_job.get('title') or '任务'} 复盘草稿", + platform=normalized_platform or "douyin", + content_type="video", + verdict="待补充", + highlights=str(result_map.get("summary") or result_map.get("headline_summary") or "")[:400], + next_actions="补充发布结果、完善指标、确认下一步动作。", + notes=str((request.payload or {}).get("notes") or "由 OneLiner 自动生成复盘草稿。"), + ), + account, + ) + return { + "title": "OneLiner 已生成复盘草稿", + "summary": f"已基于最近完成任务「{latest_job.get('title') or latest_job['id']}」生成复盘草稿。", + "payload": payload, + } + executors = { "platform-self-check": _run_platform_self_check, "storage-status": _run_storage_status, "live-recorder-status": _run_live_recorder_status, "scan-admin-ops": _run_ops_scan, + "generate-copy": _run_generate_copy, + "review-draft": _run_review_draft, } executor = executors.get(action_key) if not executor: @@ -1960,4 +2167,17 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ), ) row = legacy.db.fetch_one("SELECT * FROM admin_ops_incidents WHERE id = ?", (incident_id,)) - return _incident_payload(row) + payload = _incident_payload(row) + payload["audit"] = _log_admin_audit_event( + actor_user_id=admin["id"], + incident_id=incident_id, + action_key="review", + status=payload.get("status", "reviewed"), + summary=f"事件「{payload.get('title', incident_id)}」已更新为 {payload.get('status', 'reviewed')}。", + details={ + "review_notes": request.review_notes.strip(), + "severity": payload.get("severity", ""), + "source_type": payload.get("source_type", ""), + }, + ) + return payload diff --git a/web/storyforge-web-v4/assets/app.js b/web/storyforge-web-v4/assets/app.js index d1caf7d..07978af 100644 --- a/web/storyforge-web-v4/assets/app.js +++ b/web/storyforge-web-v4/assets/app.js @@ -1173,6 +1173,32 @@ function renderOneLinerExecutionPayloadHtml(payload) { `; } + if (payload.content) { + return ` +
+

生成文案

+

${escapeHtml(brief(payload.content, 1200))}

+
+ ${payload.assistant_id ? `${escapeHtml(payload.assistant_id)}` : ""} + ${escapeHtml(formatNumber(safeArray(payload.used_documents).length))} 个参考素材 +
+
+ `; + } + if (payload.verdict !== undefined || payload.next_actions !== undefined || payload.highlights !== undefined) { + return ` +
+

${escapeHtml(payload.title || "复盘草稿")}

+

${escapeHtml(payload.highlights || payload.notes || "已生成一版待补充的复盘草稿。")}

+
+ ${escapeHtml(platformLabel(payload.platform || "douyin"))} + ${escapeHtml(payload.verdict || "待补充")} + ${payload.source_job_id ? `看任务详情` : ""} + ${payload.id ? `打开复盘` : ""} +
+
+ `; + } return `

原始结果

@@ -2123,6 +2149,7 @@ function renderAdminOpsPanel() { `; } const incidents = safeArray(overview.incidents).slice(0, 6); + const audits = safeArray(overview.recent_audits).slice(0, 5); return `
@@ -2156,6 +2183,23 @@ function renderAdminOpsPanel() {
`).join("") || `

当前没有待处理事件

最近主链比较稳定,继续观察即可。

`}
+
+
+

最近审计记录

+

保留管理员扫描、放行、驳回等动作,方便商业化量产时追责和复盘。

+
+ ${audits.map((item) => ` +
+

${escapeHtml(item.summary || item.action_key || "审计记录")}

+

${escapeHtml(formatDateTime(item.created_at))}

+
+ ${escapeHtml(item.action_key || "audit")} + ${escapeHtml(item.status || "recorded")} + ${item.incident_id ? `事件 ${escapeHtml(brief(item.incident_id, 10))}` : ""} +
+
+ `).join("") || `

还没有审计记录

等管理员做一次扫描或审计处理后,这里会自动出现。

`} +
`; }