fix: tighten governance audit exposure

This commit is contained in:
kris
2026-03-29 17:21:02 +08:00
parent 8bb58be5ff
commit f813b6e5c0
2 changed files with 148 additions and 4 deletions

View File

@@ -1277,6 +1277,24 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"created_at": row.get("created_at", ""),
}
def _policy_version_public_payload(row: dict[str, Any] | None) -> dict[str, Any] | None:
if not row:
return None
return {
"id": row["id"],
"scope_id": row.get("scope_id", ""),
"scope_kind": row.get("scope_kind", ""),
"subject_user_id": row.get("subject_user_id", ""),
"subject_project_id": row.get("subject_project_id", ""),
"platform": row.get("platform", ""),
"platform_label": legacy.platform_label(row.get("platform", "")) if row.get("platform") else "",
"version_no": int(row.get("version_no") or 0),
"title": row.get("title", ""),
"summary": row.get("summary", ""),
"source_type": row.get("source_type", ""),
"created_at": row.get("created_at", ""),
}
def _policy_effectivity_payload(row: dict[str, Any] | None) -> dict[str, Any] | None:
if not row:
return None
@@ -1307,6 +1325,11 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"created_at": row.get("created_at", ""),
}
def _policy_public_audit_details(details: dict[str, Any] | None) -> dict[str, Any]:
data = details or {}
public_keys = {"project_id", "platform", "target_project_id"}
return {key: data.get(key) for key in public_keys if data.get(key) not in (None, "")}
def _ensure_policy_scope(
*,
scope_kind: str,
@@ -1509,7 +1532,24 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
)
return payload
def _fetch_policy_audit_records(where_sql: str, params: tuple[Any, ...], *, limit: int = 20) -> list[dict[str, Any]]:
def _policy_audit_record_public_payload(row: dict[str, Any] | None) -> dict[str, Any] | None:
payload = _policy_audit_record_payload(row)
if not payload:
return None
payload.pop("actor_user_id", None)
payload["details"] = _policy_public_audit_details(payload.get("details"))
payload["version"] = _policy_version_public_payload(
legacy.db.fetch_one("SELECT * FROM agent_policy_versions WHERE id = ?", (payload.get("version_id", ""),))
) if payload.get("version_id") else None
return payload
def _fetch_policy_audit_records(
where_sql: str,
params: tuple[Any, ...],
*,
limit: int = 20,
public_view: bool = False,
) -> list[dict[str, Any]]:
rows = legacy.db.fetch_all(
f"""
SELECT audit.*
@@ -1521,7 +1561,8 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
""",
params + (limit,),
)
return [_policy_audit_record_payload(row) for row in rows if row]
builder = _policy_audit_record_public_payload if public_view else _policy_audit_record_payload
return [builder(row) for row in rows if row]
def _create_policy_version(
scope_row: dict[str, Any],
@@ -1781,6 +1822,23 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"effectivity": bundle.get("effectivity"),
}
)
active_admin_override_notice = next(
(
{
"scope_id": layer["scope"].get("id", ""),
"version_id": (layer.get("current_version") or {}).get("id", ""),
"title": (layer.get("current_version") or {}).get("title") or layer["scope"].get("title") or "管理员覆盖生效中",
"summary": (layer.get("current_version") or {}).get("summary") or layer["scope"].get("summary") or "",
"platform": layer["scope"].get("platform", ""),
"platform_label": layer["scope"].get("platform_label", ""),
"subject_project_id": layer["scope"].get("subject_project_id", ""),
"created_at": (layer.get("current_version") or {}).get("created_at", ""),
}
for layer in reversed(layers)
if layer.get("scope_kind") == "admin_override"
),
None,
)
return {
"user_id": subject_account["id"],
"project_id": subject_project_id,
@@ -1788,6 +1846,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"platform_label": legacy.platform_label(normalized_platform) if normalized_platform else "",
"layers": layers,
"effective_policy": effective_policy,
"active_admin_override_notice": active_admin_override_notice,
}
def _bundle_with_versions(
@@ -1797,6 +1856,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
fallback_platform: str = "",
fallback_user_id: str = "",
fallback_project_id: str = "",
active_version_only: bool = False,
) -> dict[str, Any]:
bundle = _policy_scope_bundle(
scope_row,
@@ -1804,6 +1864,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
fallback_platform=fallback_platform,
fallback_user_id=fallback_user_id,
fallback_project_id=fallback_project_id,
active_only=active_version_only,
)
versions = _list_policy_versions(scope_row)
bundle["versions"] = {"items": versions, "count": len(versions)}
@@ -2667,6 +2728,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
fallback_kind="user_global",
fallback_user_id=account["id"],
fallback_project_id=project["id"],
active_version_only=True,
),
"user_platform": _bundle_with_versions(
user_platform_scope,
@@ -2674,6 +2736,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
fallback_platform=_normalize_policy_platform(platform),
fallback_user_id=account["id"],
fallback_project_id=project["id"],
active_version_only=True,
) if platform else None,
},
}
@@ -2690,6 +2753,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
governance = context.get("governance") or {}
effective_policy = (governance.get("effective") or {}).get("effective_policy") or {}
governance_layers = (governance.get("effective") or {}).get("layers") or []
active_admin_override_notice = (governance.get("effective") or {}).get("active_admin_override_notice") or {}
primary_action = (plan.get("suggested_actions") or [{}])[0] if plan.get("suggested_actions") else None
evidence = []
if platform_agent.get("recent_memory"):
@@ -2725,6 +2789,8 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
next_steps.append("我会优先参考该平台 Agent 最近沉淀的方法与技能。")
if governance_layers:
next_steps.append(f"当前会话已叠加 {len(governance_layers)} 层策略,我会先按生效策略执行。")
if active_admin_override_notice.get("title"):
next_steps.append(f"当前存在管理员覆盖:{active_admin_override_notice.get('title')}")
summary_lines = [
f"我理解你的目标是:{plan.get('intent_label', '自定义任务')}",
f"建议优先处理的平台:{plan.get('platform_label', '待判断')}",
@@ -2742,6 +2808,8 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
summary_lines.append(f"最近有效技能:{platform_agent['recent_skill'].get('name') or '一条技能'}")
if effective_policy.get("guardrails", {}).get("require_admin_review"):
summary_lines.append("当前策略层要求管理员复核,所以我会把高风险动作先压成待确认。")
if active_admin_override_notice.get("title"):
summary_lines.append(f"管理员覆盖生效中:{active_admin_override_notice.get('title')}")
secondary_actions: list[dict[str, Any]] = []
if plan.get("platform"):
secondary_actions.append(
@@ -2905,6 +2973,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
"readiness_score": platform_agent.get("readiness_score") or 0,
"primary_action": primary_action or {},
"blocked_reason": blocked_reason,
"active_admin_override_notice": active_admin_override_notice,
"evidence": evidence,
"next_steps": next_steps,
"secondary_actions": secondary_actions,
@@ -4476,6 +4545,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
fallback_kind="user_global",
fallback_user_id=account["id"],
fallback_project_id=project["id"],
active_version_only=True,
)
payload["effective_policy"] = _effective_policy_payload(
subject_account=account,
@@ -4556,7 +4626,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
if normalized_platform:
where_clauses.append("AND (scope.platform = '' OR scope.platform = ?)")
params.append(normalized_platform)
items = _fetch_policy_audit_records(" ".join(where_clauses), tuple(params), limit=limit)
items = _fetch_policy_audit_records(" ".join(where_clauses), tuple(params), limit=limit, public_view=True)
return {"items": items, "count": len(items)}
@app.post("/v2/oneliner/governance/user/global/rollback")
@@ -4614,6 +4684,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
fallback_platform=normalized_platform,
fallback_user_id=account["id"],
fallback_project_id=project["id"],
active_version_only=True,
)
payload["effective_policy"] = _effective_policy_payload(
subject_account=account,
@@ -4725,7 +4796,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
def get_system_main_policy(admin: dict[str, Any] = Depends(legacy.require_super_admin)) -> dict[str, Any]:
_ = admin
scope_row = _policy_scope_row(scope_kind="system_main")
return _bundle_with_versions(scope_row, fallback_kind="system_main")
return _bundle_with_versions(scope_row, fallback_kind="system_main", active_version_only=True)
@app.put("/v2/admin/oneliner/governance/system/main-agent")
def put_system_main_policy(
@@ -4803,6 +4874,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
scope_row,
fallback_kind="system_platform",
fallback_platform=normalized_platform,
active_version_only=True,
)
@app.put("/v2/admin/oneliner/governance/system/platforms/{platform}")
@@ -4950,6 +5022,7 @@ def register_oneliner_routes(app: Any, legacy: Any) -> None:
fallback_platform=normalized_platform,
fallback_user_id=subject_account["id"],
fallback_project_id=subject_project_id,
active_version_only=True,
)
payload.update(
_effective_policy_payload(

View File

@@ -273,6 +273,7 @@ class MainAgentGovernanceTests(unittest.TestCase):
self.assertEqual(payload["layers"][-1]["scope_kind"], "admin_override")
self.assertEqual(payload["effective_policy"]["actions"]["max_cards"], 4)
self.assertTrue(payload["effective_policy"]["guardrails"]["require_admin_review"])
self.assertEqual(payload["active_admin_override_notice"]["title"], "Global safety override")
def test_effective_policy_skips_future_scheduled_versions_until_window_opens(self) -> None:
first_response = self.client.put(
@@ -311,6 +312,72 @@ class MainAgentGovernanceTests(unittest.TestCase):
self.assertEqual(payload["effective_policy"]["tone"]["style"], "default")
self.assertEqual(payload["layers"][0]["current_version"]["title"], "Current system baseline")
def test_scope_read_endpoints_keep_current_version_on_active_release_not_future_schedule(self) -> None:
user_first = self.client.put(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "User global baseline",
"policy": {"tone": {"style": "baseline"}},
"reason": "seed baseline",
},
)
self.assertEqual(user_first.status_code, 200, user_first.text)
user_future = self.client.put(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
json={
"project_id": self.ctx["project_id"],
"title": "User global future",
"policy": {"tone": {"style": "future"}},
"effect_mode": "scheduled",
"starts_at": "2099-01-01T00:00:00Z",
"reason": "future rollout",
},
)
self.assertEqual(user_future.status_code, 200, user_future.text)
user_read = self.client.get(
"/v2/oneliner/governance/user/global",
headers=self.ctx["member_headers"],
params={"project_id": self.ctx["project_id"]},
)
self.assertEqual(user_read.status_code, 200, user_read.text)
self.assertEqual(user_read.json()["current_version"]["title"], "User global baseline")
system_first = self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "System baseline",
"policy": {"homepage": {"focus": "stable"}},
"reason": "stable baseline",
},
)
self.assertEqual(system_first.status_code, 200, system_first.text)
system_future = self.client.put(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
json={
"title": "System future",
"policy": {"homepage": {"focus": "future"}},
"effect_mode": "scheduled",
"starts_at": "2099-01-01T00:00:00Z",
"reason": "future rollout",
},
)
self.assertEqual(system_future.status_code, 200, system_future.text)
system_read = self.client.get(
"/v2/admin/oneliner/governance/system/main-agent",
headers=self.ctx["admin_headers"],
)
self.assertEqual(system_read.status_code, 200, system_read.text)
self.assertEqual(system_read.json()["current_version"]["title"], "System baseline")
def test_admin_governance_directory_lists_accounts_and_projects(self) -> None:
response = self.client.get(
"/v2/admin/oneliner/governance/directory",
@@ -547,6 +614,10 @@ class MainAgentGovernanceTests(unittest.TestCase):
first_item = payload["items"][0]
self.assertIn("version", first_item)
self.assertIn("scope", first_item)
self.assertNotIn("actor_user_id", first_item)
self.assertNotIn("policy", first_item["version"])
self.assertNotIn("reason", first_item["version"])
self.assertNotIn("actor_user_id", first_item["version"])
def test_admin_policy_audits_include_target_and_system_layers(self) -> None:
self.client.put(