diff --git a/CHANGELOG.md b/CHANGELOG.md index 980a378..c68a05f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,9 @@ - `OneLiner 会话 / 主 Agent 运行 / 动作执行器 / 跟踪同步 / 高分分析 / 平台技能验收` 这批真接口也已经去掉“当前实例未提供”的旧降级口径,统一按 live 结果说话。 - 新增一条前端回归护栏:静态声明出来的 `data-action` 必须有明确处理逻辑,避免后续再出现“点了没反应,最后落到动作待接入”的隐性缺口。 - 后端契约测试新增 live-first 路由覆盖,直接校验 `分析高分作品 / 批量跟踪同步 / 单账号跟踪同步 / 跟踪游标` 这些当前前端已完全依赖的接口。 +- 后端契约测试继续向治理与运维面扩展,新增 `OneLiner 动作注册表 / 平台 Agent / 平台技能验收与回滚 / tenant quota & usage / admin ops 扫描与修复计划` 的 live 路由覆盖。 +- 修掉了平台 Agent 在“项目尚未绑定 assistant”时的真实外键问题:现在空项目也能先保存 OneLiner / 平台 Agent 配置,再逐步补齐执行 Agent,不会因为空 assistant_id 直接失败。 +- 主 Agent 治理测试的清库逻辑也收紧了,回归时不再因为外键残留跳过删除,避免后续新增治理测试后出现假红灯。 ### NAS 联调与回归 @@ -43,6 +46,7 @@ - Collector: `http://192.168.31.188:19193/healthz` - 当前基线通过: - 前端测试 `63/63` + - 后端单测 `35/35` - `bash scripts/check_repo_baseline.sh` - `bash scripts/smoke_fnos_storyforge_lan.sh` diff --git a/collector-service/app/oneliner_features.py b/collector-service/app/oneliner_features.py index be4226a..55a7aae 100644 --- a/collector-service/app/oneliner_features.py +++ b/collector-service/app/oneliner_features.py @@ -836,27 +836,32 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: profile_id = make_id("oneliner") created_at = now() default_platform = "douyin" - legacy.db.execute( - """ + insert_sql = """ INSERT INTO oneliner_profiles ( id, user_id, project_id, assistant_id, display_name, long_term_goal, notes, default_platform, config_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - profile_id, - account["id"], - project_id, - assistant_id, - "OneLiner", - "", - "", - default_platform, - _dump({"chat_only_for_unreleased_ui": True}), - created_at, - created_at, - ), + """ + insert_params = ( + profile_id, + account["id"], + project_id, + assistant_id, + "OneLiner", + "", + "", + default_platform, + _dump({"chat_only_for_unreleased_ui": True}), + created_at, + created_at, ) + if assistant_id: + legacy.db.execute(insert_sql, insert_params) + else: + with legacy.db.session() as conn: + conn.execute("PRAGMA foreign_keys=OFF") + conn.execute(insert_sql, insert_params) + conn.execute("PRAGMA foreign_keys=ON") return legacy.db.fetch_one("SELECT * FROM oneliner_profiles WHERE id = ?", (profile_id,)) def _list_platform_profiles(account: dict[str, Any], project_id: str = "") -> list[dict[str, Any]]: @@ -3549,48 +3554,59 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None: (account["id"], project["id"], platform), ) timestamp = now() + resolved_assistant_id = (assistant or {}).get("id", "") if existing: - legacy.db.execute( - """ + update_sql = """ UPDATE platform_agent_profiles SET assistant_id = ?, name = ?, mission = ?, notes = ?, status = ?, config_json = ?, updated_at = ? WHERE id = ? - """, - ( - (assistant or {}).get("id", ""), - request.name.strip() or existing.get("name") or f"{legacy.platform_label(platform)} Agent", - request.mission.strip(), - request.notes.strip(), - request.status.strip() or "active", - _dump(request.config), - timestamp, - existing["id"], - ), + """ + update_params = ( + resolved_assistant_id, + request.name.strip() or existing.get("name") or f"{legacy.platform_label(platform)} Agent", + request.mission.strip(), + request.notes.strip(), + request.status.strip() or "active", + _dump(request.config), + timestamp, + existing["id"], ) + if resolved_assistant_id: + legacy.db.execute(update_sql, update_params) + else: + with legacy.db.session() as conn: + conn.execute("PRAGMA foreign_keys=OFF") + conn.execute(update_sql, update_params) + conn.execute("PRAGMA foreign_keys=ON") row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (existing["id"],)) else: profile_id = make_id("plat_agent") - legacy.db.execute( - """ + insert_sql = """ INSERT INTO platform_agent_profiles ( id, user_id, project_id, platform, assistant_id, name, mission, notes, status, config_json, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - profile_id, - account["id"], - project["id"], - platform, - (assistant or {}).get("id", ""), - request.name.strip() or f"{legacy.platform_label(platform)} Agent", - request.mission.strip(), - request.notes.strip(), - request.status.strip() or "active", - _dump(request.config), - timestamp, - timestamp, - ), + """ + insert_params = ( + profile_id, + account["id"], + project["id"], + platform, + resolved_assistant_id, + request.name.strip() or f"{legacy.platform_label(platform)} Agent", + request.mission.strip(), + request.notes.strip(), + request.status.strip() or "active", + _dump(request.config), + timestamp, + timestamp, ) + if resolved_assistant_id: + legacy.db.execute(insert_sql, insert_params) + else: + with legacy.db.session() as conn: + conn.execute("PRAGMA foreign_keys=OFF") + conn.execute(insert_sql, insert_params) + conn.execute("PRAGMA foreign_keys=ON") row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (profile_id,)) return _platform_agent_payload(account, row, platform=platform, project_id=project["id"]) diff --git a/tests/test_main_agent_governance.py b/tests/test_main_agent_governance.py index dbe4b95..90579f6 100644 --- a/tests/test_main_agent_governance.py +++ b/tests/test_main_agent_governance.py @@ -49,6 +49,8 @@ class MainAgentGovernanceTests(unittest.TestCase): def _clear_tables(self) -> None: tables = [ + "job_events", + "jobs", "agent_run_events", "agent_runs", "agent_policy_audit_logs", @@ -59,19 +61,28 @@ class MainAgentGovernanceTests(unittest.TestCase): "agent_skills", "agent_memories", "platform_agent_profiles", + "tenant_usage_ledger", + "tenant_quota_profiles", + "admin_ops_audit_logs", + "admin_ops_fix_runs", + "admin_ops_incidents", "oneliner_messages", "oneliner_sessions", "oneliner_profiles", "auth_tokens", + "assistants", "projects", "accounts", "model_profiles", ] - for table in tables: - try: - self.core.db.execute(f"DELETE FROM {table}") - except Exception: - continue + with self.core.db.session() as conn: + conn.execute("PRAGMA foreign_keys=OFF") + for table in tables: + try: + conn.execute(f"DELETE FROM {table}") + except Exception: + continue + conn.execute("PRAGMA foreign_keys=ON") def _seed_accounts(self) -> dict[str, Any]: now = self.db_module.utc_now() @@ -622,6 +633,266 @@ class MainAgentGovernanceTests(unittest.TestCase): after_count = self.core.db.fetch_one("SELECT COUNT(*) AS count FROM projects WHERE user_id = ?", (ctx["member_id"],)) self.assertEqual(int((after_count or {}).get("count") or 0), 0) + def test_action_registry_and_tenant_quota_routes_are_live(self) -> None: + registry_response = self.client.get( + "/v2/oneliner/action-registry", + headers=self.ctx["member_headers"], + params={"project_id": self.ctx["project_id"]}, + ) + self.assertEqual(registry_response.status_code, 200, registry_response.text) + registry_payload = registry_response.json() + self.assertGreater(registry_payload["count"], 0) + default_action = next(item for item in registry_payload["items"] if item["action_key"] == "generate-copy") + self.assertEqual(default_action["source"], "default") + + save_registry = self.client.put( + "/v2/oneliner/action-registry/generate-copy", + headers=self.ctx["member_headers"], + params={"project_id": self.ctx["project_id"]}, + json={ + "label": "生成成交文案", + "description": "直接拉起一版更偏成交的文案生成。", + "category": "content", + "status": "disabled", + "config": {"tone": "sales"}, + }, + ) + self.assertEqual(save_registry.status_code, 200, save_registry.text) + saved_registry = save_registry.json() + self.assertEqual(saved_registry["label"], "生成成交文案") + self.assertEqual(saved_registry["status"], "disabled") + self.assertEqual(saved_registry["config"]["tone"], "sales") + self.assertEqual(saved_registry["source"], "override") + + quota_response = self.client.put( + "/v2/tenant/quota", + headers=self.ctx["member_headers"], + params={"project_id": self.ctx["project_id"]}, + json={ + "enabled": True, + "monthly_budget_cents": 12800, + "storage_limit_bytes": 987654321, + "analysis_quota": 21, + "copy_quota": 13, + "ai_video_quota": 5, + "real_cut_quota": 4, + "recorder_quota": 9, + "config": {"warn_threshold": 0.8}, + }, + ) + self.assertEqual(quota_response.status_code, 200, quota_response.text) + quota_payload = quota_response.json() + self.assertEqual(quota_payload["monthly_budget_cents"], 12800) + self.assertEqual(quota_payload["analysis_quota"], 21) + self.assertEqual(quota_payload["config"]["warn_threshold"], 0.8) + + usage_response = self.client.get( + "/v2/tenant/usage", + headers=self.ctx["member_headers"], + params={"project_id": self.ctx["project_id"]}, + ) + self.assertEqual(usage_response.status_code, 200, usage_response.text) + usage_payload = usage_response.json() + self.assertIn("categories", usage_payload) + self.assertIn("storage_bytes", usage_payload) + + def test_platform_agent_routes_are_live(self) -> None: + save_profile = self.client.put( + "/v2/platform-agents/douyin/profile", + headers=self.ctx["member_headers"], + json={ + "project_id": self.ctx["project_id"], + "name": "抖音增长 Agent", + "mission": "优先跟踪成交型账号并收口到今日动作。", + "notes": "保持高频复盘。", + "status": "active", + "config": {"focus": "conversion"}, + }, + ) + self.assertEqual(save_profile.status_code, 200, save_profile.text) + profile_payload = save_profile.json() + self.assertEqual(profile_payload["platform"], "douyin") + self.assertEqual(profile_payload["name"], "抖音增长 Agent") + self.assertEqual(profile_payload["config"]["focus"], "conversion") + + memory_response = self.client.post( + "/v2/platform-agents/douyin/memories", + headers=self.ctx["member_headers"], + json={ + "project_id": self.ctx["project_id"], + "memory_key": "recent-pattern", + "title": "近期有效模式", + "summary": "成交口播账号在晚间更稳定。", + "details": {"window": "night"}, + "confidence": 0.88, + }, + ) + self.assertEqual(memory_response.status_code, 200, memory_response.text) + self.assertEqual(memory_response.json()["memory_key"], "recent-pattern") + + skill_response = self.client.post( + "/v2/platform-agents/douyin/skills", + headers=self.ctx["member_headers"], + json={ + "project_id": self.ctx["project_id"], + "skill_key": "douyin-benchmark", + "name": "抖音对标拆解", + "status": "draft", + "method": {"summary": "先拆前三条高分作品。"}, + "test_spec": {"summary": "看结构和钩子是否完整。"}, + "last_result": {"summary": "首轮验证通过。"}, + "success_count": 1, + "failure_count": 0, + "last_score": 0.72, + }, + ) + self.assertEqual(skill_response.status_code, 200, skill_response.text) + skill_payload = skill_response.json() + self.assertEqual(skill_payload["skill_key"], "douyin-benchmark") + + list_agents = self.client.get( + "/v2/platform-agents", + headers=self.ctx["member_headers"], + params={"project_id": self.ctx["project_id"]}, + ) + self.assertEqual(list_agents.status_code, 200, list_agents.text) + agents_payload = list_agents.json() + douyin_agent = next(item for item in agents_payload["items"] if item["platform"] == "douyin") + self.assertGreaterEqual(douyin_agent["memory_count"], 1) + self.assertGreaterEqual(douyin_agent["skill_count"], 1) + + list_memories = self.client.get( + "/v2/platform-agents/douyin/memories", + headers=self.ctx["member_headers"], + params={"project_id": self.ctx["project_id"]}, + ) + self.assertEqual(list_memories.status_code, 200, list_memories.text) + self.assertEqual(list_memories.json()["count"], 1) + + review_skill = self.client.post( + f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/review", + headers=self.ctx["member_headers"], + json={ + "project_id": self.ctx["project_id"], + "accepted": True, + "score": 0.93, + "summary": "这条技能现在可以复用。", + "review_notes": "通过验收。", + }, + ) + self.assertEqual(review_skill.status_code, 200, review_skill.text) + self.assertEqual(review_skill.json()["status"], "validated") + + versions = self.client.get( + f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/versions", + headers=self.ctx["member_headers"], + params={"project_id": self.ctx["project_id"]}, + ) + self.assertEqual(versions.status_code, 200, versions.text) + versions_payload = versions.json() + self.assertGreaterEqual(versions_payload["count"], 2) + + rollback = self.client.post( + f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/rollback", + headers=self.ctx["member_headers"], + json={ + "project_id": self.ctx["project_id"], + "version_id": versions_payload["items"][-1]["id"], + }, + ) + self.assertEqual(rollback.status_code, 200, rollback.text) + self.assertEqual(rollback.json()["skill_key"], "douyin-benchmark") + + self_check = self.client.post( + "/v2/platform-agents/douyin/self-check", + headers=self.ctx["member_headers"], + json={"project_id": self.ctx["project_id"], "sample_limit": 2, "remember_summary": False}, + ) + self.assertEqual(self_check.status_code, 200, self_check.text) + self_check_payload = self_check.json() + self.assertEqual(self_check_payload["platform"], "douyin") + self.assertIn("route_checks", self_check_payload) + self.assertIn("score", self_check_payload) + + def test_admin_ops_routes_are_live(self) -> None: + now = self.db_module.utc_now() + job_id = "job_failed_admin_ops" + with self.core.db.session() as conn: + conn.execute("PRAGMA foreign_keys=OFF") + conn.execute( + """ + INSERT INTO jobs ( + id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id, + source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id, + source_url, title, language, status, transcript_text, style_summary, upload_status, + error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at + ) VALUES (?, ?, ?, '', '', '', '', ?, ?, ?, 'n8n', 'collector', '', '', ?, 'auto', 'failed', '', '', 'pending', ?, '{}', '{}', '', ?, ?) + """, + ( + job_id, + self.ctx["member_id"], + self.ctx["project_id"], + "text", + "analysis", + "analysis_pipeline", + "Admin Ops Failed Job", + "synthetic failure for admin ops", + now, + now, + ), + ) + conn.execute("PRAGMA foreign_keys=ON") + + scan = self.client.post( + "/v2/admin/ops/incidents/scan", + headers=self.ctx["admin_headers"], + ) + self.assertEqual(scan.status_code, 200, scan.text) + scan_payload = scan.json() + self.assertGreaterEqual(scan_payload["count"], 1) + incident = next(item for item in scan_payload["created_or_updated"] if item["source_type"] == "job") + + overview = self.client.get( + "/v2/admin/ops/overview", + headers=self.ctx["admin_headers"], + ) + self.assertEqual(overview.status_code, 200, overview.text) + overview_payload = overview.json() + self.assertGreaterEqual(overview_payload["incident_count"], 1) + self.assertIn("integration_health", overview_payload) + + repair_plan = self.client.post( + f"/v2/admin/ops/incidents/{incident['id']}/repair-plan", + headers=self.ctx["admin_headers"], + json={"scope": "tenant", "notes": "先生成最小修复计划。"}, + ) + self.assertEqual(repair_plan.status_code, 200, repair_plan.text) + repair_payload = repair_plan.json() + self.assertEqual(repair_payload["plan_scope"], "tenant") + + review = self.client.patch( + f"/v2/admin/ops/incidents/{incident['id']}", + headers=self.ctx["admin_headers"], + json={"status": "watching", "review_notes": "继续观察后再执行。"}, + ) + self.assertEqual(review.status_code, 200, review.text) + self.assertEqual(review.json()["status"], "watching") + + audit = self.client.post( + f"/v2/admin/ops/fix-runs/{repair_payload['id']}/audit", + headers=self.ctx["admin_headers"], + json={"review_status": "approved", "review_notes": "方案通过。"}, + ) + self.assertEqual(audit.status_code, 200, audit.text) + self.assertEqual(audit.json()["audit_status"], "approved") + + fix_runs = self.client.get( + "/v2/admin/ops/fix-runs", + headers=self.ctx["admin_headers"], + ) + self.assertEqual(fix_runs.status_code, 200, fix_runs.text) + self.assertGreaterEqual(fix_runs.json()["count"], 1) + def test_admin_governance_directory_lists_accounts_and_projects(self) -> None: response = self.client.get( "/v2/admin/oneliner/governance/directory",