Files
boss/tests/master-agent-message-queue.test.ts
2026-03-31 19:59:08 +08:00

237 lines
8.0 KiB
TypeScript

import test from "node:test";
import assert from "node:assert/strict";
import os from "node:os";
import path from "node:path";
import { mkdtemp, rm } from "node:fs/promises";
import { NextRequest } from "next/server";
let runtimeRoot = "";
let POST: (typeof import("../src/app/api/v1/projects/[projectId]/messages/route"))["POST"];
let saveAiAccount: (typeof import("../src/lib/boss-data"))["saveAiAccount"];
let updateProjectAgentControls: (typeof import("../src/lib/boss-data"))["updateProjectAgentControls"];
let readState: (typeof import("../src/lib/boss-data"))["readState"];
let createAuthSession: (typeof import("../src/lib/boss-data"))["createAuthSession"];
let AUTH_SESSION_COOKIE = "";
async function setup() {
if (runtimeRoot) {
return;
}
runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-master-agent-message-queue-"));
process.env.BOSS_RUNTIME_ROOT = runtimeRoot;
process.env.BOSS_STATE_FILE = path.join(runtimeRoot, "boss-state.json");
const [messageRoute, data, auth] = await Promise.all([
import("../src/app/api/v1/projects/[projectId]/messages/route.ts"),
import("../src/lib/boss-data.ts"),
import("../src/lib/boss-auth.ts"),
]);
POST = messageRoute.POST;
saveAiAccount = data.saveAiAccount;
updateProjectAgentControls = data.updateProjectAgentControls;
readState = data.readState;
createAuthSession = data.createAuthSession;
AUTH_SESSION_COOKIE = auth.AUTH_SESSION_COOKIE;
}
async function createAuthedRequest(projectId: string, body: unknown) {
const session = await createAuthSession({
account: "17600003315",
role: "highest_admin",
displayName: "Boss 超级管理员",
loginMethod: "password",
});
return new NextRequest(`http://127.0.0.1:3000/api/v1/projects/${projectId}/messages`, {
method: "POST",
headers: {
"content-type": "application/json",
cookie: `${AUTH_SESSION_COOKIE}=${session.sessionToken}`,
},
body: JSON.stringify(body),
});
}
async function waitFor(predicate: () => Promise<boolean>, timeoutMs = 5_000) {
const startedAt = Date.now();
while (Date.now() - startedAt < timeoutMs) {
if (await predicate()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
throw new Error("waitFor timed out");
}
test.after(async () => {
if (runtimeRoot) {
await rm(runtimeRoot, { recursive: true, force: true });
}
});
test("POST /api/v1/projects/master-agent/messages 快速返回队列态并在异步实际回复时继承当前会话覆盖", async () => {
await setup();
await saveAiAccount({
accountId: "openai-master-agent-queue",
label: "API 容灾",
role: "api_fallback",
provider: "openai_api",
displayName: "OpenAI API 队列测试",
model: "gpt-5.4",
apiKey: "sk-test-openai-queue",
enabled: true,
setActive: true,
loginStatusNote: "用于 master-agent 队列测试。",
});
await updateProjectAgentControls("master-agent", {
modelOverride: "gpt-4.1-mini",
reasoningEffortOverride: "high",
});
const fetchCalls: Array<{ url: string; body: unknown }> = [];
const originalFetch = globalThis.fetch;
globalThis.fetch = (async (input, init) => {
const body = typeof init?.body === "string" ? JSON.parse(init.body) : init?.body ?? null;
fetchCalls.push({ url: String(input), body });
return new Response(JSON.stringify({ output_text: "已切到异步队列回复。" }), {
status: 200,
headers: {
"content-type": "application/json",
"x-request-id": "req-master-agent-queue",
},
});
}) as typeof fetch;
try {
const response = await POST(
await createAuthedRequest("master-agent", {
body: "请同步 master-agent 当前阻塞点",
}),
{ params: Promise.resolve({ projectId: "master-agent" }) },
);
assert.equal(response.status, 200);
const payload = (await response.json()) as {
ok: boolean;
task?: { taskId: string; taskType: string; status: string } | null;
masterReplyState?: "queued" | "running" | "completed";
masterReply?: unknown;
};
assert.equal(payload.ok, true);
assert.equal(payload.masterReplyState, "queued");
assert.ok(payload.task, "expected master-agent message to return a task envelope");
assert.equal(payload.task?.taskType, "conversation_reply");
assert.equal(payload.task?.status, "queued");
assert.ok(payload.task?.taskId, "expected a stable taskId in the response");
await waitFor(async () => {
const state = await readState();
const task = state.masterAgentTasks.find((item) => item.taskId === payload.task?.taskId);
return task?.status === "completed";
});
const nextState = await readState();
const task = nextState.masterAgentTasks.find((item) => item.taskId === payload.task?.taskId);
assert.ok(task, "expected the queued task to remain in state");
assert.equal(task?.status, "completed");
assert.equal(task?.replyBody, "已切到异步队列回复。");
const masterProject = nextState.projects.find((project) => project.id === "master-agent");
const mirroredReply = masterProject?.messages.at(-1);
assert.ok(mirroredReply, "expected the async reply to be written back to the master-agent ledger");
assert.match(mirroredReply?.body ?? "", /已切到异步队列回复/);
assert.equal(fetchCalls.length, 1);
assert.equal(fetchCalls[0]?.url, "https://api.openai.com/v1/responses");
const requestBody = fetchCalls[0]?.body as {
model?: string;
reasoning?: { effort?: string };
};
assert.equal(requestBody?.model, "gpt-4.1-mini");
assert.equal(requestBody?.reasoning?.effort, "high");
} finally {
globalThis.fetch = originalFetch;
}
});
test("master-agent enqueue 在主节点离线时会自动切到 OpenAI 后台队列而不是挂到本机设备队列", async () => {
await setup();
await saveAiAccount({
accountId: "master-codex-primary-offline",
label: "主 GPT",
role: "primary",
provider: "master_codex_node",
displayName: "离线 Master Codex Node",
nodeId: "offline-node",
nodeLabel: "离线节点",
model: "gpt-5.4",
enabled: true,
setActive: true,
loginStatusNote: "离线主节点",
});
await saveAiAccount({
accountId: "openai-backup-queue",
label: "备用 GPT",
role: "backup",
provider: "openai_api",
displayName: "OpenAI 备用账号",
accountIdentifier: "sk-queue-demo",
model: "gpt-5.4",
apiKey: "sk-queue-demo",
enabled: true,
setActive: false,
loginStatusNote: "备用 API 账号",
});
const originalFetch = globalThis.fetch;
globalThis.fetch = (async () =>
new Response(JSON.stringify({ output_text: "离线主节点已切到 API 后台队列。" }), {
status: 200,
headers: {
"content-type": "application/json",
"x-request-id": "req-master-agent-offline-fallback-queue",
},
})) as typeof fetch;
try {
const response = await POST(
await createAuthedRequest("master-agent", {
body: "请走备用 API 队列",
}),
{ params: Promise.resolve({ projectId: "master-agent" }) },
);
assert.equal(response.status, 200);
const payload = (await response.json()) as {
ok: boolean;
task?: { taskId: string; taskType: string; status: string } | null;
masterReplyState?: "queued" | "running" | "completed";
};
assert.equal(payload.ok, true);
assert.equal(payload.masterReplyState, "queued");
assert.equal(payload.task?.taskType, "conversation_reply");
await waitFor(async () => {
const state = await readState();
const task = state.masterAgentTasks.find((item) => item.taskId === payload.task?.taskId);
return task?.status === "completed";
});
const nextState = await readState();
const task = nextState.masterAgentTasks.find((item) => item.taskId === payload.task?.taskId);
assert.equal(task?.deviceId, "master-agent-openai");
assert.equal(task?.status, "completed");
assert.equal(task?.accountId, "openai-backup-queue");
} finally {
globalThis.fetch = originalFetch;
}
});