From 22f6e6e68688bc291d6c41aee3d5e69a4d3325bd Mon Sep 17 00:00:00 2001 From: kris Date: Sun, 5 Apr 2026 06:28:10 +0800 Subject: [PATCH] feat: add direct tracking pool sync actions --- CHANGELOG.md | 19 ++ collector-service/app/oneliner_features.py | 353 +++++++++++++++++++++ tests/test_main_agent_governance.py | 144 +++++++++ 3 files changed, 516 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba61c1f..357a625 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,25 @@ ## 2026-04-05 +### 主 Agent 可直接执行分析账号、加入跟踪、创建 Agent + +- `OneLiner / 主 Agent` 的动作执行器现在新增了三条真实动作: + - `直接分析账号` + - `直接加入跟踪` + - `直接创建 Agent` +- 这三条链不再只是“建议 + 跳页”,而是会直接调用当前 live 后端接口完成动作,再把结果落回工作台。 +- `分析账号` 现在会直接调用对应平台的账号分析接口,并把结果回到当前对象详情。 +- `加入跟踪` 会直接创建跟踪对象,并在支持任务同步的平台上立即触发一次同步,再把落点带回任务详情或跟踪对象。 +- `创建 Agent` 会直接在当前项目下创建 Agent,并把工作流继续落到编辑页。 +- 治理回归新增了这三条执行器的 live 断言,锁住动作注册表、分析执行、跟踪执行和 Agent 创建这条链不能退回成假执行器。 + +### 主 Agent 可直接批量同步跟踪池 + +- `OneLiner / 主 Agent` 现在新增了 `直接同步跟踪池` 动作,会批量触发当前平台已跟踪账号的同步任务。 +- 这条链会直接调用 `/v2/{platform}/tracking/refresh`,不再只是建议用户先跳去跟踪页再手动点同步。 +- 如果本轮只生成了一条同步任务,结果会直接落到任务详情;如果是多条批量同步,则回到跟踪工作区继续看结果。 +- 治理回归补上了这条动作的断言,锁住动作注册表、批量同步执行和推荐落点都必须保持 live。 + ### 额度编辑弹层补成真正的套餐配置器 - `编辑租户额度` 不再只是裸数字表单,而是会即时预览当前套餐的预算、动作池和预警阈值。 diff --git a/collector-service/app/oneliner_features.py b/collector-service/app/oneliner_features.py index 23a57ed..8b5bf91 100644 --- a/collector-service/app/oneliner_features.py +++ b/collector-service/app/oneliner_features.py @@ -1,5 +1,6 @@ from __future__ import annotations +import httpx import json import re from datetime import datetime, timezone @@ -315,6 +316,46 @@ ACTION_REGISTRY_DEFAULTS: dict[str, dict[str, Any]] = { "requires_platform": True, "config": {"top_video_count": 4}, }, + "analyze-account": { + "label": "直接分析账号", + "description": "基于当前平台账号直接生成账号分析报告与下一步建议。", + "category": "analysis", + "handler_key": "analyze-account", + "status": "enabled", + "admin_only": False, + "requires_platform": True, + "config": {"max_videos": 6}, + }, + "track-account": { + "label": "直接加入跟踪", + "description": "把当前平台账号加入跟踪,并立即触发一次同步。", + "category": "tracking", + "handler_key": "track-account", + "status": "enabled", + "admin_only": False, + "requires_platform": True, + "config": {"refresh_now": True}, + }, + "refresh-tracking": { + "label": "直接同步跟踪池", + "description": "批量触发当前平台已跟踪账号的同步任务。", + "category": "tracking", + "handler_key": "refresh-tracking", + "status": "enabled", + "admin_only": False, + "requires_platform": True, + "config": {}, + }, + "create-assistant": { + "label": "直接创建 Agent", + "description": "根据当前项目和平台上下文,直接创建可继续编辑的 Agent。", + "category": "agent", + "handler_key": "create-assistant", + "status": "enabled", + "admin_only": False, + "requires_platform": False, + "config": {}, + }, "create-ai-video": { "label": "直接创建 AI 视频", "description": "基于最近可用源任务直接创建 AI 视频链任务。", @@ -404,6 +445,9 @@ ACTION_USAGE_KEYS: dict[str, str] = { "review-draft": "review", "import-homepage": "content_source_sync", "analyze-top-videos": "analysis", + "analyze-account": "analysis", + "track-account": "content_source_sync", + "refresh-tracking": "content_source_sync", "create-ai-video": "ai_video", "create-real-cut": "real_cut", "save-live-recorder-source": "live_recorder", @@ -3173,6 +3217,81 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: (account["id"], project_id, platform), ) + def _latest_douyin_account( + account: dict[str, Any], + *, + project_id: str, + ) -> dict[str, Any] | None: + return legacy.db.fetch_one( + """ + SELECT * FROM douyin_accounts + WHERE user_id = ? AND project_id = ? + ORDER BY updated_at DESC, created_at DESC + LIMIT 1 + """, + (account["id"], project_id), + ) + + def _resolve_platform_target_account( + account: dict[str, Any], + *, + project_id: str, + platform: str, + requested_account_id: str = "", + ) -> dict[str, Any] | None: + normalized_platform = _safe_platform(platform, fallback="douyin") + normalized_requested = str(requested_account_id or "").strip() + if normalized_platform == "douyin": + if normalized_requested: + return legacy.db.fetch_one( + "SELECT * FROM douyin_accounts WHERE id = ? AND user_id = ? AND project_id = ?", + (normalized_requested, account["id"], project_id), + ) + return _latest_douyin_account(account, project_id=project_id) + if normalized_requested: + return legacy.db.fetch_one( + """ + SELECT * FROM content_sources + WHERE id = ? AND user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account' + """, + (normalized_requested, account["id"], project_id, normalized_platform), + ) + return _latest_platform_account(account, project_id=project_id, platform=normalized_platform) + + async def _call_local_api( + account: dict[str, Any], + *, + method: str, + path: str, + json_body: dict[str, Any] | None = None, + query: dict[str, Any] | None = None, + ) -> Any: + issued = legacy.issue_auth_token(account, mode="internal") + token = issued["token"] + transport = httpx.ASGITransport(app=app) + try: + async with httpx.AsyncClient(transport=transport, base_url="http://storyforge.internal") as client: + response = await client.request( + method.upper(), + path, + params=query or None, + json=json_body, + headers={"Authorization": f"Bearer {token}"}, + timeout=60.0, + ) + if response.status_code >= 400: + try: + payload = response.json() + except Exception: + payload = {"detail": response.text} + detail = payload.get("detail", payload) + raise HTTPException(status_code=response.status_code, detail=detail) + if not response.content: + return {} + return response.json() + finally: + legacy.db.execute("DELETE FROM auth_tokens WHERE token = ?", (token,)) + def _load_owned_job(account: dict[str, Any], job_id: str) -> dict[str, Any] | None: normalized_job_id = str(job_id or "").strip() if not normalized_job_id: @@ -4396,6 +4515,18 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: platform=plan.get("platform", ""), ) if plan.get("platform") and latest_platform_account and plan.get("intent_key") in {"analyze_top_videos", "analyze_account", "custom"}: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接分析账号", + "kind": "api_action", + "executor_key": "analyze-account", + "platform": plan.get("platform", ""), + "payload": { + "target_account_id": latest_platform_account["id"], + }, + } + ) secondary_actions.append( { "key": "run-oneliner-action", @@ -4408,6 +4539,30 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: }, } ) + if plan.get("platform") and latest_platform_account and plan.get("intent_key") in {"track_account", "custom"}: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接加入跟踪", + "kind": "api_action", + "executor_key": "track-account", + "platform": plan.get("platform", ""), + "payload": { + "target_account_id": latest_platform_account["id"], + "refresh_now": True, + }, + } + ) + if plan.get("platform") and plan.get("intent_key") in {"track_account", "custom"}: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接同步跟踪池", + "kind": "api_action", + "executor_key": "refresh-tracking", + "platform": plan.get("platform", ""), + } + ) if context_assistant: secondary_actions.append( { @@ -4418,6 +4573,21 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "platform": plan.get("platform", ""), } ) + if not context_assistant and plan.get("intent_key") in {"create_assistant", "custom"}: + secondary_actions.append( + { + "key": "run-oneliner-action", + "label": "直接创建 Agent", + "kind": "api_action", + "executor_key": "create-assistant", + "platform": plan.get("platform", ""), + "payload": { + "name": plan.get("platform_label") + and f"{plan.get('platform_label')} 执行 Agent" + or "项目执行 Agent", + }, + } + ) latest_job = _latest_project_job(account, project_id=project_id or "") if latest_job: secondary_actions.append( @@ -5552,6 +5722,153 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ), } + async def _run_analyze_account() -> dict[str, Any]: + if not normalized_platform: + raise HTTPException(status_code=400, detail="Platform is required for account analysis") + target_account = _resolve_platform_target_account( + account, + project_id=project["id"], + platform=normalized_platform, + requested_account_id=str(requested_payload.get("target_account_id") or requested_payload.get("targetAccountId") or ""), + ) + if not target_account: + raise HTTPException(status_code=404, detail="No platform account available for analysis") + body = { + "model_profile_ids": list(requested_payload.get("model_profile_ids") or requested_payload.get("modelProfileIds") or []), + "linked_account_ids": list(requested_payload.get("linked_account_ids") or requested_payload.get("linkedAccountIds") or []), + "include_linked_accounts": bool(requested_payload.get("include_linked_accounts", requested_payload.get("includeLinkedAccounts", True))), + "include_recent_similar_candidates": bool(requested_payload.get("include_recent_similar_candidates", requested_payload.get("includeRecentSimilarCandidates", True))), + "max_videos": max(1, min(int(requested_payload.get("max_videos") or requested_payload.get("maxVideos") or 6), 20)), + "extra_focus": str(requested_payload.get("extra_focus") or requested_payload.get("extraFocus") or latest_user_message or ""), + "temperature": float(requested_payload.get("temperature") or 0.35), + } + if normalized_platform == "douyin": + body["auto_analyze_top_videos"] = bool(requested_payload.get("auto_analyze_top_videos", requested_payload.get("autoAnalyzeTopVideos", True))) + body["top_video_analysis_count"] = max(1, min(int(requested_payload.get("top_video_analysis_count") or requested_payload.get("topVideoAnalysisCount") or 6), 10)) + else: + body["auto_analyze_top_videos"] = bool(requested_payload.get("auto_analyze_top_videos", requested_payload.get("autoAnalyzeTopVideos", False))) + body["top_video_analysis_count"] = max(1, min(int(requested_payload.get("top_video_analysis_count") or requested_payload.get("topVideoAnalysisCount") or 4), 10)) + payload = await _call_local_api( + account, + method="POST", + path=f"/v2/{normalized_platform}/accounts/{target_account['id']}/analysis", + json_body=body, + ) + return { + "title": "OneLiner 已分析账号", + "summary": f"已为 {legacy.platform_label(normalized_platform)} 账号生成分析结论和下一步建议。", + "payload": { + "platform": normalized_platform, + "account_id": target_account["id"], + "analysis": payload, + }, + "recommended_action": _recommended_action( + "select-account", + label="打开当前对象", + summary=f"继续查看 {legacy.platform_label(normalized_platform)} 账号分析报告、快照和拆解结果。", + screen="discovery", + account_id=target_account["id"], + ), + } + + async def _run_track_account() -> dict[str, Any]: + if not normalized_platform: + raise HTTPException(status_code=400, detail="Platform is required for tracking") + target_account = _resolve_platform_target_account( + account, + project_id=project["id"], + platform=normalized_platform, + requested_account_id=str(requested_payload.get("target_account_id") or requested_payload.get("targetAccountId") or ""), + ) + if not target_account: + raise HTTPException(status_code=404, detail="No platform account available for tracking") + assistant = _resolve_execution_assistant(account, project_id=project["id"], platform=normalized_platform) + tracked_payload = await _call_local_api( + account, + method="POST", + path=f"/v2/{normalized_platform}/tracking/accounts", + json_body={ + "tracked_account_id": target_account["id"], + "assistant_id": (assistant or {}).get("id", ""), + "note": str(requested_payload.get("note") or "由 OneLiner 直接加入跟踪。"), + }, + ) + refreshed_payload = None + if bool(requested_payload.get("refresh_now", True)): + refreshed_payload = await _call_local_api( + account, + method="POST", + path=f"/v2/{normalized_platform}/tracking/accounts/{target_account['id']}/refresh", + json_body={}, + ) + sync_job_id = str((refreshed_payload or {}).get("sync_job_id") or "").strip() + recommended_action = ( + _recommended_action( + "open-job-detail", + label="看任务详情", + summary="继续查看这条跟踪同步任务的执行进度。", + screen="production", + job_id=sync_job_id, + ) + if sync_job_id + else _recommended_action( + "refresh-tracked-account", + label="继续同步当前账号", + summary="继续查看当前已跟踪账号的最新同步和日报。", + screen="tracking", + tracked_account_id=target_account["id"], + ) + ) + return { + "title": "OneLiner 已加入跟踪", + "summary": f"已把当前 {legacy.platform_label(normalized_platform)} 账号加入跟踪,并触发一次同步。", + "payload": { + "platform": normalized_platform, + "account_id": target_account["id"], + "tracking": tracked_payload, + "refresh": refreshed_payload or {}, + }, + "recommended_action": recommended_action, + } + + async def _run_refresh_tracking() -> dict[str, Any]: + if not normalized_platform: + raise HTTPException(status_code=400, detail="Platform is required for tracking refresh") + payload = await _call_local_api( + account, + method="POST", + path=f"/v2/{normalized_platform}/tracking/refresh", + json_body={}, + ) + items = list(payload.get("items") or []) + sync_job_id = str((items[0] or {}).get("sync_job_id") or "").strip() if len(items) == 1 else "" + recommended_action = ( + _recommended_action( + "open-job-detail", + label="看同步任务", + summary="继续查看这批跟踪同步任务里最新的一条执行进度。", + screen="production", + job_id=sync_job_id, + ) + if sync_job_id + else _recommended_action( + "goto-tracking", + label="回到跟踪工作区", + summary="继续查看当前平台跟踪池的同步结果和日报变化。", + screen="tracking", + platform=normalized_platform, + ) + ) + return { + "title": "OneLiner 已同步跟踪池", + "summary": f"已为 {legacy.platform_label(normalized_platform)} 跟踪池触发同步,成功 {int(payload.get('refreshed') or 0)} 条。", + "payload": { + "platform": normalized_platform, + "refresh": payload, + }, + "recommended_action": recommended_action, + } + async def _run_analyze_top_videos() -> dict[str, Any]: if not normalized_platform: raise HTTPException(status_code=400, detail="Platform is required for top video analysis") @@ -5723,6 +6040,38 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: ), } + async def _run_create_assistant() -> dict[str, Any]: + default_name = str( + requested_payload.get("name") + or requested_payload.get("assistant_name") + or requested_payload.get("assistantName") + or (f"{legacy.platform_label(normalized_platform)} 执行 Agent" if normalized_platform else f"{project.get('name') or '项目'} Agent") + ).strip() + assistant = legacy.create_assistant( + legacy.AssistantCreateRequest( + project_id=project["id"], + name=default_name, + description=str(requested_payload.get("description") or f"由 OneLiner 基于当前项目上下文创建,用于承接{legacy.platform_label(normalized_platform) if normalized_platform else '当前项目'}任务。"), + generation_goal=str(requested_payload.get("generation_goal") or requested_payload.get("goal") or f"承接{legacy.platform_label(normalized_platform) if normalized_platform else '当前项目'}内容生产和分析动作。"), + system_prompt=str(requested_payload.get("system_prompt") or requested_payload.get("systemPrompt") or ""), + knowledge_base_ids=list(requested_payload.get("knowledge_base_ids") or requested_payload.get("knowledgeBaseIds") or []), + model_profile_id=str(requested_payload.get("model_profile_id") or requested_payload.get("modelProfileId") or ""), + ), + account, + ) + return { + "title": "OneLiner 已创建 Agent", + "summary": f"已在当前项目下创建 Agent「{assistant.get('name') or '新 Agent'}」。", + "payload": {"assistant": assistant}, + "recommended_action": _recommended_action( + "open-edit-assistant", + label="继续编辑 Agent", + summary="继续补齐这条 Agent 的说明、目标和知识绑定。", + screen="playbook", + assistant_id=assistant.get("id", ""), + ), + } + async def _run_create_real_cut() -> dict[str, Any]: source_job = _load_owned_job(account, str(requested_payload.get("source_job_id") or requested_payload.get("sourceJobId") or "")) or _latest_derivable_job( account, @@ -5822,7 +6171,11 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: "generate-copy": _run_generate_copy, "review-draft": _run_review_draft, "import-homepage": _run_import_homepage, + "analyze-account": _run_analyze_account, + "track-account": _run_track_account, + "refresh-tracking": _run_refresh_tracking, "analyze-top-videos": _run_analyze_top_videos, + "create-assistant": _run_create_assistant, "create-ai-video": _run_create_ai_video, "create-real-cut": _run_create_real_cut, "save-live-recorder-source": _run_save_live_recorder_source, diff --git a/tests/test_main_agent_governance.py b/tests/test_main_agent_governance.py index 7a1000f..e5a4e8d 100644 --- a/tests/test_main_agent_governance.py +++ b/tests/test_main_agent_governance.py @@ -216,6 +216,35 @@ class MainAgentGovernanceTests(unittest.TestCase): ) return assistant_id + def _insert_content_source_account( + self, + *, + source_id: str = "source_member_platform", + platform: str = "kuaishou", + title: str = "平台账号", + source_url: str = "https://example.com/account", + ) -> str: + self.core.create_content_source( + account_id=self.ctx["member_id"], + project_id=self.ctx["project_id"], + source_kind="creator_account", + platform=platform, + handle=f"{platform}_handle", + source_url=source_url, + title=title, + metadata={"nickname": title, "platform": platform}, + ) + created = self.core.db.fetch_one( + """ + SELECT * FROM content_sources + WHERE user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account' + ORDER BY created_at DESC + LIMIT 1 + """, + (self.ctx["member_id"], self.ctx["project_id"], platform), + ) + return created["id"] + def _seed_approved_member_without_project(self) -> dict[str, Any]: now = self.db_module.utc_now() admin_id = "acct_admin" @@ -767,6 +796,11 @@ class MainAgentGovernanceTests(unittest.TestCase): self.assertGreater(registry_payload["count"], 0) default_action = next(item for item in registry_payload["items"] if item["action_key"] == "generate-copy") self.assertEqual(default_action["source"], "default") + action_keys = {item["action_key"] for item in registry_payload["items"]} + self.assertIn("analyze-account", action_keys) + self.assertIn("track-account", action_keys) + self.assertIn("refresh-tracking", action_keys) + self.assertIn("create-assistant", action_keys) save_registry = self.client.put( "/v2/oneliner/action-registry/generate-copy", @@ -999,6 +1033,116 @@ class MainAgentGovernanceTests(unittest.TestCase): self.assertEqual(payload["payload"]["job"]["artifacts"]["video_provider"], "seedance2") self.assertEqual(payload["payload"]["job"]["artifacts"]["video_model"], "seedance-2.0-pro") + def test_direct_oneliner_actions_execute_real_account_and_agent_flows(self) -> None: + source_id = self._insert_content_source_account( + platform="kuaishou", + title="快手测试账号", + source_url="https://www.kuaishou.com/profile/test-account", + ) + captured_model_calls: list[dict[str, Any]] = [] + + async def fake_call_model(profile: dict[str, Any], *, system_prompt: str, user_prompt: str, temperature: float = 0.3, **_: Any) -> str: + captured_model_calls.append( + { + "profile_id": profile["id"], + "system_prompt": system_prompt, + "user_prompt": user_prompt, + "temperature": temperature, + } + ) + return ( + '{"executive_summary":"账号可继续做增长拆解","borrow_points":["封面结构稳定"],' + '"risks":["更新频率偏低"],"next_actions":["继续同步并补日报"]}' + ) + + with patch.object(self.core, "call_model", new=AsyncMock(side_effect=fake_call_model)): + analyze_response = self.client.post( + "/v2/oneliner/actions/execute", + headers=self.ctx["member_headers"], + json={ + "action_key": "analyze-account", + "project_id": self.ctx["project_id"], + "platform": "kuaishou", + "payload": { + "target_account_id": source_id, + "extra_focus": "重点看商业化动作", + "max_videos": 4, + }, + }, + ) + self.assertEqual(analyze_response.status_code, 200, analyze_response.text) + analyze_payload = analyze_response.json() + self.assertEqual(analyze_payload["recommended_action"]["action"], "select-account") + self.assertEqual(analyze_payload["recommended_action"]["screen"], "discovery") + self.assertEqual(analyze_payload["recommended_action"]["account_id"], source_id) + self.assertEqual(analyze_payload["payload"]["platform"], "kuaishou") + self.assertEqual(analyze_payload["payload"]["account_id"], source_id) + self.assertTrue(captured_model_calls) + self.assertIn("重点看商业化动作", captured_model_calls[0]["user_prompt"]) + + track_response = self.client.post( + "/v2/oneliner/actions/execute", + headers=self.ctx["member_headers"], + json={ + "action_key": "track-account", + "project_id": self.ctx["project_id"], + "platform": "kuaishou", + "payload": { + "target_account_id": source_id, + "note": "由主 Agent 直接加入跟踪", + "refresh_now": True, + }, + }, + ) + self.assertEqual(track_response.status_code, 200, track_response.text) + track_payload = track_response.json() + self.assertEqual(track_payload["recommended_action"]["action"], "open-job-detail") + self.assertEqual(track_payload["recommended_action"]["screen"], "production") + self.assertTrue(track_payload["recommended_action"]["job_id"]) + self.assertEqual(track_payload["payload"]["platform"], "kuaishou") + self.assertEqual(track_payload["payload"]["account_id"], source_id) + self.assertEqual(track_payload["payload"]["refresh"]["tracked_account_id"], source_id) + self.assertTrue(track_payload["payload"]["refresh"]["sync_job_id"]) + + refresh_tracking_response = self.client.post( + "/v2/oneliner/actions/execute", + headers=self.ctx["member_headers"], + json={ + "action_key": "refresh-tracking", + "project_id": self.ctx["project_id"], + "platform": "kuaishou", + "payload": {}, + }, + ) + self.assertEqual(refresh_tracking_response.status_code, 200, refresh_tracking_response.text) + refresh_tracking_payload = refresh_tracking_response.json() + self.assertEqual(refresh_tracking_payload["recommended_action"]["action"], "open-job-detail") + self.assertEqual(refresh_tracking_payload["recommended_action"]["screen"], "production") + self.assertTrue(refresh_tracking_payload["recommended_action"]["job_id"]) + self.assertEqual(refresh_tracking_payload["payload"]["platform"], "kuaishou") + self.assertGreaterEqual(int(refresh_tracking_payload["payload"]["refresh"]["refreshed"] or 0), 1) + + create_response = self.client.post( + "/v2/oneliner/actions/execute", + headers=self.ctx["member_headers"], + json={ + "action_key": "create-assistant", + "project_id": self.ctx["project_id"], + "platform": "kuaishou", + "payload": { + "name": "快手增长 Agent", + "description": "负责承接快手平台增长动作", + "generation_goal": "围绕快手账号分析和同步动作持续执行", + }, + }, + ) + self.assertEqual(create_response.status_code, 200, create_response.text) + create_payload = create_response.json() + self.assertEqual(create_payload["recommended_action"]["action"], "open-edit-assistant") + self.assertEqual(create_payload["recommended_action"]["screen"], "playbook") + self.assertTrue(create_payload["recommended_action"]["assistant_id"]) + self.assertEqual(create_payload["payload"]["assistant"]["name"], "快手增长 Agent") + def test_platform_agent_routes_are_live(self) -> None: save_profile = self.client.put( "/v2/platform-agents/douyin/profile",