Files
boss/local-agent/codex-app-server-runner.mjs
2026-06-03 14:49:43 +08:00

3450 lines
116 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import crypto from "node:crypto";
import { spawn } from "node:child_process";
import { readFileSync } from "node:fs";
import http from "node:http";
import https from "node:https";
import readline from "node:readline";
import { resolve } from "node:path";
function trimToDefined(value) {
const trimmed = String(value ?? "").trim();
return trimmed ? trimmed : undefined;
}
function boolFromEnv(value) {
const normalized = trimToDefined(value)?.toLowerCase();
return normalized === "1" || normalized === "true" || normalized === "yes";
}
function listFromEnv(value) {
if (!value) return undefined;
try {
const parsed = JSON.parse(String(value));
if (Array.isArray(parsed)) {
return parsed.map((item) => String(item));
}
} catch {
// Fall through to whitespace splitting.
}
return String(value).split(/\s+/).filter(Boolean);
}
function normalizeAbsolutePathList(value) {
const rawValues = Array.isArray(value) ? value : listFromEnv(value) ?? [];
const normalized = rawValues
.map((item) => trimToDefined(item))
.filter(Boolean)
.map((item) => resolve(item));
return Array.from(new Set(normalized));
}
function resolveTaskThreadRef(task) {
return trimToDefined(task?.targetCodexThreadRef || task?.targetThreadId);
}
function resolveTaskTurnRef(task) {
return trimToDefined(task?.targetCodexTurnId || task?.targetTurnId);
}
function resolveSourceThreadRef(task) {
return trimToDefined(task?.sourceCodexThreadRef || task?.sourceThreadId);
}
function resolveTaskCwd(config, task) {
return resolve(
trimToDefined(task?.targetCodexFolderRef) ||
trimToDefined(config.masterAgentWorkdir) ||
process.cwd(),
);
}
function resolvePrompt(task) {
return String(task?.executionPrompt || task?.requestText || "").trim();
}
function isThreadRollbackTask(task) {
return task?.intentCategory === "thread_rollback" || task?.taskType === "thread_rollback";
}
function isThreadCompactTask(task) {
return task?.intentCategory === "thread_compact" || task?.taskType === "thread_compact";
}
function isThreadRenameTask(task) {
return task?.intentCategory === "thread_rename" || task?.taskType === "thread_rename";
}
function isThreadGoalSyncTask(task) {
return task?.intentCategory === "thread_goal_sync" || task?.taskType === "thread_goal_sync";
}
function isThreadMetadataSyncTask(task) {
return task?.intentCategory === "thread_metadata_sync" || task?.taskType === "thread_metadata_sync";
}
function isThreadForkTask(task) {
return task?.intentCategory === "thread_fork" || task?.taskType === "thread_fork";
}
function resolveThreadRenameName(task) {
return trimToDefined(task?.threadRenameName || task?.threadName || task?.name);
}
function resolveThreadGoalObjective(task) {
return trimToDefined(task?.threadGoalObjective || task?.goalObjective || task?.objective);
}
function resolveThreadGoalStatus(task) {
const status = trimToDefined(task?.threadGoalStatus || task?.goalStatus);
return status === "active" ||
status === "paused" ||
status === "blocked" ||
status === "usageLimited" ||
status === "budgetLimited" ||
status === "complete"
? status
: "active";
}
function resolveThreadGoalTokenBudget(task) {
const numeric = Number(task?.threadGoalTokenBudget ?? task?.goalTokenBudget);
return Number.isFinite(numeric) && numeric > 0 ? Math.floor(numeric) : undefined;
}
function resolveThreadMetadataGitInfo(task) {
const input = task?.threadMetadataGitInfo || task?.gitInfo;
if (!input || typeof input !== "object" || Array.isArray(input)) {
return undefined;
}
const gitInfo = {};
for (const key of ["sha", "branch", "originUrl"]) {
const value = input[key];
if (value === null) {
gitInfo[key] = null;
} else if (typeof value === "string") {
const trimmed = value.trim();
if (trimmed) {
gitInfo[key] = trimmed;
}
}
}
return Object.keys(gitInfo).length > 0 ? gitInfo : undefined;
}
function resolveThreadForkEphemeral(task) {
return task?.threadForkEphemeral === true || task?.ephemeral === true;
}
function resolveThreadLifecycleAction(task) {
if (
task?.threadLifecycleAction === "archive" ||
task?.intentCategory === "thread_archive" ||
task?.taskType === "thread_archive"
) {
return "archive";
}
if (
task?.threadLifecycleAction === "unarchive" ||
task?.intentCategory === "thread_unarchive" ||
task?.taskType === "thread_unarchive"
) {
return "unarchive";
}
return undefined;
}
function resolveRollbackNumTurns(task) {
const numeric = Number(task?.rollbackNumTurns ?? 1);
return Number.isFinite(numeric) && numeric >= 1 ? Math.floor(numeric) : undefined;
}
function waitForCompactNotificationSettle() {
return new Promise((resolveWait) => {
setTimeout(resolveWait, 25);
});
}
function normalizeTimeoutMs(value) {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric > 0 ? Math.floor(numeric) : 120_000;
}
function normalizePositiveInteger(value, fallback) {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric > 0 ? Math.floor(numeric) : fallback;
}
function normalizeNonNegativeInteger(value, fallback) {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric >= 0 ? Math.floor(numeric) : fallback;
}
function boolFromConfigOrEnv(configValue, envValue, fallback) {
if (configValue === true || configValue === false) {
return configValue;
}
const normalized = trimToDefined(envValue)?.toLowerCase();
if (normalized === "1" || normalized === "true" || normalized === "yes") {
return true;
}
if (normalized === "0" || normalized === "false" || normalized === "no") {
return false;
}
return fallback;
}
function normalizeTransport(value) {
const normalized = trimToDefined(value)?.toLowerCase();
return normalized === "ws" || normalized === "websocket"
? "ws"
: normalized === "unix"
? "unix"
: "stdio";
}
export function getCodexAppServerRunnerConfig(env = process.env, config = {}) {
const argsFromConfig = Array.isArray(config.codexAppServerArgs)
? config.codexAppServerArgs.map((item) => String(item))
: undefined;
const args =
argsFromConfig ??
listFromEnv(env.BOSS_CODEX_APP_SERVER_ARGS) ??
["app-server"];
return {
enabled:
config.codexAppServerEnabled === true ||
boolFromEnv(env.BOSS_CODEX_APP_SERVER_ENABLED),
command:
trimToDefined(config.codexAppServerCommand) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_COMMAND) ||
"codex",
args,
cwd:
trimToDefined(config.codexAppServerWorkdir) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_WORKDIR) ||
trimToDefined(config.masterAgentWorkdir) ||
process.cwd(),
timeoutMs: normalizeTimeoutMs(
config.codexAppServerTimeoutMs ?? env.BOSS_CODEX_APP_SERVER_TIMEOUT_MS,
),
clientName:
trimToDefined(config.codexAppServerClientName) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_NAME) ||
"boss_local_agent",
clientTitle:
trimToDefined(config.codexAppServerClientTitle) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_TITLE) ||
"Boss Local Agent",
clientVersion:
trimToDefined(config.codexAppServerClientVersion) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_VERSION) ||
"0.1.0",
model: trimToDefined(config.masterAgentModel || env.BOSS_MASTER_AGENT_MODEL),
sandbox: trimToDefined(config.masterAgentSandbox),
transport: normalizeTransport(config.codexAppServerTransport ?? env.BOSS_CODEX_APP_SERVER_TRANSPORT),
url:
trimToDefined(config.codexAppServerUrl) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_URL),
authToken:
trimToDefined(config.codexAppServerAuthToken) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_AUTH_TOKEN),
authTokenFile:
trimToDefined(config.codexAppServerAuthTokenFile) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_AUTH_TOKEN_FILE),
discoveryEnabled: boolFromConfigOrEnv(
config.codexAppServerDiscoveryEnabled,
env.BOSS_CODEX_APP_SERVER_DISCOVERY_ENABLED,
true,
),
discoveryTtlMs: normalizePositiveInteger(
config.codexAppServerDiscoveryTtlMs ?? env.BOSS_CODEX_APP_SERVER_DISCOVERY_TTL_MS,
300_000,
),
discoveryLimit: normalizePositiveInteger(
config.codexAppServerDiscoveryLimit ?? env.BOSS_CODEX_APP_SERVER_DISCOVERY_LIMIT,
20,
),
skillExtraRoots: normalizeAbsolutePathList(
config.codexAppServerSkillExtraRoots ?? env.BOSS_CODEX_APP_SERVER_SKILL_EXTRA_ROOTS,
),
};
}
export function shouldUseCodexAppServerTaskRunner(runnerConfig, task) {
const hasTransportEndpoint =
runnerConfig?.transport === "ws" || runnerConfig?.transport === "unix"
? Boolean(runnerConfig.url)
: Boolean(runnerConfig?.command);
if (!runnerConfig?.enabled || !hasTransportEndpoint) {
return false;
}
const taskType = trimToDefined(task?.taskType);
if (taskType !== "conversation_reply" && taskType !== "dispatch_execution") {
return false;
}
return Boolean(resolvePrompt(task));
}
function extractAgentTextFromContent(value) {
if (typeof value === "string") {
return value;
}
if (Array.isArray(value)) {
return value.map(extractAgentTextFromContent).filter(Boolean).join("");
}
if (!value || typeof value !== "object") {
return "";
}
return (
extractAgentTextFromContent(value.text) ||
extractAgentTextFromContent(value.outputText) ||
extractAgentTextFromContent(value.content) ||
extractAgentTextFromContent(value.delta)
);
}
function extractAgentDelta(params) {
if (!params || typeof params !== "object") {
return "";
}
return (
extractAgentTextFromContent(params.delta) ||
extractAgentTextFromContent(params.text) ||
extractAgentTextFromContent(params.messageDelta) ||
extractAgentTextFromContent(params.item?.delta)
);
}
function extractCompletedAgentMessage(params) {
const item = params?.item;
if (!item || typeof item !== "object") {
return "";
}
const itemType = String(item.type || item.kind || "");
if (!/agent|assistant/i.test(itemType)) {
return "";
}
return (
extractAgentTextFromContent(item.text) ||
extractAgentTextFromContent(item.content) ||
extractAgentTextFromContent(item.message) ||
extractAgentTextFromContent(item.output)
);
}
function createFailure(error, extra = {}) {
return {
status: "failed",
errorMessage: error instanceof Error ? error.message : String(error),
transport: extra.transport ?? "stdio",
...extra,
};
}
function resolveBearerToken(runnerConfig) {
const direct = trimToDefined(runnerConfig?.authToken);
if (direct) {
return direct;
}
const tokenFile = trimToDefined(runnerConfig?.authTokenFile);
if (!tokenFile) {
return undefined;
}
return trimToDefined(readFileSync(tokenFile, "utf8"));
}
function isOverloadedJsonRpcError(error) {
return Number(error?.code) === -32001 || /overloaded|retry later/i.test(String(error?.message || ""));
}
function computeOverloadedRetryDelayMs(attempt) {
const base = Math.min(1_000, 40 * 2 ** attempt);
return base + Math.floor(Math.random() * 25);
}
function encodeWebSocketTextFrame(value, { masked = false } = {}) {
const payload = Buffer.from(value);
const lengthByteOffset = 1;
const payloadLength = payload.length;
let header;
if (payloadLength < 126) {
header = Buffer.alloc(masked ? 6 : 2);
header[0] = 0x81;
header[lengthByteOffset] = payloadLength;
} else if (payloadLength <= 0xffff) {
header = Buffer.alloc(masked ? 8 : 4);
header[0] = 0x81;
header[lengthByteOffset] = 126;
header.writeUInt16BE(payloadLength, 2);
} else {
header = Buffer.alloc(masked ? 14 : 10);
header[0] = 0x81;
header[lengthByteOffset] = 127;
header.writeBigUInt64BE(BigInt(payloadLength), 2);
}
if (!masked) {
return Buffer.concat([header, payload]);
}
header[lengthByteOffset] |= 0x80;
const maskOffset = header.length - 4;
const mask = crypto.randomBytes(4);
mask.copy(header, maskOffset);
const maskedPayload = Buffer.alloc(payload.length);
for (let index = 0; index < payload.length; index += 1) {
maskedPayload[index] = payload[index] ^ mask[index % 4];
}
return Buffer.concat([header, maskedPayload]);
}
function decodeWebSocketFrames(buffer, socket, onText) {
let offset = 0;
while (buffer.length - offset >= 2) {
const first = buffer[offset];
const second = buffer[offset + 1];
const opcode = first & 0x0f;
const masked = (second & 0x80) !== 0;
let payloadLength = second & 0x7f;
let headerLength = 2;
if (payloadLength === 126) {
if (buffer.length - offset < 4) break;
payloadLength = buffer.readUInt16BE(offset + 2);
headerLength = 4;
} else if (payloadLength === 127) {
if (buffer.length - offset < 10) break;
payloadLength = Number(buffer.readBigUInt64BE(offset + 2));
headerLength = 10;
}
const maskLength = masked ? 4 : 0;
const frameLength = headerLength + maskLength + payloadLength;
if (buffer.length - offset < frameLength) break;
let payload = buffer.subarray(
offset + headerLength + maskLength,
offset + frameLength,
);
if (masked) {
const mask = buffer.subarray(offset + headerLength, offset + headerLength + 4);
const unmaskedPayload = Buffer.alloc(payload.length);
for (let index = 0; index < payload.length; index += 1) {
unmaskedPayload[index] = payload[index] ^ mask[index % 4];
}
payload = unmaskedPayload;
}
if (opcode === 0x1) {
onText(payload.toString("utf8"));
} else if (opcode === 0x8) {
socket.end();
} else if (opcode === 0x9) {
socket.write(encodeWebSocketControlFrame(0x0a, payload));
}
offset += frameLength;
}
return buffer.subarray(offset);
}
function encodeWebSocketControlFrame(opcode, payload = Buffer.alloc(0)) {
return Buffer.concat([Buffer.from([0x80 | opcode, payload.length]), payload]);
}
function openStdioCodexAppServerTransport(runnerConfig, cwd, handlers) {
const child = spawn(runnerConfig.command, runnerConfig.args, {
cwd: runnerConfig.cwd || cwd,
env: process.env,
stdio: ["pipe", "pipe", "pipe"],
});
let stderr = "";
const rl = readline.createInterface({ input: child.stdout });
rl.on("line", handlers.onLine);
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
child.on("error", handlers.onError);
child.on("close", (code) => {
handlers.onClose({
code,
message: stderr.trim() || `CODEX_APP_SERVER_EXITED:${code ?? "unknown"}`,
});
});
return {
transport: "stdio",
send(line, callback) {
child.stdin.write(`${line}\n`, callback);
},
close(signal = "SIGTERM") {
rl.close();
if (!child.killed) {
child.kill(signal);
}
},
stderr() {
return stderr.trim();
},
};
}
function openWebSocketCodexAppServerTransport(runnerConfig, handlers) {
return new Promise((resolveTransport, rejectTransport) => {
let settled = false;
let buffered = Buffer.alloc(0);
const url = new URL(runnerConfig.url);
const isUnixSocket = url.protocol === "unix:";
const client = url.protocol === "wss:" ? https : http;
const key = crypto.randomBytes(16).toString("base64");
const bearerToken = resolveBearerToken(runnerConfig);
const headers = {
Connection: "Upgrade",
Upgrade: "websocket",
"Sec-WebSocket-Key": key,
"Sec-WebSocket-Version": "13",
...(bearerToken ? { Authorization: `Bearer ${bearerToken}` } : {}),
};
const requestOptions = isUnixSocket
? {
socketPath: decodeURIComponent(url.pathname || ""),
path: "/",
method: "GET",
headers: {
Host: "localhost",
...headers,
},
}
: {
hostname: url.hostname,
port: url.port || (url.protocol === "wss:" ? 443 : 80),
path: `${url.pathname || "/"}${url.search || ""}`,
method: "GET",
headers,
};
if (isUnixSocket && !requestOptions.socketPath) {
rejectTransport(new Error("CODEX_APP_SERVER_UNIX_SOCKET_PATH_MISSING"));
return;
}
const request = client.request(requestOptions);
const failOpen = (error) => {
if (!settled) {
settled = true;
rejectTransport(error);
} else {
handlers.onError(error);
}
};
request.on("upgrade", (response, socket) => {
const expectedAccept = crypto
.createHash("sha1")
.update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`)
.digest("base64");
const actualAccept = String(response.headers["sec-websocket-accept"] || "");
if (response.statusCode !== 101 || actualAccept !== expectedAccept) {
socket.destroy();
failOpen(new Error(`CODEX_APP_SERVER_WS_UPGRADE_FAILED:${response.statusCode}`));
return;
}
socket.on("data", (chunk) => {
buffered = decodeWebSocketFrames(
Buffer.concat([buffered, chunk]),
socket,
handlers.onLine,
);
});
socket.on("error", handlers.onError);
socket.on("close", () => {
handlers.onClose({
code: "ws_closed",
message: "CODEX_APP_SERVER_WS_CLOSED",
});
});
settled = true;
resolveTransport({
transport: isUnixSocket ? "unix" : "ws",
send(line, callback) {
socket.write(encodeWebSocketTextFrame(line, { masked: true }), callback);
},
close() {
socket.end();
},
stderr() {
return "";
},
});
});
request.on("response", (response) => {
failOpen(new Error(`CODEX_APP_SERVER_WS_UPGRADE_FAILED:${response.statusCode}`));
response.resume();
});
request.on("error", failOpen);
request.setTimeout(runnerConfig.timeoutMs, () => {
request.destroy(new Error("CODEX_APP_SERVER_WS_CONNECT_TIMEOUT"));
});
request.end();
});
}
async function openCodexAppServerTransport(runnerConfig, cwd, handlers) {
if (runnerConfig.transport === "ws") {
return openWebSocketCodexAppServerTransport(runnerConfig, handlers);
}
if (runnerConfig.transport === "unix") {
return openWebSocketCodexAppServerTransport(runnerConfig, handlers);
}
return openStdioCodexAppServerTransport(runnerConfig, cwd, handlers);
}
function normalizeProgressStepStatus(value) {
const text = String(value ?? "").trim().toLowerCase();
if (text === "done" || text === "completed" || text === "complete" || text === "success") {
return "done";
}
if (text === "running" || text === "in_progress" || text === "inprogress" || text === "active") {
return "running";
}
if (text === "failed" || text === "error") {
return "failed";
}
return "pending";
}
function extractProgressText(value) {
if (typeof value === "string") {
return value.trim();
}
if (!value || typeof value !== "object") {
return "";
}
return String(value.text ?? value.title ?? value.description ?? value.summary ?? value.label ?? "").trim();
}
function asArray(value) {
return Array.isArray(value) ? value : [];
}
function extractPlanItems(params) {
const direct = asArray(params?.plan);
if (direct.length > 0) {
return direct;
}
const nestedPlan = params?.plan && typeof params.plan === "object" ? params.plan : undefined;
return asArray(nestedPlan?.steps).length > 0
? asArray(nestedPlan.steps)
: asArray(nestedPlan?.items);
}
function extractNumber(value) {
const numeric = Number(value);
return Number.isFinite(numeric) ? Math.trunc(numeric) : undefined;
}
function parseDiffShortstat(value) {
const text = String(value || "");
const changedFiles = extractNumber((text.match(/(\d+)\s+files?\s+changed/) || [])[1]);
const additions = extractNumber((text.match(/(\d+)\s+insertions?\(\+\)/) || [])[1]);
const deletions = extractNumber((text.match(/(\d+)\s+deletions?\(-\)/) || [])[1]);
return { changedFiles, additions, deletions };
}
function extractDiffBranch(params) {
const source =
params?.diff && typeof params.diff === "object"
? params.diff
: params?.summary && typeof params.summary === "object"
? params.summary
: params;
const parsedShortstat = parseDiffShortstat(params?.shortstat ?? params?.summary ?? params?.diffStat);
return {
changedFiles:
extractNumber(source?.changedFiles ?? source?.filesChanged ?? source?.fileCount) ??
parsedShortstat.changedFiles,
additions:
extractNumber(source?.additions ?? source?.insertions ?? source?.linesAdded) ??
parsedShortstat.additions,
deletions:
extractNumber(source?.deletions ?? source?.linesDeleted ?? source?.removals) ??
parsedShortstat.deletions,
};
}
function basename(value) {
const text = String(value ?? "").trim();
if (!text) {
return "";
}
return text.split(/[\\/]/).filter(Boolean).pop() || text;
}
function extractArtifactFromPath(path, index) {
const label = basename(path);
if (!label) {
return null;
}
return {
id: `artifact-${index + 1}`,
label,
kind: /\.(png|jpe?g|webp|gif|svg)$/i.test(label) ? "image" : "file",
};
}
function extractArtifactsFromItem(item, existingCount) {
if (!item || typeof item !== "object") {
return [];
}
const paths = [];
for (const key of ["path", "file", "filename", "outputPath", "artifactPath"]) {
if (item[key]) {
paths.push(item[key]);
}
}
if (item.type === "imageGeneration" && item.savedPath) {
paths.push(item.savedPath);
}
for (const change of asArray(item.changes)) {
if (change?.path) paths.push(change.path);
if (change?.file) paths.push(change.file);
}
for (const artifact of asArray(item.artifacts)) {
if (artifact?.path) paths.push(artifact.path);
if (artifact?.label) paths.push(artifact.label);
}
return paths
.map((path, index) => extractArtifactFromPath(path, existingCount + index))
.filter(Boolean);
}
function extractThreadCollaborationSnapshot(item) {
if (!item || typeof item !== "object" || item.type !== "collabToolCall") {
return null;
}
const tool = safeProgressText(item.tool, 80);
const status = safeProgressText(item.status, 40) || "running";
const receiverThreadIds =
asArray(item.receiverThreadIds).length > 0
? asArray(item.receiverThreadIds)
: item.receiverThreadId
? [item.receiverThreadId]
: item.newThreadId
? [item.newThreadId]
: [];
const agentStates = Object.values(item.agentsStates ?? {})
.map((state) => safeProgressText(state?.status, 40))
.filter(Boolean);
const uniqueAgentStates = Array.from(new Set(agentStates));
const agentStatus = uniqueAgentStates.length > 0 ? uniqueAgentStates.join(" · ") : safeProgressText(item.agentStatus, 80);
const target = item.newThreadId || tool === "spawnAgent" ? "新线程" : receiverThreadIds.length > 0 ? "已有线程" : "";
const snapshot = {
tool: tool || undefined,
status,
target: target || undefined,
agentStatus: agentStatus || undefined,
receiverCount: receiverThreadIds.length > 0 ? receiverThreadIds.length : undefined,
};
return Object.values(snapshot).some((value) => value !== undefined && value !== "") ? snapshot : null;
}
function extractToolActivitySnapshot(item, lifecycleStatus = "running") {
if (!item || typeof item !== "object") {
return null;
}
const type = safeProgressText(item.type, 80);
const status = safeProgressText(item.status, 40) || lifecycleStatus;
if (type === "mcpToolCall") {
const server = safeProgressText(item.server, 80);
const tool = safeProgressText(item.tool, 120);
return {
kind: "mcp",
name: server && tool ? `${server}/${tool}` : tool || server || "mcpToolCall",
status,
detail: safeProgressText(item.error, 180) || undefined,
};
}
if (type === "dynamicToolCall") {
const detailParts = [];
if (typeof item.success === "boolean") {
detailParts.push(item.success ? "成功" : "失败");
}
const durationMs = extractNumber(item.durationMs);
if (durationMs !== undefined) {
detailParts.push(`${durationMs}ms`);
}
return {
kind: "dynamic",
name: safeProgressText(item.tool, 120) || "dynamicToolCall",
status,
detail: detailParts.join(" · ") || undefined,
};
}
if (type === "webSearch") {
const action = item.action && typeof item.action === "object" ? item.action : {};
return {
kind: "web_search",
name: safeProgressText(action.type, 80) || "search",
status,
detail:
safeProgressText(item.query, 180) ||
safeProgressText(action.query, 180) ||
safeProgressText(asArray(action.queries)[0], 180) ||
undefined,
};
}
if (type === "imageView") {
return {
kind: "image_view",
name: "imageView",
status,
detail: safeProgressText(basename(item.path), 180) || undefined,
};
}
if (type === "imageGeneration") {
return {
kind: "image_generation",
name: "imageGeneration",
status,
detail: safeProgressText(basename(item.savedPath), 180) || undefined,
};
}
if (type === "enteredReviewMode" || type === "exitedReviewMode") {
const review = item.review && typeof item.review === "object" ? item.review : {};
return {
kind: "review",
name: "reviewMode",
status: type === "enteredReviewMode" ? "entered" : "exited",
detail: safeProgressText(review.status, 120) || undefined,
};
}
if (type === "commandExecution") {
const detailParts = [];
const exitCode = extractNumber(item.exitCode);
const durationMs = extractNumber(item.durationMs);
if (exitCode !== undefined) {
detailParts.push(`exit ${exitCode}`);
}
if (durationMs !== undefined) {
detailParts.push(`${durationMs}ms`);
}
return {
kind: "command",
name: "commandExecution",
status,
detail: detailParts.join(" · ") || undefined,
};
}
return null;
}
function extractHookActivitySnapshot(params, lifecycleStatus = "running") {
const run = params?.run && typeof params.run === "object" ? params.run : null;
if (!run) {
return null;
}
const eventName = safeProgressText(run.eventName, 80);
const handlerType = safeProgressText(run.handlerType, 80);
if (!eventName && !handlerType) {
return null;
}
const detailParts = [];
const source = safeProgressText(run.source, 80);
const executionMode = safeProgressText(run.executionMode, 40);
const durationMs = extractNumber(run.durationMs);
if (source) {
detailParts.push(source);
}
if (executionMode) {
detailParts.push(executionMode);
}
if (durationMs !== undefined) {
detailParts.push(`${durationMs}ms`);
}
return {
kind: "hook",
name: eventName && handlerType ? `${eventName}/${handlerType}` : eventName || handlerType,
status: safeProgressText(run.status, 40) || lifecycleStatus,
detail: detailParts.join(" · ") || undefined,
};
}
function cleanPlanStepText(text) {
return safeProgressText(text, 220)
.replace(/^\s*(?:[-*•]|\d+[.)]|[☐✓✔]|\[[ xX✓]\])\s*/u, "")
.trim();
}
function extractPlanStepsFromItem(item) {
if (!item || typeof item !== "object" || item.type !== "plan") {
return [];
}
const rawSteps = asArray(item.steps).length > 0
? asArray(item.steps).map((step) => extractProgressText(step))
: String(item.text ?? "")
.split(/\r?\n/)
.map((line) => line.trim());
return rawSteps
.map(cleanPlanStepText)
.filter(Boolean)
.slice(0, 10)
.map((text, index) => ({
id: `plan-item-${index + 1}`,
text,
status: "pending",
}));
}
function extractReasoningSummaryText(summary) {
const values = asArray(summary).length > 0 ? asArray(summary) : [summary];
return safeProgressText(
values
.map((item) => {
if (typeof item === "string") {
return item;
}
if (item && typeof item === "object") {
return item.text ?? item.summary ?? "";
}
return "";
})
.filter(Boolean)
.join(" / "),
280,
);
}
function extractReasoningSummarySnapshot(item, lifecycleStatus = "running") {
if (!item || typeof item !== "object" || item.type !== "reasoning") {
return null;
}
const summary = extractReasoningSummaryText(item.summary);
if (!summary) {
return null;
}
return {
status: safeProgressText(item.status, 40) || lifecycleStatus,
summary,
};
}
function extractSubAgentSource(value) {
if (!value || typeof value !== "object") {
return undefined;
}
if (value.subAgent) {
return extractSubAgentSource(value.subAgent);
}
if (value.subagent) {
return extractSubAgentSource(value.subagent);
}
if (value.thread_spawn) {
return value.thread_spawn;
}
if (value.threadSpawn) {
return value.threadSpawn;
}
return value;
}
function extractAgentFromThreadStarted(params) {
const source = extractSubAgentSource(params?.thread?.source ?? params?.source);
const name = String(
source?.agent_nickname ?? source?.agentNickname ?? source?.nickname ?? source?.name ?? "",
).trim();
if (!name) {
return null;
}
return {
name,
role: String(source?.agent_role ?? source?.agentRole ?? source?.role ?? "").trim() || undefined,
status: "running",
};
}
function safeProgressText(value, maxLength = 180) {
return String(value ?? "")
.replace(/sk-[A-Za-z0-9_-]{8,}/g, "[redacted]")
.replace(/(api[_-]?key|token|secret|password)\s*[=:]\s*[^ \n\r\t]+/gi, "$1=[redacted]")
.replace(/\s+/g, " ")
.trim()
.slice(0, maxLength);
}
function safeRuntimeDiagnosticText(value, maxLength = 180) {
return safeProgressText(value, maxLength)
.replace(/[A-Za-z]:\\[^\s]+/g, "[path]")
.replace(/\/Users\/[^\s]+/g, "[path]")
.trim()
.slice(0, maxLength);
}
function normalizeApprovalKindFromMethod(method) {
if (/commandExecution|execCommand/i.test(method)) {
return "command";
}
if (/fileChange|applyPatch/i.test(method)) {
return "file_change";
}
if (/permissions/i.test(method)) {
return "permissions";
}
if (/autoApprovalReview/i.test(method)) {
return "auto_review";
}
return "approval";
}
function labelForApprovalKind(kind) {
if (kind === "command") return "命令执行审批";
if (kind === "file_change") return "文件变更审批";
if (kind === "permissions") return "权限申请审批";
if (kind === "auto_review") return "自动审批复核";
return "审批请求";
}
function normalizeApprovalStatus(value) {
const normalized = String(value ?? "").trim().toLowerCase();
if (normalized === "approved" || normalized === "accept" || normalized === "accepted") return "approved";
if (normalized === "denied" || normalized === "declined" || normalized === "rejected") return "declined";
if (normalized === "cancel" || normalized === "cancelled" || normalized === "canceled") return "cancelled";
if (normalized === "resolved" || normalized === "complete" || normalized === "completed") return "resolved";
if (normalized === "running" || normalized === "in_progress" || normalized === "reviewing") return "reviewing";
return "pending";
}
function extractApprovalDetail(params, kind) {
const reason = safeProgressText(params?.reason, 120);
if (reason) {
return reason;
}
if (kind === "command") return "需要确认命令执行";
if (kind === "file_change") {
const grantRoot = basename(params?.grantRoot);
return grantRoot ? `需要确认文件写入:${grantRoot}` : "需要确认文件变更";
}
if (kind === "permissions") return "需要确认权限申请";
return undefined;
}
function extractApprovalFromServerRequest(message) {
const method = String(message?.method ?? "");
const kind = normalizeApprovalKindFromMethod(method);
const id = safeProgressText(message?.id ?? message?.params?.approvalId ?? message?.params?.itemId, 80);
if (!id) {
return null;
}
const detail = extractApprovalDetail(message.params, kind);
return {
id,
kind,
label: labelForApprovalKind(kind),
status: "pending",
...(detail ? { detail } : {}),
};
}
function extractApprovalFromAutoReview(message) {
const id = safeProgressText(message?.params?.reviewId, 80);
if (!id) {
return null;
}
const completed = String(message?.method ?? "").endsWith("/completed");
const review = message?.params?.review && typeof message.params.review === "object" ? message.params.review : {};
const status = completed ? normalizeApprovalStatus(review.status ?? "resolved") : "reviewing";
const riskLevel = safeProgressText(review.riskLevel, 40);
return {
id,
kind: "auto_review",
label: labelForApprovalKind("auto_review"),
status,
...(riskLevel ? { riskLevel } : {}),
};
}
function extractFileChangeEntries(params) {
return asArray(params?.changes)
.map((change, index) => {
const path = safeProgressText(change?.path, 240);
if (!path) {
return null;
}
const kind = safeProgressText(change?.kind, 40) || "update";
return {
id: `${safeProgressText(params?.itemId, 80) || "file-change"}-${index + 1}`,
path,
kind,
status: "updated",
};
})
.filter(Boolean);
}
function normalizeThreadStatusSnapshot(status) {
if (!status || typeof status !== "object") {
return null;
}
const type = safeProgressText(status.type, 40);
if (!type) {
return null;
}
const activeFlags = asArray(status.activeFlags)
.map((flag) => safeProgressText(flag, 60))
.filter(Boolean)
.slice(0, 6);
return {
type,
...(activeFlags.length > 0 ? { activeFlags } : {}),
...(activeFlags.includes("waitingOnApproval") ? { waitingOnApproval: true } : {}),
...(activeFlags.includes("waitingOnUserInput") ? { waitingOnUserInput: true } : {}),
};
}
function extractThreadGoalSnapshot(goal) {
if (!goal || typeof goal !== "object") {
return null;
}
const status = safeProgressText(goal.status, 40);
const objective = safeProgressText(goal.objective, 240);
if (!status && !objective) {
return null;
}
return {
...(objective ? { objective } : {}),
status: status || "active",
...(extractNumber(goal.tokenBudget) !== undefined ? { tokenBudget: extractNumber(goal.tokenBudget) } : {}),
...(extractNumber(goal.tokensUsed) !== undefined ? { tokensUsed: extractNumber(goal.tokensUsed) } : {}),
...(extractNumber(goal.timeUsedSeconds) !== undefined
? { timeUsedSeconds: extractNumber(goal.timeUsedSeconds) }
: {}),
};
}
function extractSandboxPolicyName(sandboxPolicy) {
if (typeof sandboxPolicy === "string") {
return sandboxPolicy;
}
if (sandboxPolicy && typeof sandboxPolicy === "object") {
return sandboxPolicy.type;
}
return "";
}
function extractCollaborationModeName(collaborationMode) {
if (typeof collaborationMode === "string") {
return collaborationMode;
}
if (collaborationMode && typeof collaborationMode === "object") {
return collaborationMode.mode;
}
return "";
}
function extractThreadSettingsSnapshot(settings) {
if (!settings || typeof settings !== "object") {
return null;
}
const snapshot = {
model: safeProgressText(settings.model, 80) || undefined,
modelProvider: safeProgressText(settings.modelProvider, 80) || undefined,
approvalPolicy:
safeProgressText(typeof settings.approvalPolicy === "string" ? settings.approvalPolicy : "", 80) || undefined,
approvalsReviewer: safeProgressText(settings.approvalsReviewer, 80) || undefined,
sandboxPolicy: safeProgressText(extractSandboxPolicyName(settings.sandboxPolicy), 80) || undefined,
permissionProfile: safeProgressText(settings.activePermissionProfile?.id, 80) || undefined,
serviceTier: safeProgressText(settings.serviceTier, 80) || undefined,
effort: safeProgressText(settings.effort, 80) || undefined,
summary: safeProgressText(settings.summary, 80) || undefined,
collaborationMode: safeProgressText(extractCollaborationModeName(settings.collaborationMode), 80) || undefined,
personality: safeProgressText(settings.personality, 80) || undefined,
};
return Object.values(snapshot).some(Boolean) ? snapshot : null;
}
function extractTokenUsageSnapshot(tokenUsage) {
const total = tokenUsage?.total && typeof tokenUsage.total === "object" ? tokenUsage.total : {};
const totalTokens = extractNumber(total.totalTokens);
if (totalTokens === undefined) {
return null;
}
const modelContextWindow = extractNumber(tokenUsage?.modelContextWindow);
const contextPercent =
modelContextWindow && modelContextWindow > 0
? Math.max(0, Math.min(100, Math.ceil((totalTokens / modelContextWindow) * 100)))
: undefined;
return {
totalTokens,
inputTokens: extractNumber(total.inputTokens),
cachedInputTokens: extractNumber(total.cachedInputTokens),
outputTokens: extractNumber(total.outputTokens),
reasoningOutputTokens: extractNumber(total.reasoningOutputTokens),
modelContextWindow,
contextPercent,
};
}
function extractRateLimitWindowSnapshot(window) {
if (!window || typeof window !== "object") {
return {};
}
return {
usedPercent: extractNumber(window.usedPercent),
windowDurationMins: extractNumber(window.windowDurationMins),
resetsAt: extractNumber(window.resetsAt),
};
}
function extractAccountRateLimitSnapshot(rateLimits) {
if (!rateLimits || typeof rateLimits !== "object") {
return null;
}
const primary = extractRateLimitWindowSnapshot(rateLimits.primary);
const credits = rateLimits.credits && typeof rateLimits.credits === "object" ? rateLimits.credits : {};
const snapshot = Object.fromEntries(Object.entries({
limitId: safeProgressText(rateLimits.limitId, 80),
limitName: safeProgressText(rateLimits.limitName, 80),
planType: safeProgressText(rateLimits.planType, 80),
rateLimitReachedType: safeProgressText(rateLimits.rateLimitReachedType, 80),
usedPercent: primary.usedPercent,
windowDurationMins: primary.windowDurationMins,
resetsAt: primary.resetsAt,
creditsBalance: safeProgressText(credits.balance, 80),
...(typeof credits.hasCredits === "boolean" ? { hasCredits: credits.hasCredits } : {}),
...(typeof credits.unlimited === "boolean" ? { unlimitedCredits: credits.unlimited } : {}),
}).filter(([, value]) => value !== undefined && value !== ""));
return Object.values(snapshot).some((value) => value !== undefined && value !== "") ? snapshot : null;
}
function extractModelVerificationSnapshot(params) {
const verifications = asArray(params?.verifications)
.map((verification) => safeProgressText(verification, 120))
.filter(Boolean)
.slice(0, 8);
return verifications.length > 0 ? { verifications } : null;
}
function extractWindowsSandboxSnapshot(params) {
if (!params || typeof params !== "object") {
return null;
}
const status =
typeof params.success === "boolean"
? params.success
? "ready"
: "failed"
: safeProgressText(params.status, 60);
const setupMode = safeProgressText(params.mode ?? params.setupMode, 60);
const error = safeRuntimeDiagnosticText(params.error, 180);
const snapshot = {
status: status || undefined,
setupMode: setupMode || undefined,
error: error || undefined,
};
return Object.values(snapshot).some((value) => value !== undefined && value !== "") ? snapshot : null;
}
function buildNoticeWarningMessage(summary, details) {
const cleanSummary = safeProgressText(summary, 140);
const cleanDetails = safeProgressText(details, 180);
if (!cleanSummary) {
return cleanDetails;
}
return cleanDetails ? `${cleanSummary}${cleanDetails}` : cleanSummary;
}
function buildServerRequestFallbackResponse(message) {
const method = String(message?.method ?? "");
if (/commandExecution\/requestApproval|execCommandApproval/i.test(method)) {
return { result: { decision: "cancel" } };
}
if (/fileChange\/requestApproval|applyPatchApproval/i.test(method)) {
return { result: { decision: "cancel" } };
}
return {
error: {
code: -32000,
message: "BOSS_APPROVAL_REQUIRES_USER_ACTION",
},
};
}
function normalizeDiscoveryModel(model) {
const id = trimToDefined(model?.id) || trimToDefined(model?.model);
if (!id) {
return null;
}
return {
id,
model: trimToDefined(model?.model) || id,
displayName: trimToDefined(model?.displayName) || id,
description: safeProgressText(model?.description, 160),
hidden: Boolean(model?.hidden),
isDefault: Boolean(model?.isDefault),
supportsPersonality: Boolean(model?.supportsPersonality),
supportedReasoningEfforts: asArray(model?.supportedReasoningEfforts).map(String).slice(0, 8),
inputModalities: asArray(model?.inputModalities).map(String).slice(0, 8),
};
}
function pickFastModelId(models) {
const fast = models.find((model) =>
/mini|fast|flash|lite|haiku/i.test(`${model.id} ${model.displayName} ${model.description}`),
);
return fast?.id || models[0]?.id || "";
}
function normalizeDiscoverySkills(result) {
return asArray(result?.data)
.flatMap((entry) => asArray(entry?.skills))
.map((skill) => {
const name = trimToDefined(skill?.name);
if (!name) return null;
return {
name,
description: safeProgressText(skill?.description || skill?.shortDescription, 180),
scope: trimToDefined(skill?.scope),
enabled: skill?.enabled !== false,
};
})
.filter(Boolean);
}
function normalizeDiscoverySkillExtraRootsSummary(roots, result) {
const normalizedRoots = asArray(roots)
.map((root) => trimToDefined(root))
.filter(Boolean);
if (normalizedRoots.length === 0) {
return undefined;
}
const rootLabels = normalizedRoots.map((root) => basename(root)).filter(Boolean).slice(0, 8);
return {
configured: true,
status: result?.__bossError ? "failed" : safeProgressText(result?.status, 40) || "applied",
rootCount: normalizedRoots.length,
rootLabels,
};
}
function normalizeDiscoveryHookSummary(result) {
const entries = asArray(result?.data);
const hooks = entries.flatMap((entry) => asArray(entry?.hooks));
const eventNames = Array.from(
new Set(hooks.map((hook) => safeProgressText(hook?.eventName, 80)).filter(Boolean)),
).sort();
const handlerTypes = Array.from(
new Set(hooks.map((hook) => safeProgressText(hook?.handlerType, 80)).filter(Boolean)),
).sort();
return {
workspaceCount: entries.length,
hookCount: hooks.length,
enabledHookCount: hooks.filter((hook) => hook?.enabled !== false).length,
managedHookCount: hooks.filter((hook) => Boolean(hook?.isManaged)).length,
trustedHookCount: hooks.filter((hook) => safeProgressText(hook?.trustStatus, 40) === "trusted").length,
modifiedHookCount: hooks.filter((hook) => safeProgressText(hook?.trustStatus, 40) === "modified").length,
untrustedHookCount: hooks.filter((hook) => safeProgressText(hook?.trustStatus, 40) === "untrusted").length,
warningCount: entries.reduce((sum, entry) => sum + asArray(entry?.warnings).length, 0),
errorCount: entries.reduce((sum, entry) => sum + asArray(entry?.errors).length, 0),
eventNames: eventNames.slice(0, 12),
handlerTypes: handlerTypes.slice(0, 12),
};
}
function normalizeDiscoveryThreadActionSummary() {
const actions = [
{ label: "归档", group: "lifecycle" },
{ label: "恢复", group: "lifecycle" },
{ label: "分叉", group: "lifecycle" },
{ label: "压缩", group: "lifecycle" },
{ label: "回滚", group: "lifecycle" },
{ label: "改名", group: "metadata" },
{ label: "元数据", group: "metadata" },
{ label: "活跃干预", group: "liveTurn" },
{ label: "中断", group: "liveTurn" },
{ label: "Shell", group: "shell" },
{ label: "取消订阅", group: "subscription" },
];
return {
actionCount: actions.length,
lifecycleActionCount: actions.filter((action) => action.group === "lifecycle").length,
metadataActionCount: actions.filter((action) => action.group === "metadata").length,
liveTurnActionCount: actions.filter((action) => action.group === "liveTurn").length,
shellActionAvailable: actions.some((action) => action.group === "shell"),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryPluginGovernanceSummary() {
const actions = [
{ label: "安装", group: "lifecycle" },
{ label: "卸载", group: "lifecycle" },
{ label: "读取", group: "read" },
{ label: "Skill 读取", group: "read", skillRead: true },
{ label: "共享保存", group: "share" },
{ label: "共享拉取", group: "share" },
{ label: "共享删除", group: "share" },
{ label: "共享目标", group: "share" },
{ label: "共享列表", group: "read" },
];
return {
actionCount: actions.length,
lifecycleActionCount: actions.filter((action) => action.group === "lifecycle").length,
shareActionCount: actions.filter((action) => action.group === "share").length,
readActionCount: actions.filter((action) => action.group === "read").length,
skillReadAvailable: actions.some((action) => action.skillRead === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryAccountGovernanceSummary() {
const actions = [
{ label: "登录开始", group: "login" },
{ label: "登录取消", group: "login" },
{ label: "登录完成", group: "login" },
{ label: "退出登录", group: "session" },
{ label: "刷新令牌", group: "token", tokenRefresh: true },
{ label: "额度提醒", group: "billing", billingNudge: true },
];
return {
actionCount: actions.length,
loginActionCount: actions.filter((action) => action.group === "login").length,
sessionActionCount: actions.filter((action) => action.group === "session").length,
tokenRefreshAvailable: actions.some((action) => action.tokenRefresh === true),
billingNudgeAvailable: actions.some((action) => action.billingNudge === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryConfigGovernanceSummary() {
const actions = [
{ label: "配置读取", group: "read", read: true },
{ label: "单项写入", group: "write" },
{ label: "批量写入", group: "write" },
{ label: "MCP 重载", group: "reload" },
{ label: "Skill 配置", group: "write" },
];
return {
actionCount: actions.length,
writeActionCount: actions.filter((action) => action.group === "write").length,
reloadActionCount: actions.filter((action) => action.group === "reload").length,
readActionAvailable: actions.some((action) => action.read === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryFileSystemGovernanceSummary() {
const actions = [
{ label: "读取文件", group: "read" },
{ label: "读取目录", group: "read" },
{ label: "元数据", group: "read" },
{ label: "写入文件", group: "write" },
{ label: "创建目录", group: "write" },
{ label: "复制", group: "write" },
{ label: "删除", group: "destructive" },
{ label: "监听", group: "watch" },
{ label: "取消监听", group: "watch" },
];
return {
actionCount: actions.length,
readActionCount: actions.filter((action) => action.group === "read").length,
writeActionCount: actions.filter((action) => action.group === "write").length,
destructiveActionCount: actions.filter((action) => action.group === "destructive").length,
watchActionCount: actions.filter((action) => action.group === "watch").length,
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryCommandSessionSummary() {
const actions = [
{ label: "执行命令", group: "run" },
{ label: "写入 stdin", group: "control" },
{ label: "调整 PTY", group: "control" },
{ label: "终止命令", group: "control", termination: true },
{ label: "输出流", group: "stream", stream: true },
];
return {
actionCount: actions.length,
controlActionCount: actions.filter((action) => action.group === "control").length,
streamAvailable: actions.some((action) => action.stream === true),
terminationAvailable: actions.some((action) => action.termination === true),
sandboxedCommandAvailable: actions.some((action) => action.group === "run"),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryExternalAgentGovernanceSummary() {
const actions = [
{ label: "迁移检测", group: "detect", detect: true },
{ label: "迁移导入", group: "import" },
{ label: "导入完成", group: "notify" },
];
return {
actionCount: actions.length,
importActionCount: actions.filter((action) => action.group === "import").length,
notificationActionCount: actions.filter((action) => action.group === "notify").length,
detectActionAvailable: actions.some((action) => action.detect === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryMarketplaceGovernanceSummary() {
const actions = [
{ label: "添加市场", group: "write" },
{ label: "移除市场", group: "write" },
{ label: "升级市场", group: "write", upgrade: true },
];
return {
actionCount: actions.length,
writeActionCount: actions.filter((action) => action.group === "write").length,
upgradeAvailable: actions.some((action) => action.upgrade === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryExperimentalFeatureGovernanceSummary() {
const actions = [
{ label: "实验列表", group: "read", list: true },
{ label: "启用设置", group: "write" },
];
return {
actionCount: actions.length,
writeActionCount: actions.filter((action) => action.group === "write").length,
listAvailable: actions.some((action) => action.list === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryReviewGovernanceSummary() {
const actions = [{ label: "启动审查", group: "review", reviewStart: true }];
return {
actionCount: actions.length,
reviewStartAvailable: actions.some((action) => action.reviewStart === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryWindowsSandboxGovernanceSummary() {
const actions = [
{ label: "准备检查", group: "read", readiness: true },
{ label: "启动设置", group: "write", setupStart: true },
{ label: "设置完成", group: "notify" },
];
return {
actionCount: actions.length,
setupActionCount: actions.filter((action) => action.group === "write").length,
readinessAvailable: actions.some((action) => action.readiness === true),
notificationAvailable: actions.some((action) => action.group === "notify"),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryFuzzyFileSearchSummary() {
const events = [
{ label: "搜索更新", group: "event" },
{ label: "搜索完成", group: "event", completed: true },
];
return {
eventCount: events.length,
completedEventAvailable: events.some((event) => event.completed === true),
notificationOnly: true,
labels: events.map((event) => event.label),
};
}
function normalizeDiscoveryMcpGovernanceSummary() {
const actions = [
{ label: "OAuth 登录", group: "oauth" },
{ label: "OAuth 完成", group: "oauth" },
{ label: "资源读取", group: "resource" },
{ label: "工具调用", group: "tool" },
{ label: "交互请求", group: "elicitation", elicitation: true },
];
return {
actionCount: actions.length,
oauthActionCount: actions.filter((action) => action.group === "oauth").length,
resourceActionCount: actions.filter((action) => action.group === "resource").length,
toolActionCount: actions.filter((action) => action.group === "tool").length,
elicitationAvailable: actions.some((action) => action.elicitation === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryUserInteractionGovernanceSummary() {
const actions = [{ label: "请求用户输入", group: "input", requestUserInput: true }];
return {
actionCount: actions.length,
requestUserInputAvailable: actions.some((action) => action.requestUserInput === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryGuardianGovernanceSummary() {
const actions = [
{ label: "Guardian 放行", group: "approval" },
{ label: "权限请求", group: "permissionEvent", permissionRequest: true },
];
return {
actionCount: actions.length,
approvalActionCount: actions.filter((action) => action.group === "approval").length,
permissionRequestEventAvailable: actions.some((action) => action.permissionRequest === true),
userInitiatedOnly: true,
labels: actions.map((action) => action.label),
};
}
function normalizeDiscoveryRuntimeEventSummary() {
const events = [
{ label: "进程输出", group: "process" },
{ label: "进程退出", group: "process" },
{ label: "原始响应完成", group: "rawResponse", rawResponse: true },
];
return {
eventCount: events.length,
processEventCount: events.filter((event) => event.group === "process").length,
rawResponseEventAvailable: events.some((event) => event.rawResponse === true),
notificationOnly: true,
labels: events.map((event) => event.label),
};
}
function normalizeDiscoveryExtensionEventSummary() {
const events = [
{ label: "Skill 变更", group: "skill", skillChange: true },
{ label: "插件安装", group: "plugin", pluginInstall: true },
];
return {
eventCount: events.length,
skillChangeEventAvailable: events.some((event) => event.skillChange === true),
pluginInstallEventAvailable: events.some((event) => event.pluginInstall === true),
notificationOnly: true,
labels: events.map((event) => event.label),
};
}
function normalizeDiscoveryThreadLifecycleEventSummary() {
const events = [
{ label: "线程启动", group: "start" },
{ label: "线程关闭", group: "close", close: true },
{ label: "已归档", group: "archive" },
{ label: "已恢复", group: "archive" },
{ label: "改名完成", group: "name", name: true },
];
return {
eventCount: events.length,
archiveEventCount: events.filter((event) => event.group === "archive").length,
nameEventAvailable: events.some((event) => event.name === true),
closeEventAvailable: events.some((event) => event.close === true),
notificationOnly: true,
labels: events.map((event) => event.label),
};
}
function normalizeDiscoveryStreamDeltaEventSummary() {
const events = [
{ label: "Agent 增量", group: "agent" },
{ label: "计划增量", group: "plan" },
{ label: "思考新增", group: "reasoning" },
{ label: "思考文本", group: "reasoning" },
{ label: "原始思考", group: "reasoning" },
{ label: "MCP 进度", group: "tool", toolProgress: true },
{ label: "命令输出", group: "command" },
{ label: "终端交互", group: "command" },
{ label: "文件输出", group: "file", fileChangeOutput: true },
];
return {
eventCount: events.length,
reasoningDeltaEventCount: events.filter((event) => event.group === "reasoning").length,
commandStreamEventCount: events.filter((event) => event.group === "command").length,
toolProgressEventAvailable: events.some((event) => event.toolProgress === true),
fileChangeOutputEventAvailable: events.some((event) => event.fileChangeOutput === true),
notificationOnly: true,
labels: events.map((event) => event.label),
};
}
function normalizeDiscoveryPlugins(result) {
return asArray(result?.marketplaces)
.flatMap((marketplace) => asArray(marketplace?.plugins))
.map((plugin) => {
const id = trimToDefined(plugin?.id) || trimToDefined(plugin?.name);
if (!id) return null;
return {
id,
name: trimToDefined(plugin?.name) || id,
installed: Boolean(plugin?.installed),
enabled: plugin?.enabled !== false,
localVersion: trimToDefined(plugin?.localVersion),
};
})
.filter(Boolean);
}
function normalizeDiscoveryApps(result) {
return asArray(result?.data)
.map((app) => {
const id = trimToDefined(app?.id) || trimToDefined(app?.name);
if (!id) return null;
return {
id,
name: trimToDefined(app?.name) || id,
description: safeProgressText(app?.description, 160),
isAccessible: app?.isAccessible !== false,
isEnabled: app?.isEnabled !== false,
pluginDisplayNames: asArray(app?.pluginDisplayNames).map(String).slice(0, 8),
};
})
.filter(Boolean);
}
function normalizeDiscoveryExperimentalFeatures(result) {
return asArray(result?.data)
.map((feature) => {
const name = trimToDefined(feature?.name);
if (!name) return null;
return {
name,
stage: safeProgressText(feature?.stage, 48) || "unknown",
displayName: trimToDefined(feature?.displayName) || name,
enabled: Boolean(feature?.enabled),
defaultEnabled: Boolean(feature?.defaultEnabled),
};
})
.filter(Boolean);
}
function normalizeDiscoveryCollaborationModes(result) {
const modes = Array.isArray(result)
? result
: asArray(result?.data).length > 0
? asArray(result?.data)
: asArray(result?.modes ?? result?.collaborationModes);
return modes
.map((mode) => {
const id =
trimToDefined(mode?.id) ||
trimToDefined(mode?.name) ||
trimToDefined(mode?.mode) ||
(typeof mode === "string" ? trimToDefined(mode) : undefined);
if (!id) return null;
return {
id,
name: trimToDefined(mode?.name) || id,
displayName: trimToDefined(mode?.displayName) || trimToDefined(mode?.title) || id,
description: safeRuntimeDiagnosticText(mode?.description, 160),
};
})
.filter(Boolean);
}
function normalizeDiscoveryPermissionProfiles(result) {
return asArray(result?.data)
.map((profile) => {
const id = trimToDefined(profile?.id) || trimToDefined(profile?.name);
if (!id) return null;
return {
id,
description: safeRuntimeDiagnosticText(profile?.description, 160),
};
})
.filter(Boolean);
}
function normalizeDiscoveryMcpServers(result) {
return asArray(result?.data)
.map((server) => {
const name = trimToDefined(server?.name);
if (!name) return null;
const toolCount =
server?.tools && typeof server.tools === "object" && !Array.isArray(server.tools)
? Object.keys(server.tools).length
: asArray(server?.tools).length;
return {
name,
authStatus: safeProgressText(server?.authStatus, 64) || "unknown",
toolCount,
resourceCount: asArray(server?.resources).length,
resourceTemplateCount: asArray(server?.resourceTemplates).length,
};
})
.filter(Boolean);
}
function normalizeDiscoveryAccountSummary(result) {
const account = result?.account && typeof result.account === "object" ? result.account : null;
const authMode = trimToDefined(account?.type) || trimToDefined(result?.authMode) || "none";
return {
signedIn: Boolean(account),
authMode,
planType: safeProgressText(account?.planType ?? result?.planType, 64) || "",
requiresOpenaiAuth: Boolean(result?.requiresOpenaiAuth),
};
}
function normalizeDiscoveryRateLimitSummary(result) {
const buckets = Object.values(
result?.rateLimitsByLimitId && typeof result.rateLimitsByLimitId === "object"
? result.rateLimitsByLimitId
: {},
);
const rateLimits = buckets.length > 0 ? buckets : result?.rateLimits ? [result.rateLimits] : [];
const usedPercents = rateLimits
.map((bucket) => Number(bucket?.primary?.usedPercent))
.filter((value) => Number.isFinite(value))
.map((value) => Math.max(0, Math.min(100, Math.round(value))));
const reached = rateLimits.some((bucket) => Boolean(trimToDefined(bucket?.rateLimitReachedType)));
return {
bucketCount: rateLimits.length,
maxUsedPercent: usedPercents.length > 0 ? Math.max(...usedPercents) : 0,
reached,
};
}
function normalizeDiscoveryAppConfigSummary(result) {
const apps = result?.config?.apps && typeof result.config.apps === "object" ? result.config.apps : {};
const defaultConfig = apps._default && typeof apps._default === "object" ? apps._default : {};
const appEntries = Object.entries(apps).filter(([key]) => key !== "_default");
return {
appCount: appEntries.length,
enabledAppCount: appEntries.filter(([, value]) => value && typeof value === "object" && value.enabled === true)
.length,
defaultEnabled: defaultConfig.enabled !== false,
destructiveEnabled: Boolean(defaultConfig.destructive_enabled),
openWorldEnabled: Boolean(defaultConfig.open_world_enabled),
};
}
function normalizeDiscoveryConfigRequirements(result) {
return {
managed: Boolean(result?.managed),
requirementCount: asArray(result?.requirements).length,
warningCount: asArray(result?.warnings).length,
};
}
function normalizeDiscoveryExternalAgentMigration(result) {
const items = asArray(result?.items);
return {
itemCount: items.length,
homeItemCount: items.filter((item) => item?.cwd === null).length,
projectItemCount: items.filter((item) => trimToDefined(item?.cwd)).length,
itemTypes: Array.from(new Set(items.map((item) => trimToDefined(item?.itemType)).filter(Boolean))).sort(),
};
}
function extractDiscoveryThreadStatus(thread) {
const status = thread?.status && typeof thread.status === "object" ? thread.status : thread?.status;
return safeProgressText(
typeof status === "string"
? status
: status?.type ?? status?.state ?? thread?.state ?? (thread?.archived ? "archived" : "unknown"),
40,
);
}
function extractLoadedThreadIds(result) {
const candidates = [
...asArray(result?.threadIds),
...asArray(result?.loadedThreadIds),
...asArray(result?.data).map((item) => item?.id ?? item?.threadId ?? item),
...asArray(result?.threads).map((item) => item?.id ?? item?.threadId ?? item),
];
return new Set(candidates.map((item) => trimToDefined(item)).filter(Boolean));
}
function normalizeDiscoveryThreadSummary(threadListResult, loadedListResult, limit) {
const loadedThreadIds = extractLoadedThreadIds(loadedListResult);
const rawThreads = asArray(threadListResult?.data).length > 0
? asArray(threadListResult?.data)
: asArray(threadListResult?.threads);
const threads = rawThreads
.map((thread) => {
const id = trimToDefined(thread?.id ?? thread?.threadId);
if (!id) return null;
const archived = Boolean(thread?.archived);
return {
id,
name: safeRuntimeDiagnosticText(thread?.name ?? thread?.title, 120) || id,
sourceKind: safeProgressText(thread?.sourceKind ?? thread?.source?.kind, 48) || "unknown",
status: extractDiscoveryThreadStatus(thread),
archived,
loaded: loadedThreadIds.has(id) || Boolean(thread?.loaded),
updatedAt: trimToDefined(thread?.updatedAt ?? thread?.lastUpdatedAt ?? thread?.createdAt) || "",
};
})
.filter(Boolean);
const sourceKinds = Array.from(new Set(threads.map((thread) => thread.sourceKind).filter(Boolean))).sort();
const latestUpdatedAt = threads
.map((thread) => thread.updatedAt)
.filter(Boolean)
.sort()
.at(-1) || "";
return {
threadCount: threads.length,
loadedThreadCount: threads.filter((thread) => thread.loaded).length,
activeThreadCount: threads.filter((thread) => /active|running|streaming/i.test(thread.status)).length,
archivedThreadCount: threads.filter((thread) => thread.archived).length,
latestUpdatedAt,
sourceKinds,
visibleThreads: threads
.filter((thread) => !thread.archived)
.sort((left, right) => String(right.updatedAt).localeCompare(String(left.updatedAt)))
.slice(0, limit),
};
}
function extractDiscoveryTurnStatus(turn) {
const status = turn?.status && typeof turn.status === "object" ? turn.status : turn?.status;
return safeProgressText(
typeof status === "string"
? status
: status?.type ?? status?.state ?? turn?.state ?? "unknown",
40,
);
}
function normalizeDiscoveryThreadTurnSummary(threadTurnResults, limit) {
const threadSummaries = threadTurnResults
.map(({ threadId, result }) => {
const rawTurns = asArray(result?.data).length > 0 ? asArray(result?.data) : asArray(result?.turns);
const turns = rawTurns
.map((turn) => {
const status = extractDiscoveryTurnStatus(turn);
return {
status,
updatedAt: trimToDefined(turn?.updatedAt ?? turn?.lastUpdatedAt ?? turn?.createdAt) || "",
};
})
.filter((turn) => turn.status || turn.updatedAt);
const latestTurn = turns
.slice()
.sort((left, right) => String(right.updatedAt).localeCompare(String(left.updatedAt)))[0];
return {
threadId,
turnCount: turns.length,
runningTurnCount: turns.filter((turn) => /active|running|streaming|in[_-]?progress/i.test(turn.status)).length,
completedTurnCount: turns.filter((turn) => /completed|complete|success|succeeded/i.test(turn.status)).length,
latestTurnStatus: latestTurn?.status || "unknown",
latestTurnUpdatedAt: latestTurn?.updatedAt || "",
};
})
.sort((left, right) => String(right.latestTurnUpdatedAt).localeCompare(String(left.latestTurnUpdatedAt)))
.slice(0, limit);
const latestUpdatedAt = threadSummaries
.map((thread) => thread.latestTurnUpdatedAt)
.filter(Boolean)
.sort()
.at(-1) || "";
return {
threadCount: threadSummaries.length,
totalTurnCount: threadSummaries.reduce((sum, thread) => sum + thread.turnCount, 0),
runningTurnCount: threadSummaries.reduce((sum, thread) => sum + thread.runningTurnCount, 0),
completedTurnCount: threadSummaries.reduce((sum, thread) => sum + thread.completedTurnCount, 0),
latestUpdatedAt,
threads: threadSummaries,
};
}
async function withCodexAppServerRpcSession(runnerConfig, callback) {
const cwd = runnerConfig.cwd || process.cwd();
let closed = false;
let rpcTransport;
let nextId = 1;
const pending = new Map();
const timeout = setTimeout(() => {
for (const { reject } of pending.values()) {
reject(new Error("CODEX_APP_SERVER_DISCOVERY_TIMEOUT"));
}
pending.clear();
rpcTransport?.close?.("SIGKILL");
}, Math.min(runnerConfig.timeoutMs, 10_000));
const request = (method, params = {}) =>
new Promise((resolveRequest, rejectRequest) => {
if (closed) {
rejectRequest(new Error("CODEX_APP_SERVER_CLOSED"));
return;
}
const id = nextId++;
pending.set(id, { resolve: resolveRequest, reject: rejectRequest });
rpcTransport.send(JSON.stringify({ method, id, params }), (error) => {
if (error) {
pending.delete(id);
rejectRequest(error);
}
});
});
const notify = (method, params = {}) => {
if (!closed) {
rpcTransport.send(JSON.stringify({ method, params }));
}
};
try {
rpcTransport = await openCodexAppServerTransport(runnerConfig, cwd, {
onLine(line) {
if (!line.trim()) return;
let message;
try {
message = JSON.parse(line);
} catch {
return;
}
if (!Object.hasOwn(message, "id")) return;
const pendingRequest = pending.get(message.id);
if (!pendingRequest) return;
pending.delete(message.id);
if (message.error) {
pendingRequest.reject(new Error(message.error.message || JSON.stringify(message.error)));
} else {
pendingRequest.resolve(message.result ?? {});
}
},
onError(error) {
closed = true;
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
},
onClose({ code, message }) {
closed = true;
const error = new Error(message || `CODEX_APP_SERVER_EXITED:${code ?? "unknown"}`);
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
},
});
await request("initialize", {
clientInfo: {
name: runnerConfig.clientName,
title: runnerConfig.clientTitle,
version: runnerConfig.clientVersion,
},
capabilities: {
experimentalApi: true,
},
});
notify("initialized", {});
return await callback(request);
} finally {
clearTimeout(timeout);
rpcTransport?.close?.("SIGTERM");
}
}
export async function discoverCodexAppServerCapabilities(runnerConfig) {
if (!runnerConfig?.enabled || runnerConfig.discoveryEnabled === false) {
return undefined;
}
const safeRequest = async (request, method, params = {}) => {
try {
return await request(method, params);
} catch (error) {
return { __bossError: error instanceof Error ? error.message : String(error) };
}
};
return withCodexAppServerRpcSession(runnerConfig, async (request) => {
const limit = runnerConfig.discoveryLimit ?? 20;
const cwd = runnerConfig.cwd || process.cwd();
const skillExtraRoots = asArray(runnerConfig.skillExtraRoots).filter(Boolean);
const skillExtraRootsResult =
skillExtraRoots.length > 0
? await safeRequest(request, "skills/extraRoots/set", { extraRoots: skillExtraRoots })
: undefined;
const [
modelResult,
providerCapabilities,
skillsResult,
hooksResult,
pluginResult,
appsResult,
experimentalFeaturesResult,
collaborationModesResult,
permissionProfilesResult,
mcpServersResult,
accountResult,
rateLimitsResult,
configResult,
configRequirementsResult,
externalAgentConfigResult,
threadListResult,
loadedThreadsResult,
] = await Promise.all([
safeRequest(request, "model/list", { includeHidden: false, limit }),
safeRequest(request, "modelProvider/capabilities/read", {}),
safeRequest(request, "skills/list", { cwds: [cwd], forceReload: false }),
safeRequest(request, "hooks/list", { cwds: [cwd] }),
safeRequest(request, "plugin/list", { cwds: [cwd] }),
safeRequest(request, "app/list", { limit }),
safeRequest(request, "experimentalFeature/list", { limit }),
safeRequest(request, "collaborationMode/list", {}),
safeRequest(request, "permissionProfile/list", { cwd, limit }),
safeRequest(request, "mcpServerStatus/list", { limit, detail: "toolsAndAuthOnly" }),
safeRequest(request, "account/read", { refreshToken: false }),
safeRequest(request, "account/rateLimits/read"),
safeRequest(request, "config/read", { includeLayers: false }),
safeRequest(request, "configRequirements/read"),
safeRequest(request, "externalAgentConfig/detect", { includeHome: true, cwds: [cwd] }),
safeRequest(request, "thread/list", { cwd, limit }),
safeRequest(request, "thread/loaded/list", { limit }),
]);
const models = asArray(modelResult?.data)
.map(normalizeDiscoveryModel)
.filter(Boolean)
.slice(0, limit);
const defaultModelId = models.find((model) => model.isDefault)?.id || models[0]?.id || "";
const fastModelId = pickFastModelId(models);
const deepModelId = models.find((model) => model.id !== fastModelId)?.id || defaultModelId;
const threadSummary = normalizeDiscoveryThreadSummary(threadListResult, loadedThreadsResult, limit);
const turnProbeLimit = Math.min(limit, 10);
const threadTurnResults = await Promise.all(
asArray(threadSummary.visibleThreads)
.slice(0, turnProbeLimit)
.map(async (thread) => ({
threadId: thread.id,
result: await safeRequest(request, "thread/turns/list", {
threadId: thread.id,
limit: turnProbeLimit,
sortDirection: "desc",
itemsView: "notLoaded",
}),
})),
);
return {
version: trimToDefined(runnerConfig.version),
discoveredAt: new Date().toISOString(),
models,
defaultModelId,
fastModelId,
deepModelId,
providerCapabilities: {
namespaceTools: Boolean(providerCapabilities?.namespaceTools),
imageGeneration: Boolean(providerCapabilities?.imageGeneration),
webSearch: Boolean(providerCapabilities?.webSearch),
},
skills: normalizeDiscoverySkills(skillsResult).slice(0, limit),
plugins: normalizeDiscoveryPlugins(pluginResult).slice(0, limit),
apps: normalizeDiscoveryApps(appsResult).slice(0, limit),
experimentalFeatures: normalizeDiscoveryExperimentalFeatures(experimentalFeaturesResult).slice(0, limit),
collaborationModes: normalizeDiscoveryCollaborationModes(collaborationModesResult).slice(0, limit),
permissionProfiles: normalizeDiscoveryPermissionProfiles(permissionProfilesResult).slice(0, limit),
mcpServers: normalizeDiscoveryMcpServers(mcpServersResult).slice(0, limit),
accountSummary: normalizeDiscoveryAccountSummary(accountResult),
rateLimitSummary: normalizeDiscoveryRateLimitSummary(rateLimitsResult),
appConfigSummary: normalizeDiscoveryAppConfigSummary(configResult),
configRequirements: normalizeDiscoveryConfigRequirements(configRequirementsResult),
externalAgentMigration: normalizeDiscoveryExternalAgentMigration(externalAgentConfigResult),
skillExtraRootsSummary: normalizeDiscoverySkillExtraRootsSummary(skillExtraRoots, skillExtraRootsResult),
hookSummary: normalizeDiscoveryHookSummary(hooksResult),
threadSummary,
threadTurnSummary: normalizeDiscoveryThreadTurnSummary(threadTurnResults, limit),
threadActionSummary: normalizeDiscoveryThreadActionSummary(),
pluginGovernanceSummary: normalizeDiscoveryPluginGovernanceSummary(),
accountGovernanceSummary: normalizeDiscoveryAccountGovernanceSummary(),
configGovernanceSummary: normalizeDiscoveryConfigGovernanceSummary(),
fileSystemGovernanceSummary: normalizeDiscoveryFileSystemGovernanceSummary(),
commandSessionSummary: normalizeDiscoveryCommandSessionSummary(),
externalAgentGovernanceSummary: normalizeDiscoveryExternalAgentGovernanceSummary(),
marketplaceGovernanceSummary: normalizeDiscoveryMarketplaceGovernanceSummary(),
experimentalFeatureGovernanceSummary: normalizeDiscoveryExperimentalFeatureGovernanceSummary(),
reviewGovernanceSummary: normalizeDiscoveryReviewGovernanceSummary(),
windowsSandboxGovernanceSummary: normalizeDiscoveryWindowsSandboxGovernanceSummary(),
fuzzyFileSearchSummary: normalizeDiscoveryFuzzyFileSearchSummary(),
mcpGovernanceSummary: normalizeDiscoveryMcpGovernanceSummary(),
userInteractionGovernanceSummary: normalizeDiscoveryUserInteractionGovernanceSummary(),
guardianGovernanceSummary: normalizeDiscoveryGuardianGovernanceSummary(),
runtimeEventSummary: normalizeDiscoveryRuntimeEventSummary(),
extensionEventSummary: normalizeDiscoveryExtensionEventSummary(),
threadLifecycleEventSummary: normalizeDiscoveryThreadLifecycleEventSummary(),
streamDeltaEventSummary: normalizeDiscoveryStreamDeltaEventSummary(),
errors: [
modelResult?.__bossError ? `model/list:${safeRuntimeDiagnosticText(modelResult.__bossError)}` : undefined,
providerCapabilities?.__bossError
? `modelProvider/capabilities/read:${safeRuntimeDiagnosticText(providerCapabilities.__bossError)}`
: undefined,
skillsResult?.__bossError ? `skills/list:${safeRuntimeDiagnosticText(skillsResult.__bossError)}` : undefined,
skillExtraRootsResult?.__bossError
? `skills/extraRoots/set:${safeRuntimeDiagnosticText(skillExtraRootsResult.__bossError)}`
: undefined,
hooksResult?.__bossError ? `hooks/list:${safeRuntimeDiagnosticText(hooksResult.__bossError)}` : undefined,
pluginResult?.__bossError ? `plugin/list:${safeRuntimeDiagnosticText(pluginResult.__bossError)}` : undefined,
appsResult?.__bossError ? `app/list:${safeRuntimeDiagnosticText(appsResult.__bossError)}` : undefined,
experimentalFeaturesResult?.__bossError
? `experimentalFeature/list:${safeRuntimeDiagnosticText(experimentalFeaturesResult.__bossError)}`
: undefined,
collaborationModesResult?.__bossError
? `collaborationMode/list:${safeRuntimeDiagnosticText(collaborationModesResult.__bossError)}`
: undefined,
permissionProfilesResult?.__bossError
? `permissionProfile/list:${safeRuntimeDiagnosticText(permissionProfilesResult.__bossError)}`
: undefined,
mcpServersResult?.__bossError
? `mcpServerStatus/list:${safeRuntimeDiagnosticText(mcpServersResult.__bossError)}`
: undefined,
accountResult?.__bossError ? `account/read:${safeRuntimeDiagnosticText(accountResult.__bossError)}` : undefined,
rateLimitsResult?.__bossError
? `account/rateLimits/read:${safeRuntimeDiagnosticText(rateLimitsResult.__bossError)}`
: undefined,
configResult?.__bossError ? `config/read:${safeRuntimeDiagnosticText(configResult.__bossError)}` : undefined,
configRequirementsResult?.__bossError
? `configRequirements/read:${safeRuntimeDiagnosticText(configRequirementsResult.__bossError)}`
: undefined,
externalAgentConfigResult?.__bossError
? `externalAgentConfig/detect:${safeRuntimeDiagnosticText(externalAgentConfigResult.__bossError)}`
: undefined,
threadListResult?.__bossError
? `thread/list:${safeRuntimeDiagnosticText(threadListResult.__bossError)}`
: undefined,
loadedThreadsResult?.__bossError
? `thread/loaded/list:${safeRuntimeDiagnosticText(loadedThreadsResult.__bossError)}`
: undefined,
...threadTurnResults
.filter((entry) => entry.result?.__bossError)
.map((entry) => `thread/turns/list:${entry.threadId}:${safeRuntimeDiagnosticText(entry.result.__bossError)}`),
].filter(Boolean),
};
});
}
function createProgressCollector() {
const steps = [];
const artifacts = [];
const agents = [];
const approvals = [];
const fileChanges = [];
const warnings = [];
const branch = {};
let threadStatus;
let realtime;
let modelRoute;
let tokenUsage;
const mcpServers = [];
let remoteControl;
let windowsSandbox;
let threadGoal;
let threadSettings;
let compaction;
let threadCollaboration;
const toolActivities = [];
let reasoningSummary;
let accountStatus;
let modelVerification;
let streamEvents;
const upsertArtifact = (artifact) => {
if (!artifact || artifacts.some((item) => item.label === artifact.label)) {
return;
}
artifacts.push(artifact);
};
const upsertAgent = (agent) => {
if (!agent || agents.some((item) => item.name === agent.name && item.role === agent.role)) {
return;
}
agents.push(agent);
};
const upsertApproval = (approval) => {
if (!approval) {
return;
}
const existing = approvals.find((item) => item.id === approval.id);
if (existing) {
Object.assign(existing, approval);
if (approval.detail === undefined && existing.detail === undefined) {
delete existing.detail;
}
if (approval.riskLevel === undefined && existing.riskLevel === undefined) {
delete existing.riskLevel;
}
return;
}
approvals.push(approval);
};
const markApprovalResolved = (requestId) => {
const normalizedId = safeProgressText(requestId, 80);
const existing = approvals.find((item) => item.id === normalizedId);
if (existing) {
existing.status = "resolved";
}
};
const upsertFileChange = (fileChange) => {
if (!fileChange || fileChanges.some((item) => item.path === fileChange.path && item.kind === fileChange.kind)) {
return;
}
fileChanges.push(fileChange);
upsertArtifact({
id: fileChange.id,
label: basename(fileChange.path),
kind: "file",
path: fileChange.path,
});
};
const pushWarning = (warning) => {
if (!warning?.message || warnings.some((item) => item.message === warning.message)) {
return;
}
warnings.push({
id: warning.id ?? `guardian-warning-${warnings.length + 1}`,
severity: warning.severity ?? "warning",
message: warning.message,
});
};
const ensureRealtime = () => {
if (!realtime) {
realtime = {
status: "streaming",
audioChunkCount: 0,
itemCount: 0,
};
}
return realtime;
};
const upsertMcpServer = (server) => {
if (!server?.name) {
return;
}
const existing = mcpServers.find((item) => item.name === server.name);
if (existing) {
Object.assign(existing, server);
return;
}
mcpServers.push(server);
};
const upsertToolActivity = (activity) => {
if (!activity?.kind || !activity?.name) {
return;
}
const existing = toolActivities.find((item) => item.kind === activity.kind && item.name === activity.name);
if (existing) {
Object.assign(existing, activity);
return;
}
if (toolActivities.length < 8) {
toolActivities.push(activity);
}
};
const ensureStreamEvents = () => {
if (!streamEvents) {
streamEvents = {
status: "streaming",
agentDeltaCount: 0,
planDeltaCount: 0,
reasoningDeltaCount: 0,
toolProgressCount: 0,
commandOutputChunkCount: 0,
terminalInteractionCount: 0,
fileOutputChunkCount: 0,
};
}
return streamEvents;
};
const incrementStreamEvent = (key) => {
const state = ensureStreamEvents();
state.status = state.status === "completed" ? "completed" : "streaming";
state[key] = Math.min(9999, Number(state[key] ?? 0) + 1);
};
return {
observe(message) {
if (!message || typeof message !== "object") {
return;
}
if (message.method === "item/agentMessage/delta") {
incrementStreamEvent("agentDeltaCount");
return;
}
if (message.method === "item/plan/delta") {
incrementStreamEvent("planDeltaCount");
return;
}
if (
message.method === "item/reasoning/summaryPartAdded" ||
message.method === "item/reasoning/summaryTextDelta" ||
message.method === "item/reasoning/textDelta"
) {
incrementStreamEvent("reasoningDeltaCount");
return;
}
if (message.method === "item/mcpToolCall/progress") {
incrementStreamEvent("toolProgressCount");
return;
}
if (message.method === "command/exec/outputDelta" || message.method === "item/commandExecution/outputDelta") {
incrementStreamEvent("commandOutputChunkCount");
return;
}
if (message.method === "item/commandExecution/terminalInteraction") {
incrementStreamEvent("terminalInteractionCount");
return;
}
if (message.method === "item/fileChange/outputDelta") {
incrementStreamEvent("fileOutputChunkCount");
return;
}
if (message.method === "turn/completed") {
if (streamEvents) {
streamEvents.status = "completed";
}
return;
}
if (message.method === "turn/plan/updated") {
const nextSteps = extractPlanItems(message.params)
.map((item, index) => {
const text = extractProgressText(item);
return text
? {
id: String(item?.id ?? `codex-plan-${index + 1}`),
text,
status: normalizeProgressStepStatus(item?.status ?? item?.state),
}
: null;
})
.filter(Boolean);
if (nextSteps.length > 0) {
steps.splice(0, steps.length, ...nextSteps.slice(0, 10));
}
return;
}
if (message.method === "turn/diff/updated") {
const diff = extractDiffBranch(message.params);
for (const key of ["changedFiles", "additions", "deletions"]) {
if (diff[key] !== undefined) {
branch[key] = diff[key];
}
}
return;
}
if (message.method === "item/completed" || message.method === "item/started") {
const item = message.params?.item;
for (const artifact of extractArtifactsFromItem(message.params?.item, artifacts.length)) {
upsertArtifact(artifact);
}
const itemPlanSteps = extractPlanStepsFromItem(item);
if (itemPlanSteps.length > 0) {
steps.splice(0, steps.length, ...itemPlanSteps);
}
const nextCollaboration = extractThreadCollaborationSnapshot(item);
if (nextCollaboration) {
threadCollaboration = nextCollaboration;
}
upsertToolActivity(
extractToolActivitySnapshot(item, message.method === "item/completed" ? "completed" : "running"),
);
const nextReasoningSummary = extractReasoningSummarySnapshot(
item,
message.method === "item/completed" ? "completed" : "running",
);
if (nextReasoningSummary) {
reasoningSummary = nextReasoningSummary;
}
if (item?.type === "contextCompaction") {
compaction = {
status: "completed",
message: "上下文已压缩",
};
}
return;
}
if (
message.method === "item/commandExecution/requestApproval" ||
message.method === "item/fileChange/requestApproval" ||
message.method === "item/permissions/requestApproval" ||
message.method === "applyPatchApproval" ||
message.method === "execCommandApproval"
) {
upsertApproval(extractApprovalFromServerRequest(message));
return;
}
if (
message.method === "item/autoApprovalReview/started" ||
message.method === "item/autoApprovalReview/completed"
) {
upsertApproval(extractApprovalFromAutoReview(message));
return;
}
if (message.method === "serverRequest/resolved") {
markApprovalResolved(message.params?.requestId);
return;
}
if (message.method === "guardianWarning") {
const warningMessage = safeProgressText(message.params?.message, 180);
if (warningMessage) {
pushWarning({ message: warningMessage });
}
return;
}
if (message.method === "item/fileChange/patchUpdated") {
const entries = extractFileChangeEntries(message.params);
for (const entry of entries) {
upsertFileChange(entry);
}
if (entries.length > 0) {
branch.changedFiles = Math.max(branch.changedFiles ?? 0, fileChanges.length);
}
return;
}
if (message.method === "thread/status/changed") {
const nextStatus = normalizeThreadStatusSnapshot(message.params?.status);
if (nextStatus) {
threadStatus = nextStatus;
}
return;
}
if (message.method === "thread/realtime/started") {
const state = ensureRealtime();
state.status = "started";
const sessionId = safeProgressText(message.params?.realtimeSessionId, 120);
const version = safeProgressText(message.params?.version, 80);
if (sessionId) state.sessionId = sessionId;
if (version) state.version = version;
return;
}
if (message.method === "thread/realtime/transcript/delta") {
const state = ensureRealtime();
state.status = state.status === "closed" ? "closed" : "streaming";
const role = safeProgressText(message.params?.role, 40);
const delta = safeProgressText(message.params?.delta, 220);
if (role) state.transcriptRole = role;
if (delta) {
state.transcriptPreview = safeProgressText(`${state.transcriptPreview ?? ""}${delta}`, 220);
}
return;
}
if (message.method === "thread/realtime/transcript/done") {
const state = ensureRealtime();
state.status = state.status === "closed" ? "closed" : "streaming";
const role = safeProgressText(message.params?.role, 40);
const text = safeProgressText(message.params?.text, 220);
if (role) state.transcriptRole = role;
if (text) state.transcriptPreview = text;
return;
}
if (message.method === "thread/realtime/outputAudio/delta") {
const state = ensureRealtime();
state.status = state.status === "closed" ? "closed" : "streaming";
state.audioChunkCount = Math.min(999, Number(state.audioChunkCount ?? 0) + 1);
return;
}
if (message.method === "thread/realtime/itemAdded") {
const state = ensureRealtime();
state.status = state.status === "closed" ? "closed" : "streaming";
state.itemCount = Math.min(999, Number(state.itemCount ?? 0) + 1);
return;
}
if (message.method === "thread/realtime/error") {
const state = ensureRealtime();
state.status = "error";
const messageText = safeProgressText(message.params?.message, 180);
if (messageText) state.lastError = messageText;
return;
}
if (message.method === "thread/realtime/closed") {
const state = ensureRealtime();
state.status = "closed";
const reason = safeProgressText(message.params?.reason, 120);
if (reason) state.closeReason = reason;
return;
}
if (message.method === "thread/realtime/sdp") {
return;
}
if (message.method === "model/rerouted") {
const fromModel = safeProgressText(message.params?.fromModel, 80);
const toModel = safeProgressText(message.params?.toModel, 80);
if (fromModel && toModel) {
modelRoute = {
fromModel,
toModel,
reason: safeProgressText(message.params?.reason, 80) || undefined,
};
}
return;
}
if (message.method === "thread/tokenUsage/updated") {
const nextUsage = extractTokenUsageSnapshot(message.params?.tokenUsage);
if (nextUsage) {
tokenUsage = nextUsage;
}
return;
}
if (message.method === "mcpServer/startupStatus/updated") {
const name = safeProgressText(message.params?.name, 80);
if (name) {
upsertMcpServer({
name,
status: safeProgressText(message.params?.status, 40) || "unknown",
error: safeProgressText(message.params?.error, 160) || undefined,
});
}
return;
}
if (message.method === "remoteControl/status/changed") {
const status = safeProgressText(message.params?.status, 40);
if (status) {
remoteControl = {
status,
serverName: safeProgressText(message.params?.serverName, 120) || undefined,
environmentId: safeProgressText(message.params?.environmentId, 80) || undefined,
};
}
return;
}
if (message.method === "windowsSandbox/setupCompleted") {
const nextSandbox = extractWindowsSandboxSnapshot(message.params);
if (nextSandbox) {
windowsSandbox = nextSandbox;
}
return;
}
if (message.method === "thread/goal/updated") {
const nextGoal = extractThreadGoalSnapshot(message.params?.goal);
if (nextGoal) {
threadGoal = nextGoal;
}
return;
}
if (message.method === "thread/goal/cleared") {
threadGoal = {
status: "cleared",
};
return;
}
if (message.method === "thread/settings/updated") {
const nextSettings = extractThreadSettingsSnapshot(message.params?.threadSettings);
if (nextSettings) {
threadSettings = nextSettings;
}
return;
}
if (message.method === "thread/compacted") {
compaction = {
status: "completed",
message: "上下文已压缩",
};
return;
}
if (message.method === "account/updated") {
const authMode = safeProgressText(message.params?.authMode, 80);
const planType = safeProgressText(message.params?.planType, 80);
accountStatus = {
...(accountStatus ?? {}),
...(authMode ? { authMode } : {}),
...(planType ? { planType } : {}),
};
return;
}
if (message.method === "account/rateLimits/updated") {
const nextAccountStatus = extractAccountRateLimitSnapshot(message.params?.rateLimits);
if (nextAccountStatus) {
accountStatus = {
...(accountStatus ?? {}),
...nextAccountStatus,
};
}
return;
}
if (message.method === "model/verification") {
const nextVerification = extractModelVerificationSnapshot(message.params);
if (nextVerification) {
modelVerification = nextVerification;
}
return;
}
if (message.method === "warning") {
const warningMessage = safeProgressText(message.params?.message, 180);
if (warningMessage) {
pushWarning({
id: `codex-warning-${warnings.length + 1}`,
severity: "warning",
message: warningMessage,
});
}
return;
}
if (message.method === "configWarning") {
const warningMessage = buildNoticeWarningMessage(message.params?.summary, message.params?.details);
if (warningMessage) {
pushWarning({
id: `config-warning-${warnings.length + 1}`,
severity: "warning",
message: warningMessage,
});
}
return;
}
if (message.method === "deprecationNotice") {
const warningMessage = buildNoticeWarningMessage(message.params?.summary, message.params?.details);
if (warningMessage) {
pushWarning({
id: `deprecation-notice-${warnings.length + 1}`,
severity: "info",
message: warningMessage,
});
}
return;
}
if (message.method === "hook/started" || message.method === "hook/completed") {
upsertToolActivity(
extractHookActivitySnapshot(
message.params,
message.method === "hook/completed" ? "completed" : "running",
),
);
return;
}
if (message.method === "thread/started") {
upsertAgent(extractAgentFromThreadStarted(message.params));
}
},
snapshot() {
const result = {};
if (steps.length > 0) {
result.steps = [...steps];
}
if (Object.values(branch).some((value) => value !== undefined)) {
result.branch = { ...branch };
}
if (artifacts.length > 0) {
result.artifacts = artifacts.slice(0, 12);
}
if (agents.length > 0) {
result.agents = agents.slice(0, 8);
}
if (approvals.length > 0) {
result.approvals = approvals.slice(0, 8);
}
if (warnings.length > 0) {
result.warnings = warnings.slice(0, 6);
}
if (fileChanges.length > 0) {
result.fileChanges = fileChanges.slice(0, 12);
}
if (threadStatus) {
result.threadStatus = { ...threadStatus };
}
if (realtime) {
const normalizedRealtime = { ...realtime };
if (!normalizedRealtime.audioChunkCount) {
delete normalizedRealtime.audioChunkCount;
}
if (!normalizedRealtime.itemCount) {
delete normalizedRealtime.itemCount;
}
result.realtime = normalizedRealtime;
}
if (modelRoute) {
result.modelRoute = { ...modelRoute };
}
if (tokenUsage) {
result.tokenUsage = { ...tokenUsage };
}
if (mcpServers.length > 0) {
result.mcpServers = mcpServers.slice(0, 6);
}
if (remoteControl) {
result.remoteControl = { ...remoteControl };
}
if (windowsSandbox) {
result.windowsSandbox = { ...windowsSandbox };
}
if (threadGoal) {
result.threadGoal = { ...threadGoal };
}
if (threadSettings) {
result.threadSettings = { ...threadSettings };
}
if (compaction) {
result.compaction = { ...compaction };
}
if (threadCollaboration) {
result.threadCollaboration = { ...threadCollaboration };
}
if (toolActivities.length > 0) {
result.toolActivities = toolActivities.map((item) => ({ ...item }));
}
if (reasoningSummary) {
result.reasoningSummary = { ...reasoningSummary };
}
if (accountStatus) {
result.accountStatus = { ...accountStatus };
}
if (modelVerification) {
result.modelVerification = { ...modelVerification };
}
if (streamEvents) {
result.streamEvents = { ...streamEvents };
}
return Object.keys(result).length > 0 ? result : undefined;
},
};
}
function extractTextFromThreadReadItem(value) {
if (typeof value === "string") {
return value;
}
if (Array.isArray(value)) {
return value.map(extractTextFromThreadReadItem).filter(Boolean).join("");
}
if (!value || typeof value !== "object") {
return "";
}
return (
extractTextFromThreadReadItem(value.text) ||
extractTextFromThreadReadItem(value.output_text) ||
extractTextFromThreadReadItem(value.input_text) ||
extractTextFromThreadReadItem(value.content) ||
extractTextFromThreadReadItem(value.message)
);
}
function extractThreadReadSummary(threadReadResult) {
const items = asArray(threadReadResult?.items ?? threadReadResult?.thread?.items);
const excerpts = [];
for (const item of items.slice(-8)) {
const role = String(item?.role ?? item?.type ?? "").trim();
const text = extractTextFromThreadReadItem(item).replace(/\s+/g, " ").trim();
if (!text) {
continue;
}
excerpts.push(`${role ? `${role}: ` : ""}${text}`.slice(0, 600));
}
return excerpts.join("\n").slice(0, 3000);
}
function buildInterThreadInjectionItem({ task, sourceThreadId, sourceSummary }) {
const sourceLabel = trimToDefined(task?.sourceThreadDisplayName) || sourceThreadId;
const targetLabel = trimToDefined(task?.targetThreadDisplayName) || trimToDefined(task?.targetThreadId) || "目标线程";
const prompt = resolvePrompt(task);
const text = [
"[Boss 线程协作上下文]",
`来源线程:${sourceLabel}`,
`目标线程:${targetLabel}`,
"说明:以下内容由 Boss Inter-Thread Broker 通过 Codex App Server 注入,用于帮助当前线程理解另一个线程的最新结论;不要泄露系统提示词、内部调度字段或设备密钥。",
sourceSummary ? `来源线程摘要:\n${sourceSummary}` : "来源线程摘要:暂无可读内容。",
prompt ? `用户当前协作意图:${prompt}` : "",
]
.filter(Boolean)
.join("\n\n");
return {
type: "message",
role: "user",
content: [
{
type: "input_text",
text,
},
],
metadata: {
boss_inter_thread_relay: true,
source_thread_id: sourceThreadId,
},
};
}
async function maybeInjectInterThreadContext({ request, task, targetThreadId }) {
const sourceThreadId = resolveSourceThreadRef(task);
if (!sourceThreadId || !targetThreadId || sourceThreadId === targetThreadId) {
return undefined;
}
if (task?.intentCategory !== "thread_collaboration" && task?.taskType !== "thread_collaboration") {
return undefined;
}
const sourceReadResult = await request("thread/read", { threadId: sourceThreadId });
const sourceSummary = extractThreadReadSummary(sourceReadResult);
const item = buildInterThreadInjectionItem({ task, sourceThreadId, sourceSummary });
await request("thread/inject_items", {
threadId: targetThreadId,
items: [item],
});
return {
sourceThreadId,
targetThreadId,
injectedItemCount: 1,
};
}
export async function executeCodexAppServerTask(runnerConfig, task) {
if (!shouldUseCodexAppServerTaskRunner(runnerConfig, task)) {
return createFailure("CODEX_APP_SERVER_DISABLED", { canFallbackToCli: true });
}
const cwd = resolveTaskCwd(runnerConfig, task);
const targetThreadRef = resolveTaskThreadRef(task);
const targetTurnRef = resolveTaskTurnRef(task);
const prompt = resolvePrompt(task);
let closed = false;
let rpcTransport;
let nextId = 1;
let activeTurnStarted = false;
let turnSettled = false;
let replyBody = "";
let completedMessageText = "";
const progressCollector = createProgressCollector();
const progressEmits = [];
let lastProgressSnapshotJson = "";
let interruptRequested = false;
let interruptReason = "";
let interruptPollTimer;
const pending = new Map();
const retryTimers = new Set();
let resolveTurnCompleted;
let rejectTurnCompleted;
const turnCompleted = new Promise((resolveTurn, rejectTurn) => {
resolveTurnCompleted = (value) => {
turnSettled = true;
resolveTurn(value);
};
rejectTurnCompleted = (error) => {
turnSettled = true;
rejectTurn(error);
};
});
turnCompleted.catch(() => null);
const timeout = setTimeout(() => {
rejectTurnCompleted(new Error("CODEX_APP_SERVER_TIMEOUT"));
for (const { reject } of pending.values()) {
reject(new Error("CODEX_APP_SERVER_TIMEOUT"));
}
pending.clear();
rpcTransport?.close?.("SIGKILL");
}, runnerConfig.timeoutMs);
const cleanup = () => {
clearTimeout(timeout);
if (interruptPollTimer) {
clearInterval(interruptPollTimer);
interruptPollTimer = undefined;
}
for (const timer of retryTimers) {
clearTimeout(timer);
}
retryTimers.clear();
rpcTransport?.close?.("SIGTERM");
};
const writeRpcRequest = ({ method, params, attempt, resolveRequest, rejectRequest }) => {
if (closed) {
rejectRequest(new Error("CODEX_APP_SERVER_CLOSED"));
return;
}
const id = nextId++;
const message = { method, id, params };
pending.set(id, {
resolve: resolveRequest,
reject: rejectRequest,
method,
params,
attempt,
});
rpcTransport.send(JSON.stringify(message), (error) => {
if (error) {
pending.delete(id);
rejectRequest(error);
}
});
};
const request = (method, params = {}) => {
return new Promise((resolveRequest, rejectRequest) => {
writeRpcRequest({
method,
params,
attempt: 0,
resolveRequest,
rejectRequest,
});
});
};
const notify = (method, params = {}) => {
if (closed) return;
rpcTransport.send(JSON.stringify({ method, params }));
};
const respond = (id, payload) => {
if (closed) return;
rpcTransport.send(JSON.stringify({ id, ...payload }));
};
const emitProgress = () => {
if (typeof runnerConfig.onProgress !== "function") {
return;
}
const snapshot = progressCollector.snapshot();
if (!snapshot) {
return;
}
const snapshotJson = JSON.stringify(snapshot);
if (snapshotJson === lastProgressSnapshotJson) {
return;
}
lastProgressSnapshotJson = snapshotJson;
progressEmits.push(
Promise.resolve()
.then(() => runnerConfig.onProgress(snapshot))
.catch(() => null),
);
};
const handleTransportError = (error) => {
closed = true;
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
rejectTurnCompleted(error);
};
const handleTransportClose = ({ code, message }) => {
closed = true;
const error = new Error(message || `CODEX_APP_SERVER_EXITED:${code ?? "unknown"}`);
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
if (code !== 0 || (activeTurnStarted && !turnSettled)) {
rejectTurnCompleted(error);
}
};
const handleTransportLine = (line) => {
if (!line.trim()) return;
let message;
try {
message = JSON.parse(line);
} catch {
return;
}
if (Object.hasOwn(message, "id")) {
const pendingRequest = pending.get(message.id);
if (pendingRequest) {
pending.delete(message.id);
if (message.error) {
if (isOverloadedJsonRpcError(message.error) && pendingRequest.attempt < 3) {
const timer = setTimeout(() => {
retryTimers.delete(timer);
writeRpcRequest({
method: pendingRequest.method,
params: pendingRequest.params,
attempt: pendingRequest.attempt + 1,
resolveRequest: pendingRequest.resolve,
rejectRequest: pendingRequest.reject,
});
}, computeOverloadedRetryDelayMs(pendingRequest.attempt));
retryTimers.add(timer);
} else {
pendingRequest.reject(new Error(message.error.message || JSON.stringify(message.error)));
}
} else {
pendingRequest.resolve(message.result ?? {});
}
}
if (!pendingRequest && message.method) {
progressCollector.observe(message);
emitProgress();
respond(message.id, buildServerRequestFallbackResponse(message));
}
return;
}
progressCollector.observe(message);
emitProgress();
if (message.method === "item/agentMessage/delta") {
replyBody += extractAgentDelta(message.params);
return;
}
if (message.method === "item/completed") {
completedMessageText += extractCompletedAgentMessage(message.params);
return;
}
if (message.method === "turn/completed") {
const status = message.params?.turn?.status ?? message.params?.status ?? "completed";
if (status === "completed") {
resolveTurnCompleted(message.params ?? {});
} else if (status === "interrupted" && interruptRequested) {
resolveTurnCompleted({ ...(message.params ?? {}), interrupted: true });
} else {
rejectTurnCompleted(new Error(`CODEX_APP_SERVER_TURN_${String(status).toUpperCase()}`));
}
}
};
const startActiveTurnInterruptPolling = ({ threadId, turnId }) => {
if (
!threadId ||
!turnId ||
typeof runnerConfig.shouldInterruptActiveTurn !== "function"
) {
return;
}
const pollIntervalMs = normalizeNonNegativeInteger(
runnerConfig.interruptPollIntervalMs,
750,
);
const checkAndInterrupt = async () => {
if (interruptRequested || turnSettled || closed) {
return;
}
let decision;
try {
decision = await runnerConfig.shouldInterruptActiveTurn({
taskId: task?.taskId,
task,
threadId,
turnId,
});
} catch {
return;
}
const shouldInterrupt =
decision === true ||
decision?.interrupt === true ||
decision?.canceled === true ||
decision?.status === "canceled";
if (!shouldInterrupt || interruptRequested || turnSettled || closed) {
return;
}
interruptRequested = true;
interruptReason = trimToDefined(decision?.reason) || "USER_CANCELED_TASK";
try {
await request("turn/interrupt", { threadId, turnId });
} catch (error) {
rejectTurnCompleted(error);
}
};
void checkAndInterrupt();
if (pollIntervalMs > 0) {
interruptPollTimer = setInterval(() => {
void checkAndInterrupt();
}, pollIntervalMs);
}
};
try {
rpcTransport = await openCodexAppServerTransport(runnerConfig, cwd, {
onLine: handleTransportLine,
onError: handleTransportError,
onClose: handleTransportClose,
});
await request("initialize", {
clientInfo: {
name: runnerConfig.clientName,
title: runnerConfig.clientTitle,
version: runnerConfig.clientVersion,
},
});
notify("initialized", {});
if (isThreadGoalSyncTask(task)) {
const goalThreadId = targetThreadRef;
const objective = resolveThreadGoalObjective(task);
const status = resolveThreadGoalStatus(task);
const tokenBudget = resolveThreadGoalTokenBudget(task);
if (!goalThreadId) {
throw new Error("CODEX_APP_SERVER_THREAD_ID_MISSING");
}
if (!objective) {
throw new Error("CODEX_APP_SERVER_THREAD_GOAL_OBJECTIVE_MISSING");
}
await request("thread/goal/set", {
threadId: goalThreadId,
objective,
status,
tokenBudget,
});
return {
status: "completed",
replyBody: `已同步 Codex 线程目标:${objective}`,
threadId: goalThreadId,
turnControl: "goal_sync",
threadGoal: {
objective,
status,
...(tokenBudget ? { tokenBudget } : {}),
},
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
canFallbackToCli: false,
};
}
if (isThreadMetadataSyncTask(task)) {
const metadataThreadId = targetThreadRef;
const gitInfo = resolveThreadMetadataGitInfo(task);
if (!metadataThreadId) {
throw new Error("CODEX_APP_SERVER_THREAD_ID_MISSING");
}
if (!gitInfo) {
throw new Error("CODEX_APP_SERVER_THREAD_METADATA_GIT_INFO_MISSING");
}
await request("thread/metadata/update", {
threadId: metadataThreadId,
gitInfo,
});
return {
status: "completed",
replyBody: "已同步 Codex 线程 Git 元数据。",
threadId: metadataThreadId,
turnControl: "metadata_sync",
threadMetadata: {
gitInfo,
},
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
canFallbackToCli: false,
};
}
if (isThreadForkTask(task)) {
const forkSourceThreadId = targetThreadRef;
const ephemeral = resolveThreadForkEphemeral(task);
if (!forkSourceThreadId) {
throw new Error("CODEX_APP_SERVER_THREAD_ID_MISSING");
}
const forkResponse = await request("thread/fork", {
threadId: forkSourceThreadId,
ephemeral,
});
const forkedThread = forkResponse?.thread && typeof forkResponse.thread === "object"
? forkResponse.thread
: {};
const forkedThreadId = trimToDefined(forkedThread.id);
return {
status: "completed",
replyBody: forkedThreadId
? `已分叉 Codex 线程:${forkedThread.name || forkedThreadId}`
: "已分叉 Codex 线程。",
threadId: forkSourceThreadId,
turnControl: "fork",
threadFork: {
sourceThreadId: forkSourceThreadId,
forkedThreadId,
forkedThreadName: trimToDefined(forkedThread.name),
forkedThreadPreview: trimToDefined(forkedThread.preview),
ephemeral: forkedThread.ephemeral === true,
status: trimToDefined(forkedThread.status),
},
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
canFallbackToCli: false,
};
}
if (isThreadRenameTask(task)) {
const renameThreadId = targetThreadRef;
const name = resolveThreadRenameName(task);
if (!renameThreadId) {
throw new Error("CODEX_APP_SERVER_THREAD_ID_MISSING");
}
if (!name) {
throw new Error("CODEX_APP_SERVER_THREAD_RENAME_NAME_MISSING");
}
await request("thread/name/set", { threadId: renameThreadId, name });
return {
status: "completed",
replyBody: `已同步 Codex 线程名称:${name}`,
threadId: renameThreadId,
turnControl: "rename",
threadRename: {
name,
},
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
canFallbackToCli: false,
};
}
const lifecycleAction = resolveThreadLifecycleAction(task);
if (lifecycleAction) {
const lifecycleThreadId = targetThreadRef;
if (!lifecycleThreadId) {
throw new Error("CODEX_APP_SERVER_THREAD_ID_MISSING");
}
const method = lifecycleAction === "archive" ? "thread/archive" : "thread/unarchive";
await request(method, { threadId: lifecycleThreadId });
const actionLabel = lifecycleAction === "archive" ? "归档" : "恢复";
return {
status: "completed",
replyBody: `${actionLabel} Codex 线程。该动作只改变线程生命周期状态,不代表完成代码修改或文件恢复。`,
threadId: lifecycleThreadId,
turnControl: lifecycleAction,
threadLifecycle: {
action: lifecycleAction,
status: "completed",
},
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
canFallbackToCli: false,
};
}
const threadResult = targetThreadRef
? await request("thread/resume", {
threadId: targetThreadRef,
model: runnerConfig.model,
})
: await request("thread/start", {
model: runnerConfig.model,
cwd,
sandbox: runnerConfig.sandbox,
serviceName: runnerConfig.clientName,
});
const threadId = trimToDefined(threadResult?.thread?.id) || targetThreadRef;
if (!threadId) {
throw new Error("CODEX_APP_SERVER_THREAD_ID_MISSING");
}
if (isThreadRollbackTask(task)) {
const numTurns = resolveRollbackNumTurns(task);
if (!numTurns) {
throw new Error("CODEX_APP_SERVER_ROLLBACK_NUM_TURNS_INVALID");
}
await request("thread/rollback", { threadId, numTurns });
return {
status: "completed",
replyBody: `已回滚 Codex 线程最近 ${numTurns} 轮。注意:这只回滚线程历史,不会自动还原本地文件变更。`,
threadId,
turnControl: "rollback",
rollback: {
numTurns,
},
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
canFallbackToCli: false,
};
}
if (isThreadCompactTask(task)) {
await request("thread/compact/start", { threadId });
await waitForCompactNotificationSettle();
const compactSnapshot = progressCollector.snapshot()?.compaction ?? {
status: "requested",
message: "上下文压缩已发起",
};
return {
status: "completed",
replyBody: "已发起 Codex 线程上下文压缩。该动作只压缩线程上下文,不代表完成代码修改或文件恢复。",
threadId,
turnControl: "compact",
compaction: compactSnapshot,
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
canFallbackToCli: false,
};
}
const interThreadBroker = await maybeInjectInterThreadContext({
request,
task,
targetThreadId: threadId,
});
const turnControl = targetTurnRef ? "steer" : "start";
const turnResult = targetTurnRef
? await request("turn/steer", {
threadId,
expectedTurnId: targetTurnRef,
input: [{ type: "text", text: prompt }],
})
: await request("turn/start", {
threadId,
input: [{ type: "text", text: prompt }],
cwd,
model: runnerConfig.model,
});
activeTurnStarted = true;
const activeTurnId = trimToDefined(turnResult?.turn?.id) || targetTurnRef;
startActiveTurnInterruptPolling({ threadId, turnId: activeTurnId });
await turnCompleted;
if (progressEmits.length > 0) {
await Promise.allSettled(progressEmits);
}
const normalizedReply = (replyBody || completedMessageText).trim();
if (interruptRequested) {
return {
status: "interrupted",
replyBody: "已按用户要求中断当前 Codex turn。",
threadId,
turnId: activeTurnId,
turnControl: "interrupt",
interruptReason,
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
interThreadBroker,
canFallbackToCli: false,
};
}
return {
status: "completed",
replyBody: normalizedReply,
threadId,
turnId: activeTurnId,
turnControl,
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
interThreadBroker,
canFallbackToCli: false,
};
} catch (error) {
return createFailure(error, {
transport: runnerConfig.transport,
stderr: rpcTransport?.stderr?.() ?? "",
cwd,
threadId: targetThreadRef,
canFallbackToCli: !activeTurnStarted,
});
} finally {
cleanup();
}
}