Integrate master agent runtime orchestration updates

This commit is contained in:
kris
2026-04-16 04:41:46 +08:00
parent e0c0ea1814
commit 39be49630f
81 changed files with 9283 additions and 448 deletions

View File

@@ -2,7 +2,7 @@ import test from "node:test";
import assert from "node:assert/strict";
import os from "node:os";
import path from "node:path";
import { mkdtemp, rm } from "node:fs/promises";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { NextRequest } from "next/server";
let runtimeRoot = "";
@@ -11,6 +11,7 @@ let completeMasterTaskRoute: (typeof import("../src/app/api/v1/master-agent/task
let createAuthSession: (typeof import("../src/lib/boss-data"))["createAuthSession"];
let readState: (typeof import("../src/lib/boss-data"))["readState"];
let writeState: (typeof import("../src/lib/boss-data"))["writeState"];
let updateProjectAgentControls: (typeof import("../src/lib/boss-data"))["updateProjectAgentControls"];
let AUTH_SESSION_COOKIE = "";
async function setup() {
@@ -34,6 +35,7 @@ async function setup() {
createAuthSession = data.createAuthSession;
readState = data.readState;
writeState = data.writeState;
updateProjectAgentControls = data.updateProjectAgentControls;
AUTH_SESSION_COOKIE = auth.AUTH_SESSION_COOKIE;
}
@@ -43,6 +45,12 @@ test.after(async () => {
}
});
test.beforeEach(async () => {
await setup();
await rm(runtimeRoot, { recursive: true, force: true });
await mkdir(runtimeRoot, { recursive: true });
});
async function createAuthedRequest(url: string, method: "POST", body: unknown) {
const session = await createAuthSession({
account: "17600003315",
@@ -61,10 +69,27 @@ async function createAuthedRequest(url: string, method: "POST", body: unknown) {
});
}
async function waitFor(predicate: () => Promise<boolean>, timeoutMs = 5_000) {
const startedAt = Date.now();
while (Date.now() - startedAt < timeoutMs) {
if (await predicate()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 50));
}
throw new Error("waitFor timed out");
}
function findSingleThreadProject(
state: Awaited<ReturnType<typeof readState>>,
projectId?: string,
) {
return state.projects.find((project) => project.id !== "master-agent" && !project.isGroup);
return state.projects.find(
(project) =>
project.id !== "master-agent" &&
!project.isGroup &&
(projectId ? project.id === projectId : true),
);
}
function buildSingleThreadProject(projectId: string) {
@@ -105,19 +130,19 @@ function buildProjectFolderKey(project: ReturnType<typeof buildSingleThreadProje
return `${project.deviceIds[0]}:${folderRef}`;
}
async function ensureSingleThreadProject() {
async function ensureSingleThreadProject(projectId = "single-thread-test") {
const state = await readState();
const existing = findSingleThreadProject(state);
const existing = findSingleThreadProject(state, projectId);
if (existing) {
return existing;
}
const project = buildSingleThreadProject("single-thread-test");
const project = buildSingleThreadProject(projectId);
await writeState({
...state,
projects: state.projects.concat(project),
});
const nextState = await readState();
return findSingleThreadProject(nextState);
return findSingleThreadProject(nextState, projectId);
}
test("POST /api/v1/projects/[projectId]/messages enqueues a conversation task for single-thread projects", async () => {
@@ -137,7 +162,8 @@ test("POST /api/v1/projects/[projectId]/messages enqueues a conversation task fo
const payload = (await response.json()) as {
ok: boolean;
task?: { taskId: string; taskType: string; status: string } | null;
message: { id: string };
task?: { taskId: string; taskType: string; status: string; requestMessageId: string } | null;
dispatchPlan: null;
};
@@ -146,6 +172,7 @@ test("POST /api/v1/projects/[projectId]/messages enqueues a conversation task fo
assert.ok(payload.task, "expected single-thread message to return a queued task");
assert.equal(payload.task?.taskType, "conversation_reply");
assert.equal(payload.task?.status, "queued");
assert.equal(payload.task?.requestMessageId, payload.message.id);
const nextState = await readState();
const task = nextState.masterAgentTasks.find(
@@ -164,6 +191,333 @@ test("POST /api/v1/projects/[projectId]/messages enqueues a conversation task fo
assert.ok(!task?.executionPrompt?.includes("deviceIds:"), "thread prompt should not include device id labels");
});
test("POST /api/v1/projects/[projectId]/messages preserves default local-agent path when ordinary thread has no backend override", async () => {
await setup();
const singleProject = await ensureSingleThreadProject();
assert.ok(singleProject, "expected a seeded single-thread project");
const response = await postMessageRoute(
await createAuthedRequest(
`http://127.0.0.1:3000/api/v1/projects/${singleProject.id}/messages`,
"POST",
{ body: "继续走默认线程回复链" },
),
{ params: Promise.resolve({ projectId: singleProject.id }) },
);
assert.equal(response.status, 200);
const nextState = await readState();
const task = nextState.masterAgentTasks.find(
(item) =>
item.taskType === "conversation_reply" &&
item.projectId === singleProject.id &&
item.requestText === "继续走默认线程回复链",
);
assert.ok(task, "expected a queued conversation task");
assert.equal(task?.deviceId, singleProject.deviceIds[0]);
assert.equal(task?.accountId, undefined);
assert.equal(task?.accountLabel, undefined);
});
test("POST /api/v1/projects/[projectId]/messages routes ordinary thread conversation_reply to hermes-runtime when backendOverride is set", async () => {
await setup();
const singleProject = await ensureSingleThreadProject("single-thread-hermes-test");
assert.ok(singleProject, "expected a seeded single-thread project");
const hermesDir = await mkdtemp(path.join(os.tmpdir(), "boss-thread-hermes-route-"));
const hermesScriptPath = path.join(hermesDir, "hermes-thread-route-runtime.mjs");
await writeFile(
hermesScriptPath,
`
process.stdout.write("Hermes 路由测试已执行\\n\\n");
process.stdout.write("session_id: hermes-thread-route-123\\n");
`,
"utf8",
);
const previousEnv = {
BOSS_HERMES_ENABLED: process.env.BOSS_HERMES_ENABLED,
BOSS_HERMES_COMMAND: process.env.BOSS_HERMES_COMMAND,
BOSS_HERMES_ARGS: process.env.BOSS_HERMES_ARGS,
BOSS_HERMES_TIMEOUT_MS: process.env.BOSS_HERMES_TIMEOUT_MS,
};
process.env.BOSS_HERMES_ENABLED = "true";
process.env.BOSS_HERMES_COMMAND = process.execPath;
process.env.BOSS_HERMES_ARGS = hermesScriptPath;
process.env.BOSS_HERMES_TIMEOUT_MS = "1000";
try {
await updateProjectAgentControls(
singleProject.id,
{
backendOverride: "hermes-runtime",
},
"17600003315",
);
const response = await postMessageRoute(
await createAuthedRequest(
`http://127.0.0.1:3000/api/v1/projects/${singleProject.id}/messages`,
"POST",
{ body: "请让 Hermes 接管当前线程回复" },
),
{ params: Promise.resolve({ projectId: singleProject.id }) },
);
assert.equal(response.status, 200);
const nextState = await readState();
const task = nextState.masterAgentTasks.find(
(item) =>
item.taskType === "conversation_reply" &&
item.projectId === singleProject.id &&
item.requestText === "请让 Hermes 接管当前线程回复",
);
assert.ok(task, "expected a queued conversation task");
assert.equal(task?.deviceId, "master-agent-hermes");
assert.equal(task?.accountId, "hermes-runtime");
assert.equal(task?.accountLabel, "Hermes Runtime");
assert.equal(task?.targetProjectId, singleProject.id);
assert.equal(task?.targetThreadId, singleProject.threadMeta.threadId);
await waitFor(async () => {
const state = await readState();
const currentTask = state.masterAgentTasks.find((item) => item.taskId === task?.taskId);
return currentTask?.status === "completed";
});
} finally {
process.env.BOSS_HERMES_ENABLED = previousEnv.BOSS_HERMES_ENABLED;
process.env.BOSS_HERMES_COMMAND = previousEnv.BOSS_HERMES_COMMAND;
process.env.BOSS_HERMES_ARGS = previousEnv.BOSS_HERMES_ARGS;
process.env.BOSS_HERMES_TIMEOUT_MS = previousEnv.BOSS_HERMES_TIMEOUT_MS;
await rm(hermesDir, { recursive: true, force: true });
}
});
test("POST /api/v1/projects/[projectId]/messages falls back to the default local-agent path when a saved hermes override is no longer available", async () => {
await setup();
const singleProject = await ensureSingleThreadProject("single-thread-hermes-fallback-test");
assert.ok(singleProject, "expected a seeded single-thread project");
const previousEnv = {
BOSS_HERMES_ENABLED: process.env.BOSS_HERMES_ENABLED,
BOSS_HERMES_COMMAND: process.env.BOSS_HERMES_COMMAND,
BOSS_HERMES_ARGS: process.env.BOSS_HERMES_ARGS,
BOSS_HERMES_TIMEOUT_MS: process.env.BOSS_HERMES_TIMEOUT_MS,
};
try {
await updateProjectAgentControls(
singleProject.id,
{
backendOverride: "hermes-runtime",
},
"17600003315",
);
delete process.env.BOSS_HERMES_ENABLED;
delete process.env.BOSS_HERMES_COMMAND;
delete process.env.BOSS_HERMES_ARGS;
delete process.env.BOSS_HERMES_TIMEOUT_MS;
const response = await postMessageRoute(
await createAuthedRequest(
`http://127.0.0.1:3000/api/v1/projects/${singleProject.id}/messages`,
"POST",
{ body: "Hermes 不可用时请回退到默认线程链路" },
),
{ params: Promise.resolve({ projectId: singleProject.id }) },
);
assert.equal(response.status, 200);
const nextState = await readState();
const task = nextState.masterAgentTasks.find(
(item) =>
item.taskType === "conversation_reply" &&
item.projectId === singleProject.id &&
item.requestText === "Hermes 不可用时请回退到默认线程链路",
);
assert.ok(task, "expected a queued conversation task");
assert.equal(task?.deviceId, singleProject.deviceIds[0]);
assert.equal(task?.accountId, undefined);
assert.equal(task?.accountLabel, undefined);
} finally {
process.env.BOSS_HERMES_ENABLED = previousEnv.BOSS_HERMES_ENABLED;
process.env.BOSS_HERMES_COMMAND = previousEnv.BOSS_HERMES_COMMAND;
process.env.BOSS_HERMES_ARGS = previousEnv.BOSS_HERMES_ARGS;
process.env.BOSS_HERMES_TIMEOUT_MS = previousEnv.BOSS_HERMES_TIMEOUT_MS;
}
});
test("POST /api/v1/projects/[projectId]/messages lets Hermes asynchronously complete ordinary thread replies when backendOverride is set", async () => {
await setup();
const singleProject = await ensureSingleThreadProject("single-thread-hermes-async-test");
assert.ok(singleProject, "expected a seeded single-thread project");
const hermesDir = await mkdtemp(path.join(os.tmpdir(), "boss-thread-hermes-queue-"));
const hermesScriptPath = path.join(hermesDir, "hermes-thread-runtime.mjs");
await writeFile(
hermesScriptPath,
`
const args = process.argv.slice(2);
const queryIndex = args.findIndex((item) => item === "-q" || item === "--query");
const query = queryIndex >= 0 ? args[queryIndex + 1] ?? "" : "";
process.stdout.write("Hermes 线程已接管:" + query + "\\n\\n");
process.stdout.write("session_id: hermes-thread-session-123\\n");
`,
"utf8",
);
const previousEnv = {
BOSS_HERMES_ENABLED: process.env.BOSS_HERMES_ENABLED,
BOSS_HERMES_COMMAND: process.env.BOSS_HERMES_COMMAND,
BOSS_HERMES_ARGS: process.env.BOSS_HERMES_ARGS,
BOSS_HERMES_TIMEOUT_MS: process.env.BOSS_HERMES_TIMEOUT_MS,
};
process.env.BOSS_HERMES_ENABLED = "true";
process.env.BOSS_HERMES_COMMAND = process.execPath;
process.env.BOSS_HERMES_ARGS = hermesScriptPath;
process.env.BOSS_HERMES_TIMEOUT_MS = "1000";
try {
await updateProjectAgentControls(
singleProject.id,
{
backendOverride: "hermes-runtime",
},
"17600003315",
);
const response = await postMessageRoute(
await createAuthedRequest(
`http://127.0.0.1:3000/api/v1/projects/${singleProject.id}/messages`,
"POST",
{ body: "请让 Hermes 真正回复当前线程" },
),
{ params: Promise.resolve({ projectId: singleProject.id }) },
);
assert.equal(response.status, 200);
const queuedState = await readState();
const task = queuedState.masterAgentTasks.find(
(item) =>
item.taskType === "conversation_reply" &&
item.projectId === singleProject.id &&
item.requestText === "请让 Hermes 真正回复当前线程",
);
assert.ok(task, "expected a queued Hermes conversation task");
await waitFor(async () => {
const state = await readState();
const currentTask = state.masterAgentTasks.find((item) => item.taskId === task?.taskId);
return currentTask?.status === "completed";
});
const nextState = await readState();
const completedTask = nextState.masterAgentTasks.find((item) => item.taskId === task?.taskId);
assert.equal(completedTask?.status, "completed");
assert.match(completedTask?.replyBody ?? "", /Hermes 线程已接管:/);
assert.equal(completedTask?.sessionId, "hermes-thread-session-123");
const updatedProject = nextState.projects.find((project) => project.id === singleProject.id);
const mirroredReply = updatedProject?.messages.find((message) =>
message.body.includes("Hermes 线程已接管:"),
);
assert.ok(mirroredReply, "expected Hermes reply to be written back to the thread project");
assert.equal(mirroredReply?.sender, "device");
} finally {
process.env.BOSS_HERMES_ENABLED = previousEnv.BOSS_HERMES_ENABLED;
process.env.BOSS_HERMES_COMMAND = previousEnv.BOSS_HERMES_COMMAND;
process.env.BOSS_HERMES_ARGS = previousEnv.BOSS_HERMES_ARGS;
process.env.BOSS_HERMES_TIMEOUT_MS = previousEnv.BOSS_HERMES_TIMEOUT_MS;
await rm(hermesDir, { recursive: true, force: true });
}
});
test("ordinary thread Hermes async execution blocks leaked environment diagnostics from the chat transcript", async () => {
await setup();
const singleProject = await ensureSingleThreadProject("single-thread-hermes-env-test");
assert.ok(singleProject, "expected a seeded single-thread project");
const hermesDir = await mkdtemp(path.join(os.tmpdir(), "boss-thread-hermes-env-"));
const hermesScriptPath = path.join(hermesDir, "hermes-thread-env-runtime.mjs");
await writeFile(
hermesScriptPath,
`
process.stdout.write("我不能直接把当前会话环境从只读改回可写也不能替你修改这层运行配置。cwd 我可以在命令里指向 /Users/kris/code/gptpluscontrol。\\n\\n");
process.stdout.write("session_id: hermes-thread-env-123\\n");
`,
"utf8",
);
const previousEnv = {
BOSS_HERMES_ENABLED: process.env.BOSS_HERMES_ENABLED,
BOSS_HERMES_COMMAND: process.env.BOSS_HERMES_COMMAND,
BOSS_HERMES_ARGS: process.env.BOSS_HERMES_ARGS,
BOSS_HERMES_TIMEOUT_MS: process.env.BOSS_HERMES_TIMEOUT_MS,
};
process.env.BOSS_HERMES_ENABLED = "true";
process.env.BOSS_HERMES_COMMAND = process.execPath;
process.env.BOSS_HERMES_ARGS = hermesScriptPath;
process.env.BOSS_HERMES_TIMEOUT_MS = "1000";
try {
await updateProjectAgentControls(
singleProject.id,
{
backendOverride: "hermes-runtime",
},
"17600003315",
);
const response = await postMessageRoute(
await createAuthedRequest(
`http://127.0.0.1:3000/api/v1/projects/${singleProject.id}/messages`,
"POST",
{ body: "请继续推进当前线程" },
),
{ params: Promise.resolve({ projectId: singleProject.id }) },
);
assert.equal(response.status, 200);
const queuedState = await readState();
const task = queuedState.masterAgentTasks.find(
(item) =>
item.taskType === "conversation_reply" &&
item.projectId === singleProject.id &&
item.requestText === "请继续推进当前线程",
);
assert.ok(task, "expected a queued Hermes conversation task");
await waitFor(async () => {
const state = await readState();
const currentTask = state.masterAgentTasks.find((item) => item.taskId === task?.taskId);
return currentTask?.status === "failed";
});
const nextState = await readState();
const failedTask = nextState.masterAgentTasks.find((item) => item.taskId === task?.taskId);
assert.equal(failedTask?.status, "failed");
assert.match(failedTask?.errorMessage ?? "", /THREAD_ENVIRONMENT_INVALID/);
const updatedProject = nextState.projects.find((project) => project.id === singleProject.id);
const leakedReply = updatedProject?.messages.find((message) =>
message.body.includes("当前会话环境从只读改回可写"),
);
assert.equal(leakedReply, undefined);
const opsNotice = updatedProject?.messages.find((message) =>
message.body.includes("线程环境异常,请重新绑定到正确项目或工作目录后再试。"),
);
assert.ok(opsNotice, "expected a user-facing system notice instead of raw environment diagnostics");
} finally {
process.env.BOSS_HERMES_ENABLED = previousEnv.BOSS_HERMES_ENABLED;
process.env.BOSS_HERMES_COMMAND = previousEnv.BOSS_HERMES_COMMAND;
process.env.BOSS_HERMES_ARGS = previousEnv.BOSS_HERMES_ARGS;
process.env.BOSS_HERMES_TIMEOUT_MS = previousEnv.BOSS_HERMES_TIMEOUT_MS;
await rm(hermesDir, { recursive: true, force: true });
}
});
test("POST /api/v1/projects/[projectId]/messages blocks single-thread sends when the target device prefers gui mode", async () => {
await setup();
const singleProject = await ensureSingleThreadProject();
@@ -389,3 +743,73 @@ test("POST /api/v1/master-agent/tasks/[taskId]/complete blocks leaked thread env
);
assert.ok(opsNotice, "expected a user-facing system notice instead of raw environment diagnostics");
});
test("POST /api/v1/master-agent/tasks/[taskId]/complete persists remote warnings onto execution warning records", async () => {
await setup();
const singleProject = await ensureSingleThreadProject("single-thread-warning-test");
assert.ok(singleProject, "expected a seeded single-thread project");
await postMessageRoute(
await createAuthedRequest(
`http://127.0.0.1:3000/api/v1/projects/${singleProject.id}/messages`,
"POST",
{ body: "请同步当前线程的风险点" },
),
{ params: Promise.resolve({ projectId: singleProject.id }) },
);
const queuedState = await readState();
const task = queuedState.masterAgentTasks.find(
(item) =>
item.taskType === "conversation_reply" &&
item.projectId === singleProject.id &&
item.targetProjectId === singleProject.id &&
item.requestText === "请同步当前线程的风险点",
);
assert.ok(task, "expected a queued conversation_reply task");
const response = await completeMasterTaskRoute(
await createAuthedRequest(
`http://127.0.0.1:3000/api/v1/master-agent/tasks/${task.taskId}/complete`,
"POST",
{
deviceId: task.deviceId,
status: "completed",
targetProjectId: singleProject.id,
targetThreadId: singleProject.threadMeta.threadId,
requestId: "req-thread-warning-1",
warnings: [
{
title: "上下文接近上限",
summary: "本轮回复过长,建议尽快压缩。",
},
{
title: " ",
summary: " ",
},
],
replyBody: "当前风险点已同步。",
},
),
{ params: Promise.resolve({ taskId: task.taskId }) },
);
assert.equal(response.status, 200);
const nextState = await readState();
const warnings = nextState.threadExecutionWarnings.filter((warning) => warning.taskId === task.taskId);
assert.deepEqual(warnings, [
{
warningId: warnings[0]?.warningId,
taskId: task.taskId,
requestMessageId: task.requestMessageId,
projectId: singleProject.id,
targetProjectId: singleProject.id,
targetThreadId: singleProject.threadMeta.threadId,
sessionId: undefined,
requestId: "req-thread-warning-1",
title: "上下文接近上限",
summary: "本轮回复过长,建议尽快压缩。",
createdAt: warnings[0]?.createdAt,
},
]);
});