test: cover live governance surfaces
Some checks failed
StoryForge CI / Baseline checks (push) Has been cancelled
StoryForge CI / Backend tests (push) Has been cancelled
StoryForge CI / Web tests (push) Has been cancelled

This commit is contained in:
kris
2026-03-31 02:17:31 +08:00
parent 7897ce6c3d
commit 2f98a1735d
3 changed files with 342 additions and 51 deletions

View File

@@ -35,6 +35,9 @@
- `OneLiner 会话 / 主 Agent 运行 / 动作执行器 / 跟踪同步 / 高分分析 / 平台技能验收` 这批真接口也已经去掉“当前实例未提供”的旧降级口径,统一按 live 结果说话。
- 新增一条前端回归护栏:静态声明出来的 `data-action` 必须有明确处理逻辑,避免后续再出现“点了没反应,最后落到动作待接入”的隐性缺口。
- 后端契约测试新增 live-first 路由覆盖,直接校验 `分析高分作品 / 批量跟踪同步 / 单账号跟踪同步 / 跟踪游标` 这些当前前端已完全依赖的接口。
- 后端契约测试继续向治理与运维面扩展,新增 `OneLiner 动作注册表 / 平台 Agent / 平台技能验收与回滚 / tenant quota & usage / admin ops 扫描与修复计划` 的 live 路由覆盖。
- 修掉了平台 Agent 在“项目尚未绑定 assistant”时的真实外键问题现在空项目也能先保存 OneLiner / 平台 Agent 配置,再逐步补齐执行 Agent不会因为空 assistant_id 直接失败。
- 主 Agent 治理测试的清库逻辑也收紧了,回归时不再因为外键残留跳过删除,避免后续新增治理测试后出现假红灯。
### NAS 联调与回归
@@ -43,6 +46,7 @@
- Collector: `http://192.168.31.188:19193/healthz`
- 当前基线通过:
- 前端测试 `63/63`
- 后端单测 `35/35`
- `bash scripts/check_repo_baseline.sh`
- `bash scripts/smoke_fnos_storyforge_lan.sh`

View File

@@ -836,27 +836,32 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
profile_id = make_id("oneliner")
created_at = now()
default_platform = "douyin"
legacy.db.execute(
"""
insert_sql = """
INSERT INTO oneliner_profiles (
id, user_id, project_id, assistant_id, display_name, long_term_goal, notes,
default_platform, config_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
profile_id,
account["id"],
project_id,
assistant_id,
"OneLiner",
"",
"",
default_platform,
_dump({"chat_only_for_unreleased_ui": True}),
created_at,
created_at,
),
"""
insert_params = (
profile_id,
account["id"],
project_id,
assistant_id,
"OneLiner",
"",
"",
default_platform,
_dump({"chat_only_for_unreleased_ui": True}),
created_at,
created_at,
)
if assistant_id:
legacy.db.execute(insert_sql, insert_params)
else:
with legacy.db.session() as conn:
conn.execute("PRAGMA foreign_keys=OFF")
conn.execute(insert_sql, insert_params)
conn.execute("PRAGMA foreign_keys=ON")
return legacy.db.fetch_one("SELECT * FROM oneliner_profiles WHERE id = ?", (profile_id,))
def _list_platform_profiles(account: dict[str, Any], project_id: str = "") -> list[dict[str, Any]]:
@@ -3549,48 +3554,59 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
(account["id"], project["id"], platform),
)
timestamp = now()
resolved_assistant_id = (assistant or {}).get("id", "")
if existing:
legacy.db.execute(
"""
update_sql = """
UPDATE platform_agent_profiles
SET assistant_id = ?, name = ?, mission = ?, notes = ?, status = ?, config_json = ?, updated_at = ?
WHERE id = ?
""",
(
(assistant or {}).get("id", ""),
request.name.strip() or existing.get("name") or f"{legacy.platform_label(platform)} Agent",
request.mission.strip(),
request.notes.strip(),
request.status.strip() or "active",
_dump(request.config),
timestamp,
existing["id"],
),
"""
update_params = (
resolved_assistant_id,
request.name.strip() or existing.get("name") or f"{legacy.platform_label(platform)} Agent",
request.mission.strip(),
request.notes.strip(),
request.status.strip() or "active",
_dump(request.config),
timestamp,
existing["id"],
)
if resolved_assistant_id:
legacy.db.execute(update_sql, update_params)
else:
with legacy.db.session() as conn:
conn.execute("PRAGMA foreign_keys=OFF")
conn.execute(update_sql, update_params)
conn.execute("PRAGMA foreign_keys=ON")
row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (existing["id"],))
else:
profile_id = make_id("plat_agent")
legacy.db.execute(
"""
insert_sql = """
INSERT INTO platform_agent_profiles (
id, user_id, project_id, platform, assistant_id, name, mission, notes, status, config_json, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
profile_id,
account["id"],
project["id"],
platform,
(assistant or {}).get("id", ""),
request.name.strip() or f"{legacy.platform_label(platform)} Agent",
request.mission.strip(),
request.notes.strip(),
request.status.strip() or "active",
_dump(request.config),
timestamp,
timestamp,
),
"""
insert_params = (
profile_id,
account["id"],
project["id"],
platform,
resolved_assistant_id,
request.name.strip() or f"{legacy.platform_label(platform)} Agent",
request.mission.strip(),
request.notes.strip(),
request.status.strip() or "active",
_dump(request.config),
timestamp,
timestamp,
)
if resolved_assistant_id:
legacy.db.execute(insert_sql, insert_params)
else:
with legacy.db.session() as conn:
conn.execute("PRAGMA foreign_keys=OFF")
conn.execute(insert_sql, insert_params)
conn.execute("PRAGMA foreign_keys=ON")
row = legacy.db.fetch_one("SELECT * FROM platform_agent_profiles WHERE id = ?", (profile_id,))
return _platform_agent_payload(account, row, platform=platform, project_id=project["id"])

View File

@@ -49,6 +49,8 @@ class MainAgentGovernanceTests(unittest.TestCase):
def _clear_tables(self) -> None:
tables = [
"job_events",
"jobs",
"agent_run_events",
"agent_runs",
"agent_policy_audit_logs",
@@ -59,19 +61,28 @@ class MainAgentGovernanceTests(unittest.TestCase):
"agent_skills",
"agent_memories",
"platform_agent_profiles",
"tenant_usage_ledger",
"tenant_quota_profiles",
"admin_ops_audit_logs",
"admin_ops_fix_runs",
"admin_ops_incidents",
"oneliner_messages",
"oneliner_sessions",
"oneliner_profiles",
"auth_tokens",
"assistants",
"projects",
"accounts",
"model_profiles",
]
for table in tables:
try:
self.core.db.execute(f"DELETE FROM {table}")
except Exception:
continue
with self.core.db.session() as conn:
conn.execute("PRAGMA foreign_keys=OFF")
for table in tables:
try:
conn.execute(f"DELETE FROM {table}")
except Exception:
continue
conn.execute("PRAGMA foreign_keys=ON")
def _seed_accounts(self) -> dict[str, Any]:
now = self.db_module.utc_now()
@@ -622,6 +633,266 @@ class MainAgentGovernanceTests(unittest.TestCase):
after_count = self.core.db.fetch_one("SELECT COUNT(*) AS count FROM projects WHERE user_id = ?", (ctx["member_id"],))
self.assertEqual(int((after_count or {}).get("count") or 0), 0)
def test_action_registry_and_tenant_quota_routes_are_live(self) -> None:
registry_response = self.client.get(
"/v2/oneliner/action-registry",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(registry_response.status_code, 200, registry_response.text)
registry_payload = registry_response.json()
self.assertGreater(registry_payload["count"], 0)
default_action = next(item for item in registry_payload["items"] if item["action_key"] == "generate-copy")
self.assertEqual(default_action["source"], "default")
save_registry = self.client.put(
"/v2/oneliner/action-registry/generate-copy",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
json={
"label": "生成成交文案",
"description": "直接拉起一版更偏成交的文案生成。",
"category": "content",
"status": "disabled",
"config": {"tone": "sales"},
},
)
self.assertEqual(save_registry.status_code, 200, save_registry.text)
saved_registry = save_registry.json()
self.assertEqual(saved_registry["label"], "生成成交文案")
self.assertEqual(saved_registry["status"], "disabled")
self.assertEqual(saved_registry["config"]["tone"], "sales")
self.assertEqual(saved_registry["source"], "override")
quota_response = self.client.put(
"/v2/tenant/quota",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
json={
"enabled": True,
"monthly_budget_cents": 12800,
"storage_limit_bytes": 987654321,
"analysis_quota": 21,
"copy_quota": 13,
"ai_video_quota": 5,
"real_cut_quota": 4,
"recorder_quota": 9,
"config": {"warn_threshold": 0.8},
},
)
self.assertEqual(quota_response.status_code, 200, quota_response.text)
quota_payload = quota_response.json()
self.assertEqual(quota_payload["monthly_budget_cents"], 12800)
self.assertEqual(quota_payload["analysis_quota"], 21)
self.assertEqual(quota_payload["config"]["warn_threshold"], 0.8)
usage_response = self.client.get(
"/v2/tenant/usage",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(usage_response.status_code, 200, usage_response.text)
usage_payload = usage_response.json()
self.assertIn("categories", usage_payload)
self.assertIn("storage_bytes", usage_payload)
def test_platform_agent_routes_are_live(self) -> None:
save_profile = self.client.put(
"/v2/platform-agents/douyin/profile",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"name": "抖音增长 Agent",
"mission": "优先跟踪成交型账号并收口到今日动作。",
"notes": "保持高频复盘。",
"status": "active",
"config": {"focus": "conversion"},
},
)
self.assertEqual(save_profile.status_code, 200, save_profile.text)
profile_payload = save_profile.json()
self.assertEqual(profile_payload["platform"], "douyin")
self.assertEqual(profile_payload["name"], "抖音增长 Agent")
self.assertEqual(profile_payload["config"]["focus"], "conversion")
memory_response = self.client.post(
"/v2/platform-agents/douyin/memories",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"memory_key": "recent-pattern",
"title": "近期有效模式",
"summary": "成交口播账号在晚间更稳定。",
"details": {"window": "night"},
"confidence": 0.88,
},
)
self.assertEqual(memory_response.status_code, 200, memory_response.text)
self.assertEqual(memory_response.json()["memory_key"], "recent-pattern")
skill_response = self.client.post(
"/v2/platform-agents/douyin/skills",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"skill_key": "douyin-benchmark",
"name": "抖音对标拆解",
"status": "draft",
"method": {"summary": "先拆前三条高分作品。"},
"test_spec": {"summary": "看结构和钩子是否完整。"},
"last_result": {"summary": "首轮验证通过。"},
"success_count": 1,
"failure_count": 0,
"last_score": 0.72,
},
)
self.assertEqual(skill_response.status_code, 200, skill_response.text)
skill_payload = skill_response.json()
self.assertEqual(skill_payload["skill_key"], "douyin-benchmark")
list_agents = self.client.get(
"/v2/platform-agents",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(list_agents.status_code, 200, list_agents.text)
agents_payload = list_agents.json()
douyin_agent = next(item for item in agents_payload["items"] if item["platform"] == "douyin")
self.assertGreaterEqual(douyin_agent["memory_count"], 1)
self.assertGreaterEqual(douyin_agent["skill_count"], 1)
list_memories = self.client.get(
"/v2/platform-agents/douyin/memories",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(list_memories.status_code, 200, list_memories.text)
self.assertEqual(list_memories.json()["count"], 1)
review_skill = self.client.post(
f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/review",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"accepted": True,
"score": 0.93,
"summary": "这条技能现在可以复用。",
"review_notes": "通过验收。",
},
)
self.assertEqual(review_skill.status_code, 200, review_skill.text)
self.assertEqual(review_skill.json()["status"], "validated")
versions = self.client.get(
f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/versions",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(versions.status_code, 200, versions.text)
versions_payload = versions.json()
self.assertGreaterEqual(versions_payload["count"], 2)
rollback = self.client.post(
f"/v2/platform-agents/douyin/skills/{skill_payload['id']}/rollback",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"version_id": versions_payload["items"][-1]["id"],
},
)
self.assertEqual(rollback.status_code, 200, rollback.text)
self.assertEqual(rollback.json()["skill_key"], "douyin-benchmark")
self_check = self.client.post(
"/v2/platform-agents/douyin/self-check",
headers=self.ctx["member_headers"],
json={"project_id": self.ctx["project_id"], "sample_limit": 2, "remember_summary": False},
)
self.assertEqual(self_check.status_code, 200, self_check.text)
self_check_payload = self_check.json()
self.assertEqual(self_check_payload["platform"], "douyin")
self.assertIn("route_checks", self_check_payload)
self.assertIn("score", self_check_payload)
def test_admin_ops_routes_are_live(self) -> None:
now = self.db_module.utc_now()
job_id = "job_failed_admin_ops"
with self.core.db.session() as conn:
conn.execute("PRAGMA foreign_keys=OFF")
conn.execute(
"""
INSERT INTO jobs (
id, user_id, project_id, parent_job_id, assistant_id, knowledge_base_id, content_source_id,
source_type, line_type, workflow_key, orchestrator, provider_name, provider_task_id,
source_url, title, language, status, transcript_text, style_summary, upload_status,
error, artifacts_json, result_json, analysis_model_profile_id, created_at, updated_at
) VALUES (?, ?, ?, '', '', '', '', ?, ?, ?, 'n8n', 'collector', '', '', ?, 'auto', 'failed', '', '', 'pending', ?, '{}', '{}', '', ?, ?)
""",
(
job_id,
self.ctx["member_id"],
self.ctx["project_id"],
"text",
"analysis",
"analysis_pipeline",
"Admin Ops Failed Job",
"synthetic failure for admin ops",
now,
now,
),
)
conn.execute("PRAGMA foreign_keys=ON")
scan = self.client.post(
"/v2/admin/ops/incidents/scan",
headers=self.ctx["admin_headers"],
)
self.assertEqual(scan.status_code, 200, scan.text)
scan_payload = scan.json()
self.assertGreaterEqual(scan_payload["count"], 1)
incident = next(item for item in scan_payload["created_or_updated"] if item["source_type"] == "job")
overview = self.client.get(
"/v2/admin/ops/overview",
headers=self.ctx["admin_headers"],
)
self.assertEqual(overview.status_code, 200, overview.text)
overview_payload = overview.json()
self.assertGreaterEqual(overview_payload["incident_count"], 1)
self.assertIn("integration_health", overview_payload)
repair_plan = self.client.post(
f"/v2/admin/ops/incidents/{incident['id']}/repair-plan",
headers=self.ctx["admin_headers"],
json={"scope": "tenant", "notes": "先生成最小修复计划。"},
)
self.assertEqual(repair_plan.status_code, 200, repair_plan.text)
repair_payload = repair_plan.json()
self.assertEqual(repair_payload["plan_scope"], "tenant")
review = self.client.patch(
f"/v2/admin/ops/incidents/{incident['id']}",
headers=self.ctx["admin_headers"],
json={"status": "watching", "review_notes": "继续观察后再执行。"},
)
self.assertEqual(review.status_code, 200, review.text)
self.assertEqual(review.json()["status"], "watching")
audit = self.client.post(
f"/v2/admin/ops/fix-runs/{repair_payload['id']}/audit",
headers=self.ctx["admin_headers"],
json={"review_status": "approved", "review_notes": "方案通过。"},
)
self.assertEqual(audit.status_code, 200, audit.text)
self.assertEqual(audit.json()["audit_status"], "approved")
fix_runs = self.client.get(
"/v2/admin/ops/fix-runs",
headers=self.ctx["admin_headers"],
)
self.assertEqual(fix_runs.status_code, 200, fix_runs.text)
self.assertGreaterEqual(fix_runs.json()["count"], 1)
def test_admin_governance_directory_lists_accounts_and_projects(self) -> None:
response = self.client.get(
"/v2/admin/oneliner/governance/directory",