test: cover live-first platform routes
Some checks failed
StoryForge CI / Baseline checks (push) Has been cancelled
StoryForge CI / Backend tests (push) Has been cancelled
StoryForge CI / Web tests (push) Has been cancelled

This commit is contained in:
kris
2026-03-31 01:11:57 +08:00
parent 7a928b5df9
commit 7897ce6c3d
2 changed files with 100 additions and 0 deletions

View File

@@ -34,6 +34,7 @@
- 工作台前端已经清掉浏览器 `alert` 弹窗,缺对象、权限不足、刷新失败和加载失败都会回到站内反馈,不再把用户从当前流程里打断出去。
- `OneLiner 会话 / 主 Agent 运行 / 动作执行器 / 跟踪同步 / 高分分析 / 平台技能验收` 这批真接口也已经去掉“当前实例未提供”的旧降级口径,统一按 live 结果说话。
- 新增一条前端回归护栏:静态声明出来的 `data-action` 必须有明确处理逻辑,避免后续再出现“点了没反应,最后落到动作待接入”的隐性缺口。
- 后端契约测试新增 live-first 路由覆盖,直接校验 `分析高分作品 / 批量跟踪同步 / 单账号跟踪同步 / 跟踪游标` 这些当前前端已完全依赖的接口。
### NAS 联调与回归

View File

@@ -96,6 +96,28 @@ def _make_legacy(db: Database, account_row: dict[str, object]) -> SimpleNamespac
def job_payload(row: dict[str, object]) -> dict[str, object]:
return row
def create_job_record(**kwargs: object) -> dict[str, object]:
created_at = utc_now()
return {
"id": make_id("job"),
"account_id": kwargs.get("account_id", ""),
"project_id": kwargs.get("project_id", ""),
"knowledge_base_id": kwargs.get("knowledge_base_id", ""),
"content_source_id": kwargs.get("content_source_id", ""),
"assistant_id": kwargs.get("assistant_id", ""),
"source_type": kwargs.get("source_type", ""),
"line_type": kwargs.get("line_type", ""),
"workflow_key": kwargs.get("workflow_key", ""),
"title": kwargs.get("title", ""),
"language": kwargs.get("language", "auto"),
"source_url": kwargs.get("source_url", ""),
"artifacts": kwargs.get("artifacts", {}),
"analysis_model_profile_id": kwargs.get("analysis_model_profile_id", ""),
"status": "queued",
"created_at": created_at,
"updated_at": created_at,
}
async def trigger_orchestrated_job(job_row: dict[str, object]) -> dict[str, object]:
return job_row
@@ -111,6 +133,7 @@ def _make_legacy(db: Database, account_row: dict[str, object]) -> SimpleNamespac
resolve_target_kb=resolve_target_kb,
resolve_target_assistant=resolve_target_assistant,
call_model=call_model,
create_job_record=create_job_record,
job_payload=job_payload,
trigger_orchestrated_job=trigger_orchestrated_job,
)
@@ -570,6 +593,82 @@ class PlatformContractTests(unittest.TestCase):
self.assertIn("account", digest_item)
self.assertIn("video", digest_item)
def test_douyin_live_first_mutation_routes_are_available(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
douyin_account_id = _seed_douyin(legacy.db, seed["owner"], seed["model"])
with TestClient(app) as client:
analyze = client.post(
f"/v2/douyin/accounts/{douyin_account_id}/videos/analyze-top",
headers={"Authorization": "Bearer dummy"},
json={"top_video_count": 2, "min_score": 0, "temperature": 0.25, "model_profile_id": seed["model"]["id"]},
)
self.assertEqual(analyze.status_code, 200, analyze.text)
analyze_payload = analyze.json()
self.assertEqual(analyze_payload["account_id"], douyin_account_id)
self.assertIn("analyzed_count", analyze_payload)
refresh_all = client.post("/v2/douyin/tracking/refresh", headers={"Authorization": "Bearer dummy"})
self.assertEqual(refresh_all.status_code, 200, refresh_all.text)
refresh_all_payload = refresh_all.json()
self.assertIn("refreshed", refresh_all_payload)
self.assertIn("items", refresh_all_payload)
refresh_one = client.post(
f"/v2/douyin/tracking/accounts/{douyin_account_id}/refresh",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(refresh_one.status_code, 200, refresh_one.text)
refresh_one_payload = refresh_one.json()
self.assertTrue(refresh_one_payload["success"])
self.assertEqual(refresh_one_payload["tracked_account_id"], douyin_account_id)
self.assertIn("account", refresh_one_payload)
self.assertIn("sync_errors", refresh_one_payload)
cursor = client.post(
"/v2/douyin/tracking/cursor",
headers={"Authorization": "Bearer dummy"},
json={"last_seen_at": "2026-03-30T10:00:00+00:00"},
)
self.assertEqual(cursor.status_code, 200, cursor.text)
self.assertEqual(cursor.json()["last_seen_at"], "2026-03-30T10:00:00+00:00")
def test_domestic_live_first_mutation_routes_are_available(self) -> None:
app, legacy, seed = _build_app(["xiaohongshu"])
xhs_account_id = _seed_domestic(legacy.db, seed["owner"], seed["project"], "xiaohongshu")
with TestClient(app) as client:
analyze = client.post(
f"/v2/xiaohongshu/accounts/{xhs_account_id}/videos/analyze-top",
headers={"Authorization": "Bearer dummy"},
json={"top_video_count": 2, "min_score": 0, "temperature": 0.25, "model_profile_id": seed["model"]["id"]},
)
self.assertEqual(analyze.status_code, 200, analyze.text)
analyze_payload = analyze.json()
self.assertEqual(analyze_payload["account_id"], xhs_account_id)
self.assertIn("analyzed_count", analyze_payload)
refresh_all = client.post("/v2/xiaohongshu/tracking/refresh", headers={"Authorization": "Bearer dummy"})
self.assertEqual(refresh_all.status_code, 200, refresh_all.text)
refresh_all_payload = refresh_all.json()
self.assertIn("refreshed", refresh_all_payload)
self.assertIn("items", refresh_all_payload)
refresh_one = client.post(
f"/v2/xiaohongshu/tracking/accounts/{xhs_account_id}/refresh",
headers={"Authorization": "Bearer dummy"},
)
self.assertEqual(refresh_one.status_code, 200, refresh_one.text)
refresh_one_payload = refresh_one.json()
self.assertEqual(refresh_one_payload["tracked_account_id"], xhs_account_id)
self.assertIn("sync_job_id", refresh_one_payload)
cursor = client.post(
"/v2/xiaohongshu/tracking/cursor",
headers={"Authorization": "Bearer dummy"},
json={"last_seen_at": "2026-03-30T10:00:00+00:00"},
)
self.assertEqual(cursor.status_code, 200, cursor.text)
self.assertEqual(cursor.json()["last_seen_at"], "2026-03-30T10:00:00+00:00")
if __name__ == "__main__":
unittest.main()