feat: interrupt canceled codex app-server turns

This commit is contained in:
AI Bot
2026-06-03 13:12:23 +08:00
parent 142fb2a4b3
commit 13201e6aee
11 changed files with 517 additions and 1 deletions

View File

@@ -164,6 +164,7 @@
- 第二十六批另补 `runtimeEventSummary / extensionEventSummary / threadLifecycleEventSummary` 运行事件、扩展事件和线程生命周期事件能力摘要:设备详情页会显示 process output/exited、raw response completed、skills changed、plugin installed、thread started/closed/archived/unarchived/name updated 等能力分组;这些字段只读,不在 heartbeat 中主动触发进程、插件、Skill 或线程生命周期动作。
- 第二十七批另补 `streamDeltaEventSummary` 流式增量事件能力摘要:设备详情页会显示 agent delta、plan delta、reasoning delta、MCP progress、command output、terminal interaction 和 file output 等能力分组;该字段只读,不保存原始增量文本、命令输出、推理正文或文件输出。
- 当前任务执行态也已补 `executionProgress.streamEvents`App Server runner 会把 agent / plan / reasoning / MCP / command / terminal / file 的流式 delta 归一成计数Android 进度卡展示“流式增量”,不保存或渲染原始 delta、命令输出、终端输入、推理正文或文件输出。
- 当前 App Server 任务取消已从“服务端标记”升级为“真实 turn 中断”:`POST /api/v1/master-agent/tasks/[taskId]/cancel` 仍负责把任务置为 `canceled`,新增 `GET /api/v1/master-agent/tasks/[taskId]/control-state` 供设备端轮询;`local-agent` 在 App Server turn 启动后会按取消状态调用 `turn/interrupt`,并把 `interrupted` 作为干净取消处理,避免取消后长任务继续跑或被误写成失败日志。
- 当前 boss-agent 已支持 Mac OTA`local-agent/boss-agent-ota-runner.mjs` 默认开启,每 5 分钟检查服务端最新包;状态页可手动检查或下载并安装,安装时保留原绑定配置,只更新版本号和本机 runtime 路径。最新验证版本为 `20260516221619`,已在 MacBook Air `macbook-air` 上确认 OTA 下载校验、暂存、覆盖安装后不会误切到默认 `config.cloud.json`。正式分发脚本已预留 Developer ID 公证路径:`BOSS_AGENT_NOTARIZE=1` 配合 notary profile 或 Apple ID 凭据。
- 当前量产治理已补设备撤权和任务可靠性底座:`revoke_device` 会清空设备 token、标记离线并阻断 heartbeat / 任务认领 / Skill 同步 / 日志上报 / boss-agent OTA`MasterAgentTask` claim 会记录 attempt 和 lease运行中任务可按租约重试超过上限转 `timed_out`,用户或管理员可通过 cancel 接口转 `canceled` 且迟到 complete 不覆盖终态。
- 当前群聊 `dispatch_execution` 完成回写已补幂等,重复完成不会再向群聊重复追加结果

View File

@@ -1249,6 +1249,14 @@
- 权限:任务请求账号、`highest_admin`,或具备目标设备 `device.manage` 的账号
- 当前行为:任务转为 `canceled`,写入 `canceledAt / canceledBy / cancelReason`,清除租约;如果之后设备端迟到回写成功,服务端不会覆盖取消终态
#### `GET /api/v1/master-agent/tasks/[taskId]/control-state`
- 用途:设备端在执行中轮询任务控制状态,用于把 APP / Web 的取消动作同步到正在运行的本地 runtime
- 权限:目标设备 token或具备目标设备写权限的会话
- 返回:`taskId / status / canceled / cancelReason / canceledAt`
- 安全边界:不返回 `requestText / executionPrompt / sourceMessageBody`,避免把用户消息、内部提示词或调度字段泄露给设备外的调用方
- 当前 App Server 行为:`local-agent``turn/start` 后轮询该接口;如果返回 `canceled=true`,会调用当前 App Server 连接的 `turn/interrupt`,把 Codex 活跃 turn 真实中断
#### `GET /api/v1/integrations/telegram`
- 用途:读取 Telegram Bot 接入配置

View File

@@ -147,6 +147,7 @@ UI 参考:
- 新增实时进度入口 `POST /api/v1/master-agent/tasks/[taskId]/progress`,设备端可在任务执行中持续刷新同一张 `execution_progress` 卡;`local-agent` 的 App Server runner 已在收到协议进度事件时调用该接口complete 仍携带最终进度作为兜底
- 新增服务端线程协作入口 `POST /api/v1/projects/[projectId]/thread-collaboration`,由 Boss 校验源/目标项目权限并创建 `intentCategory=thread_collaboration``conversation_reply` 任务;设备端继续通过 App Server runner 执行 `thread/read -> thread/inject_items -> turn/start`,避免把“线程互通”误做成无监管 P2P
- 新增活跃 turn 干预:任务携带 `targetCodexTurnId` / `targetTurnId`App Server runner 会调用 `turn/steer`,并把 `turnControl=steer``turnId` 写回执行结果;没有活跃 turn id 时仍使用 `turn/start`
- 新增活跃 turn 中断:用户或管理员通过任务取消接口把任务转为 `canceled` 后,设备端会轮询 `GET /api/v1/master-agent/tasks/[taskId]/control-state`;如果当前任务已经启动 App Server turnrunner 会在同一个 JSON-RPC 连接上调用 `turn/interrupt`,并把返回的 `interrupted` 处理成干净取消,不再把用户主动取消误判成 runtime failure
- `getCodexAppServerRunnerConfig` 已识别 `codexAppServerTransport` / `BOSS_CODEX_APP_SERVER_TRANSPORT``codexAppServerUrl` / `BOSS_CODEX_APP_SERVER_URL``codexAppServerAuthTokenFile` / `BOSS_CODEX_APP_SERVER_AUTH_TOKEN_FILE``local-agent/codex-app-server-runner.mjs` 现已支持 `stdio``ws://127.0.0.1:<port>``unix://PATH` 三种 JSON-RPC transport默认仍是 stdiows/unix 适合作为同机长驻 App Server 灰度路径
- 新增 App Server 过载退避:单个 JSON-RPC 请求收到 `-32001``retry later` 文案时,会在同一个任务生命周期内重试,超出上限后才进入失败/CLI fallback 判定
- 新增 App Server capability discovery`local-agent` 会把可用模型、默认/快速/深度模型建议、provider 能力、Skill、Plugin、App 摘要写入设备 heartbeatWeb 设备详情已显示 App Server、模型和扩展数量为后续 APP/后台模型配置页提供真实数据来源

View File

@@ -267,6 +267,7 @@ cd /Users/kris/code/boss
- 当前 `local-agent` 会在 Codex 任务执行中和完成时回传 `executionProgress`:服务端把同一任务的进度卡从 queued / running 更新到 completed / failedAndroid 原生聊天页会显示“进度 / 线程状态 / 实时状态 / 线程配置 / 线程协作 / 工具活动 / 思考摘要 / 账号状态 / 运行状态 / 安全提醒 / 审批状态 / 文件变更 / 分支详情 / 生成结果 / 后台智能体”。2026-05-31 起Codex App Server 的 `turn/plan/updated``turn/diff/updated``item/started|completed``thread/started` 会直接映射为进度步骤、变更统计、生成产物和后台智能体;第二批已把 `item/*/requestApproval``item/autoApprovalReview/*``guardianWarning``serverRequest/resolved``item/fileChange/patchUpdated` 映射为审批、安全提醒和文件变更摘要;第三批已把 `thread/status/changed``thread/realtime/*` 安全映射为线程状态和实时状态摘要;第四批已把 `model/rerouted``thread/tokenUsage/updated``mcpServer/startupStatus/updated``remoteControl/status/changed` 安全映射为运行状态摘要;第五批已把 `thread/goal/*``thread/settings/updated``thread/compacted` 映射为线程配置摘要;第六批已把 `account/updated``account/rateLimits/updated``model/verification``warning``configWarning``deprecationNotice` 映射为账号状态、模型校验和安全提醒摘要;第七批已把 `ThreadItem.collabToolCall``ThreadItem.contextCompaction` 映射为线程协作和上下文压缩摘要;第八批已把 `mcpToolCall``dynamicToolCall``webSearch``imageView``enteredReviewMode``exitedReviewMode``commandExecution` 映射为工具活动摘要;第九批已把 `ThreadItem.plan``ThreadItem.reasoning.summary` 映射为计划步骤与思考摘要;第十批已把 `ThreadItem.imageGeneration` 映射为图像生成工具活动和图片产物;第十一批已把 `hook/started|completed` 映射为钩子生命周期工具活动;第十二批已把 `windowsSandbox/setupCompleted` 映射为 Windows 沙箱准备状态摘要;第十七批已把新版 `ThreadItem.collabToolCall.receiverThreadIds / agentsStates` 安全映射为线程协作目标数量和 agent 状态集合。所有进度均通过 `POST /api/v1/master-agent/tasks/[taskId]/progress` 实时刷新;字段白名单会剥离 cwd、turnId、配置文件路径、内部 prompt、collab 源/目标线程 ID、receiverThreadIds、agentsStates 私有消息、共享 Skill 根绝对路径、tool arguments/result、web URL token、命令正文/输出、raw reasoning content、reasoning item id、图像生成 revisedPrompt/result、hook sourcePath/statusMessage/entries、Windows sandbox sourcePath/samplePaths/本地绝对路径和未清洗密钥complete 回写仍会携带最终进度兜底
- 当前 `local-agent` heartbeat 已新增 Codex App Server capability discovery按 TTL 拉取模型、provider 能力、Skill、Hook、Plugin、App 摘要,并附加只读线程操作、插件治理、账号治理、配置治理、文件治理、命令会话、外部 Agent 迁移、Marketplace、实验特性、审查、Windows 沙箱、文件搜索事件、MCP、用户交互、Guardian、运行事件、扩展事件、线程生命周期和流式增量能力 catalog写入 `capabilities.codexAppServer.metadata`Web 设备详情会展示 App Server 连接状态、模型数量、默认/快速/深度模型、扩展数量、Hook 治理摘要、线程操作摘要、插件治理摘要、账号治理摘要、配置治理摘要、文件治理摘要、命令会话摘要、迁移治理摘要、市场治理摘要、实验特性治理摘要、审查治理摘要、Windows 沙箱摘要、文件搜索事件摘要、MCP 治理摘要、用户交互摘要、Guardian 治理摘要、运行事件摘要、扩展事件摘要、线程生命周期摘要和流式增量摘要
- 当前 `MasterAgentTask` 已具备服务端租约和取消基础状态机claim 会写入 `attemptCount / maxAttempts / leaseExpiresAt`,运行中任务租约过期后可被重新认领,超过重试上限会转 `timed_out``POST /api/v1/master-agent/tasks/[taskId]/cancel` 会把任务转 `canceled`,迟到的成功 complete 不会覆盖终态
- 当前 App Server 执行中的任务取消已补真实中断链路:服务端新增 `GET /api/v1/master-agent/tasks/[taskId]/control-state` 给设备端按 token 轮询取消状态;`local-agent` 在 turn 启动后按 `masterAgentInterruptPollIntervalMs` 检查该接口,发现任务已取消会在同一个 App Server 连接上调用 `turn/interrupt`,并把 interrupted 作为干净取消结果处理,不再等长任务自然结束或把取消刷成失败日志
- 当前 `local-agent``browser_control / desktop_control` 已从占位骨架升级成外部 runtime 桥:当本机配置了 `browserControlEnabled + browserControlCommand``computerUseEnabled + computerUseCommand` 时,会把标准化 JSON 请求透传给外部进程,并解析单行 JSON 结果;未启用时会 fail closed返回明确的 runtime disabled 错误,不再假装执行成功
- 远程电脑控制链路当前已有可复用压测基线:`npm run stress:remote-control` 可按参数压测 `local-agent -> MasterAgentTask -> browser_control / desktop_control runtime -> complete 回写` 全链路;`npm run stress:remote-control:ci` 固定 120 条链路任务和 360 条 runtime 并发任务,并用 p95 延迟预算判断是否退化。压测报告可通过 `--report-json=PATH` 落盘,便于后续接入真实 macOS AX / Windows UIA helper 后复用同一套稳定性判断。
- 当前历史脏群如果不再包含真实线程成员,群聊消息不会再表现成“无响应”;服务端会在群内追加明确 `system_notice`,提示先重新添加线程成员

View File

@@ -72,6 +72,11 @@ function normalizePositiveInteger(value, fallback) {
return Number.isFinite(numeric) && numeric > 0 ? Math.floor(numeric) : fallback;
}
function normalizeNonNegativeInteger(value, fallback) {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric >= 0 ? Math.floor(numeric) : fallback;
}
function boolFromConfigOrEnv(configValue, envValue, fallback) {
if (configValue === true || configValue === false) {
return configValue;
@@ -2809,6 +2814,9 @@ export async function executeCodexAppServerTask(runnerConfig, task) {
const progressCollector = createProgressCollector();
const progressEmits = [];
let lastProgressSnapshotJson = "";
let interruptRequested = false;
let interruptReason = "";
let interruptPollTimer;
const pending = new Map();
const retryTimers = new Set();
let resolveTurnCompleted;
@@ -2836,6 +2844,10 @@ export async function executeCodexAppServerTask(runnerConfig, task) {
const cleanup = () => {
clearTimeout(timeout);
if (interruptPollTimer) {
clearInterval(interruptPollTimer);
interruptPollTimer = undefined;
}
for (const timer of retryTimers) {
clearTimeout(timer);
}
@@ -2984,12 +2996,67 @@ export async function executeCodexAppServerTask(runnerConfig, task) {
const status = message.params?.turn?.status ?? message.params?.status ?? "completed";
if (status === "completed") {
resolveTurnCompleted(message.params ?? {});
} else if (status === "interrupted" && interruptRequested) {
resolveTurnCompleted({ ...(message.params ?? {}), interrupted: true });
} else {
rejectTurnCompleted(new Error(`CODEX_APP_SERVER_TURN_${String(status).toUpperCase()}`));
}
}
};
const startActiveTurnInterruptPolling = ({ threadId, turnId }) => {
if (
!threadId ||
!turnId ||
typeof runnerConfig.shouldInterruptActiveTurn !== "function"
) {
return;
}
const pollIntervalMs = normalizeNonNegativeInteger(
runnerConfig.interruptPollIntervalMs,
750,
);
const checkAndInterrupt = async () => {
if (interruptRequested || turnSettled || closed) {
return;
}
let decision;
try {
decision = await runnerConfig.shouldInterruptActiveTurn({
taskId: task?.taskId,
task,
threadId,
turnId,
});
} catch {
return;
}
const shouldInterrupt =
decision === true ||
decision?.interrupt === true ||
decision?.canceled === true ||
decision?.status === "canceled";
if (!shouldInterrupt || interruptRequested || turnSettled || closed) {
return;
}
interruptRequested = true;
interruptReason = trimToDefined(decision?.reason) || "USER_CANCELED_TASK";
try {
await request("turn/interrupt", { threadId, turnId });
} catch (error) {
rejectTurnCompleted(error);
}
};
void checkAndInterrupt();
if (pollIntervalMs > 0) {
interruptPollTimer = setInterval(() => {
void checkAndInterrupt();
}, pollIntervalMs);
}
};
try {
rpcTransport = await openCodexAppServerTransport(runnerConfig, cwd, {
onLine: handleTransportLine,
@@ -3041,17 +3108,34 @@ export async function executeCodexAppServerTask(runnerConfig, task) {
model: runnerConfig.model,
});
activeTurnStarted = true;
const activeTurnId = trimToDefined(turnResult?.turn?.id) || targetTurnRef;
startActiveTurnInterruptPolling({ threadId, turnId: activeTurnId });
await turnCompleted;
if (progressEmits.length > 0) {
await Promise.allSettled(progressEmits);
}
const normalizedReply = (replyBody || completedMessageText).trim();
if (interruptRequested) {
return {
status: "interrupted",
replyBody: "已按用户要求中断当前 Codex turn。",
threadId,
turnId: activeTurnId,
turnControl: "interrupt",
interruptReason,
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
interThreadBroker,
canFallbackToCli: false,
};
}
return {
status: "completed",
replyBody: normalizedReply,
threadId,
turnId: trimToDefined(turnResult?.turn?.id) || targetTurnRef,
turnId: activeTurnId,
turnControl,
cwd,
transport: runnerConfig.transport,

View File

@@ -525,6 +525,35 @@ async function postMasterAgentTaskProgress(config, runtime, payload) {
};
}
async function fetchMasterAgentTaskControlState(config, runtime, task) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${task.taskId}/control-state`,
{
method: "GET",
headers: {
...deviceTokenHeaders(config, runtime),
},
},
);
if (!response.ok) {
return {
ok: false,
status: response.status,
body: await response.text(),
};
}
return {
ok: true,
status: response.status,
body: await response.json(),
};
}
function normalizeInterruptPollIntervalMs(config) {
const value = Number(config.masterAgentInterruptPollIntervalMs ?? 750);
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : 750;
}
async function claimSkillLifecycleRequest(config, runtime) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skill-requests/claim`,
@@ -793,6 +822,20 @@ async function runMasterAgentTask(config, runtime, task) {
const appServerResult = await executeCodexAppServerTask(
{
...codexAppServerRunner,
interruptPollIntervalMs: normalizeInterruptPollIntervalMs(config),
shouldInterruptActiveTurn: async () => {
const controlState = await fetchMasterAgentTaskControlState(config, runtime, task);
if (!controlState.ok) {
return false;
}
if (controlState.body?.canceled === true || controlState.body?.status === "canceled") {
return {
interrupt: true,
reason: controlState.body?.cancelReason || "USER_CANCELED_TASK",
};
}
return false;
},
onProgress: async (executionProgress) => {
const progressResult = await postMasterAgentTaskProgress(config, runtime, {
taskId: task.taskId,
@@ -813,6 +856,15 @@ async function runMasterAgentTask(config, runtime, task) {
},
task,
);
if (appServerResult.status === "interrupted") {
return {
interruptedCompletion: {
replyBody: appServerResult.replyBody,
interruptReason: appServerResult.interruptReason,
executionProgress: appServerResult.executionProgress,
},
};
}
if (appServerResult.status === "completed") {
const localExecutionProgress = await collectLocalExecutionProgress(
appServerResult.cwd || task.targetCodexFolderRef || config.masterAgentWorkdir || process.cwd(),
@@ -976,6 +1028,23 @@ async function runMasterAgentTask(config, runtime, task) {
});
return;
}
if (executionResult.interruptedCompletion) {
runtime.activeMasterTask = {
taskId: task.taskId,
status: "canceled",
completedAt: new Date().toISOString(),
detail: executionResult.interruptedCompletion.replyBody,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.codex_app_server_turn_interrupted",
message: `Master Codex Node 已按取消状态中断 Codex App Server turn${task.taskId}`,
detail: executionResult.interruptedCompletion.interruptReason,
mirrorToMaster: false,
});
return;
}
const { replyBody, dispatchExecutionCompletion, executionProgress } = executionResult;
const completion = await completeMasterAgentTask(

View File

@@ -0,0 +1,29 @@
import { NextRequest } from "next/server";
import { authorizeDeviceWriteRequest } from "@/lib/boss-device-auth";
import { getMasterAgentTask } from "@/lib/boss-data";
import { jsonNoStore } from "@/lib/api-response";
export async function GET(
request: NextRequest,
context: { params: Promise<{ taskId: string }> },
) {
const { taskId } = await context.params;
const task = await getMasterAgentTask(taskId);
if (!task) {
return jsonNoStore({ ok: false, message: "MASTER_AGENT_TASK_NOT_FOUND" }, { status: 404 });
}
const auth = await authorizeDeviceWriteRequest(request, task.deviceId);
if (!auth.ok) {
return jsonNoStore({ ok: false, message: "FORBIDDEN" }, { status: 403 });
}
return jsonNoStore({
ok: true,
taskId: task.taskId,
status: task.status,
canceled: task.status === "canceled",
cancelReason: task.cancelReason,
canceledAt: task.canceledAt,
});
}

View File

@@ -7,6 +7,7 @@ const received = [];
let injectedItems = [];
let extraSkillRoots = [];
let overloadedTurnStartEmitted = false;
let interruptibleTurn = null;
function send(message) {
process.stdout.write(`${JSON.stringify(message)}\n`);
@@ -645,6 +646,13 @@ rl.on("line", (line) => {
if (process.env.BOSS_CODEX_APP_SERVER_FIXTURE_EXIT_AFTER_TURN_START === "1") {
process.exit(0);
}
if (process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT === "1") {
interruptibleTurn = {
threadId: message.params?.threadId,
turnId: "turn-fixture",
};
return;
}
if (process.env.BOSS_CODEX_APP_SERVER_FIXTURE_EMIT_PROGRESS === "1") {
send({
method: "turn/plan/updated",
@@ -1503,6 +1511,25 @@ rl.on("line", (line) => {
return;
}
if (message.method === "turn/interrupt") {
send({
id: message.id,
result: {},
});
send({
method: "turn/completed",
params: {
threadId: message.params?.threadId ?? interruptibleTurn?.threadId,
turn: {
id: message.params?.turnId ?? interruptibleTurn?.turnId,
status: "interrupted",
},
},
});
process.stderr.write(`${JSON.stringify({ received })}\n`);
return;
}
if (message.method === "turn/steer") {
const text = message.params?.input?.find?.((item) => item?.type === "text")?.text ?? "";
send({

View File

@@ -1284,6 +1284,54 @@ test("codex app-server runner steers an active turn when a target turn id is pre
}
});
test("codex app-server runner interrupts the active turn when the task is canceled while running", async () => {
const previous = process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT;
process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT = "1";
try {
const interruptChecks = [];
const runnerConfig = getCodexAppServerRunnerConfig(process.env, {
codexAppServerEnabled: true,
codexAppServerCommand: process.execPath,
codexAppServerArgs: ["tests/fixtures/codex-app-server-runtime.mjs"],
codexAppServerWorkdir: repoRoot,
codexAppServerTimeoutMs: 5000,
});
const result = await executeCodexAppServerTask(
{
...runnerConfig,
interruptPollIntervalMs: 10,
shouldInterruptActiveTurn: async (activeTurn) => {
interruptChecks.push(activeTurn);
return activeTurn.taskId === "task-turn-interrupt";
},
},
{
taskId: "task-turn-interrupt",
taskType: "conversation_reply",
targetCodexThreadRef: "active-thread-2",
targetCodexFolderRef: repoRoot,
executionPrompt: "继续执行一个可取消的长任务",
},
);
assert.equal(result.status, "interrupted");
assert.equal(result.threadId, "active-thread-2");
assert.equal(result.turnId, "turn-fixture");
assert.equal(result.turnControl, "interrupt");
assert.equal(result.replyBody, "已按用户要求中断当前 Codex turn。");
assert.equal(interruptChecks[0]?.taskId, "task-turn-interrupt");
assert.equal(interruptChecks[0]?.threadId, "active-thread-2");
assert.equal(interruptChecks[0]?.turnId, "turn-fixture");
} finally {
if (previous === undefined) {
delete process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT;
} else {
process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT = previous;
}
}
});
test("codex app-server config exposes ws transport without mutating stdio defaults", () => {
const previousTransport = process.env.BOSS_CODEX_APP_SERVER_TRANSPORT;
const previousUrl = process.env.BOSS_CODEX_APP_SERVER_URL;

View File

@@ -242,3 +242,151 @@ process.exit(0);
await rm(runtimeRoot, { recursive: true, force: true });
}
});
test("local-agent interrupts an active Codex App Server turn after server-side task cancellation", async () => {
const runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-local-agent-app-server-interrupt-"));
const skillsDir = path.join(runtimeRoot, "skills");
const projectDir = path.join(runtimeRoot, "project");
await mkdir(skillsDir, { recursive: true });
await mkdir(projectDir, { recursive: true });
const completeBodies = [];
const controlStateBodies = [];
let claimCount = 0;
const controlPlane = createServer(async (request, response) => {
const url = request.url || "";
if (request.method === "POST" && url === "/api/v1/master-agent/tasks/claim") {
claimCount += 1;
await readJsonBody(request);
response.writeHead(200, { "content-type": "application/json" });
response.end(
JSON.stringify({
ok: true,
task:
claimCount === 1
? {
taskId: "conversation-app-server-interrupt-task",
taskType: "conversation_reply",
projectId: "app-server-project",
requestMessageId: "msg-app-server-interrupt",
requestText: "继续长任务",
executionPrompt: "执行一个可以被取消的长任务",
requestedByAccount: "krisolo",
targetCodexThreadRef: "019d-app-server-thread",
targetCodexFolderRef: projectDir,
requestedAt: "2026-05-16T10:00:00.000Z",
}
: null,
}),
);
return;
}
if (
request.method === "GET" &&
url === "/api/v1/master-agent/tasks/conversation-app-server-interrupt-task/control-state"
) {
controlStateBodies.push({ at: Date.now() });
response.writeHead(200, { "content-type": "application/json" });
response.end(
JSON.stringify({
ok: true,
taskId: "conversation-app-server-interrupt-task",
status: "canceled",
canceled: true,
cancelReason: "用户取消当前 Codex turn",
}),
);
return;
}
if (
request.method === "POST" &&
url === "/api/v1/master-agent/tasks/conversation-app-server-interrupt-task/complete"
) {
completeBodies.push(await readJsonBody(request));
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true }));
return;
}
if (request.method === "POST" && url === "/api/device-heartbeat") {
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true, token: "server-token" }));
return;
}
if (request.method === "POST" && url === "/api/v1/app-logs") {
await readJsonBody(request);
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true }));
return;
}
if (request.method === "POST" && url === "/api/v1/devices/mac-studio/skills") {
response.writeHead(200, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: true }));
return;
}
response.writeHead(404, { "content-type": "application/json" });
response.end(JSON.stringify({ ok: false, message: "not_found", url }));
});
const controlPort = await listen(controlPlane);
const agentServer = createServer();
const agentPort = await listen(agentServer);
await new Promise((resolve) => agentServer.close(resolve));
const configPath = path.join(runtimeRoot, "local-agent-config.json");
await writeFile(
configPath,
JSON.stringify({
port: agentPort,
bindHost: "127.0.0.1",
controlPlaneUrl: `http://127.0.0.1:${controlPort}`,
deviceId: "mac-studio",
token: "local-token",
name: "Mac Studio",
account: "krisolo",
status: "online",
codexSessionDiscoveryEnabled: false,
skillsDir,
masterAgentEnabled: true,
masterAgentPollIntervalMs: 60_000,
masterAgentInterruptPollIntervalMs: 10,
heartbeatIntervalMs: 60_000,
skillLifecycleEnabled: false,
masterAgentWorkdir: projectDir,
masterAgentModel: "gpt-5.4",
codexAppServerEnabled: true,
codexAppServerCommand: process.execPath,
codexAppServerArgs: ["tests/fixtures/codex-app-server-runtime.mjs"],
codexAppServerWorkdir: repoRoot,
codexAppServerTimeoutMs: 5000,
}),
"utf8",
);
const previous = process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT;
process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT = "1";
const child = spawn(process.execPath, ["local-agent/server.mjs", configPath], {
cwd: repoRoot,
env: {
...process.env,
},
stdio: ["ignore", "pipe", "pipe"],
});
try {
await waitFor(() => controlStateBodies.length > 0);
await new Promise((resolve) => setTimeout(resolve, 250));
assert.equal(completeBodies.length, 0);
} finally {
if (previous === undefined) {
delete process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT;
} else {
process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT = previous;
}
child.kill("SIGTERM");
await new Promise((resolve) => {
child.once("close", resolve);
}).catch(() => null);
controlPlane.close();
await rm(runtimeRoot, { recursive: true, force: true });
}
});

View File

@@ -0,0 +1,100 @@
import test from "node:test";
import assert from "node:assert/strict";
import os from "node:os";
import path from "node:path";
import { mkdtemp, rm } from "node:fs/promises";
import { NextRequest } from "next/server";
let runtimeRoot = "";
let data: typeof import("../src/lib/boss-data.ts");
let getControlState: (typeof import("../src/app/api/v1/master-agent/tasks/[taskId]/control-state/route.ts"))["GET"];
async function setup() {
if (runtimeRoot) return;
runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-master-task-control-state-"));
process.env.BOSS_RUNTIME_ROOT = runtimeRoot;
process.env.BOSS_STATE_FILE = path.join(runtimeRoot, "boss-state.json");
const [dataModule, routeModule] = await Promise.all([
import("../src/lib/boss-data.ts"),
import("../src/app/api/v1/master-agent/tasks/[taskId]/control-state/route.ts"),
]);
data = dataModule;
getControlState = routeModule.GET;
}
test.beforeEach(async () => {
await setup();
await rm(runtimeRoot, { recursive: true, force: true });
});
test.after(async () => {
if (runtimeRoot) await rm(runtimeRoot, { recursive: true, force: true });
});
test("GET task control-state lets the owning device see cancellation without exposing prompt text", async () => {
const task = await data.queueMasterAgentTask({
taskId: "control-state-task",
projectId: "master-agent",
taskType: "conversation_reply",
requestMessageId: "msg-control-state",
requestText: "private user request should not leak",
executionPrompt: "private execution prompt should not leak",
requestedBy: "krisolo",
requestedByAccount: "krisolo",
deviceId: "mac-studio",
targetCodexThreadRef: "thread-control-state",
});
await data.claimNextMasterAgentTask("mac-studio");
await data.cancelMasterAgentTask({
taskId: task.taskId,
actorAccount: "krisolo",
reason: "用户在 APP 取消了当前任务",
});
const response = await getControlState(
new NextRequest(`http://127.0.0.1:3000/api/v1/master-agent/tasks/${task.taskId}/control-state`, {
method: "GET",
headers: {
"x-boss-device-token": "boss-mac-studio-token",
},
}),
{ params: Promise.resolve({ taskId: task.taskId }) },
);
const payload = await response.json();
assert.equal(response.status, 200);
assert.equal(payload.ok, true);
assert.equal(payload.taskId, "control-state-task");
assert.equal(payload.status, "canceled");
assert.equal(payload.canceled, true);
assert.equal(payload.cancelReason, "用户在 APP 取消了当前任务");
assert.equal(JSON.stringify(payload).includes("private user request"), false);
assert.equal(JSON.stringify(payload).includes("private execution prompt"), false);
});
test("GET task control-state rejects a different device token", async () => {
const task = await data.queueMasterAgentTask({
taskId: "control-state-forbidden-task",
projectId: "master-agent",
taskType: "conversation_reply",
requestMessageId: "msg-control-state-forbidden",
requestText: "继续",
executionPrompt: "继续",
requestedBy: "krisolo",
requestedByAccount: "krisolo",
deviceId: "mac-studio",
});
const response = await getControlState(
new NextRequest(`http://127.0.0.1:3000/api/v1/master-agent/tasks/${task.taskId}/control-state`, {
method: "GET",
headers: {
"x-boss-device-token": "wrong-token",
},
}),
{ params: Promise.resolve({ taskId: task.taskId }) },
);
assert.equal(response.status, 403);
});