Files
storyforge/tests/test_main_agent_governance.py
kris 22f6e6e686
Some checks failed
StoryForge CI / Baseline checks (push) Has been cancelled
StoryForge CI / Backend tests (push) Has been cancelled
StoryForge CI / Web tests (push) Has been cancelled
feat: add direct tracking pool sync actions
2026-04-05 06:28:10 +08:00

1908 lines
87 KiB
Python

from __future__ import annotations
import importlib
import os
import sys
import tempfile
import unittest
from unittest.mock import AsyncMock, patch
from pathlib import Path
from typing import Any
from fastapi.testclient import TestClient
ROOT = Path(__file__).resolve().parents[1]
APP_ROOT = ROOT / "collector-service"
if str(APP_ROOT) not in sys.path:
sys.path.insert(0, str(APP_ROOT))
class MainAgentGovernanceTests(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.tempdir = tempfile.TemporaryDirectory()
temp_root = Path(cls.tempdir.name)
os.environ["DATA_DIR"] = str(temp_root / "data")
os.environ["DATABASE_PATH"] = str(temp_root / "data" / "storyforge.db")
os.environ["DOWNLOADS_DIR"] = str(temp_root / "downloads")
os.environ["JOBS_DIR"] = str(temp_root / "jobs")
os.environ["MODELS_DIR"] = str(temp_root / "models")
os.environ["ORCHESTRATOR_SHARED_SECRET"] = "test-secret"
os.environ["WEB_AUTOLOGIN_ENABLED"] = "0"
os.environ.setdefault("BOOTSTRAP_SUPERADMIN_USERNAME", "")
os.environ.setdefault("BOOTSTRAP_SUPERADMIN_PASSWORD", "")
cls.db_module = importlib.reload(importlib.import_module("app.database"))
cls.core = importlib.reload(importlib.import_module("app.core_main"))
cls.app_main = importlib.reload(importlib.import_module("app.main"))
cls.core.db.init_schema()
cls.client = TestClient(cls.app_main.app)
@classmethod
def tearDownClass(cls) -> None:
cls.client.close()
cls.tempdir.cleanup()
def setUp(self) -> None:
self._clear_tables()
self.ctx = self._seed_accounts()
def _clear_tables(self) -> None:
tables = [
"job_events",
"jobs",
"publish_reviews",
"live_recorder_bindings",
"live_recorder_sources",
"agent_run_events",
"agent_runs",
"agent_policy_audit_logs",
"agent_policy_effectivity",
"agent_policy_versions",
"agent_policy_scopes",
"agent_skill_versions",
"agent_skills",
"agent_memories",
"platform_agent_profiles",
"oneliner_action_definitions",
"tenant_usage_ledger",
"tenant_quota_profiles",
"admin_ops_audit_logs",
"admin_ops_fix_runs",
"admin_ops_incidents",
"oneliner_messages",
"oneliner_sessions",
"oneliner_profiles",
"auth_tokens",
"assistants",
"projects",
"accounts",
"model_profiles",
]
with self.core.db.session() as conn:
conn.execute("PRAGMA foreign_keys=OFF")
for table in tables:
try:
conn.execute(f"DELETE FROM {table}")
except Exception:
continue
conn.execute("PRAGMA foreign_keys=ON")
def _seed_accounts(self) -> dict[str, Any]:
now = self.db_module.utc_now()
admin_id = "acct_admin"
member_id = "acct_member"
project_id = "proj_member"
model_id = "model_default"
admin_token = "token_admin"
member_token = "token_member"
self.core.db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role, approval_status,
approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at
) VALUES (?, ?, 'hash', 'salt', ?, ?, 'approved', ?, ?, ?, ?, ?)
""",
(admin_id, "admin", "Admin", "super_admin", admin_id, now, model_id, now, now),
)
self.core.db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role, approval_status,
approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at
) VALUES (?, ?, 'hash', 'salt', ?, ?, 'approved', ?, ?, ?, ?, ?)
""",
(member_id, "member", "Member", "operator", admin_id, now, model_id, now, now),
)
self.core.db.execute(
"""
INSERT INTO projects (id, user_id, name, description, created_at, updated_at)
VALUES (?, ?, ?, '', ?, ?)
""",
(project_id, member_id, "Member Project", now, now),
)
self.core.db.execute(
"""
INSERT INTO model_profiles (
id, owner_account_id, name, provider, base_url, api_key, model_name,
is_system, is_default, created_at, updated_at
) VALUES (?, NULL, 'Default Model', 'openai_compat', 'http://127.0.0.1:8317/v1', '', 'GLM-5', 1, 1, ?, ?)
""",
(model_id, now, now),
)
self.core.db.execute(
"INSERT INTO auth_tokens (token, account_id, created_at) VALUES (?, ?, ?)",
(admin_token, admin_id, now),
)
self.core.db.execute(
"INSERT INTO auth_tokens (token, account_id, created_at) VALUES (?, ?, ?)",
(member_token, member_id, now),
)
return {
"admin_id": admin_id,
"member_id": member_id,
"project_id": project_id,
"admin_headers": {"Authorization": f"Bearer {admin_token}"},
"member_headers": {"Authorization": f"Bearer {member_token}"},
}
def _insert_completed_job(
self,
*,
job_id: str = "job_completed",
title: str = "Completed Job",
result_json: str = '{"summary":"done"}',
) -> str:
now = self.db_module.utc_now()
knowledge_base_id = "kb_member_default"
existing_kb = self.core.db.fetch_one("SELECT id FROM knowledge_bases WHERE id = ?", (knowledge_base_id,))
if not existing_kb:
self.core.db.execute(
"""
INSERT INTO knowledge_bases (id, user_id, project_id, name, description, sync_status, created_at, updated_at)
VALUES (?, ?, ?, 'Default KB', '', 'ready', ?, ?)
""",
(knowledge_base_id, self.ctx["member_id"], self.ctx["project_id"], now, now),
)
self.core.db.execute(
"""
INSERT INTO jobs (
id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id,
source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id,
source_url, title, language, status, transcript_text, style_summary, upload_status,
error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at
) VALUES (?, ?, ?, '', NULL, ?, NULL, ?, ?, ?, 'n8n', 'collector', '', '', ?, 'auto', 'completed', '', '', 'completed', '', '{}', ?, '', ?, ?)
""",
(
job_id,
self.ctx["member_id"],
self.ctx["project_id"],
knowledge_base_id,
"text",
"analysis",
"analysis_pipeline",
title,
result_json,
now,
now,
),
)
return job_id
def _insert_assistant(
self,
*,
assistant_id: str = "asst_member_default",
name: str = "Default Assistant",
) -> str:
now = self.db_module.utc_now()
self.core.db.execute(
"""
INSERT INTO assistants (
id, user_id, project_id, name, description, system_prompt, generation_goal, config_json,
model_profile_id, created_at, updated_at
) VALUES (?, ?, ?, ?, '', '', '', '{}', '', ?, ?)
""",
(
assistant_id,
self.ctx["member_id"],
self.ctx["project_id"],
name,
now,
now,
),
)
return assistant_id
def _insert_content_source_account(
self,
*,
source_id: str = "source_member_platform",
platform: str = "kuaishou",
title: str = "平台账号",
source_url: str = "https://example.com/account",
) -> str:
self.core.create_content_source(
account_id=self.ctx["member_id"],
project_id=self.ctx["project_id"],
source_kind="creator_account",
platform=platform,
handle=f"{platform}_handle",
source_url=source_url,
title=title,
metadata={"nickname": title, "platform": platform},
)
created = self.core.db.fetch_one(
"""
SELECT * FROM content_sources
WHERE user_id = ? AND project_id = ? AND platform = ? AND source_kind = 'creator_account'
ORDER BY created_at DESC
LIMIT 1
""",
(self.ctx["member_id"], self.ctx["project_id"], platform),
)
return created["id"]
def _seed_approved_member_without_project(self) -> dict[str, Any]:
now = self.db_module.utc_now()
admin_id = "acct_admin"
member_id = "acct_member_noproject"
model_id = "model_default"
admin_token = "token_admin"
member_token = "token_member_noproject"
self.core.db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role, approval_status,
approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at
) VALUES (?, ?, 'hash', 'salt', ?, ?, 'approved', ?, ?, ?, ?, ?)
""",
(admin_id, "admin", "Admin", "super_admin", admin_id, now, model_id, now, now),
)
self.core.db.execute(
"""
INSERT INTO accounts (
id, username, password_hash, password_salt, display_name, role, approval_status,
approved_by, approved_at, preferred_analysis_model_id, created_at, updated_at
) VALUES (?, ?, 'hash', 'salt', ?, ?, 'approved', ?, ?, ?, ?, ?)
""",
(member_id, "member_noproject", "Member No Project", "operator", admin_id, now, model_id, now, now),
)
self.core.db.execute(
"""
INSERT INTO model_profiles (
id, owner_account_id, name, provider, base_url, api_key, model_name,
is_system, is_default, created_at, updated_at
) VALUES (?, NULL, 'Default Model', 'openai_compat', 'http://127.0.0.1:8317/v1', '', 'GLM-5', 1, 1, ?, ?)
""",
(model_id, now, now),
)
self.core.db.execute(
"INSERT INTO auth_tokens (token, account_id, created_at) VALUES (?, ?, ?)",
(admin_token, admin_id, now),
)
self.core.db.execute(
"INSERT INTO auth_tokens (token, account_id, created_at) VALUES (?, ?, ?)",
(member_token, member_id, now),
)
return {
"admin_id": admin_id,
"member_id": member_id,
"admin_headers": {"Authorization": f"Bearer {admin_token}"},
"member_headers": {"Authorization": f"Bearer {member_token}"},
}
def test_agent_run_creation_snapshots_governance_and_needs_confirmation(self) -> None:
response = self.client.post(
"/v2/oneliner/runs",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"source_screen": "dashboard",
"source_action_key": "homepage-primary-action",
"title": "跟进重点账号",
"summary": "先由主 Agent 评估优先级",
"intent_key": "track_account",
"platform": "douyin",
"platform_scope": "single_platform",
"plan_request": {
"goal": "跟进重点账号",
"steps": ["读取当前项目上下文", "检查重点账号变化", "决定下一步"],
},
},
)
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertEqual(payload["run_status"], "needs_confirmation")
self.assertEqual(payload["source_screen"], "dashboard")
self.assertEqual(payload["platform"], "douyin")
self.assertEqual(payload["platform_scope"], "single_platform")
self.assertEqual(payload["session_id"][:5], "oline")
self.assertEqual(payload["plan"]["goal"], "跟进重点账号")
self.assertEqual(payload["recommended_preview_action"]["action"], "goto-discovery")
self.assertEqual(payload["recommended_preview_action"]["screen"], "discovery")
self.assertEqual(payload["governance"]["project_id"], self.ctx["project_id"])
self.assertIn("layers", payload["governance"])
self.assertIn("oneliner_profile", payload["governance"])
self.assertIn("oneliner_profile_version", payload["governance"])
self.assertGreaterEqual(payload["governance"]["oneliner_profile_version"]["version_no"], 1)
self.assertIn("platform_agent_profile", payload["governance"])
self.assertEqual(payload["governance"]["platform_agent_profile"]["platform"], "douyin")
self.assertEqual(payload["events"][0]["event_type"], "run.created")
def test_agent_run_confirm_transitions_to_queue_or_running_and_logs_events(self) -> None:
create = self.client.post(
"/v2/oneliner/runs",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"source_screen": "strategy",
"source_action_key": "handoff-to-main-agent",
"title": "调整当前平台策略",
"summary": "让主 Agent 先给执行计划",
"intent_key": "custom",
"platform": "douyin",
"platform_scope": "single_platform",
"plan_request": {
"goal": "调整当前平台策略",
"steps": ["读取当前平台策略", "生成调整建议"],
},
},
)
self.assertEqual(create.status_code, 200, create.text)
run_id = create.json()["id"]
confirm = self.client.post(
f"/v2/oneliner/runs/{run_id}/confirm",
headers=self.ctx["member_headers"],
json={"reason": "user confirmed"},
)
self.assertEqual(confirm.status_code, 200, confirm.text)
payload = confirm.json()
self.assertIn(payload["run_status"], {"queued", "running"})
self.assertEqual(payload["recommended_preview_action"]["action"], "goto-strategy")
self.assertEqual(payload["recommended_preview_action"]["screen"], "strategy")
event_types = [item["event_type"] for item in payload["events"]]
self.assertIn("run.created", event_types)
self.assertIn("run.confirmed", event_types)
self.assertTrue("run.queued" in event_types or "run.started" in event_types)
def test_running_agent_run_detail_advances_to_progress_and_done(self) -> None:
create = self.client.post(
"/v2/oneliner/runs",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"source_screen": "dashboard",
"source_action_key": "homepage-primary-action",
"title": "安排今日动作",
"summary": "让主 Agent 给出执行收口",
"intent_key": "custom",
"platform": "douyin",
"platform_scope": "single_platform",
"plan_request": {
"goal": "安排今日动作",
"steps": ["读取当前项目上下文", "给出执行建议", "输出下一步"],
},
},
)
self.assertEqual(create.status_code, 200, create.text)
run_id = create.json()["id"]
confirm = self.client.post(
f"/v2/oneliner/runs/{run_id}/confirm",
headers=self.ctx["member_headers"],
json={"reason": "user confirmed"},
)
self.assertEqual(confirm.status_code, 200, confirm.text)
detail = self.client.get(
f"/v2/oneliner/runs/{run_id}",
headers=self.ctx["member_headers"],
)
self.assertEqual(detail.status_code, 200, detail.text)
payload = detail.json()
self.assertEqual(payload["run_status"], "done")
self.assertTrue(payload["finished_at"])
self.assertEqual(payload["result"]["result_kind"], "main_agent_plan")
self.assertEqual(payload["result"]["recommended_action"]["action"], "goto-discovery")
self.assertEqual(payload["result"]["recommended_action"]["screen"], "discovery")
self.assertEqual(payload["result"]["result_sections"]["workstream_key"], "discovery")
self.assertGreaterEqual(len(payload["result"]["result_sections"]["cards"]), 2)
self.assertEqual(payload["result"]["result_sections"]["cards"][0]["title"], "当前焦点")
self.assertGreaterEqual(payload["result"]["execution_card"]["oneliner_profile_version"]["version_no"], 1)
self.assertEqual(payload["result"]["execution_card"]["platform_agent_profile"]["platform"], "douyin")
event_types = [item["event_type"] for item in payload["events"]]
self.assertIn("run.progress", event_types)
self.assertIn("run.done", event_types)
def test_agent_run_result_recommended_action_preserves_requested_object_context(self) -> None:
run_response = self.client.post(
"/v2/oneliner/runs",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"source_screen": "discovery",
"source_action_key": "selected-account-handoff",
"title": "继续分析当前对象",
"summary": "让主 Agent 沿着当前账号继续推进。",
"intent_key": "analyze_account",
"platform": "douyin",
"platform_scope": "single_platform",
"plan_request": {
"goal": "继续分析当前对象",
"steps": ["读取当前账号上下文", "结合当前策略生成下一步建议"],
},
"payload": {
"target_account_id": "acct_focus_target",
},
},
)
self.assertEqual(run_response.status_code, 200, run_response.text)
run_id = run_response.json()["id"]
confirm = self.client.post(
f"/v2/oneliner/runs/{run_id}/confirm",
headers=self.ctx["member_headers"],
json={"reason": "继续围绕当前对象推进"},
)
self.assertEqual(confirm.status_code, 200, confirm.text)
detail = self.client.get(
f"/v2/oneliner/runs/{run_id}",
headers=self.ctx["member_headers"],
)
self.assertEqual(detail.status_code, 200, detail.text)
payload = detail.json()
self.assertEqual(payload["run_status"], "done")
self.assertEqual(payload["result"]["recommended_action"]["action"], "select-account")
self.assertEqual(payload["result"]["recommended_action"]["screen"], "discovery")
self.assertEqual(payload["result"]["recommended_action"]["account_id"], "acct_focus_target")
def test_cancelled_run_can_be_retried_as_a_new_pending_run(self) -> None:
create = self.client.post(
"/v2/oneliner/runs",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"source_screen": "production",
"source_action_key": "handoff-to-main-agent",
"title": "恢复失败任务",
"summary": "让主 Agent 重新接单",
"intent_key": "custom",
"platform": "douyin",
"platform_scope": "single_platform",
"plan_request": {
"goal": "恢复失败任务",
"steps": ["检查失败原因", "重新安排执行", "确认落点"],
},
},
)
self.assertEqual(create.status_code, 200, create.text)
original_run = create.json()
run_id = original_run["id"]
cancel = self.client.post(
f"/v2/oneliner/runs/{run_id}/cancel",
headers=self.ctx["member_headers"],
json={"reason": "user cancelled"},
)
self.assertEqual(cancel.status_code, 200, cancel.text)
self.assertEqual(cancel.json()["run_status"], "cancelled")
retry = self.client.post(
f"/v2/oneliner/runs/{run_id}/retry",
headers=self.ctx["member_headers"],
json={"reason": "retry from runtime"},
)
self.assertEqual(retry.status_code, 200, retry.text)
payload = retry.json()
self.assertNotEqual(payload["id"], run_id)
self.assertEqual(payload["run_status"], "needs_confirmation")
self.assertEqual(payload["title"], original_run["title"])
self.assertEqual(payload["project_id"], self.ctx["project_id"])
self.assertEqual(payload["plan"]["goal"], "恢复失败任务")
self.assertEqual(payload["recommended_preview_action"]["screen"], "production")
event_types = [item["event_type"] for item in payload["events"]]
self.assertIn("run.created", event_types)
self.assertIn("run.retried", event_types)
def test_effective_policy_merges_system_user_global_and_platform_layers(self) -> None:
system_response = self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "System main agent",
"summary": "Default baseline",
"policy": {
"tone": {"style": "default"},
"homepage": {"focus": "ops"},
"actions": {"max_cards": 3},
},
"reason": "seed system baseline",
},
)
self.assertEqual(system_response.status_code, 200, system_response.text)
global_response = self.client.put(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "Member global strategy",
"summary": "Personal operating style",
"policy": {
"tone": {"style": "analytical"},
"memory": {"default_window": "30d"},
"actions": {"max_cards": 2},
},
"reason": "personalize global defaults",
},
)
self.assertEqual(global_response.status_code, 200, global_response.text)
platform_response = self.client.put(
"/v2/oneliner/governance/user/platforms/douyin",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "Douyin strategy",
"summary": "Tighter benchmark workflow",
"policy": {
"actions": {"max_cards": 1},
"douyin": {"benchmark_mode": "strict"},
},
"reason": "tighten douyin execution",
},
)
self.assertEqual(platform_response.status_code, 200, platform_response.text)
effective_response = self.client.get(
"/v2/oneliner/governance/effective",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"], "platform": "douyin"},
)
self.assertEqual(effective_response.status_code, 200, effective_response.text)
payload = effective_response.json()
self.assertEqual(
[item["scope_kind"] for item in payload["layers"]],
["system_main", "user_global", "user_platform"],
)
self.assertEqual(payload["effective_policy"]["tone"]["style"], "analytical")
self.assertEqual(payload["effective_policy"]["homepage"]["focus"], "ops")
self.assertEqual(payload["effective_policy"]["actions"]["max_cards"], 1)
self.assertEqual(payload["effective_policy"]["douyin"]["benchmark_mode"], "strict")
def test_admin_override_takes_precedence_in_effective_policy(self) -> None:
self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "System main agent",
"policy": {"actions": {"max_cards": 3}},
"reason": "seed baseline",
},
)
self.client.put(
"/v2/oneliner/governance/user/platforms/douyin",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "Douyin strategy",
"policy": {"actions": {"max_cards": 1}},
"reason": "tighten douyin execution",
},
)
override_response = self.client.post(
"/v2/admin/oneliner/governance/overrides",
headers=self.ctx["admin_headers"],
json={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
"title": "Safety override",
"summary": "Require review after recent drift",
"policy": {
"actions": {"max_cards": 5},
"guardrails": {"require_admin_review": True},
},
"reason": "contain unexpected drift",
},
)
self.assertEqual(override_response.status_code, 200, override_response.text)
effective_response = self.client.get(
"/v2/oneliner/governance/effective",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"], "platform": "douyin"},
)
self.assertEqual(effective_response.status_code, 200, effective_response.text)
payload = effective_response.json()
self.assertEqual(payload["layers"][-1]["scope_kind"], "admin_override")
self.assertEqual(payload["effective_policy"]["actions"]["max_cards"], 5)
self.assertTrue(payload["effective_policy"]["guardrails"]["require_admin_review"])
def test_admin_override_without_target_project_applies_to_member_projects(self) -> None:
override_response = self.client.post(
"/v2/admin/oneliner/governance/overrides",
headers=self.ctx["admin_headers"],
json={
"target_user_id": self.ctx["member_id"],
"title": "Global safety override",
"summary": "Apply guardrails across every project",
"policy": {
"guardrails": {"require_admin_review": True},
"actions": {"max_cards": 4},
},
"reason": "global containment",
},
)
self.assertEqual(override_response.status_code, 200, override_response.text)
effective_response = self.client.get(
"/v2/oneliner/governance/effective",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"], "platform": "douyin"},
)
self.assertEqual(effective_response.status_code, 200, effective_response.text)
payload = effective_response.json()
self.assertEqual(payload["layers"][-1]["scope_kind"], "admin_override")
self.assertEqual(payload["effective_policy"]["actions"]["max_cards"], 4)
self.assertTrue(payload["effective_policy"]["guardrails"]["require_admin_review"])
self.assertEqual(payload["active_admin_override_notice"]["title"], "Global safety override")
def test_effective_policy_skips_future_scheduled_versions_until_window_opens(self) -> None:
first_response = self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "Current system baseline",
"summary": "Active now",
"policy": {"tone": {"style": "default"}},
"reason": "baseline",
},
)
self.assertEqual(first_response.status_code, 200, first_response.text)
second_response = self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "Future strategy",
"summary": "Should not be active yet",
"policy": {"tone": {"style": "future"}},
"effect_mode": "scheduled",
"starts_at": "2099-01-01T00:00:00Z",
"reason": "future rollout",
},
)
self.assertEqual(second_response.status_code, 200, second_response.text)
effective_response = self.client.get(
"/v2/oneliner/governance/effective",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"], "platform": "douyin"},
)
self.assertEqual(effective_response.status_code, 200, effective_response.text)
payload = effective_response.json()
self.assertEqual(payload["effective_policy"]["tone"]["style"], "default")
self.assertEqual(payload["layers"][0]["current_version"]["title"], "Current system baseline")
def test_scope_read_endpoints_keep_current_version_on_active_release_not_future_schedule(self) -> None:
user_first = self.client.put(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "User global baseline",
"policy": {"tone": {"style": "baseline"}},
"reason": "seed baseline",
},
)
self.assertEqual(user_first.status_code, 200, user_first.text)
user_future = self.client.put(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "User global future",
"policy": {"tone": {"style": "future"}},
"effect_mode": "scheduled",
"starts_at": "2099-01-01T00:00:00Z",
"reason": "future rollout",
},
)
self.assertEqual(user_future.status_code, 200, user_future.text)
user_read = self.client.get(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(user_read.status_code, 200, user_read.text)
self.assertEqual(user_read.json()["current_version"]["title"], "User global baseline")
system_first = self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "System baseline",
"policy": {"homepage": {"focus": "stable"}},
"reason": "stable baseline",
},
)
self.assertEqual(system_first.status_code, 200, system_first.text)
system_future = self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "System future",
"policy": {"homepage": {"focus": "future"}},
"effect_mode": "scheduled",
"starts_at": "2099-01-01T00:00:00Z",
"reason": "future rollout",
},
)
self.assertEqual(system_future.status_code, 200, system_future.text)
system_read = self.client.get(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
)
self.assertEqual(system_read.status_code, 200, system_read.text)
self.assertEqual(system_read.json()["current_version"]["title"], "System baseline")
def test_governance_read_endpoints_do_not_create_default_project_when_project_is_missing(self) -> None:
self._clear_tables()
ctx = self._seed_approved_member_without_project()
before_count = self.core.db.fetch_one("SELECT COUNT(*) AS count FROM projects WHERE user_id = ?", (ctx["member_id"],))
self.assertEqual(int((before_count or {}).get("count") or 0), 0)
effective_response = self.client.get(
"/v2/oneliner/governance/effective",
headers=ctx["member_headers"],
params={"platform": "douyin"},
)
self.assertEqual(effective_response.status_code, 200, effective_response.text)
effective_payload = effective_response.json()
self.assertEqual(effective_payload["project_id"], "")
self.assertEqual(effective_payload["layers"], [])
global_response = self.client.get(
"/v2/oneliner/governance/user/global",
headers=ctx["member_headers"],
)
self.assertEqual(global_response.status_code, 200, global_response.text)
self.assertEqual(global_response.json()["scope"]["subject_project_id"], "")
self.assertIsNone(global_response.json()["current_version"])
after_count = self.core.db.fetch_one("SELECT COUNT(*) AS count FROM projects WHERE user_id = ?", (ctx["member_id"],))
self.assertEqual(int((after_count or {}).get("count") or 0), 0)
def test_action_registry_and_tenant_quota_routes_are_live(self) -> None:
registry_response = self.client.get(
"/v2/oneliner/action-registry",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(registry_response.status_code, 200, registry_response.text)
registry_payload = registry_response.json()
self.assertGreater(registry_payload["count"], 0)
default_action = next(item for item in registry_payload["items"] if item["action_key"] == "generate-copy")
self.assertEqual(default_action["source"], "default")
action_keys = {item["action_key"] for item in registry_payload["items"]}
self.assertIn("analyze-account", action_keys)
self.assertIn("track-account", action_keys)
self.assertIn("refresh-tracking", action_keys)
self.assertIn("create-assistant", action_keys)
save_registry = self.client.put(
"/v2/oneliner/action-registry/generate-copy",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
json={
"label": "生成成交文案",
"description": "直接拉起一版更偏成交的文案生成。",
"category": "content",
"status": "disabled",
"config": {"tone": "sales"},
},
)
self.assertEqual(save_registry.status_code, 200, save_registry.text)
saved_registry = save_registry.json()
self.assertEqual(saved_registry["label"], "生成成交文案")
self.assertEqual(saved_registry["status"], "disabled")
self.assertEqual(saved_registry["config"]["tone"], "sales")
self.assertEqual(saved_registry["source"], "override")
growth_preset = {
"monthly_budget_cents": 49900,
"storage_limit_bytes": 21474836480,
"analysis_quota": 160,
"copy_quota": 320,
"ai_video_quota": 12,
"real_cut_quota": 8,
"recorder_quota": 20,
}
quota_response = self.client.put(
"/v2/tenant/quota",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
json={
"enabled": True,
"monthly_budget_cents": 1,
"storage_limit_bytes": 2,
"analysis_quota": 3,
"copy_quota": 4,
"ai_video_quota": 5,
"real_cut_quota": 6,
"recorder_quota": 7,
"config": {"package_label": "growth", "warn_threshold": 0.8, "custom_note": "keep"},
},
)
self.assertEqual(quota_response.status_code, 200, quota_response.text)
quota_payload = quota_response.json()
self.assertEqual(quota_payload["package_label"], "growth")
self.assertEqual(quota_payload["monthly_budget_cents"], growth_preset["monthly_budget_cents"])
self.assertEqual(quota_payload["storage_limit_bytes"], growth_preset["storage_limit_bytes"])
self.assertEqual(quota_payload["analysis_quota"], growth_preset["analysis_quota"])
self.assertEqual(quota_payload["copy_quota"], growth_preset["copy_quota"])
self.assertEqual(quota_payload["ai_video_quota"], growth_preset["ai_video_quota"])
self.assertEqual(quota_payload["real_cut_quota"], growth_preset["real_cut_quota"])
self.assertEqual(quota_payload["recorder_quota"], growth_preset["recorder_quota"])
self.assertEqual(quota_payload["config"]["package_label"], "growth")
self.assertEqual(quota_payload["config"]["warn_threshold"], 0.8)
self.assertEqual(quota_payload["config"]["package_title"], "增长套餐")
self.assertTrue(quota_payload["config"]["package_is_preset"])
self.assertEqual(quota_payload["config"]["custom_note"], "keep")
quota_get_response = self.client.get(
"/v2/tenant/quota",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(quota_get_response.status_code, 200, quota_get_response.text)
quota_get_payload = quota_get_response.json()
self.assertEqual(quota_get_payload["package_label"], "growth")
self.assertEqual(quota_get_payload["monthly_budget_cents"], growth_preset["monthly_budget_cents"])
self.assertEqual(quota_get_payload["config"]["package_label"], "growth")
self.assertEqual(quota_get_payload["config"]["warn_threshold"], 0.8)
self.assertEqual(quota_get_payload["config"]["package_title"], "增长套餐")
custom_response = self.client.put(
"/v2/tenant/quota",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
json={
"enabled": False,
"monthly_budget_cents": 9100,
"storage_limit_bytes": 111,
"analysis_quota": 22,
"copy_quota": 33,
"ai_video_quota": 44,
"real_cut_quota": 55,
"recorder_quota": 66,
"config": {"package_label": "custom", "warn_threshold": 0.55, "custom_note": "manual"},
},
)
self.assertEqual(custom_response.status_code, 200, custom_response.text)
custom_payload = custom_response.json()
self.assertEqual(custom_payload["package_label"], "custom")
self.assertEqual(custom_payload["monthly_budget_cents"], 9100)
self.assertEqual(custom_payload["storage_limit_bytes"], 111)
self.assertEqual(custom_payload["analysis_quota"], 22)
self.assertEqual(custom_payload["copy_quota"], 33)
self.assertEqual(custom_payload["ai_video_quota"], 44)
self.assertEqual(custom_payload["real_cut_quota"], 55)
self.assertEqual(custom_payload["recorder_quota"], 66)
self.assertFalse(custom_payload["enabled"])
self.assertEqual(custom_payload["config"]["package_label"], "custom")
self.assertEqual(custom_payload["config"]["warn_threshold"], 0.55)
self.assertEqual(custom_payload["config"]["package_title"], "自定义套餐")
self.assertFalse(custom_payload["config"]["package_is_preset"])
self.assertEqual(custom_payload["config"]["custom_note"], "manual")
usage_response = self.client.get(
"/v2/tenant/usage",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(usage_response.status_code, 200, usage_response.text)
usage_payload = usage_response.json()
self.assertIn("categories", usage_payload)
self.assertIn("storage_bytes", usage_payload)
def test_direct_oneliner_actions_return_structured_followup_targets(self) -> None:
self._insert_completed_job(job_id="job_review_source", title="Review Source Job")
self._insert_assistant()
review_response = self.client.post(
"/v2/oneliner/actions/execute",
headers=self.ctx["member_headers"],
json={
"action_key": "review-draft",
"project_id": self.ctx["project_id"],
"platform": "douyin",
"payload": {},
},
)
self.assertEqual(review_response.status_code, 200, review_response.text)
review_payload = review_response.json()
self.assertEqual(review_payload["recommended_action"]["action"], "open-review-edit")
self.assertEqual(review_payload["recommended_action"]["screen"], "review")
self.assertEqual(review_payload["recommended_action"]["job_id"], "job_review_source")
self.assertTrue(review_payload["recommended_action"]["review_id"])
self_check_response = self.client.post(
"/v2/oneliner/actions/execute",
headers=self.ctx["member_headers"],
json={
"action_key": "platform-self-check",
"project_id": self.ctx["project_id"],
"platform": "douyin",
"payload": {},
},
)
self.assertEqual(self_check_response.status_code, 200, self_check_response.text)
self_check_payload = self_check_response.json()
self.assertEqual(self_check_payload["recommended_action"]["action"], "open-platform-agent-detail")
self.assertEqual(self_check_payload["recommended_action"]["screen"], "playbook")
self.assertEqual(self_check_payload["recommended_action"]["platform"], "douyin")
copy_response = self.client.post(
"/v2/oneliner/actions/execute",
headers=self.ctx["member_headers"],
json={
"action_key": "generate-copy",
"project_id": self.ctx["project_id"],
"platform": "douyin",
"payload": {
"brief": "给我一版成交向短视频文案",
},
},
)
self.assertEqual(copy_response.status_code, 200, copy_response.text)
copy_payload = copy_response.json()
self.assertEqual(copy_payload["recommended_action"]["action"], "open-generate-copy")
self.assertEqual(copy_payload["recommended_action"]["screen"], "playbook")
self.assertEqual(copy_payload["recommended_action"]["platform"], "douyin")
def test_create_ai_video_action_passes_provider_and_model_through_oneliner(self) -> None:
self._insert_completed_job(job_id="job_ai_video_source", title="AI Video Source Job")
self._insert_assistant()
captured_request: dict[str, Any] = {}
async def fake_create_ai_video_job(request: Any, account: dict[str, Any]) -> dict[str, Any]:
captured_request.update(
{
"project_id": request.project_id,
"assistant_id": request.assistant_id,
"source_job_id": request.source_job_id,
"title": request.title,
"brief": request.brief,
"style": request.style,
"shots": request.shots,
"duration": request.duration,
"video_provider": request.video_provider,
"video_model": request.video_model,
}
)
return {
"id": "job_ai_video_seedance",
"title": request.title,
"artifacts": {
"source_job_id": request.source_job_id,
"video_provider": request.video_provider,
"video_model": request.video_model,
},
}
with patch.object(self.core, "create_ai_video_job", new=AsyncMock(side_effect=fake_create_ai_video_job)):
response = self.client.post(
"/v2/oneliner/actions/execute",
headers=self.ctx["member_headers"],
json={
"action_key": "create-ai-video",
"project_id": self.ctx["project_id"],
"platform": "douyin",
"payload": {
"source_job_id": "job_ai_video_source",
"title": "Seedance 2.0 测试任务",
"brief": "做一条节奏强、镜头推进明确的短视频。",
"style": "cinematic",
"shots": 5,
"duration": 6,
"video_provider": "seedance2",
"video_model": "seedance-2.0-pro",
},
},
)
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertEqual(captured_request["source_job_id"], "job_ai_video_source")
self.assertEqual(captured_request["video_provider"], "seedance2")
self.assertEqual(captured_request["video_model"], "seedance-2.0-pro")
self.assertEqual(payload["payload"]["job"]["artifacts"]["video_provider"], "seedance2")
self.assertEqual(payload["payload"]["job"]["artifacts"]["video_model"], "seedance-2.0-pro")
def test_direct_oneliner_actions_execute_real_account_and_agent_flows(self) -> None:
source_id = self._insert_content_source_account(
platform="kuaishou",
title="快手测试账号",
source_url="https://www.kuaishou.com/profile/test-account",
)
captured_model_calls: list[dict[str, Any]] = []
async def fake_call_model(profile: dict[str, Any], *, system_prompt: str, user_prompt: str, temperature: float = 0.3, **_: Any) -> str:
captured_model_calls.append(
{
"profile_id": profile["id"],
"system_prompt": system_prompt,
"user_prompt": user_prompt,
"temperature": temperature,
}
)
return (
'{"executive_summary":"账号可继续做增长拆解","borrow_points":["封面结构稳定"],'
'"risks":["更新频率偏低"],"next_actions":["继续同步并补日报"]}'
)
with patch.object(self.core, "call_model", new=AsyncMock(side_effect=fake_call_model)):
analyze_response = self.client.post(
"/v2/oneliner/actions/execute",
headers=self.ctx["member_headers"],
json={
"action_key": "analyze-account",
"project_id": self.ctx["project_id"],
"platform": "kuaishou",
"payload": {
"target_account_id": source_id,
"extra_focus": "重点看商业化动作",
"max_videos": 4,
},
},
)
self.assertEqual(analyze_response.status_code, 200, analyze_response.text)
analyze_payload = analyze_response.json()
self.assertEqual(analyze_payload["recommended_action"]["action"], "select-account")
self.assertEqual(analyze_payload["recommended_action"]["screen"], "discovery")
self.assertEqual(analyze_payload["recommended_action"]["account_id"], source_id)
self.assertEqual(analyze_payload["payload"]["platform"], "kuaishou")
self.assertEqual(analyze_payload["payload"]["account_id"], source_id)
self.assertTrue(captured_model_calls)
self.assertIn("重点看商业化动作", captured_model_calls[0]["user_prompt"])
track_response = self.client.post(
"/v2/oneliner/actions/execute",
headers=self.ctx["member_headers"],
json={
"action_key": "track-account",
"project_id": self.ctx["project_id"],
"platform": "kuaishou",
"payload": {
"target_account_id": source_id,
"note": "由主 Agent 直接加入跟踪",
"refresh_now": True,
},
},
)
self.assertEqual(track_response.status_code, 200, track_response.text)
track_payload = track_response.json()
self.assertEqual(track_payload["recommended_action"]["action"], "open-job-detail")
self.assertEqual(track_payload["recommended_action"]["screen"], "production")
self.assertTrue(track_payload["recommended_action"]["job_id"])
self.assertEqual(track_payload["payload"]["platform"], "kuaishou")
self.assertEqual(track_payload["payload"]["account_id"], source_id)
self.assertEqual(track_payload["payload"]["refresh"]["tracked_account_id"], source_id)
self.assertTrue(track_payload["payload"]["refresh"]["sync_job_id"])
refresh_tracking_response = self.client.post(
"/v2/oneliner/actions/execute",
headers=self.ctx["member_headers"],
json={
"action_key": "refresh-tracking",
"project_id": self.ctx["project_id"],
"platform": "kuaishou",
"payload": {},
},
)
self.assertEqual(refresh_tracking_response.status_code, 200, refresh_tracking_response.text)
refresh_tracking_payload = refresh_tracking_response.json()
self.assertEqual(refresh_tracking_payload["recommended_action"]["action"], "open-job-detail")
self.assertEqual(refresh_tracking_payload["recommended_action"]["screen"], "production")
self.assertTrue(refresh_tracking_payload["recommended_action"]["job_id"])
self.assertEqual(refresh_tracking_payload["payload"]["platform"], "kuaishou")
self.assertGreaterEqual(int(refresh_tracking_payload["payload"]["refresh"]["refreshed"] or 0), 1)
create_response = self.client.post(
"/v2/oneliner/actions/execute",
headers=self.ctx["member_headers"],
json={
"action_key": "create-assistant",
"project_id": self.ctx["project_id"],
"platform": "kuaishou",
"payload": {
"name": "快手增长 Agent",
"description": "负责承接快手平台增长动作",
"generation_goal": "围绕快手账号分析和同步动作持续执行",
},
},
)
self.assertEqual(create_response.status_code, 200, create_response.text)
create_payload = create_response.json()
self.assertEqual(create_payload["recommended_action"]["action"], "open-edit-assistant")
self.assertEqual(create_payload["recommended_action"]["screen"], "playbook")
self.assertTrue(create_payload["recommended_action"]["assistant_id"])
self.assertEqual(create_payload["payload"]["assistant"]["name"], "快手增长 Agent")
def test_platform_agent_routes_are_live(self) -> None:
save_profile = self.client.put(
"/v2/platform-agents/douyin/profile",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"name": "抖音增长 Agent",
"mission": "优先跟踪成交型账号并收口到今日动作。",
"notes": "保持高频复盘。",
"status": "active",
"config": {"focus": "conversion"},
},
)
self.assertEqual(save_profile.status_code, 200, save_profile.text)
profile_payload = save_profile.json()
self.assertEqual(profile_payload["platform"], "douyin")
self.assertEqual(profile_payload["name"], "抖音增长 Agent")
self.assertEqual(profile_payload["config"]["focus"], "conversion")
self.assertEqual(profile_payload["current_version"]["version_no"], 1)
first_profile_version_id = profile_payload["current_version"]["id"]
profile_versions = self.client.get(
"/v2/platform-agents/douyin/profile/versions",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(profile_versions.status_code, 200, profile_versions.text)
self.assertEqual(profile_versions.json()["count"], 1)
memory_response = self.client.post(
"/v2/platform-agents/douyin/memories",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"memory_key": "recent-pattern",
"title": "近期有效模式",
"summary": "成交口播账号在晚间更稳定。",
"details": {"window": "night"},
"confidence": 0.88,
},
)
self.assertEqual(memory_response.status_code, 200, memory_response.text)
self.assertEqual(memory_response.json()["memory_key"], "recent-pattern")
skill_response = self.client.post(
"/v2/platform-agents/douyin/skills",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"skill_key": "douyin-benchmark",
"name": "抖音对标拆解",
"status": "draft",
"method": {"summary": "先拆前三条高分作品。"},
"test_spec": {"summary": "看结构和钩子是否完整。"},
"last_result": {"summary": "首轮验证通过。"},
"success_count": 1,
"failure_count": 0,
"last_score": 0.72,
},
)
self.assertEqual(skill_response.status_code, 200, skill_response.text)
skill_payload = skill_response.json()
self.assertEqual(skill_payload["skill_key"], "douyin-benchmark")
list_agents = self.client.get(
"/v2/platform-agents",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(list_agents.status_code, 200, list_agents.text)
agents_payload = list_agents.json()
douyin_agent = next(item for item in agents_payload["items"] if item["platform"] == "douyin")
self.assertGreaterEqual(douyin_agent["memory_count"], 1)
self.assertGreaterEqual(douyin_agent["skill_count"], 1)
list_memories = self.client.get(
"/v2/platform-agents/douyin/memories",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(list_memories.status_code, 200, list_memories.text)
self.assertEqual(list_memories.json()["count"], 1)
review_skill = self.client.post(
f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/review",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"accepted": True,
"score": 0.93,
"summary": "这条技能现在可以复用。",
"review_notes": "通过验收。",
},
)
self.assertEqual(review_skill.status_code, 200, review_skill.text)
self.assertEqual(review_skill.json()["status"], "validated")
update_profile = self.client.put(
"/v2/platform-agents/douyin/profile",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"name": "抖音增长 Agent",
"mission": "改成优先对标高互动账号。",
"notes": "先压缩近期重点方向。",
"status": "active",
"config": {"focus": "engagement"},
"reason": "调整当前抖音平台策略",
},
)
self.assertEqual(update_profile.status_code, 200, update_profile.text)
self.assertEqual(update_profile.json()["current_version"]["version_no"], 2)
rollback_profile = self.client.post(
"/v2/platform-agents/douyin/profile/rollback",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"version_id": first_profile_version_id,
"reason": "恢复到第一版平台 Agent 配置",
},
)
self.assertEqual(rollback_profile.status_code, 200, rollback_profile.text)
rollback_profile_payload = rollback_profile.json()
self.assertEqual(rollback_profile_payload["current_version"]["rollback_from_version_id"], first_profile_version_id)
self.assertEqual(rollback_profile_payload["config"]["focus"], "conversion")
profile_audits = self.client.get(
"/v2/platform-agents/douyin/profile/audits",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(profile_audits.status_code, 200, profile_audits.text)
self.assertGreaterEqual(profile_audits.json()["count"], 3)
versions = self.client.get(
f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/versions",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(versions.status_code, 200, versions.text)
versions_payload = versions.json()
self.assertGreaterEqual(versions_payload["count"], 2)
rollback = self.client.post(
f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/rollback",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"version_id": versions_payload["items"][-1]["id"],
},
)
self.assertEqual(rollback.status_code, 200, rollback.text)
self.assertEqual(rollback.json()["skill_key"], "douyin-benchmark")
self_check = self.client.post(
"/v2/platform-agents/douyin/self-check",
headers=self.ctx["member_headers"],
json={"project_id": self.ctx["project_id"], "sample_limit": 2, "remember_summary": False},
)
self.assertEqual(self_check.status_code, 200, self_check.text)
self_check_payload = self_check.json()
self.assertEqual(self_check_payload["platform"], "douyin")
self.assertIn("route_checks", self_check_payload)
self.assertIn("score", self_check_payload)
run_response = self.client.post(
"/v2/oneliner/runs",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"source_screen": "playbook",
"source_action_key": "platform-agent-handoff",
"title": "验证平台 Agent 执行回写",
"summary": "确认主 Agent 完成态会回写最近平台执行信息。",
"intent_key": "governance_review",
"platform": "douyin",
"delivery_mode": "confirm_first",
"plan": {
"goal": "验证平台 Agent 执行回写",
"summary": "检查主 Agent 完成态后平台 Agent 是否记录最近执行。",
"steps": ["读取当前主配置", "读取当前平台 Agent", "生成执行结果"],
},
},
)
self.assertEqual(run_response.status_code, 200, run_response.text)
run_payload = run_response.json()
confirm_response = self.client.post(
f"/v2/oneliner/runs/{run_payload['id']}/confirm",
headers=self.ctx["member_headers"],
json={"note": "执行平台 Agent 回写验证"},
)
self.assertEqual(confirm_response.status_code, 200, confirm_response.text)
detail_response = self.client.get(
f"/v2/oneliner/runs/{run_payload['id']}",
headers=self.ctx["member_headers"],
)
self.assertEqual(detail_response.status_code, 200, detail_response.text)
detail_payload = detail_response.json()
self.assertEqual(detail_payload["run_status"], "done")
self.assertTrue(
(((detail_payload.get("result") or {}).get("execution_card") or {}).get("oneliner_profile_version") or {}).get("version_id")
)
self.assertTrue(
(((detail_payload.get("result") or {}).get("execution_card") or {}).get("platform_agent_profile") or {}).get("version_id")
)
self.assertEqual(
(((detail_payload.get("result") or {}).get("execution_card") or {}).get("platform_agent_profile") or {}).get("version_no"),
rollback_profile_payload["current_version"]["version_no"],
)
refreshed_agents = self.client.get(
"/v2/platform-agents",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(refreshed_agents.status_code, 200, refreshed_agents.text)
refreshed_douyin = next(item for item in refreshed_agents.json()["items"] if item["platform"] == "douyin")
self.assertIn("recent_execution", refreshed_douyin)
self.assertEqual(refreshed_douyin["current_version"]["version_no"], rollback_profile_payload["current_version"]["version_no"])
self.assertEqual(refreshed_douyin["recent_execution"]["run_id"], run_payload["id"])
self.assertEqual(refreshed_douyin["recent_execution"]["title"], "验证平台 Agent 执行回写")
self.assertEqual(refreshed_douyin["recent_execution"]["goal"], "验证平台 Agent 执行回写")
self.assertEqual(refreshed_douyin["recent_execution"]["intent_key"], "governance_review")
self.assertEqual(refreshed_douyin["recent_execution"]["platform_scope"], "single_platform")
self.assertEqual(refreshed_douyin["recent_execution"]["delivery_mode"], "hybrid")
self.assertEqual(refreshed_douyin["recent_execution"]["source_action_key"], "platform-agent-handoff")
self.assertGreaterEqual(refreshed_douyin["recent_execution"]["oneliner_profile_version_no"], 1)
self.assertTrue(refreshed_douyin["recent_execution"]["oneliner_profile_version_id"])
self.assertTrue(refreshed_douyin["recent_execution"]["platform_agent_profile_version_id"])
self.assertEqual(
refreshed_douyin["recent_execution"]["platform_agent_profile_version_no"],
rollback_profile_payload["current_version"]["version_no"],
)
self.assertEqual(refreshed_douyin["recent_execution"]["recommended_action"]["action"], "goto-playbook")
self.assertEqual(refreshed_douyin["recent_execution"]["recommended_action"]["screen"], "playbook")
self.assertEqual(refreshed_douyin["recent_execution"]["workstream_key"], "playbook")
self.assertEqual(refreshed_douyin["recent_execution"]["workstream_label"], "Agent 治理")
def test_admin_ops_routes_are_live(self) -> None:
now = self.db_module.utc_now()
job_id = "job_failed_admin_ops"
with self.core.db.session() as conn:
conn.execute("PRAGMA foreign_keys=OFF")
conn.execute(
"""
INSERT INTO jobs (
id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id,
source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id,
source_url, title, language, status, transcript_text, style_summary, upload_status,
error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at
) VALUES (?, ?, ?, '', '', '', '', ?, ?, ?, 'n8n', 'collector', '', '', ?, 'auto', 'failed', '', '', 'pending', ?, '{}', '{}', '', ?, ?)
""",
(
job_id,
self.ctx["member_id"],
self.ctx["project_id"],
"text",
"analysis",
"analysis_pipeline",
"Admin Ops Failed Job",
"synthetic failure for admin ops",
now,
now,
),
)
conn.execute("PRAGMA foreign_keys=ON")
scan = self.client.post(
"/v2/admin/ops/incidents/scan",
headers=self.ctx["admin_headers"],
)
self.assertEqual(scan.status_code, 200, scan.text)
scan_payload = scan.json()
self.assertGreaterEqual(scan_payload["count"], 1)
incident = next(item for item in scan_payload["created_or_updated"] if item["source_type"] == "job")
overview = self.client.get(
"/v2/admin/ops/overview",
headers=self.ctx["admin_headers"],
)
self.assertEqual(overview.status_code, 200, overview.text)
overview_payload = overview.json()
self.assertGreaterEqual(overview_payload["incident_count"], 1)
self.assertIn("integration_health", overview_payload)
repair_plan = self.client.post(
f"/v2/admin/ops/incidents/{incident['id']}/repair-plan",
headers=self.ctx["admin_headers"],
json={"scope": "tenant", "notes": "先生成最小修复计划。"},
)
self.assertEqual(repair_plan.status_code, 200, repair_plan.text)
repair_payload = repair_plan.json()
self.assertEqual(repair_payload["plan_scope"], "tenant")
review = self.client.patch(
f"/v2/admin/ops/incidents/{incident['id']}",
headers=self.ctx["admin_headers"],
json={"status": "watching", "review_notes": "继续观察后再执行。"},
)
self.assertEqual(review.status_code, 200, review.text)
self.assertEqual(review.json()["status"], "watching")
audit = self.client.post(
f"/v2/admin/ops/fix-runs/{repair_payload['id']}/audit",
headers=self.ctx["admin_headers"],
json={"review_status": "approved", "review_notes": "方案通过。"},
)
self.assertEqual(audit.status_code, 200, audit.text)
self.assertEqual(audit.json()["audit_status"], "approved")
fix_runs = self.client.get(
"/v2/admin/ops/fix-runs",
headers=self.ctx["admin_headers"],
)
self.assertEqual(fix_runs.status_code, 200, fix_runs.text)
self.assertGreaterEqual(fix_runs.json()["count"], 1)
def test_admin_governance_directory_lists_accounts_and_projects(self) -> None:
response = self.client.get(
"/v2/admin/oneliner/governance/directory",
headers=self.ctx["admin_headers"],
)
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertGreaterEqual(payload["count"], 2)
member = next((item for item in payload["items"] if item["id"] == self.ctx["member_id"]), None)
self.assertIsNotNone(member)
assert member is not None
self.assertEqual(member["project_count"], 1)
self.assertEqual(member["projects"][0]["id"], self.ctx["project_id"])
def test_admin_override_versions_support_rollback(self) -> None:
first_response = self.client.post(
"/v2/admin/oneliner/governance/overrides",
headers=self.ctx["admin_headers"],
json={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
"title": "Override v1",
"summary": "first override",
"policy": {"actions": {"max_cards": 2}},
"reason": "first override",
},
)
self.assertEqual(first_response.status_code, 200, first_response.text)
first_version_id = first_response.json()["current_version"]["id"]
second_response = self.client.post(
"/v2/admin/oneliner/governance/overrides",
headers=self.ctx["admin_headers"],
json={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
"title": "Override v2",
"summary": "second override",
"policy": {"actions": {"max_cards": 5}},
"reason": "second override",
},
)
self.assertEqual(second_response.status_code, 200, second_response.text)
versions_before = self.client.get(
"/v2/admin/oneliner/governance/overrides/versions",
headers=self.ctx["admin_headers"],
params={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
},
)
self.assertEqual(versions_before.status_code, 200, versions_before.text)
self.assertEqual(versions_before.json()["count"], 2)
rollback_response = self.client.post(
"/v2/admin/oneliner/governance/overrides/rollback",
headers=self.ctx["admin_headers"],
json={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
"version_id": first_version_id,
"reason": "rollback to v1",
},
)
self.assertEqual(rollback_response.status_code, 200, rollback_response.text)
rollback_payload = rollback_response.json()
self.assertEqual(rollback_payload["current_version"]["rollback_from_version_id"], first_version_id)
self.assertEqual(rollback_payload["effective_policy"]["actions"]["max_cards"], 2)
versions_after = self.client.get(
"/v2/admin/oneliner/governance/overrides/versions",
headers=self.ctx["admin_headers"],
params={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
},
)
self.assertEqual(versions_after.status_code, 200, versions_after.text)
self.assertEqual(versions_after.json()["count"], 3)
def test_user_global_versions_support_rollback_by_creating_new_version(self) -> None:
first_response = self.client.put(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "Global strategy v1",
"policy": {"tone": {"style": "analytical"}},
"reason": "first pass",
},
)
self.assertEqual(first_response.status_code, 200, first_response.text)
first_version_id = first_response.json()["current_version"]["id"]
second_response = self.client.put(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "Global strategy v2",
"policy": {"tone": {"style": "decisive"}},
"reason": "refine tone",
},
)
self.assertEqual(second_response.status_code, 200, second_response.text)
versions_before = self.client.get(
"/v2/oneliner/governance/user/global/versions",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(versions_before.status_code, 200, versions_before.text)
self.assertEqual(versions_before.json()["count"], 2)
rollback_response = self.client.post(
"/v2/oneliner/governance/user/global/rollback",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"version_id": first_version_id,
"reason": "restore best baseline",
},
)
self.assertEqual(rollback_response.status_code, 200, rollback_response.text)
rollback_payload = rollback_response.json()
self.assertEqual(rollback_payload["current_version"]["rollback_from_version_id"], first_version_id)
self.assertEqual(rollback_payload["effective_policy"]["tone"]["style"], "analytical")
versions_after = self.client.get(
"/v2/oneliner/governance/user/global/versions",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(versions_after.status_code, 200, versions_after.text)
self.assertEqual(versions_after.json()["count"], 3)
def test_user_platform_versions_support_rollback_by_creating_new_version(self) -> None:
first_response = self.client.put(
"/v2/oneliner/governance/user/platforms/douyin",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "Douyin strategy v1",
"policy": {"douyin": {"benchmark_mode": "strict"}},
"reason": "first platform pass",
},
)
self.assertEqual(first_response.status_code, 200, first_response.text)
first_version_id = first_response.json()["current_version"]["id"]
second_response = self.client.put(
"/v2/oneliner/governance/user/platforms/douyin",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "Douyin strategy v2",
"policy": {"douyin": {"benchmark_mode": "aggressive"}},
"reason": "push harder",
},
)
self.assertEqual(second_response.status_code, 200, second_response.text)
versions_before = self.client.get(
"/v2/oneliner/governance/user/platforms/douyin/versions",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(versions_before.status_code, 200, versions_before.text)
self.assertEqual(versions_before.json()["count"], 2)
rollback_response = self.client.post(
"/v2/oneliner/governance/user/platforms/douyin/rollback",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"version_id": first_version_id,
"reason": "restore previous platform strategy",
},
)
self.assertEqual(rollback_response.status_code, 200, rollback_response.text)
rollback_payload = rollback_response.json()
self.assertEqual(rollback_payload["current_version"]["rollback_from_version_id"], first_version_id)
self.assertEqual(rollback_payload["effective_policy"]["douyin"]["benchmark_mode"], "strict")
versions_after = self.client.get(
"/v2/oneliner/governance/user/platforms/douyin/versions",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(versions_after.status_code, 200, versions_after.text)
self.assertEqual(versions_after.json()["count"], 3)
def test_user_policy_audits_include_personal_and_admin_layers_for_project(self) -> None:
self.client.put(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "Global strategy",
"policy": {"tone": {"style": "analytical"}},
"reason": "personalize defaults",
},
)
self.client.post(
"/v2/admin/oneliner/governance/overrides",
headers=self.ctx["admin_headers"],
json={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
"title": "Admin override",
"policy": {"actions": {"max_cards": 4}},
"reason": "contain drift",
},
)
response = self.client.get(
"/v2/oneliner/governance/user/audits",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"], "platform": "douyin"},
)
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertGreaterEqual(payload["count"], 2)
scope_kinds = [item["scope_kind"] for item in payload["items"]]
self.assertIn("user_global", scope_kinds)
self.assertIn("admin_override", scope_kinds)
first_item = payload["items"][0]
self.assertIn("version", first_item)
self.assertIn("scope", first_item)
self.assertNotIn("actor_user_id", first_item)
self.assertNotIn("policy", first_item["version"])
self.assertNotIn("reason", first_item["version"])
self.assertNotIn("actor_user_id", first_item["version"])
def test_admin_policy_audits_include_target_and_system_layers(self) -> None:
self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "System main",
"policy": {"homepage": {"focus": "ops"}},
"reason": "seed system baseline",
},
)
self.client.post(
"/v2/admin/oneliner/governance/overrides",
headers=self.ctx["admin_headers"],
json={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
"title": "Admin override",
"policy": {"actions": {"max_cards": 5}},
"reason": "focus target account",
},
)
response = self.client.get(
"/v2/admin/oneliner/governance/audits",
headers=self.ctx["admin_headers"],
params={
"target_user_id": self.ctx["member_id"],
"target_project_id": self.ctx["project_id"],
"platform": "douyin",
"include_system": "1",
},
)
self.assertEqual(response.status_code, 200, response.text)
payload = response.json()
self.assertGreaterEqual(payload["count"], 2)
scope_kinds = [item["scope_kind"] for item in payload["items"]]
self.assertIn("system_main", scope_kinds)
self.assertIn("admin_override", scope_kinds)
def test_non_admin_cannot_change_system_defaults(self) -> None:
response = self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["member_headers"],
json={
"title": "Not allowed",
"policy": {"tone": {"style": "rogue"}},
"reason": "should be blocked",
},
)
self.assertEqual(response.status_code, 403, response.text)
def test_oneliner_profile_versions_and_rollback_are_available(self) -> None:
initial = self.client.get(
"/v2/oneliner/profile",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(initial.status_code, 200, initial.text)
initial_payload = initial.json()
self.assertIn("current_version", initial_payload)
initial_version_id = initial_payload["current_version"]["id"]
update_one = self.client.put(
"/v2/oneliner/profile",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"display_name": "增长总控 OneLiner",
"assistant_id": "",
"default_platform": "douyin",
"long_term_goal": "围绕创业内容完成多平台增长",
"notes": "先做抖音与小红书联动",
"config": {"commercial_ready": True},
"reason": "对齐新的增长目标",
},
)
self.assertEqual(update_one.status_code, 200, update_one.text)
update_one_payload = update_one.json()
first_saved_version_id = update_one_payload["current_version"]["id"]
self.assertNotEqual(first_saved_version_id, initial_version_id)
update_two = self.client.put(
"/v2/oneliner/profile",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"display_name": "增长总控 OneLiner",
"assistant_id": "",
"default_platform": "xiaohongshu",
"long_term_goal": "先把小红书对标拆解做深",
"notes": "首页动作只保留一条主动作",
"config": {"commercial_ready": True, "tenant_isolation_required": True},
"reason": "阶段性切到小红书主战场",
},
)
self.assertEqual(update_two.status_code, 200, update_two.text)
second_payload = update_two.json()
second_version_id = second_payload["current_version"]["id"]
self.assertNotEqual(second_version_id, first_saved_version_id)
self.assertEqual(second_payload["default_platform"], "xiaohongshu")
versions_response = self.client.get(
"/v2/oneliner/profile/versions",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(versions_response.status_code, 200, versions_response.text)
versions_payload = versions_response.json()
self.assertGreaterEqual(versions_payload["count"], 3)
rollback_response = self.client.post(
"/v2/oneliner/profile/rollback",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"version_id": first_saved_version_id,
"reason": "回到抖音主战场配置",
},
)
self.assertEqual(rollback_response.status_code, 200, rollback_response.text)
rollback_payload = rollback_response.json()
self.assertEqual(rollback_payload["default_platform"], "douyin")
self.assertEqual(rollback_payload["long_term_goal"], "围绕创业内容完成多平台增长")
self.assertEqual(rollback_payload["current_version"]["rollback_from_version_id"], first_saved_version_id)
audits_response = self.client.get(
"/v2/oneliner/profile/audits",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(audits_response.status_code, 200, audits_response.text)
audits_payload = audits_response.json()
self.assertGreaterEqual(audits_payload["count"], 3)
action_keys = [item["action_key"] for item in audits_payload["items"]]
self.assertIn("update-oneliner-profile", action_keys)
self.assertIn("rollback-oneliner-profile", action_keys)
def test_oneliner_message_execution_card_tracks_config_versions(self) -> None:
profile_response = self.client.put(
"/v2/oneliner/profile",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"display_name": "增长总控 OneLiner",
"assistant_id": "",
"default_platform": "douyin",
"long_term_goal": "优先分析当前平台账号并收口到下一步动作",
"notes": "验证消息卡里的配置追溯链",
"config": {"analysis_mode": "fast"},
"reason": "给消息卡提供明确的主配置版本",
},
)
self.assertEqual(profile_response.status_code, 200, profile_response.text)
current_profile_version = profile_response.json()["current_version"]
platform_response = self.client.put(
"/v2/platform-agents/douyin/profile",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"name": "抖音增长 Agent",
"mission": "优先分析当前账号和高分作品",
"notes": "验证消息卡里的平台配置追溯链",
"status": "active",
"config": {"focus": "analysis"},
},
)
self.assertEqual(platform_response.status_code, 200, platform_response.text)
current_platform_version = platform_response.json()["current_version"]
session_response = self.client.post(
"/v2/oneliner/sessions",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"preferred_platform": "douyin",
"title": "消息卡配置追溯",
},
)
self.assertEqual(session_response.status_code, 200, session_response.text)
session_payload = session_response.json()
message_response = self.client.post(
f"/v2/oneliner/sessions/{session_payload['id']}/messages",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"platform": "douyin",
"content": "帮我创建 Agent",
},
)
self.assertEqual(message_response.status_code, 200, message_response.text)
payload = message_response.json()
execution_card = (((payload.get("assistant_message") or {}).get("result")) or {}).get("execution_card") or {}
self.assertEqual((execution_card.get("primary_action") or {}).get("key"), "open-create-assistant")
self.assertEqual((execution_card.get("oneliner_profile_version") or {}).get("version_id"), current_profile_version["id"])
self.assertEqual((execution_card.get("oneliner_profile_version") or {}).get("version_no"), current_profile_version["version_no"])
self.assertEqual((execution_card.get("platform_agent_profile") or {}).get("platform"), "douyin")
self.assertEqual((execution_card.get("platform_agent_profile") or {}).get("version_id"), current_platform_version["id"])
self.assertEqual((execution_card.get("platform_agent_profile") or {}).get("version_no"), current_platform_version["version_no"])