feat: queue master-agent chat replies
This commit is contained in:
236
tests/master-agent-message-queue.test.ts
Normal file
236
tests/master-agent-message-queue.test.ts
Normal file
@@ -0,0 +1,236 @@
|
||||
import test from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { mkdtemp, rm } from "node:fs/promises";
|
||||
import { NextRequest } from "next/server";
|
||||
|
||||
let runtimeRoot = "";
|
||||
let POST: (typeof import("../src/app/api/v1/projects/[projectId]/messages/route"))["POST"];
|
||||
let saveAiAccount: (typeof import("../src/lib/boss-data"))["saveAiAccount"];
|
||||
let updateProjectAgentControls: (typeof import("../src/lib/boss-data"))["updateProjectAgentControls"];
|
||||
let readState: (typeof import("../src/lib/boss-data"))["readState"];
|
||||
let createAuthSession: (typeof import("../src/lib/boss-data"))["createAuthSession"];
|
||||
let AUTH_SESSION_COOKIE = "";
|
||||
|
||||
async function setup() {
|
||||
if (runtimeRoot) {
|
||||
return;
|
||||
}
|
||||
|
||||
runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-master-agent-message-queue-"));
|
||||
process.env.BOSS_RUNTIME_ROOT = runtimeRoot;
|
||||
process.env.BOSS_STATE_FILE = path.join(runtimeRoot, "boss-state.json");
|
||||
|
||||
const [messageRoute, data, auth] = await Promise.all([
|
||||
import("../src/app/api/v1/projects/[projectId]/messages/route.ts"),
|
||||
import("../src/lib/boss-data.ts"),
|
||||
import("../src/lib/boss-auth.ts"),
|
||||
]);
|
||||
|
||||
POST = messageRoute.POST;
|
||||
saveAiAccount = data.saveAiAccount;
|
||||
updateProjectAgentControls = data.updateProjectAgentControls;
|
||||
readState = data.readState;
|
||||
createAuthSession = data.createAuthSession;
|
||||
AUTH_SESSION_COOKIE = auth.AUTH_SESSION_COOKIE;
|
||||
}
|
||||
|
||||
async function createAuthedRequest(projectId: string, body: unknown) {
|
||||
const session = await createAuthSession({
|
||||
account: "17600003315",
|
||||
role: "highest_admin",
|
||||
displayName: "Boss 超级管理员",
|
||||
loginMethod: "password",
|
||||
});
|
||||
|
||||
return new NextRequest(`http://127.0.0.1:3000/api/v1/projects/${projectId}/messages`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
cookie: `${AUTH_SESSION_COOKIE}=${session.sessionToken}`,
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
});
|
||||
}
|
||||
|
||||
async function waitFor(predicate: () => Promise<boolean>, timeoutMs = 5_000) {
|
||||
const startedAt = Date.now();
|
||||
while (Date.now() - startedAt < timeoutMs) {
|
||||
if (await predicate()) {
|
||||
return;
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
}
|
||||
throw new Error("waitFor timed out");
|
||||
}
|
||||
|
||||
test.after(async () => {
|
||||
if (runtimeRoot) {
|
||||
await rm(runtimeRoot, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("POST /api/v1/projects/master-agent/messages 快速返回队列态并在异步实际回复时继承当前会话覆盖", async () => {
|
||||
await setup();
|
||||
|
||||
await saveAiAccount({
|
||||
accountId: "openai-master-agent-queue",
|
||||
label: "API 容灾",
|
||||
role: "api_fallback",
|
||||
provider: "openai_api",
|
||||
displayName: "OpenAI API 队列测试",
|
||||
model: "gpt-5.4",
|
||||
apiKey: "sk-test-openai-queue",
|
||||
enabled: true,
|
||||
setActive: true,
|
||||
loginStatusNote: "用于 master-agent 队列测试。",
|
||||
});
|
||||
|
||||
await updateProjectAgentControls("master-agent", {
|
||||
modelOverride: "gpt-4.1-mini",
|
||||
reasoningEffortOverride: "high",
|
||||
});
|
||||
|
||||
const fetchCalls: Array<{ url: string; body: unknown }> = [];
|
||||
const originalFetch = globalThis.fetch;
|
||||
globalThis.fetch = (async (input, init) => {
|
||||
const body = typeof init?.body === "string" ? JSON.parse(init.body) : init?.body ?? null;
|
||||
fetchCalls.push({ url: String(input), body });
|
||||
return new Response(JSON.stringify({ output_text: "已切到异步队列回复。" }), {
|
||||
status: 200,
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
"x-request-id": "req-master-agent-queue",
|
||||
},
|
||||
});
|
||||
}) as typeof fetch;
|
||||
|
||||
try {
|
||||
const response = await POST(
|
||||
await createAuthedRequest("master-agent", {
|
||||
body: "请同步 master-agent 当前阻塞点",
|
||||
}),
|
||||
{ params: Promise.resolve({ projectId: "master-agent" }) },
|
||||
);
|
||||
|
||||
assert.equal(response.status, 200);
|
||||
|
||||
const payload = (await response.json()) as {
|
||||
ok: boolean;
|
||||
task?: { taskId: string; taskType: string; status: string } | null;
|
||||
masterReplyState?: "queued" | "running" | "completed";
|
||||
masterReply?: unknown;
|
||||
};
|
||||
|
||||
assert.equal(payload.ok, true);
|
||||
assert.equal(payload.masterReplyState, "queued");
|
||||
assert.ok(payload.task, "expected master-agent message to return a task envelope");
|
||||
assert.equal(payload.task?.taskType, "conversation_reply");
|
||||
assert.equal(payload.task?.status, "queued");
|
||||
assert.ok(payload.task?.taskId, "expected a stable taskId in the response");
|
||||
|
||||
await waitFor(async () => {
|
||||
const state = await readState();
|
||||
const task = state.masterAgentTasks.find((item) => item.taskId === payload.task?.taskId);
|
||||
return task?.status === "completed";
|
||||
});
|
||||
|
||||
const nextState = await readState();
|
||||
const task = nextState.masterAgentTasks.find((item) => item.taskId === payload.task?.taskId);
|
||||
assert.ok(task, "expected the queued task to remain in state");
|
||||
assert.equal(task?.status, "completed");
|
||||
assert.equal(task?.replyBody, "已切到异步队列回复。");
|
||||
|
||||
const masterProject = nextState.projects.find((project) => project.id === "master-agent");
|
||||
const mirroredReply = masterProject?.messages.at(-1);
|
||||
assert.ok(mirroredReply, "expected the async reply to be written back to the master-agent ledger");
|
||||
assert.match(mirroredReply?.body ?? "", /已切到异步队列回复/);
|
||||
|
||||
assert.equal(fetchCalls.length, 1);
|
||||
assert.equal(fetchCalls[0]?.url, "https://api.openai.com/v1/responses");
|
||||
const requestBody = fetchCalls[0]?.body as {
|
||||
model?: string;
|
||||
reasoning?: { effort?: string };
|
||||
};
|
||||
assert.equal(requestBody?.model, "gpt-4.1-mini");
|
||||
assert.equal(requestBody?.reasoning?.effort, "high");
|
||||
} finally {
|
||||
globalThis.fetch = originalFetch;
|
||||
}
|
||||
});
|
||||
|
||||
test("master-agent enqueue 在主节点离线时会自动切到 OpenAI 后台队列而不是挂到本机设备队列", async () => {
|
||||
await setup();
|
||||
|
||||
await saveAiAccount({
|
||||
accountId: "master-codex-primary-offline",
|
||||
label: "主 GPT",
|
||||
role: "primary",
|
||||
provider: "master_codex_node",
|
||||
displayName: "离线 Master Codex Node",
|
||||
nodeId: "offline-node",
|
||||
nodeLabel: "离线节点",
|
||||
model: "gpt-5.4",
|
||||
enabled: true,
|
||||
setActive: true,
|
||||
loginStatusNote: "离线主节点",
|
||||
});
|
||||
|
||||
await saveAiAccount({
|
||||
accountId: "openai-backup-queue",
|
||||
label: "备用 GPT",
|
||||
role: "backup",
|
||||
provider: "openai_api",
|
||||
displayName: "OpenAI 备用账号",
|
||||
accountIdentifier: "sk-queue-demo",
|
||||
model: "gpt-5.4",
|
||||
apiKey: "sk-queue-demo",
|
||||
enabled: true,
|
||||
setActive: false,
|
||||
loginStatusNote: "备用 API 账号",
|
||||
});
|
||||
|
||||
const originalFetch = globalThis.fetch;
|
||||
globalThis.fetch = (async () =>
|
||||
new Response(JSON.stringify({ output_text: "离线主节点已切到 API 后台队列。" }), {
|
||||
status: 200,
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
"x-request-id": "req-master-agent-offline-fallback-queue",
|
||||
},
|
||||
})) as typeof fetch;
|
||||
|
||||
try {
|
||||
const response = await POST(
|
||||
await createAuthedRequest("master-agent", {
|
||||
body: "请走备用 API 队列",
|
||||
}),
|
||||
{ params: Promise.resolve({ projectId: "master-agent" }) },
|
||||
);
|
||||
|
||||
assert.equal(response.status, 200);
|
||||
const payload = (await response.json()) as {
|
||||
ok: boolean;
|
||||
task?: { taskId: string; taskType: string; status: string } | null;
|
||||
masterReplyState?: "queued" | "running" | "completed";
|
||||
};
|
||||
assert.equal(payload.ok, true);
|
||||
assert.equal(payload.masterReplyState, "queued");
|
||||
assert.equal(payload.task?.taskType, "conversation_reply");
|
||||
|
||||
await waitFor(async () => {
|
||||
const state = await readState();
|
||||
const task = state.masterAgentTasks.find((item) => item.taskId === payload.task?.taskId);
|
||||
return task?.status === "completed";
|
||||
});
|
||||
|
||||
const nextState = await readState();
|
||||
const task = nextState.masterAgentTasks.find((item) => item.taskId === payload.task?.taskId);
|
||||
assert.equal(task?.deviceId, "master-agent-openai");
|
||||
assert.equal(task?.status, "completed");
|
||||
assert.equal(task?.accountId, "openai-backup-queue");
|
||||
} finally {
|
||||
globalThis.fetch = originalFetch;
|
||||
}
|
||||
});
|
||||
Reference in New Issue
Block a user