diff --git a/docs/architecture/ai_handoff_index_cn.md b/docs/architecture/ai_handoff_index_cn.md index 06a782b..7064caf 100644 --- a/docs/architecture/ai_handoff_index_cn.md +++ b/docs/architecture/ai_handoff_index_cn.md @@ -164,6 +164,7 @@ - 第二十六批另补 `runtimeEventSummary / extensionEventSummary / threadLifecycleEventSummary` 运行事件、扩展事件和线程生命周期事件能力摘要:设备详情页会显示 process output/exited、raw response completed、skills changed、plugin installed、thread started/closed/archived/unarchived/name updated 等能力分组;这些字段只读,不在 heartbeat 中主动触发进程、插件、Skill 或线程生命周期动作。 - 第二十七批另补 `streamDeltaEventSummary` 流式增量事件能力摘要:设备详情页会显示 agent delta、plan delta、reasoning delta、MCP progress、command output、terminal interaction 和 file output 等能力分组;该字段只读,不保存原始增量文本、命令输出、推理正文或文件输出。 - 当前任务执行态也已补 `executionProgress.streamEvents`:App Server runner 会把 agent / plan / reasoning / MCP / command / terminal / file 的流式 delta 归一成计数,Android 进度卡展示“流式增量”,不保存或渲染原始 delta、命令输出、终端输入、推理正文或文件输出。 +- 当前 App Server 任务取消已从“服务端标记”升级为“真实 turn 中断”:`POST /api/v1/master-agent/tasks/[taskId]/cancel` 仍负责把任务置为 `canceled`,新增 `GET /api/v1/master-agent/tasks/[taskId]/control-state` 供设备端轮询;`local-agent` 在 App Server turn 启动后会按取消状态调用 `turn/interrupt`,并把 `interrupted` 作为干净取消处理,避免取消后长任务继续跑或被误写成失败日志。 - 当前 boss-agent 已支持 Mac OTA:`local-agent/boss-agent-ota-runner.mjs` 默认开启,每 5 分钟检查服务端最新包;状态页可手动检查或下载并安装,安装时保留原绑定配置,只更新版本号和本机 runtime 路径。最新验证版本为 `20260516221619`,已在 MacBook Air `macbook-air` 上确认 OTA 下载校验、暂存、覆盖安装后不会误切到默认 `config.cloud.json`。正式分发脚本已预留 Developer ID 公证路径:`BOSS_AGENT_NOTARIZE=1` 配合 notary profile 或 Apple ID 凭据。 - 当前量产治理已补设备撤权和任务可靠性底座:`revoke_device` 会清空设备 token、标记离线并阻断 heartbeat / 任务认领 / Skill 同步 / 日志上报 / boss-agent OTA;`MasterAgentTask` claim 会记录 attempt 和 lease,运行中任务可按租约重试,超过上限转 `timed_out`,用户或管理员可通过 cancel 接口转 `canceled` 且迟到 complete 不覆盖终态。 - 当前群聊 `dispatch_execution` 完成回写已补幂等,重复完成不会再向群聊重复追加结果 diff --git a/docs/architecture/api_and_service_inventory_cn.md b/docs/architecture/api_and_service_inventory_cn.md index 1d9993d..133376a 100644 --- a/docs/architecture/api_and_service_inventory_cn.md +++ b/docs/architecture/api_and_service_inventory_cn.md @@ -1249,6 +1249,14 @@ - 权限:任务请求账号、`highest_admin`,或具备目标设备 `device.manage` 的账号 - 当前行为:任务转为 `canceled`,写入 `canceledAt / canceledBy / cancelReason`,清除租约;如果之后设备端迟到回写成功,服务端不会覆盖取消终态 +#### `GET /api/v1/master-agent/tasks/[taskId]/control-state` + +- 用途:设备端在执行中轮询任务控制状态,用于把 APP / Web 的取消动作同步到正在运行的本地 runtime +- 权限:目标设备 token,或具备目标设备写权限的会话 +- 返回:`taskId / status / canceled / cancelReason / canceledAt` +- 安全边界:不返回 `requestText / executionPrompt / sourceMessageBody`,避免把用户消息、内部提示词或调度字段泄露给设备外的调用方 +- 当前 App Server 行为:`local-agent` 在 `turn/start` 后轮询该接口;如果返回 `canceled=true`,会调用当前 App Server 连接的 `turn/interrupt`,把 Codex 活跃 turn 真实中断 + #### `GET /api/v1/integrations/telegram` - 用途:读取 Telegram Bot 接入配置 diff --git a/docs/architecture/codex_server_progress_card_cn.md b/docs/architecture/codex_server_progress_card_cn.md index ebc032d..036db06 100644 --- a/docs/architecture/codex_server_progress_card_cn.md +++ b/docs/architecture/codex_server_progress_card_cn.md @@ -147,6 +147,7 @@ UI 参考: - 新增实时进度入口 `POST /api/v1/master-agent/tasks/[taskId]/progress`,设备端可在任务执行中持续刷新同一张 `execution_progress` 卡;`local-agent` 的 App Server runner 已在收到协议进度事件时调用该接口,complete 仍携带最终进度作为兜底 - 新增服务端线程协作入口 `POST /api/v1/projects/[projectId]/thread-collaboration`,由 Boss 校验源/目标项目权限并创建 `intentCategory=thread_collaboration` 的 `conversation_reply` 任务;设备端继续通过 App Server runner 执行 `thread/read -> thread/inject_items -> turn/start`,避免把“线程互通”误做成无监管 P2P - 新增活跃 turn 干预:任务携带 `targetCodexTurnId` / `targetTurnId` 时,App Server runner 会调用 `turn/steer`,并把 `turnControl=steer`、`turnId` 写回执行结果;没有活跃 turn id 时仍使用 `turn/start` +- 新增活跃 turn 中断:用户或管理员通过任务取消接口把任务转为 `canceled` 后,设备端会轮询 `GET /api/v1/master-agent/tasks/[taskId]/control-state`;如果当前任务已经启动 App Server turn,runner 会在同一个 JSON-RPC 连接上调用 `turn/interrupt`,并把返回的 `interrupted` 处理成干净取消,不再把用户主动取消误判成 runtime failure - `getCodexAppServerRunnerConfig` 已识别 `codexAppServerTransport` / `BOSS_CODEX_APP_SERVER_TRANSPORT`、`codexAppServerUrl` / `BOSS_CODEX_APP_SERVER_URL`、`codexAppServerAuthTokenFile` / `BOSS_CODEX_APP_SERVER_AUTH_TOKEN_FILE`;`local-agent/codex-app-server-runner.mjs` 现已支持 `stdio`、`ws://127.0.0.1:` 与 `unix://PATH` 三种 JSON-RPC transport,默认仍是 stdio,ws/unix 适合作为同机长驻 App Server 灰度路径 - 新增 App Server 过载退避:单个 JSON-RPC 请求收到 `-32001` 或 `retry later` 文案时,会在同一个任务生命周期内重试,超出上限后才进入失败/CLI fallback 判定 - 新增 App Server capability discovery:`local-agent` 会把可用模型、默认/快速/深度模型建议、provider 能力、Skill、Plugin、App 摘要写入设备 heartbeat;Web 设备详情已显示 App Server、模型和扩展数量,为后续 APP/后台模型配置页提供真实数据来源 diff --git a/docs/architecture/current_runtime_and_deploy_status_cn.md b/docs/architecture/current_runtime_and_deploy_status_cn.md index a8d9a3c..b56de5b 100644 --- a/docs/architecture/current_runtime_and_deploy_status_cn.md +++ b/docs/architecture/current_runtime_and_deploy_status_cn.md @@ -267,6 +267,7 @@ cd /Users/kris/code/boss - 当前 `local-agent` 会在 Codex 任务执行中和完成时回传 `executionProgress`:服务端把同一任务的进度卡从 queued / running 更新到 completed / failed,Android 原生聊天页会显示“进度 / 线程状态 / 实时状态 / 线程配置 / 线程协作 / 工具活动 / 思考摘要 / 账号状态 / 运行状态 / 安全提醒 / 审批状态 / 文件变更 / 分支详情 / 生成结果 / 后台智能体”。2026-05-31 起,Codex App Server 的 `turn/plan/updated`、`turn/diff/updated`、`item/started|completed`、`thread/started` 会直接映射为进度步骤、变更统计、生成产物和后台智能体;第二批已把 `item/*/requestApproval`、`item/autoApprovalReview/*`、`guardianWarning`、`serverRequest/resolved`、`item/fileChange/patchUpdated` 映射为审批、安全提醒和文件变更摘要;第三批已把 `thread/status/changed` 与 `thread/realtime/*` 安全映射为线程状态和实时状态摘要;第四批已把 `model/rerouted`、`thread/tokenUsage/updated`、`mcpServer/startupStatus/updated`、`remoteControl/status/changed` 安全映射为运行状态摘要;第五批已把 `thread/goal/*`、`thread/settings/updated` 和 `thread/compacted` 映射为线程配置摘要;第六批已把 `account/updated`、`account/rateLimits/updated`、`model/verification`、`warning`、`configWarning`、`deprecationNotice` 映射为账号状态、模型校验和安全提醒摘要;第七批已把 `ThreadItem.collabToolCall` 和 `ThreadItem.contextCompaction` 映射为线程协作和上下文压缩摘要;第八批已把 `mcpToolCall`、`dynamicToolCall`、`webSearch`、`imageView`、`enteredReviewMode`、`exitedReviewMode`、`commandExecution` 映射为工具活动摘要;第九批已把 `ThreadItem.plan` 和 `ThreadItem.reasoning.summary` 映射为计划步骤与思考摘要;第十批已把 `ThreadItem.imageGeneration` 映射为图像生成工具活动和图片产物;第十一批已把 `hook/started|completed` 映射为钩子生命周期工具活动;第十二批已把 `windowsSandbox/setupCompleted` 映射为 Windows 沙箱准备状态摘要;第十七批已把新版 `ThreadItem.collabToolCall.receiverThreadIds / agentsStates` 安全映射为线程协作目标数量和 agent 状态集合。所有进度均通过 `POST /api/v1/master-agent/tasks/[taskId]/progress` 实时刷新;字段白名单会剥离 cwd、turnId、配置文件路径、内部 prompt、collab 源/目标线程 ID、receiverThreadIds、agentsStates 私有消息、共享 Skill 根绝对路径、tool arguments/result、web URL token、命令正文/输出、raw reasoning content、reasoning item id、图像生成 revisedPrompt/result、hook sourcePath/statusMessage/entries、Windows sandbox sourcePath/samplePaths/本地绝对路径和未清洗密钥,complete 回写仍会携带最终进度兜底 - 当前 `local-agent` heartbeat 已新增 Codex App Server capability discovery:按 TTL 拉取模型、provider 能力、Skill、Hook、Plugin、App 摘要,并附加只读线程操作、插件治理、账号治理、配置治理、文件治理、命令会话、外部 Agent 迁移、Marketplace、实验特性、审查、Windows 沙箱、文件搜索事件、MCP、用户交互、Guardian、运行事件、扩展事件、线程生命周期和流式增量能力 catalog,写入 `capabilities.codexAppServer.metadata`;Web 设备详情会展示 App Server 连接状态、模型数量、默认/快速/深度模型、扩展数量、Hook 治理摘要、线程操作摘要、插件治理摘要、账号治理摘要、配置治理摘要、文件治理摘要、命令会话摘要、迁移治理摘要、市场治理摘要、实验特性治理摘要、审查治理摘要、Windows 沙箱摘要、文件搜索事件摘要、MCP 治理摘要、用户交互摘要、Guardian 治理摘要、运行事件摘要、扩展事件摘要、线程生命周期摘要和流式增量摘要 - 当前 `MasterAgentTask` 已具备服务端租约和取消基础状态机:claim 会写入 `attemptCount / maxAttempts / leaseExpiresAt`,运行中任务租约过期后可被重新认领,超过重试上限会转 `timed_out`;`POST /api/v1/master-agent/tasks/[taskId]/cancel` 会把任务转 `canceled`,迟到的成功 complete 不会覆盖终态 +- 当前 App Server 执行中的任务取消已补真实中断链路:服务端新增 `GET /api/v1/master-agent/tasks/[taskId]/control-state` 给设备端按 token 轮询取消状态;`local-agent` 在 turn 启动后按 `masterAgentInterruptPollIntervalMs` 检查该接口,发现任务已取消会在同一个 App Server 连接上调用 `turn/interrupt`,并把 interrupted 作为干净取消结果处理,不再等长任务自然结束或把取消刷成失败日志 - 当前 `local-agent` 对 `browser_control / desktop_control` 已从占位骨架升级成外部 runtime 桥:当本机配置了 `browserControlEnabled + browserControlCommand` 或 `computerUseEnabled + computerUseCommand` 时,会把标准化 JSON 请求透传给外部进程,并解析单行 JSON 结果;未启用时会 fail closed,返回明确的 runtime disabled 错误,不再假装执行成功 - 远程电脑控制链路当前已有可复用压测基线:`npm run stress:remote-control` 可按参数压测 `local-agent -> MasterAgentTask -> browser_control / desktop_control runtime -> complete 回写` 全链路;`npm run stress:remote-control:ci` 固定 120 条链路任务和 360 条 runtime 并发任务,并用 p95 延迟预算判断是否退化。压测报告可通过 `--report-json=PATH` 落盘,便于后续接入真实 macOS AX / Windows UIA helper 后复用同一套稳定性判断。 - 当前历史脏群如果不再包含真实线程成员,群聊消息不会再表现成“无响应”;服务端会在群内追加明确 `system_notice`,提示先重新添加线程成员 diff --git a/local-agent/codex-app-server-runner.mjs b/local-agent/codex-app-server-runner.mjs index 8d6d6d2..a2bf3d3 100644 --- a/local-agent/codex-app-server-runner.mjs +++ b/local-agent/codex-app-server-runner.mjs @@ -72,6 +72,11 @@ function normalizePositiveInteger(value, fallback) { return Number.isFinite(numeric) && numeric > 0 ? Math.floor(numeric) : fallback; } +function normalizeNonNegativeInteger(value, fallback) { + const numeric = Number(value); + return Number.isFinite(numeric) && numeric >= 0 ? Math.floor(numeric) : fallback; +} + function boolFromConfigOrEnv(configValue, envValue, fallback) { if (configValue === true || configValue === false) { return configValue; @@ -2809,6 +2814,9 @@ export async function executeCodexAppServerTask(runnerConfig, task) { const progressCollector = createProgressCollector(); const progressEmits = []; let lastProgressSnapshotJson = ""; + let interruptRequested = false; + let interruptReason = ""; + let interruptPollTimer; const pending = new Map(); const retryTimers = new Set(); let resolveTurnCompleted; @@ -2836,6 +2844,10 @@ export async function executeCodexAppServerTask(runnerConfig, task) { const cleanup = () => { clearTimeout(timeout); + if (interruptPollTimer) { + clearInterval(interruptPollTimer); + interruptPollTimer = undefined; + } for (const timer of retryTimers) { clearTimeout(timer); } @@ -2984,12 +2996,67 @@ export async function executeCodexAppServerTask(runnerConfig, task) { const status = message.params?.turn?.status ?? message.params?.status ?? "completed"; if (status === "completed") { resolveTurnCompleted(message.params ?? {}); + } else if (status === "interrupted" && interruptRequested) { + resolveTurnCompleted({ ...(message.params ?? {}), interrupted: true }); } else { rejectTurnCompleted(new Error(`CODEX_APP_SERVER_TURN_${String(status).toUpperCase()}`)); } } }; + const startActiveTurnInterruptPolling = ({ threadId, turnId }) => { + if ( + !threadId || + !turnId || + typeof runnerConfig.shouldInterruptActiveTurn !== "function" + ) { + return; + } + + const pollIntervalMs = normalizeNonNegativeInteger( + runnerConfig.interruptPollIntervalMs, + 750, + ); + const checkAndInterrupt = async () => { + if (interruptRequested || turnSettled || closed) { + return; + } + let decision; + try { + decision = await runnerConfig.shouldInterruptActiveTurn({ + taskId: task?.taskId, + task, + threadId, + turnId, + }); + } catch { + return; + } + const shouldInterrupt = + decision === true || + decision?.interrupt === true || + decision?.canceled === true || + decision?.status === "canceled"; + if (!shouldInterrupt || interruptRequested || turnSettled || closed) { + return; + } + interruptRequested = true; + interruptReason = trimToDefined(decision?.reason) || "USER_CANCELED_TASK"; + try { + await request("turn/interrupt", { threadId, turnId }); + } catch (error) { + rejectTurnCompleted(error); + } + }; + + void checkAndInterrupt(); + if (pollIntervalMs > 0) { + interruptPollTimer = setInterval(() => { + void checkAndInterrupt(); + }, pollIntervalMs); + } + }; + try { rpcTransport = await openCodexAppServerTransport(runnerConfig, cwd, { onLine: handleTransportLine, @@ -3041,17 +3108,34 @@ export async function executeCodexAppServerTask(runnerConfig, task) { model: runnerConfig.model, }); activeTurnStarted = true; + const activeTurnId = trimToDefined(turnResult?.turn?.id) || targetTurnRef; + startActiveTurnInterruptPolling({ threadId, turnId: activeTurnId }); await turnCompleted; if (progressEmits.length > 0) { await Promise.allSettled(progressEmits); } const normalizedReply = (replyBody || completedMessageText).trim(); + if (interruptRequested) { + return { + status: "interrupted", + replyBody: "已按用户要求中断当前 Codex turn。", + threadId, + turnId: activeTurnId, + turnControl: "interrupt", + interruptReason, + cwd, + transport: runnerConfig.transport, + executionProgress: progressCollector.snapshot(), + interThreadBroker, + canFallbackToCli: false, + }; + } return { status: "completed", replyBody: normalizedReply, threadId, - turnId: trimToDefined(turnResult?.turn?.id) || targetTurnRef, + turnId: activeTurnId, turnControl, cwd, transport: runnerConfig.transport, diff --git a/local-agent/server.mjs b/local-agent/server.mjs index 73ea872..83d7094 100755 --- a/local-agent/server.mjs +++ b/local-agent/server.mjs @@ -525,6 +525,35 @@ async function postMasterAgentTaskProgress(config, runtime, payload) { }; } +async function fetchMasterAgentTaskControlState(config, runtime, task) { + const response = await fetch( + `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${task.taskId}/control-state`, + { + method: "GET", + headers: { + ...deviceTokenHeaders(config, runtime), + }, + }, + ); + if (!response.ok) { + return { + ok: false, + status: response.status, + body: await response.text(), + }; + } + return { + ok: true, + status: response.status, + body: await response.json(), + }; +} + +function normalizeInterruptPollIntervalMs(config) { + const value = Number(config.masterAgentInterruptPollIntervalMs ?? 750); + return Number.isFinite(value) && value >= 0 ? Math.floor(value) : 750; +} + async function claimSkillLifecycleRequest(config, runtime) { const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skill-requests/claim`, @@ -793,6 +822,20 @@ async function runMasterAgentTask(config, runtime, task) { const appServerResult = await executeCodexAppServerTask( { ...codexAppServerRunner, + interruptPollIntervalMs: normalizeInterruptPollIntervalMs(config), + shouldInterruptActiveTurn: async () => { + const controlState = await fetchMasterAgentTaskControlState(config, runtime, task); + if (!controlState.ok) { + return false; + } + if (controlState.body?.canceled === true || controlState.body?.status === "canceled") { + return { + interrupt: true, + reason: controlState.body?.cancelReason || "USER_CANCELED_TASK", + }; + } + return false; + }, onProgress: async (executionProgress) => { const progressResult = await postMasterAgentTaskProgress(config, runtime, { taskId: task.taskId, @@ -813,6 +856,15 @@ async function runMasterAgentTask(config, runtime, task) { }, task, ); + if (appServerResult.status === "interrupted") { + return { + interruptedCompletion: { + replyBody: appServerResult.replyBody, + interruptReason: appServerResult.interruptReason, + executionProgress: appServerResult.executionProgress, + }, + }; + } if (appServerResult.status === "completed") { const localExecutionProgress = await collectLocalExecutionProgress( appServerResult.cwd || task.targetCodexFolderRef || config.masterAgentWorkdir || process.cwd(), @@ -976,6 +1028,23 @@ async function runMasterAgentTask(config, runtime, task) { }); return; } + if (executionResult.interruptedCompletion) { + runtime.activeMasterTask = { + taskId: task.taskId, + status: "canceled", + completedAt: new Date().toISOString(), + detail: executionResult.interruptedCompletion.replyBody, + }; + await postAppLog(config, runtime, { + projectId: "master-agent", + level: "info", + category: "local_agent.codex_app_server_turn_interrupted", + message: `Master Codex Node 已按取消状态中断 Codex App Server turn:${task.taskId}`, + detail: executionResult.interruptedCompletion.interruptReason, + mirrorToMaster: false, + }); + return; + } const { replyBody, dispatchExecutionCompletion, executionProgress } = executionResult; const completion = await completeMasterAgentTask( diff --git a/src/app/api/v1/master-agent/tasks/[taskId]/control-state/route.ts b/src/app/api/v1/master-agent/tasks/[taskId]/control-state/route.ts new file mode 100644 index 0000000..ec34535 --- /dev/null +++ b/src/app/api/v1/master-agent/tasks/[taskId]/control-state/route.ts @@ -0,0 +1,29 @@ +import { NextRequest } from "next/server"; +import { authorizeDeviceWriteRequest } from "@/lib/boss-device-auth"; +import { getMasterAgentTask } from "@/lib/boss-data"; +import { jsonNoStore } from "@/lib/api-response"; + +export async function GET( + request: NextRequest, + context: { params: Promise<{ taskId: string }> }, +) { + const { taskId } = await context.params; + const task = await getMasterAgentTask(taskId); + if (!task) { + return jsonNoStore({ ok: false, message: "MASTER_AGENT_TASK_NOT_FOUND" }, { status: 404 }); + } + + const auth = await authorizeDeviceWriteRequest(request, task.deviceId); + if (!auth.ok) { + return jsonNoStore({ ok: false, message: "FORBIDDEN" }, { status: 403 }); + } + + return jsonNoStore({ + ok: true, + taskId: task.taskId, + status: task.status, + canceled: task.status === "canceled", + cancelReason: task.cancelReason, + canceledAt: task.canceledAt, + }); +} diff --git a/tests/fixtures/codex-app-server-runtime.mjs b/tests/fixtures/codex-app-server-runtime.mjs index b4fea46..d2c5387 100644 --- a/tests/fixtures/codex-app-server-runtime.mjs +++ b/tests/fixtures/codex-app-server-runtime.mjs @@ -7,6 +7,7 @@ const received = []; let injectedItems = []; let extraSkillRoots = []; let overloadedTurnStartEmitted = false; +let interruptibleTurn = null; function send(message) { process.stdout.write(`${JSON.stringify(message)}\n`); @@ -645,6 +646,13 @@ rl.on("line", (line) => { if (process.env.BOSS_CODEX_APP_SERVER_FIXTURE_EXIT_AFTER_TURN_START === "1") { process.exit(0); } + if (process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT === "1") { + interruptibleTurn = { + threadId: message.params?.threadId, + turnId: "turn-fixture", + }; + return; + } if (process.env.BOSS_CODEX_APP_SERVER_FIXTURE_EMIT_PROGRESS === "1") { send({ method: "turn/plan/updated", @@ -1503,6 +1511,25 @@ rl.on("line", (line) => { return; } + if (message.method === "turn/interrupt") { + send({ + id: message.id, + result: {}, + }); + send({ + method: "turn/completed", + params: { + threadId: message.params?.threadId ?? interruptibleTurn?.threadId, + turn: { + id: message.params?.turnId ?? interruptibleTurn?.turnId, + status: "interrupted", + }, + }, + }); + process.stderr.write(`${JSON.stringify({ received })}\n`); + return; + } + if (message.method === "turn/steer") { const text = message.params?.input?.find?.((item) => item?.type === "text")?.text ?? ""; send({ diff --git a/tests/local-agent-codex-app-server-runner.test.mjs b/tests/local-agent-codex-app-server-runner.test.mjs index e5aa7a9..54aca80 100644 --- a/tests/local-agent-codex-app-server-runner.test.mjs +++ b/tests/local-agent-codex-app-server-runner.test.mjs @@ -1284,6 +1284,54 @@ test("codex app-server runner steers an active turn when a target turn id is pre } }); +test("codex app-server runner interrupts the active turn when the task is canceled while running", async () => { + const previous = process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT; + process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT = "1"; + try { + const interruptChecks = []; + const runnerConfig = getCodexAppServerRunnerConfig(process.env, { + codexAppServerEnabled: true, + codexAppServerCommand: process.execPath, + codexAppServerArgs: ["tests/fixtures/codex-app-server-runtime.mjs"], + codexAppServerWorkdir: repoRoot, + codexAppServerTimeoutMs: 5000, + }); + + const result = await executeCodexAppServerTask( + { + ...runnerConfig, + interruptPollIntervalMs: 10, + shouldInterruptActiveTurn: async (activeTurn) => { + interruptChecks.push(activeTurn); + return activeTurn.taskId === "task-turn-interrupt"; + }, + }, + { + taskId: "task-turn-interrupt", + taskType: "conversation_reply", + targetCodexThreadRef: "active-thread-2", + targetCodexFolderRef: repoRoot, + executionPrompt: "继续执行一个可取消的长任务", + }, + ); + + assert.equal(result.status, "interrupted"); + assert.equal(result.threadId, "active-thread-2"); + assert.equal(result.turnId, "turn-fixture"); + assert.equal(result.turnControl, "interrupt"); + assert.equal(result.replyBody, "已按用户要求中断当前 Codex turn。"); + assert.equal(interruptChecks[0]?.taskId, "task-turn-interrupt"); + assert.equal(interruptChecks[0]?.threadId, "active-thread-2"); + assert.equal(interruptChecks[0]?.turnId, "turn-fixture"); + } finally { + if (previous === undefined) { + delete process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT; + } else { + process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT = previous; + } + } +}); + test("codex app-server config exposes ws transport without mutating stdio defaults", () => { const previousTransport = process.env.BOSS_CODEX_APP_SERVER_TRANSPORT; const previousUrl = process.env.BOSS_CODEX_APP_SERVER_URL; diff --git a/tests/local-agent-server-codex-app-server-flow.test.mjs b/tests/local-agent-server-codex-app-server-flow.test.mjs index feef4dc..e022e0e 100644 --- a/tests/local-agent-server-codex-app-server-flow.test.mjs +++ b/tests/local-agent-server-codex-app-server-flow.test.mjs @@ -242,3 +242,151 @@ process.exit(0); await rm(runtimeRoot, { recursive: true, force: true }); } }); + +test("local-agent interrupts an active Codex App Server turn after server-side task cancellation", async () => { + const runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-local-agent-app-server-interrupt-")); + const skillsDir = path.join(runtimeRoot, "skills"); + const projectDir = path.join(runtimeRoot, "project"); + await mkdir(skillsDir, { recursive: true }); + await mkdir(projectDir, { recursive: true }); + + const completeBodies = []; + const controlStateBodies = []; + let claimCount = 0; + const controlPlane = createServer(async (request, response) => { + const url = request.url || ""; + if (request.method === "POST" && url === "/api/v1/master-agent/tasks/claim") { + claimCount += 1; + await readJsonBody(request); + response.writeHead(200, { "content-type": "application/json" }); + response.end( + JSON.stringify({ + ok: true, + task: + claimCount === 1 + ? { + taskId: "conversation-app-server-interrupt-task", + taskType: "conversation_reply", + projectId: "app-server-project", + requestMessageId: "msg-app-server-interrupt", + requestText: "继续长任务", + executionPrompt: "执行一个可以被取消的长任务", + requestedByAccount: "krisolo", + targetCodexThreadRef: "019d-app-server-thread", + targetCodexFolderRef: projectDir, + requestedAt: "2026-05-16T10:00:00.000Z", + } + : null, + }), + ); + return; + } + if ( + request.method === "GET" && + url === "/api/v1/master-agent/tasks/conversation-app-server-interrupt-task/control-state" + ) { + controlStateBodies.push({ at: Date.now() }); + response.writeHead(200, { "content-type": "application/json" }); + response.end( + JSON.stringify({ + ok: true, + taskId: "conversation-app-server-interrupt-task", + status: "canceled", + canceled: true, + cancelReason: "用户取消当前 Codex turn", + }), + ); + return; + } + if ( + request.method === "POST" && + url === "/api/v1/master-agent/tasks/conversation-app-server-interrupt-task/complete" + ) { + completeBodies.push(await readJsonBody(request)); + response.writeHead(200, { "content-type": "application/json" }); + response.end(JSON.stringify({ ok: true })); + return; + } + if (request.method === "POST" && url === "/api/device-heartbeat") { + response.writeHead(200, { "content-type": "application/json" }); + response.end(JSON.stringify({ ok: true, token: "server-token" })); + return; + } + if (request.method === "POST" && url === "/api/v1/app-logs") { + await readJsonBody(request); + response.writeHead(200, { "content-type": "application/json" }); + response.end(JSON.stringify({ ok: true })); + return; + } + if (request.method === "POST" && url === "/api/v1/devices/mac-studio/skills") { + response.writeHead(200, { "content-type": "application/json" }); + response.end(JSON.stringify({ ok: true })); + return; + } + response.writeHead(404, { "content-type": "application/json" }); + response.end(JSON.stringify({ ok: false, message: "not_found", url })); + }); + + const controlPort = await listen(controlPlane); + const agentServer = createServer(); + const agentPort = await listen(agentServer); + await new Promise((resolve) => agentServer.close(resolve)); + + const configPath = path.join(runtimeRoot, "local-agent-config.json"); + await writeFile( + configPath, + JSON.stringify({ + port: agentPort, + bindHost: "127.0.0.1", + controlPlaneUrl: `http://127.0.0.1:${controlPort}`, + deviceId: "mac-studio", + token: "local-token", + name: "Mac Studio", + account: "krisolo", + status: "online", + codexSessionDiscoveryEnabled: false, + skillsDir, + masterAgentEnabled: true, + masterAgentPollIntervalMs: 60_000, + masterAgentInterruptPollIntervalMs: 10, + heartbeatIntervalMs: 60_000, + skillLifecycleEnabled: false, + masterAgentWorkdir: projectDir, + masterAgentModel: "gpt-5.4", + codexAppServerEnabled: true, + codexAppServerCommand: process.execPath, + codexAppServerArgs: ["tests/fixtures/codex-app-server-runtime.mjs"], + codexAppServerWorkdir: repoRoot, + codexAppServerTimeoutMs: 5000, + }), + "utf8", + ); + + const previous = process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT; + process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT = "1"; + const child = spawn(process.execPath, ["local-agent/server.mjs", configPath], { + cwd: repoRoot, + env: { + ...process.env, + }, + stdio: ["ignore", "pipe", "pipe"], + }); + + try { + await waitFor(() => controlStateBodies.length > 0); + await new Promise((resolve) => setTimeout(resolve, 250)); + assert.equal(completeBodies.length, 0); + } finally { + if (previous === undefined) { + delete process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT; + } else { + process.env.BOSS_CODEX_APP_SERVER_FIXTURE_WAIT_FOR_INTERRUPT = previous; + } + child.kill("SIGTERM"); + await new Promise((resolve) => { + child.once("close", resolve); + }).catch(() => null); + controlPlane.close(); + await rm(runtimeRoot, { recursive: true, force: true }); + } +}); diff --git a/tests/master-agent-task-control-state-route.test.ts b/tests/master-agent-task-control-state-route.test.ts new file mode 100644 index 0000000..53e24c9 --- /dev/null +++ b/tests/master-agent-task-control-state-route.test.ts @@ -0,0 +1,100 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import os from "node:os"; +import path from "node:path"; +import { mkdtemp, rm } from "node:fs/promises"; +import { NextRequest } from "next/server"; + +let runtimeRoot = ""; +let data: typeof import("../src/lib/boss-data.ts"); +let getControlState: (typeof import("../src/app/api/v1/master-agent/tasks/[taskId]/control-state/route.ts"))["GET"]; + +async function setup() { + if (runtimeRoot) return; + runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-master-task-control-state-")); + process.env.BOSS_RUNTIME_ROOT = runtimeRoot; + process.env.BOSS_STATE_FILE = path.join(runtimeRoot, "boss-state.json"); + + const [dataModule, routeModule] = await Promise.all([ + import("../src/lib/boss-data.ts"), + import("../src/app/api/v1/master-agent/tasks/[taskId]/control-state/route.ts"), + ]); + data = dataModule; + getControlState = routeModule.GET; +} + +test.beforeEach(async () => { + await setup(); + await rm(runtimeRoot, { recursive: true, force: true }); +}); + +test.after(async () => { + if (runtimeRoot) await rm(runtimeRoot, { recursive: true, force: true }); +}); + +test("GET task control-state lets the owning device see cancellation without exposing prompt text", async () => { + const task = await data.queueMasterAgentTask({ + taskId: "control-state-task", + projectId: "master-agent", + taskType: "conversation_reply", + requestMessageId: "msg-control-state", + requestText: "private user request should not leak", + executionPrompt: "private execution prompt should not leak", + requestedBy: "krisolo", + requestedByAccount: "krisolo", + deviceId: "mac-studio", + targetCodexThreadRef: "thread-control-state", + }); + await data.claimNextMasterAgentTask("mac-studio"); + await data.cancelMasterAgentTask({ + taskId: task.taskId, + actorAccount: "krisolo", + reason: "用户在 APP 取消了当前任务", + }); + + const response = await getControlState( + new NextRequest(`http://127.0.0.1:3000/api/v1/master-agent/tasks/${task.taskId}/control-state`, { + method: "GET", + headers: { + "x-boss-device-token": "boss-mac-studio-token", + }, + }), + { params: Promise.resolve({ taskId: task.taskId }) }, + ); + const payload = await response.json(); + + assert.equal(response.status, 200); + assert.equal(payload.ok, true); + assert.equal(payload.taskId, "control-state-task"); + assert.equal(payload.status, "canceled"); + assert.equal(payload.canceled, true); + assert.equal(payload.cancelReason, "用户在 APP 取消了当前任务"); + assert.equal(JSON.stringify(payload).includes("private user request"), false); + assert.equal(JSON.stringify(payload).includes("private execution prompt"), false); +}); + +test("GET task control-state rejects a different device token", async () => { + const task = await data.queueMasterAgentTask({ + taskId: "control-state-forbidden-task", + projectId: "master-agent", + taskType: "conversation_reply", + requestMessageId: "msg-control-state-forbidden", + requestText: "继续", + executionPrompt: "继续", + requestedBy: "krisolo", + requestedByAccount: "krisolo", + deviceId: "mac-studio", + }); + + const response = await getControlState( + new NextRequest(`http://127.0.0.1:3000/api/v1/master-agent/tasks/${task.taskId}/control-state`, { + method: "GET", + headers: { + "x-boss-device-token": "wrong-token", + }, + }), + { params: Promise.resolve({ taskId: task.taskId }) }, + ); + + assert.equal(response.status, 403); +});