Files
boss/local-agent/codex-app-server-runner.mjs
2026-06-01 18:41:10 +08:00

2170 lines
68 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import crypto from "node:crypto";
import { spawn } from "node:child_process";
import { readFileSync } from "node:fs";
import http from "node:http";
import https from "node:https";
import readline from "node:readline";
import { resolve } from "node:path";
function trimToDefined(value) {
const trimmed = String(value ?? "").trim();
return trimmed ? trimmed : undefined;
}
function boolFromEnv(value) {
const normalized = trimToDefined(value)?.toLowerCase();
return normalized === "1" || normalized === "true" || normalized === "yes";
}
function listFromEnv(value) {
if (!value) return undefined;
try {
const parsed = JSON.parse(String(value));
if (Array.isArray(parsed)) {
return parsed.map((item) => String(item));
}
} catch {
// Fall through to whitespace splitting.
}
return String(value).split(/\s+/).filter(Boolean);
}
function resolveTaskThreadRef(task) {
return trimToDefined(task?.targetCodexThreadRef || task?.targetThreadId);
}
function resolveTaskTurnRef(task) {
return trimToDefined(task?.targetCodexTurnId || task?.targetTurnId);
}
function resolveSourceThreadRef(task) {
return trimToDefined(task?.sourceCodexThreadRef || task?.sourceThreadId);
}
function resolveTaskCwd(config, task) {
return resolve(
trimToDefined(task?.targetCodexFolderRef) ||
trimToDefined(config.masterAgentWorkdir) ||
process.cwd(),
);
}
function resolvePrompt(task) {
return String(task?.executionPrompt || task?.requestText || "").trim();
}
function normalizeTimeoutMs(value) {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric > 0 ? Math.floor(numeric) : 120_000;
}
function normalizePositiveInteger(value, fallback) {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric > 0 ? Math.floor(numeric) : fallback;
}
function boolFromConfigOrEnv(configValue, envValue, fallback) {
if (configValue === true || configValue === false) {
return configValue;
}
const normalized = trimToDefined(envValue)?.toLowerCase();
if (normalized === "1" || normalized === "true" || normalized === "yes") {
return true;
}
if (normalized === "0" || normalized === "false" || normalized === "no") {
return false;
}
return fallback;
}
function normalizeTransport(value) {
const normalized = trimToDefined(value)?.toLowerCase();
return normalized === "ws" || normalized === "websocket"
? "ws"
: normalized === "unix"
? "unix"
: "stdio";
}
export function getCodexAppServerRunnerConfig(env = process.env, config = {}) {
const argsFromConfig = Array.isArray(config.codexAppServerArgs)
? config.codexAppServerArgs.map((item) => String(item))
: undefined;
const args =
argsFromConfig ??
listFromEnv(env.BOSS_CODEX_APP_SERVER_ARGS) ??
["app-server"];
return {
enabled:
config.codexAppServerEnabled === true ||
boolFromEnv(env.BOSS_CODEX_APP_SERVER_ENABLED),
command:
trimToDefined(config.codexAppServerCommand) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_COMMAND) ||
"codex",
args,
cwd:
trimToDefined(config.codexAppServerWorkdir) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_WORKDIR) ||
trimToDefined(config.masterAgentWorkdir) ||
process.cwd(),
timeoutMs: normalizeTimeoutMs(
config.codexAppServerTimeoutMs ?? env.BOSS_CODEX_APP_SERVER_TIMEOUT_MS,
),
clientName:
trimToDefined(config.codexAppServerClientName) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_NAME) ||
"boss_local_agent",
clientTitle:
trimToDefined(config.codexAppServerClientTitle) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_TITLE) ||
"Boss Local Agent",
clientVersion:
trimToDefined(config.codexAppServerClientVersion) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_VERSION) ||
"0.1.0",
model: trimToDefined(config.masterAgentModel || env.BOSS_MASTER_AGENT_MODEL),
sandbox: trimToDefined(config.masterAgentSandbox),
transport: normalizeTransport(config.codexAppServerTransport ?? env.BOSS_CODEX_APP_SERVER_TRANSPORT),
url:
trimToDefined(config.codexAppServerUrl) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_URL),
authToken:
trimToDefined(config.codexAppServerAuthToken) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_AUTH_TOKEN),
authTokenFile:
trimToDefined(config.codexAppServerAuthTokenFile) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_AUTH_TOKEN_FILE),
discoveryEnabled: boolFromConfigOrEnv(
config.codexAppServerDiscoveryEnabled,
env.BOSS_CODEX_APP_SERVER_DISCOVERY_ENABLED,
true,
),
discoveryTtlMs: normalizePositiveInteger(
config.codexAppServerDiscoveryTtlMs ?? env.BOSS_CODEX_APP_SERVER_DISCOVERY_TTL_MS,
300_000,
),
discoveryLimit: normalizePositiveInteger(
config.codexAppServerDiscoveryLimit ?? env.BOSS_CODEX_APP_SERVER_DISCOVERY_LIMIT,
20,
),
};
}
export function shouldUseCodexAppServerTaskRunner(runnerConfig, task) {
const hasTransportEndpoint =
runnerConfig?.transport === "ws" || runnerConfig?.transport === "unix"
? Boolean(runnerConfig.url)
: Boolean(runnerConfig?.command);
if (!runnerConfig?.enabled || !hasTransportEndpoint) {
return false;
}
const taskType = trimToDefined(task?.taskType);
if (taskType !== "conversation_reply" && taskType !== "dispatch_execution") {
return false;
}
return Boolean(resolvePrompt(task));
}
function extractAgentTextFromContent(value) {
if (typeof value === "string") {
return value;
}
if (Array.isArray(value)) {
return value.map(extractAgentTextFromContent).filter(Boolean).join("");
}
if (!value || typeof value !== "object") {
return "";
}
return (
extractAgentTextFromContent(value.text) ||
extractAgentTextFromContent(value.outputText) ||
extractAgentTextFromContent(value.content) ||
extractAgentTextFromContent(value.delta)
);
}
function extractAgentDelta(params) {
if (!params || typeof params !== "object") {
return "";
}
return (
extractAgentTextFromContent(params.delta) ||
extractAgentTextFromContent(params.text) ||
extractAgentTextFromContent(params.messageDelta) ||
extractAgentTextFromContent(params.item?.delta)
);
}
function extractCompletedAgentMessage(params) {
const item = params?.item;
if (!item || typeof item !== "object") {
return "";
}
const itemType = String(item.type || item.kind || "");
if (!/agent|assistant/i.test(itemType)) {
return "";
}
return (
extractAgentTextFromContent(item.text) ||
extractAgentTextFromContent(item.content) ||
extractAgentTextFromContent(item.message) ||
extractAgentTextFromContent(item.output)
);
}
function createFailure(error, extra = {}) {
return {
status: "failed",
errorMessage: error instanceof Error ? error.message : String(error),
transport: extra.transport ?? "stdio",
...extra,
};
}
function resolveBearerToken(runnerConfig) {
const direct = trimToDefined(runnerConfig?.authToken);
if (direct) {
return direct;
}
const tokenFile = trimToDefined(runnerConfig?.authTokenFile);
if (!tokenFile) {
return undefined;
}
return trimToDefined(readFileSync(tokenFile, "utf8"));
}
function isOverloadedJsonRpcError(error) {
return Number(error?.code) === -32001 || /overloaded|retry later/i.test(String(error?.message || ""));
}
function computeOverloadedRetryDelayMs(attempt) {
const base = Math.min(1_000, 40 * 2 ** attempt);
return base + Math.floor(Math.random() * 25);
}
function encodeWebSocketTextFrame(value, { masked = false } = {}) {
const payload = Buffer.from(value);
const lengthByteOffset = 1;
const payloadLength = payload.length;
let header;
if (payloadLength < 126) {
header = Buffer.alloc(masked ? 6 : 2);
header[0] = 0x81;
header[lengthByteOffset] = payloadLength;
} else if (payloadLength <= 0xffff) {
header = Buffer.alloc(masked ? 8 : 4);
header[0] = 0x81;
header[lengthByteOffset] = 126;
header.writeUInt16BE(payloadLength, 2);
} else {
header = Buffer.alloc(masked ? 14 : 10);
header[0] = 0x81;
header[lengthByteOffset] = 127;
header.writeBigUInt64BE(BigInt(payloadLength), 2);
}
if (!masked) {
return Buffer.concat([header, payload]);
}
header[lengthByteOffset] |= 0x80;
const maskOffset = header.length - 4;
const mask = crypto.randomBytes(4);
mask.copy(header, maskOffset);
const maskedPayload = Buffer.alloc(payload.length);
for (let index = 0; index < payload.length; index += 1) {
maskedPayload[index] = payload[index] ^ mask[index % 4];
}
return Buffer.concat([header, maskedPayload]);
}
function decodeWebSocketFrames(buffer, socket, onText) {
let offset = 0;
while (buffer.length - offset >= 2) {
const first = buffer[offset];
const second = buffer[offset + 1];
const opcode = first & 0x0f;
const masked = (second & 0x80) !== 0;
let payloadLength = second & 0x7f;
let headerLength = 2;
if (payloadLength === 126) {
if (buffer.length - offset < 4) break;
payloadLength = buffer.readUInt16BE(offset + 2);
headerLength = 4;
} else if (payloadLength === 127) {
if (buffer.length - offset < 10) break;
payloadLength = Number(buffer.readBigUInt64BE(offset + 2));
headerLength = 10;
}
const maskLength = masked ? 4 : 0;
const frameLength = headerLength + maskLength + payloadLength;
if (buffer.length - offset < frameLength) break;
let payload = buffer.subarray(
offset + headerLength + maskLength,
offset + frameLength,
);
if (masked) {
const mask = buffer.subarray(offset + headerLength, offset + headerLength + 4);
const unmaskedPayload = Buffer.alloc(payload.length);
for (let index = 0; index < payload.length; index += 1) {
unmaskedPayload[index] = payload[index] ^ mask[index % 4];
}
payload = unmaskedPayload;
}
if (opcode === 0x1) {
onText(payload.toString("utf8"));
} else if (opcode === 0x8) {
socket.end();
} else if (opcode === 0x9) {
socket.write(encodeWebSocketControlFrame(0x0a, payload));
}
offset += frameLength;
}
return buffer.subarray(offset);
}
function encodeWebSocketControlFrame(opcode, payload = Buffer.alloc(0)) {
return Buffer.concat([Buffer.from([0x80 | opcode, payload.length]), payload]);
}
function openStdioCodexAppServerTransport(runnerConfig, cwd, handlers) {
const child = spawn(runnerConfig.command, runnerConfig.args, {
cwd: runnerConfig.cwd || cwd,
env: process.env,
stdio: ["pipe", "pipe", "pipe"],
});
let stderr = "";
const rl = readline.createInterface({ input: child.stdout });
rl.on("line", handlers.onLine);
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
child.on("error", handlers.onError);
child.on("close", (code) => {
handlers.onClose({
code,
message: stderr.trim() || `CODEX_APP_SERVER_EXITED:${code ?? "unknown"}`,
});
});
return {
transport: "stdio",
send(line, callback) {
child.stdin.write(`${line}\n`, callback);
},
close(signal = "SIGTERM") {
rl.close();
if (!child.killed) {
child.kill(signal);
}
},
stderr() {
return stderr.trim();
},
};
}
function openWebSocketCodexAppServerTransport(runnerConfig, handlers) {
return new Promise((resolveTransport, rejectTransport) => {
let settled = false;
let buffered = Buffer.alloc(0);
const url = new URL(runnerConfig.url);
const isUnixSocket = url.protocol === "unix:";
const client = url.protocol === "wss:" ? https : http;
const key = crypto.randomBytes(16).toString("base64");
const bearerToken = resolveBearerToken(runnerConfig);
const headers = {
Connection: "Upgrade",
Upgrade: "websocket",
"Sec-WebSocket-Key": key,
"Sec-WebSocket-Version": "13",
...(bearerToken ? { Authorization: `Bearer ${bearerToken}` } : {}),
};
const requestOptions = isUnixSocket
? {
socketPath: decodeURIComponent(url.pathname || ""),
path: "/",
method: "GET",
headers: {
Host: "localhost",
...headers,
},
}
: {
hostname: url.hostname,
port: url.port || (url.protocol === "wss:" ? 443 : 80),
path: `${url.pathname || "/"}${url.search || ""}`,
method: "GET",
headers,
};
if (isUnixSocket && !requestOptions.socketPath) {
rejectTransport(new Error("CODEX_APP_SERVER_UNIX_SOCKET_PATH_MISSING"));
return;
}
const request = client.request(requestOptions);
const failOpen = (error) => {
if (!settled) {
settled = true;
rejectTransport(error);
} else {
handlers.onError(error);
}
};
request.on("upgrade", (response, socket) => {
const expectedAccept = crypto
.createHash("sha1")
.update(`${key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`)
.digest("base64");
const actualAccept = String(response.headers["sec-websocket-accept"] || "");
if (response.statusCode !== 101 || actualAccept !== expectedAccept) {
socket.destroy();
failOpen(new Error(`CODEX_APP_SERVER_WS_UPGRADE_FAILED:${response.statusCode}`));
return;
}
socket.on("data", (chunk) => {
buffered = decodeWebSocketFrames(
Buffer.concat([buffered, chunk]),
socket,
handlers.onLine,
);
});
socket.on("error", handlers.onError);
socket.on("close", () => {
handlers.onClose({
code: "ws_closed",
message: "CODEX_APP_SERVER_WS_CLOSED",
});
});
settled = true;
resolveTransport({
transport: isUnixSocket ? "unix" : "ws",
send(line, callback) {
socket.write(encodeWebSocketTextFrame(line, { masked: true }), callback);
},
close() {
socket.end();
},
stderr() {
return "";
},
});
});
request.on("response", (response) => {
failOpen(new Error(`CODEX_APP_SERVER_WS_UPGRADE_FAILED:${response.statusCode}`));
response.resume();
});
request.on("error", failOpen);
request.setTimeout(runnerConfig.timeoutMs, () => {
request.destroy(new Error("CODEX_APP_SERVER_WS_CONNECT_TIMEOUT"));
});
request.end();
});
}
async function openCodexAppServerTransport(runnerConfig, cwd, handlers) {
if (runnerConfig.transport === "ws") {
return openWebSocketCodexAppServerTransport(runnerConfig, handlers);
}
if (runnerConfig.transport === "unix") {
return openWebSocketCodexAppServerTransport(runnerConfig, handlers);
}
return openStdioCodexAppServerTransport(runnerConfig, cwd, handlers);
}
function normalizeProgressStepStatus(value) {
const text = String(value ?? "").trim().toLowerCase();
if (text === "done" || text === "completed" || text === "complete" || text === "success") {
return "done";
}
if (text === "running" || text === "in_progress" || text === "inprogress" || text === "active") {
return "running";
}
if (text === "failed" || text === "error") {
return "failed";
}
return "pending";
}
function extractProgressText(value) {
if (typeof value === "string") {
return value.trim();
}
if (!value || typeof value !== "object") {
return "";
}
return String(value.text ?? value.title ?? value.description ?? value.summary ?? value.label ?? "").trim();
}
function asArray(value) {
return Array.isArray(value) ? value : [];
}
function extractPlanItems(params) {
const direct = asArray(params?.plan);
if (direct.length > 0) {
return direct;
}
const nestedPlan = params?.plan && typeof params.plan === "object" ? params.plan : undefined;
return asArray(nestedPlan?.steps).length > 0
? asArray(nestedPlan.steps)
: asArray(nestedPlan?.items);
}
function extractNumber(value) {
const numeric = Number(value);
return Number.isFinite(numeric) ? Math.trunc(numeric) : undefined;
}
function parseDiffShortstat(value) {
const text = String(value || "");
const changedFiles = extractNumber((text.match(/(\d+)\s+files?\s+changed/) || [])[1]);
const additions = extractNumber((text.match(/(\d+)\s+insertions?\(\+\)/) || [])[1]);
const deletions = extractNumber((text.match(/(\d+)\s+deletions?\(-\)/) || [])[1]);
return { changedFiles, additions, deletions };
}
function extractDiffBranch(params) {
const source =
params?.diff && typeof params.diff === "object"
? params.diff
: params?.summary && typeof params.summary === "object"
? params.summary
: params;
const parsedShortstat = parseDiffShortstat(params?.shortstat ?? params?.summary ?? params?.diffStat);
return {
changedFiles:
extractNumber(source?.changedFiles ?? source?.filesChanged ?? source?.fileCount) ??
parsedShortstat.changedFiles,
additions:
extractNumber(source?.additions ?? source?.insertions ?? source?.linesAdded) ??
parsedShortstat.additions,
deletions:
extractNumber(source?.deletions ?? source?.linesDeleted ?? source?.removals) ??
parsedShortstat.deletions,
};
}
function basename(value) {
const text = String(value ?? "").trim();
if (!text) {
return "";
}
return text.split(/[\\/]/).filter(Boolean).pop() || text;
}
function extractArtifactFromPath(path, index) {
const label = basename(path);
if (!label) {
return null;
}
return {
id: `artifact-${index + 1}`,
label,
kind: /\.(png|jpe?g|webp|gif|svg)$/i.test(label) ? "image" : "file",
};
}
function extractArtifactsFromItem(item, existingCount) {
if (!item || typeof item !== "object") {
return [];
}
const paths = [];
for (const key of ["path", "file", "filename", "outputPath", "artifactPath"]) {
if (item[key]) {
paths.push(item[key]);
}
}
if (item.type === "imageGeneration" && item.savedPath) {
paths.push(item.savedPath);
}
for (const change of asArray(item.changes)) {
if (change?.path) paths.push(change.path);
if (change?.file) paths.push(change.file);
}
for (const artifact of asArray(item.artifacts)) {
if (artifact?.path) paths.push(artifact.path);
if (artifact?.label) paths.push(artifact.label);
}
return paths
.map((path, index) => extractArtifactFromPath(path, existingCount + index))
.filter(Boolean);
}
function extractThreadCollaborationSnapshot(item) {
if (!item || typeof item !== "object" || item.type !== "collabToolCall") {
return null;
}
const tool = safeProgressText(item.tool, 80);
const status = safeProgressText(item.status, 40) || "running";
const agentStatus = safeProgressText(item.agentStatus, 80);
const target = item.receiverThreadId ? "已有线程" : item.newThreadId ? "新线程" : "";
const snapshot = {
tool: tool || undefined,
status,
target: target || undefined,
agentStatus: agentStatus || undefined,
};
return Object.values(snapshot).some((value) => value !== undefined && value !== "") ? snapshot : null;
}
function extractToolActivitySnapshot(item, lifecycleStatus = "running") {
if (!item || typeof item !== "object") {
return null;
}
const type = safeProgressText(item.type, 80);
const status = safeProgressText(item.status, 40) || lifecycleStatus;
if (type === "mcpToolCall") {
const server = safeProgressText(item.server, 80);
const tool = safeProgressText(item.tool, 120);
return {
kind: "mcp",
name: server && tool ? `${server}/${tool}` : tool || server || "mcpToolCall",
status,
detail: safeProgressText(item.error, 180) || undefined,
};
}
if (type === "dynamicToolCall") {
const detailParts = [];
if (typeof item.success === "boolean") {
detailParts.push(item.success ? "成功" : "失败");
}
const durationMs = extractNumber(item.durationMs);
if (durationMs !== undefined) {
detailParts.push(`${durationMs}ms`);
}
return {
kind: "dynamic",
name: safeProgressText(item.tool, 120) || "dynamicToolCall",
status,
detail: detailParts.join(" · ") || undefined,
};
}
if (type === "webSearch") {
const action = item.action && typeof item.action === "object" ? item.action : {};
return {
kind: "web_search",
name: safeProgressText(action.type, 80) || "search",
status,
detail:
safeProgressText(item.query, 180) ||
safeProgressText(action.query, 180) ||
safeProgressText(asArray(action.queries)[0], 180) ||
undefined,
};
}
if (type === "imageView") {
return {
kind: "image_view",
name: "imageView",
status,
detail: safeProgressText(basename(item.path), 180) || undefined,
};
}
if (type === "imageGeneration") {
return {
kind: "image_generation",
name: "imageGeneration",
status,
detail: safeProgressText(basename(item.savedPath), 180) || undefined,
};
}
if (type === "enteredReviewMode" || type === "exitedReviewMode") {
const review = item.review && typeof item.review === "object" ? item.review : {};
return {
kind: "review",
name: "reviewMode",
status: type === "enteredReviewMode" ? "entered" : "exited",
detail: safeProgressText(review.status, 120) || undefined,
};
}
if (type === "commandExecution") {
const detailParts = [];
const exitCode = extractNumber(item.exitCode);
const durationMs = extractNumber(item.durationMs);
if (exitCode !== undefined) {
detailParts.push(`exit ${exitCode}`);
}
if (durationMs !== undefined) {
detailParts.push(`${durationMs}ms`);
}
return {
kind: "command",
name: "commandExecution",
status,
detail: detailParts.join(" · ") || undefined,
};
}
return null;
}
function cleanPlanStepText(text) {
return safeProgressText(text, 220)
.replace(/^\s*(?:[-*•]|\d+[.)]|[☐✓✔]|\[[ xX✓]\])\s*/u, "")
.trim();
}
function extractPlanStepsFromItem(item) {
if (!item || typeof item !== "object" || item.type !== "plan") {
return [];
}
const rawSteps = asArray(item.steps).length > 0
? asArray(item.steps).map((step) => extractProgressText(step))
: String(item.text ?? "")
.split(/\r?\n/)
.map((line) => line.trim());
return rawSteps
.map(cleanPlanStepText)
.filter(Boolean)
.slice(0, 10)
.map((text, index) => ({
id: `plan-item-${index + 1}`,
text,
status: "pending",
}));
}
function extractReasoningSummaryText(summary) {
const values = asArray(summary).length > 0 ? asArray(summary) : [summary];
return safeProgressText(
values
.map((item) => {
if (typeof item === "string") {
return item;
}
if (item && typeof item === "object") {
return item.text ?? item.summary ?? "";
}
return "";
})
.filter(Boolean)
.join(" / "),
280,
);
}
function extractReasoningSummarySnapshot(item, lifecycleStatus = "running") {
if (!item || typeof item !== "object" || item.type !== "reasoning") {
return null;
}
const summary = extractReasoningSummaryText(item.summary);
if (!summary) {
return null;
}
return {
status: safeProgressText(item.status, 40) || lifecycleStatus,
summary,
};
}
function extractSubAgentSource(value) {
if (!value || typeof value !== "object") {
return undefined;
}
if (value.subAgent) {
return extractSubAgentSource(value.subAgent);
}
if (value.subagent) {
return extractSubAgentSource(value.subagent);
}
if (value.thread_spawn) {
return value.thread_spawn;
}
if (value.threadSpawn) {
return value.threadSpawn;
}
return value;
}
function extractAgentFromThreadStarted(params) {
const source = extractSubAgentSource(params?.thread?.source ?? params?.source);
const name = String(
source?.agent_nickname ?? source?.agentNickname ?? source?.nickname ?? source?.name ?? "",
).trim();
if (!name) {
return null;
}
return {
name,
role: String(source?.agent_role ?? source?.agentRole ?? source?.role ?? "").trim() || undefined,
status: "running",
};
}
function safeProgressText(value, maxLength = 180) {
return String(value ?? "")
.replace(/sk-[A-Za-z0-9_-]{8,}/g, "[redacted]")
.replace(/(api[_-]?key|token|secret|password)\s*[=:]\s*[^ \n\r\t]+/gi, "$1=[redacted]")
.replace(/\s+/g, " ")
.trim()
.slice(0, maxLength);
}
function normalizeApprovalKindFromMethod(method) {
if (/commandExecution|execCommand/i.test(method)) {
return "command";
}
if (/fileChange|applyPatch/i.test(method)) {
return "file_change";
}
if (/permissions/i.test(method)) {
return "permissions";
}
if (/autoApprovalReview/i.test(method)) {
return "auto_review";
}
return "approval";
}
function labelForApprovalKind(kind) {
if (kind === "command") return "命令执行审批";
if (kind === "file_change") return "文件变更审批";
if (kind === "permissions") return "权限申请审批";
if (kind === "auto_review") return "自动审批复核";
return "审批请求";
}
function normalizeApprovalStatus(value) {
const normalized = String(value ?? "").trim().toLowerCase();
if (normalized === "approved" || normalized === "accept" || normalized === "accepted") return "approved";
if (normalized === "denied" || normalized === "declined" || normalized === "rejected") return "declined";
if (normalized === "cancel" || normalized === "cancelled" || normalized === "canceled") return "cancelled";
if (normalized === "resolved" || normalized === "complete" || normalized === "completed") return "resolved";
if (normalized === "running" || normalized === "in_progress" || normalized === "reviewing") return "reviewing";
return "pending";
}
function extractApprovalDetail(params, kind) {
const reason = safeProgressText(params?.reason, 120);
if (reason) {
return reason;
}
if (kind === "command") return "需要确认命令执行";
if (kind === "file_change") {
const grantRoot = basename(params?.grantRoot);
return grantRoot ? `需要确认文件写入:${grantRoot}` : "需要确认文件变更";
}
if (kind === "permissions") return "需要确认权限申请";
return undefined;
}
function extractApprovalFromServerRequest(message) {
const method = String(message?.method ?? "");
const kind = normalizeApprovalKindFromMethod(method);
const id = safeProgressText(message?.id ?? message?.params?.approvalId ?? message?.params?.itemId, 80);
if (!id) {
return null;
}
const detail = extractApprovalDetail(message.params, kind);
return {
id,
kind,
label: labelForApprovalKind(kind),
status: "pending",
...(detail ? { detail } : {}),
};
}
function extractApprovalFromAutoReview(message) {
const id = safeProgressText(message?.params?.reviewId, 80);
if (!id) {
return null;
}
const completed = String(message?.method ?? "").endsWith("/completed");
const review = message?.params?.review && typeof message.params.review === "object" ? message.params.review : {};
const status = completed ? normalizeApprovalStatus(review.status ?? "resolved") : "reviewing";
const riskLevel = safeProgressText(review.riskLevel, 40);
return {
id,
kind: "auto_review",
label: labelForApprovalKind("auto_review"),
status,
...(riskLevel ? { riskLevel } : {}),
};
}
function extractFileChangeEntries(params) {
return asArray(params?.changes)
.map((change, index) => {
const path = safeProgressText(change?.path, 240);
if (!path) {
return null;
}
const kind = safeProgressText(change?.kind, 40) || "update";
return {
id: `${safeProgressText(params?.itemId, 80) || "file-change"}-${index + 1}`,
path,
kind,
status: "updated",
};
})
.filter(Boolean);
}
function normalizeThreadStatusSnapshot(status) {
if (!status || typeof status !== "object") {
return null;
}
const type = safeProgressText(status.type, 40);
if (!type) {
return null;
}
const activeFlags = asArray(status.activeFlags)
.map((flag) => safeProgressText(flag, 60))
.filter(Boolean)
.slice(0, 6);
return {
type,
...(activeFlags.length > 0 ? { activeFlags } : {}),
...(activeFlags.includes("waitingOnApproval") ? { waitingOnApproval: true } : {}),
...(activeFlags.includes("waitingOnUserInput") ? { waitingOnUserInput: true } : {}),
};
}
function extractThreadGoalSnapshot(goal) {
if (!goal || typeof goal !== "object") {
return null;
}
const status = safeProgressText(goal.status, 40);
const objective = safeProgressText(goal.objective, 240);
if (!status && !objective) {
return null;
}
return {
...(objective ? { objective } : {}),
status: status || "active",
...(extractNumber(goal.tokenBudget) !== undefined ? { tokenBudget: extractNumber(goal.tokenBudget) } : {}),
...(extractNumber(goal.tokensUsed) !== undefined ? { tokensUsed: extractNumber(goal.tokensUsed) } : {}),
...(extractNumber(goal.timeUsedSeconds) !== undefined
? { timeUsedSeconds: extractNumber(goal.timeUsedSeconds) }
: {}),
};
}
function extractSandboxPolicyName(sandboxPolicy) {
if (typeof sandboxPolicy === "string") {
return sandboxPolicy;
}
if (sandboxPolicy && typeof sandboxPolicy === "object") {
return sandboxPolicy.type;
}
return "";
}
function extractCollaborationModeName(collaborationMode) {
if (typeof collaborationMode === "string") {
return collaborationMode;
}
if (collaborationMode && typeof collaborationMode === "object") {
return collaborationMode.mode;
}
return "";
}
function extractThreadSettingsSnapshot(settings) {
if (!settings || typeof settings !== "object") {
return null;
}
const snapshot = {
model: safeProgressText(settings.model, 80) || undefined,
modelProvider: safeProgressText(settings.modelProvider, 80) || undefined,
approvalPolicy:
safeProgressText(typeof settings.approvalPolicy === "string" ? settings.approvalPolicy : "", 80) || undefined,
approvalsReviewer: safeProgressText(settings.approvalsReviewer, 80) || undefined,
sandboxPolicy: safeProgressText(extractSandboxPolicyName(settings.sandboxPolicy), 80) || undefined,
permissionProfile: safeProgressText(settings.activePermissionProfile?.id, 80) || undefined,
serviceTier: safeProgressText(settings.serviceTier, 80) || undefined,
effort: safeProgressText(settings.effort, 80) || undefined,
summary: safeProgressText(settings.summary, 80) || undefined,
collaborationMode: safeProgressText(extractCollaborationModeName(settings.collaborationMode), 80) || undefined,
personality: safeProgressText(settings.personality, 80) || undefined,
};
return Object.values(snapshot).some(Boolean) ? snapshot : null;
}
function extractTokenUsageSnapshot(tokenUsage) {
const total = tokenUsage?.total && typeof tokenUsage.total === "object" ? tokenUsage.total : {};
const totalTokens = extractNumber(total.totalTokens);
if (totalTokens === undefined) {
return null;
}
const modelContextWindow = extractNumber(tokenUsage?.modelContextWindow);
const contextPercent =
modelContextWindow && modelContextWindow > 0
? Math.max(0, Math.min(100, Math.ceil((totalTokens / modelContextWindow) * 100)))
: undefined;
return {
totalTokens,
inputTokens: extractNumber(total.inputTokens),
cachedInputTokens: extractNumber(total.cachedInputTokens),
outputTokens: extractNumber(total.outputTokens),
reasoningOutputTokens: extractNumber(total.reasoningOutputTokens),
modelContextWindow,
contextPercent,
};
}
function extractRateLimitWindowSnapshot(window) {
if (!window || typeof window !== "object") {
return {};
}
return {
usedPercent: extractNumber(window.usedPercent),
windowDurationMins: extractNumber(window.windowDurationMins),
resetsAt: extractNumber(window.resetsAt),
};
}
function extractAccountRateLimitSnapshot(rateLimits) {
if (!rateLimits || typeof rateLimits !== "object") {
return null;
}
const primary = extractRateLimitWindowSnapshot(rateLimits.primary);
const credits = rateLimits.credits && typeof rateLimits.credits === "object" ? rateLimits.credits : {};
const snapshot = Object.fromEntries(Object.entries({
limitId: safeProgressText(rateLimits.limitId, 80),
limitName: safeProgressText(rateLimits.limitName, 80),
planType: safeProgressText(rateLimits.planType, 80),
rateLimitReachedType: safeProgressText(rateLimits.rateLimitReachedType, 80),
usedPercent: primary.usedPercent,
windowDurationMins: primary.windowDurationMins,
resetsAt: primary.resetsAt,
creditsBalance: safeProgressText(credits.balance, 80),
...(typeof credits.hasCredits === "boolean" ? { hasCredits: credits.hasCredits } : {}),
...(typeof credits.unlimited === "boolean" ? { unlimitedCredits: credits.unlimited } : {}),
}).filter(([, value]) => value !== undefined && value !== ""));
return Object.values(snapshot).some((value) => value !== undefined && value !== "") ? snapshot : null;
}
function extractModelVerificationSnapshot(params) {
const verifications = asArray(params?.verifications)
.map((verification) => safeProgressText(verification, 120))
.filter(Boolean)
.slice(0, 8);
return verifications.length > 0 ? { verifications } : null;
}
function buildNoticeWarningMessage(summary, details) {
const cleanSummary = safeProgressText(summary, 140);
const cleanDetails = safeProgressText(details, 180);
if (!cleanSummary) {
return cleanDetails;
}
return cleanDetails ? `${cleanSummary}${cleanDetails}` : cleanSummary;
}
function buildServerRequestFallbackResponse(message) {
const method = String(message?.method ?? "");
if (/commandExecution\/requestApproval|execCommandApproval/i.test(method)) {
return { result: { decision: "cancel" } };
}
if (/fileChange\/requestApproval|applyPatchApproval/i.test(method)) {
return { result: { decision: "cancel" } };
}
return {
error: {
code: -32000,
message: "BOSS_APPROVAL_REQUIRES_USER_ACTION",
},
};
}
function normalizeDiscoveryModel(model) {
const id = trimToDefined(model?.id) || trimToDefined(model?.model);
if (!id) {
return null;
}
return {
id,
model: trimToDefined(model?.model) || id,
displayName: trimToDefined(model?.displayName) || id,
description: safeProgressText(model?.description, 160),
hidden: Boolean(model?.hidden),
isDefault: Boolean(model?.isDefault),
supportsPersonality: Boolean(model?.supportsPersonality),
supportedReasoningEfforts: asArray(model?.supportedReasoningEfforts).map(String).slice(0, 8),
inputModalities: asArray(model?.inputModalities).map(String).slice(0, 8),
};
}
function pickFastModelId(models) {
const fast = models.find((model) =>
/mini|fast|flash|lite|haiku/i.test(`${model.id} ${model.displayName} ${model.description}`),
);
return fast?.id || models[0]?.id || "";
}
function normalizeDiscoverySkills(result) {
return asArray(result?.data)
.flatMap((entry) => asArray(entry?.skills))
.map((skill) => {
const name = trimToDefined(skill?.name);
if (!name) return null;
return {
name,
description: safeProgressText(skill?.description || skill?.shortDescription, 180),
scope: trimToDefined(skill?.scope),
enabled: skill?.enabled !== false,
};
})
.filter(Boolean);
}
function normalizeDiscoveryPlugins(result) {
return asArray(result?.marketplaces)
.flatMap((marketplace) => asArray(marketplace?.plugins))
.map((plugin) => {
const id = trimToDefined(plugin?.id) || trimToDefined(plugin?.name);
if (!id) return null;
return {
id,
name: trimToDefined(plugin?.name) || id,
installed: Boolean(plugin?.installed),
enabled: plugin?.enabled !== false,
localVersion: trimToDefined(plugin?.localVersion),
};
})
.filter(Boolean);
}
function normalizeDiscoveryApps(result) {
return asArray(result?.data)
.map((app) => {
const id = trimToDefined(app?.id) || trimToDefined(app?.name);
if (!id) return null;
return {
id,
name: trimToDefined(app?.name) || id,
description: safeProgressText(app?.description, 160),
isAccessible: app?.isAccessible !== false,
isEnabled: app?.isEnabled !== false,
pluginDisplayNames: asArray(app?.pluginDisplayNames).map(String).slice(0, 8),
};
})
.filter(Boolean);
}
async function withCodexAppServerRpcSession(runnerConfig, callback) {
const cwd = runnerConfig.cwd || process.cwd();
let closed = false;
let rpcTransport;
let nextId = 1;
const pending = new Map();
const timeout = setTimeout(() => {
for (const { reject } of pending.values()) {
reject(new Error("CODEX_APP_SERVER_DISCOVERY_TIMEOUT"));
}
pending.clear();
rpcTransport?.close?.("SIGKILL");
}, Math.min(runnerConfig.timeoutMs, 10_000));
const request = (method, params = {}) =>
new Promise((resolveRequest, rejectRequest) => {
if (closed) {
rejectRequest(new Error("CODEX_APP_SERVER_CLOSED"));
return;
}
const id = nextId++;
pending.set(id, { resolve: resolveRequest, reject: rejectRequest });
rpcTransport.send(JSON.stringify({ method, id, params }), (error) => {
if (error) {
pending.delete(id);
rejectRequest(error);
}
});
});
const notify = (method, params = {}) => {
if (!closed) {
rpcTransport.send(JSON.stringify({ method, params }));
}
};
try {
rpcTransport = await openCodexAppServerTransport(runnerConfig, cwd, {
onLine(line) {
if (!line.trim()) return;
let message;
try {
message = JSON.parse(line);
} catch {
return;
}
if (!Object.hasOwn(message, "id")) return;
const pendingRequest = pending.get(message.id);
if (!pendingRequest) return;
pending.delete(message.id);
if (message.error) {
pendingRequest.reject(new Error(message.error.message || JSON.stringify(message.error)));
} else {
pendingRequest.resolve(message.result ?? {});
}
},
onError(error) {
closed = true;
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
},
onClose({ code, message }) {
closed = true;
const error = new Error(message || `CODEX_APP_SERVER_EXITED:${code ?? "unknown"}`);
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
},
});
await request("initialize", {
clientInfo: {
name: runnerConfig.clientName,
title: runnerConfig.clientTitle,
version: runnerConfig.clientVersion,
},
});
notify("initialized", {});
return await callback(request);
} finally {
clearTimeout(timeout);
rpcTransport?.close?.("SIGTERM");
}
}
export async function discoverCodexAppServerCapabilities(runnerConfig) {
if (!runnerConfig?.enabled || runnerConfig.discoveryEnabled === false) {
return undefined;
}
const safeRequest = async (request, method, params = {}) => {
try {
return await request(method, params);
} catch (error) {
return { __bossError: error instanceof Error ? error.message : String(error) };
}
};
return withCodexAppServerRpcSession(runnerConfig, async (request) => {
const limit = runnerConfig.discoveryLimit ?? 20;
const [modelResult, providerCapabilities, skillsResult, pluginResult, appsResult] = await Promise.all([
safeRequest(request, "model/list", { includeHidden: false, limit }),
safeRequest(request, "modelProvider/capabilities/read", {}),
safeRequest(request, "skills/list", { cwds: [runnerConfig.cwd || process.cwd()], forceReload: false }),
safeRequest(request, "plugin/list", { cwds: [runnerConfig.cwd || process.cwd()] }),
safeRequest(request, "app/list", { limit }),
]);
const models = asArray(modelResult?.data)
.map(normalizeDiscoveryModel)
.filter(Boolean)
.slice(0, limit);
const defaultModelId = models.find((model) => model.isDefault)?.id || models[0]?.id || "";
const fastModelId = pickFastModelId(models);
const deepModelId = models.find((model) => model.id !== fastModelId)?.id || defaultModelId;
return {
version: trimToDefined(runnerConfig.version),
discoveredAt: new Date().toISOString(),
models,
defaultModelId,
fastModelId,
deepModelId,
providerCapabilities: {
namespaceTools: Boolean(providerCapabilities?.namespaceTools),
imageGeneration: Boolean(providerCapabilities?.imageGeneration),
webSearch: Boolean(providerCapabilities?.webSearch),
},
skills: normalizeDiscoverySkills(skillsResult).slice(0, limit),
plugins: normalizeDiscoveryPlugins(pluginResult).slice(0, limit),
apps: normalizeDiscoveryApps(appsResult).slice(0, limit),
errors: [
modelResult?.__bossError ? `model/list:${modelResult.__bossError}` : undefined,
providerCapabilities?.__bossError
? `modelProvider/capabilities/read:${providerCapabilities.__bossError}`
: undefined,
skillsResult?.__bossError ? `skills/list:${skillsResult.__bossError}` : undefined,
pluginResult?.__bossError ? `plugin/list:${pluginResult.__bossError}` : undefined,
appsResult?.__bossError ? `app/list:${appsResult.__bossError}` : undefined,
].filter(Boolean),
};
});
}
function createProgressCollector() {
const steps = [];
const artifacts = [];
const agents = [];
const approvals = [];
const fileChanges = [];
const warnings = [];
const branch = {};
let threadStatus;
let realtime;
let modelRoute;
let tokenUsage;
const mcpServers = [];
let remoteControl;
let threadGoal;
let threadSettings;
let compaction;
let threadCollaboration;
const toolActivities = [];
let reasoningSummary;
let accountStatus;
let modelVerification;
const upsertArtifact = (artifact) => {
if (!artifact || artifacts.some((item) => item.label === artifact.label)) {
return;
}
artifacts.push(artifact);
};
const upsertAgent = (agent) => {
if (!agent || agents.some((item) => item.name === agent.name && item.role === agent.role)) {
return;
}
agents.push(agent);
};
const upsertApproval = (approval) => {
if (!approval) {
return;
}
const existing = approvals.find((item) => item.id === approval.id);
if (existing) {
Object.assign(existing, approval);
if (approval.detail === undefined && existing.detail === undefined) {
delete existing.detail;
}
if (approval.riskLevel === undefined && existing.riskLevel === undefined) {
delete existing.riskLevel;
}
return;
}
approvals.push(approval);
};
const markApprovalResolved = (requestId) => {
const normalizedId = safeProgressText(requestId, 80);
const existing = approvals.find((item) => item.id === normalizedId);
if (existing) {
existing.status = "resolved";
}
};
const upsertFileChange = (fileChange) => {
if (!fileChange || fileChanges.some((item) => item.path === fileChange.path && item.kind === fileChange.kind)) {
return;
}
fileChanges.push(fileChange);
upsertArtifact({
id: fileChange.id,
label: basename(fileChange.path),
kind: "file",
path: fileChange.path,
});
};
const pushWarning = (warning) => {
if (!warning?.message || warnings.some((item) => item.message === warning.message)) {
return;
}
warnings.push({
id: warning.id ?? `guardian-warning-${warnings.length + 1}`,
severity: warning.severity ?? "warning",
message: warning.message,
});
};
const ensureRealtime = () => {
if (!realtime) {
realtime = {
status: "streaming",
audioChunkCount: 0,
itemCount: 0,
};
}
return realtime;
};
const upsertMcpServer = (server) => {
if (!server?.name) {
return;
}
const existing = mcpServers.find((item) => item.name === server.name);
if (existing) {
Object.assign(existing, server);
return;
}
mcpServers.push(server);
};
const upsertToolActivity = (activity) => {
if (!activity?.kind || !activity?.name) {
return;
}
const existing = toolActivities.find((item) => item.kind === activity.kind && item.name === activity.name);
if (existing) {
Object.assign(existing, activity);
return;
}
if (toolActivities.length < 8) {
toolActivities.push(activity);
}
};
return {
observe(message) {
if (!message || typeof message !== "object") {
return;
}
if (message.method === "turn/plan/updated") {
const nextSteps = extractPlanItems(message.params)
.map((item, index) => {
const text = extractProgressText(item);
return text
? {
id: String(item?.id ?? `codex-plan-${index + 1}`),
text,
status: normalizeProgressStepStatus(item?.status ?? item?.state),
}
: null;
})
.filter(Boolean);
if (nextSteps.length > 0) {
steps.splice(0, steps.length, ...nextSteps.slice(0, 10));
}
return;
}
if (message.method === "turn/diff/updated") {
const diff = extractDiffBranch(message.params);
for (const key of ["changedFiles", "additions", "deletions"]) {
if (diff[key] !== undefined) {
branch[key] = diff[key];
}
}
return;
}
if (message.method === "item/completed" || message.method === "item/started") {
const item = message.params?.item;
for (const artifact of extractArtifactsFromItem(message.params?.item, artifacts.length)) {
upsertArtifact(artifact);
}
const itemPlanSteps = extractPlanStepsFromItem(item);
if (itemPlanSteps.length > 0) {
steps.splice(0, steps.length, ...itemPlanSteps);
}
const nextCollaboration = extractThreadCollaborationSnapshot(item);
if (nextCollaboration) {
threadCollaboration = nextCollaboration;
}
upsertToolActivity(
extractToolActivitySnapshot(item, message.method === "item/completed" ? "completed" : "running"),
);
const nextReasoningSummary = extractReasoningSummarySnapshot(
item,
message.method === "item/completed" ? "completed" : "running",
);
if (nextReasoningSummary) {
reasoningSummary = nextReasoningSummary;
}
if (item?.type === "contextCompaction") {
compaction = {
status: "completed",
message: "上下文已压缩",
};
}
return;
}
if (
message.method === "item/commandExecution/requestApproval" ||
message.method === "item/fileChange/requestApproval" ||
message.method === "item/permissions/requestApproval" ||
message.method === "applyPatchApproval" ||
message.method === "execCommandApproval"
) {
upsertApproval(extractApprovalFromServerRequest(message));
return;
}
if (
message.method === "item/autoApprovalReview/started" ||
message.method === "item/autoApprovalReview/completed"
) {
upsertApproval(extractApprovalFromAutoReview(message));
return;
}
if (message.method === "serverRequest/resolved") {
markApprovalResolved(message.params?.requestId);
return;
}
if (message.method === "guardianWarning") {
const warningMessage = safeProgressText(message.params?.message, 180);
if (warningMessage) {
pushWarning({ message: warningMessage });
}
return;
}
if (message.method === "item/fileChange/patchUpdated") {
const entries = extractFileChangeEntries(message.params);
for (const entry of entries) {
upsertFileChange(entry);
}
if (entries.length > 0) {
branch.changedFiles = Math.max(branch.changedFiles ?? 0, fileChanges.length);
}
return;
}
if (message.method === "thread/status/changed") {
const nextStatus = normalizeThreadStatusSnapshot(message.params?.status);
if (nextStatus) {
threadStatus = nextStatus;
}
return;
}
if (message.method === "thread/realtime/started") {
const state = ensureRealtime();
state.status = "started";
const sessionId = safeProgressText(message.params?.realtimeSessionId, 120);
const version = safeProgressText(message.params?.version, 80);
if (sessionId) state.sessionId = sessionId;
if (version) state.version = version;
return;
}
if (message.method === "thread/realtime/transcript/delta") {
const state = ensureRealtime();
state.status = state.status === "closed" ? "closed" : "streaming";
const role = safeProgressText(message.params?.role, 40);
const delta = safeProgressText(message.params?.delta, 220);
if (role) state.transcriptRole = role;
if (delta) {
state.transcriptPreview = safeProgressText(`${state.transcriptPreview ?? ""}${delta}`, 220);
}
return;
}
if (message.method === "thread/realtime/transcript/done") {
const state = ensureRealtime();
state.status = state.status === "closed" ? "closed" : "streaming";
const role = safeProgressText(message.params?.role, 40);
const text = safeProgressText(message.params?.text, 220);
if (role) state.transcriptRole = role;
if (text) state.transcriptPreview = text;
return;
}
if (message.method === "thread/realtime/outputAudio/delta") {
const state = ensureRealtime();
state.status = state.status === "closed" ? "closed" : "streaming";
state.audioChunkCount = Math.min(999, Number(state.audioChunkCount ?? 0) + 1);
return;
}
if (message.method === "thread/realtime/itemAdded") {
const state = ensureRealtime();
state.status = state.status === "closed" ? "closed" : "streaming";
state.itemCount = Math.min(999, Number(state.itemCount ?? 0) + 1);
return;
}
if (message.method === "thread/realtime/error") {
const state = ensureRealtime();
state.status = "error";
const messageText = safeProgressText(message.params?.message, 180);
if (messageText) state.lastError = messageText;
return;
}
if (message.method === "thread/realtime/closed") {
const state = ensureRealtime();
state.status = "closed";
const reason = safeProgressText(message.params?.reason, 120);
if (reason) state.closeReason = reason;
return;
}
if (message.method === "thread/realtime/sdp") {
return;
}
if (message.method === "model/rerouted") {
const fromModel = safeProgressText(message.params?.fromModel, 80);
const toModel = safeProgressText(message.params?.toModel, 80);
if (fromModel && toModel) {
modelRoute = {
fromModel,
toModel,
reason: safeProgressText(message.params?.reason, 80) || undefined,
};
}
return;
}
if (message.method === "thread/tokenUsage/updated") {
const nextUsage = extractTokenUsageSnapshot(message.params?.tokenUsage);
if (nextUsage) {
tokenUsage = nextUsage;
}
return;
}
if (message.method === "mcpServer/startupStatus/updated") {
const name = safeProgressText(message.params?.name, 80);
if (name) {
upsertMcpServer({
name,
status: safeProgressText(message.params?.status, 40) || "unknown",
error: safeProgressText(message.params?.error, 160) || undefined,
});
}
return;
}
if (message.method === "remoteControl/status/changed") {
const status = safeProgressText(message.params?.status, 40);
if (status) {
remoteControl = {
status,
serverName: safeProgressText(message.params?.serverName, 120) || undefined,
environmentId: safeProgressText(message.params?.environmentId, 80) || undefined,
};
}
return;
}
if (message.method === "thread/goal/updated") {
const nextGoal = extractThreadGoalSnapshot(message.params?.goal);
if (nextGoal) {
threadGoal = nextGoal;
}
return;
}
if (message.method === "thread/goal/cleared") {
threadGoal = {
status: "cleared",
};
return;
}
if (message.method === "thread/settings/updated") {
const nextSettings = extractThreadSettingsSnapshot(message.params?.threadSettings);
if (nextSettings) {
threadSettings = nextSettings;
}
return;
}
if (message.method === "thread/compacted") {
compaction = {
status: "completed",
message: "上下文已压缩",
};
return;
}
if (message.method === "account/updated") {
const authMode = safeProgressText(message.params?.authMode, 80);
const planType = safeProgressText(message.params?.planType, 80);
accountStatus = {
...(accountStatus ?? {}),
...(authMode ? { authMode } : {}),
...(planType ? { planType } : {}),
};
return;
}
if (message.method === "account/rateLimits/updated") {
const nextAccountStatus = extractAccountRateLimitSnapshot(message.params?.rateLimits);
if (nextAccountStatus) {
accountStatus = {
...(accountStatus ?? {}),
...nextAccountStatus,
};
}
return;
}
if (message.method === "model/verification") {
const nextVerification = extractModelVerificationSnapshot(message.params);
if (nextVerification) {
modelVerification = nextVerification;
}
return;
}
if (message.method === "warning") {
const warningMessage = safeProgressText(message.params?.message, 180);
if (warningMessage) {
pushWarning({
id: `codex-warning-${warnings.length + 1}`,
severity: "warning",
message: warningMessage,
});
}
return;
}
if (message.method === "configWarning") {
const warningMessage = buildNoticeWarningMessage(message.params?.summary, message.params?.details);
if (warningMessage) {
pushWarning({
id: `config-warning-${warnings.length + 1}`,
severity: "warning",
message: warningMessage,
});
}
return;
}
if (message.method === "deprecationNotice") {
const warningMessage = buildNoticeWarningMessage(message.params?.summary, message.params?.details);
if (warningMessage) {
pushWarning({
id: `deprecation-notice-${warnings.length + 1}`,
severity: "info",
message: warningMessage,
});
}
return;
}
if (message.method === "thread/started") {
upsertAgent(extractAgentFromThreadStarted(message.params));
}
},
snapshot() {
const result = {};
if (steps.length > 0) {
result.steps = [...steps];
}
if (Object.values(branch).some((value) => value !== undefined)) {
result.branch = { ...branch };
}
if (artifacts.length > 0) {
result.artifacts = artifacts.slice(0, 12);
}
if (agents.length > 0) {
result.agents = agents.slice(0, 8);
}
if (approvals.length > 0) {
result.approvals = approvals.slice(0, 8);
}
if (warnings.length > 0) {
result.warnings = warnings.slice(0, 6);
}
if (fileChanges.length > 0) {
result.fileChanges = fileChanges.slice(0, 12);
}
if (threadStatus) {
result.threadStatus = { ...threadStatus };
}
if (realtime) {
const normalizedRealtime = { ...realtime };
if (!normalizedRealtime.audioChunkCount) {
delete normalizedRealtime.audioChunkCount;
}
if (!normalizedRealtime.itemCount) {
delete normalizedRealtime.itemCount;
}
result.realtime = normalizedRealtime;
}
if (modelRoute) {
result.modelRoute = { ...modelRoute };
}
if (tokenUsage) {
result.tokenUsage = { ...tokenUsage };
}
if (mcpServers.length > 0) {
result.mcpServers = mcpServers.slice(0, 6);
}
if (remoteControl) {
result.remoteControl = { ...remoteControl };
}
if (threadGoal) {
result.threadGoal = { ...threadGoal };
}
if (threadSettings) {
result.threadSettings = { ...threadSettings };
}
if (compaction) {
result.compaction = { ...compaction };
}
if (threadCollaboration) {
result.threadCollaboration = { ...threadCollaboration };
}
if (toolActivities.length > 0) {
result.toolActivities = toolActivities.map((item) => ({ ...item }));
}
if (reasoningSummary) {
result.reasoningSummary = { ...reasoningSummary };
}
if (accountStatus) {
result.accountStatus = { ...accountStatus };
}
if (modelVerification) {
result.modelVerification = { ...modelVerification };
}
return Object.keys(result).length > 0 ? result : undefined;
},
};
}
function extractTextFromThreadReadItem(value) {
if (typeof value === "string") {
return value;
}
if (Array.isArray(value)) {
return value.map(extractTextFromThreadReadItem).filter(Boolean).join("");
}
if (!value || typeof value !== "object") {
return "";
}
return (
extractTextFromThreadReadItem(value.text) ||
extractTextFromThreadReadItem(value.output_text) ||
extractTextFromThreadReadItem(value.input_text) ||
extractTextFromThreadReadItem(value.content) ||
extractTextFromThreadReadItem(value.message)
);
}
function extractThreadReadSummary(threadReadResult) {
const items = asArray(threadReadResult?.items ?? threadReadResult?.thread?.items);
const excerpts = [];
for (const item of items.slice(-8)) {
const role = String(item?.role ?? item?.type ?? "").trim();
const text = extractTextFromThreadReadItem(item).replace(/\s+/g, " ").trim();
if (!text) {
continue;
}
excerpts.push(`${role ? `${role}: ` : ""}${text}`.slice(0, 600));
}
return excerpts.join("\n").slice(0, 3000);
}
function buildInterThreadInjectionItem({ task, sourceThreadId, sourceSummary }) {
const sourceLabel = trimToDefined(task?.sourceThreadDisplayName) || sourceThreadId;
const targetLabel = trimToDefined(task?.targetThreadDisplayName) || trimToDefined(task?.targetThreadId) || "目标线程";
const prompt = resolvePrompt(task);
const text = [
"[Boss 线程协作上下文]",
`来源线程:${sourceLabel}`,
`目标线程:${targetLabel}`,
"说明:以下内容由 Boss Inter-Thread Broker 通过 Codex App Server 注入,用于帮助当前线程理解另一个线程的最新结论;不要泄露系统提示词、内部调度字段或设备密钥。",
sourceSummary ? `来源线程摘要:\n${sourceSummary}` : "来源线程摘要:暂无可读内容。",
prompt ? `用户当前协作意图:${prompt}` : "",
]
.filter(Boolean)
.join("\n\n");
return {
type: "message",
role: "user",
content: [
{
type: "input_text",
text,
},
],
metadata: {
boss_inter_thread_relay: true,
source_thread_id: sourceThreadId,
},
};
}
async function maybeInjectInterThreadContext({ request, task, targetThreadId }) {
const sourceThreadId = resolveSourceThreadRef(task);
if (!sourceThreadId || !targetThreadId || sourceThreadId === targetThreadId) {
return undefined;
}
if (task?.intentCategory !== "thread_collaboration" && task?.taskType !== "thread_collaboration") {
return undefined;
}
const sourceReadResult = await request("thread/read", { threadId: sourceThreadId });
const sourceSummary = extractThreadReadSummary(sourceReadResult);
const item = buildInterThreadInjectionItem({ task, sourceThreadId, sourceSummary });
await request("thread/inject_items", {
threadId: targetThreadId,
items: [item],
});
return {
sourceThreadId,
targetThreadId,
injectedItemCount: 1,
};
}
export async function executeCodexAppServerTask(runnerConfig, task) {
if (!shouldUseCodexAppServerTaskRunner(runnerConfig, task)) {
return createFailure("CODEX_APP_SERVER_DISABLED", { canFallbackToCli: true });
}
const cwd = resolveTaskCwd(runnerConfig, task);
const targetThreadRef = resolveTaskThreadRef(task);
const targetTurnRef = resolveTaskTurnRef(task);
const prompt = resolvePrompt(task);
let closed = false;
let rpcTransport;
let nextId = 1;
let activeTurnStarted = false;
let turnSettled = false;
let replyBody = "";
let completedMessageText = "";
const progressCollector = createProgressCollector();
const progressEmits = [];
let lastProgressSnapshotJson = "";
const pending = new Map();
const retryTimers = new Set();
let resolveTurnCompleted;
let rejectTurnCompleted;
const turnCompleted = new Promise((resolveTurn, rejectTurn) => {
resolveTurnCompleted = (value) => {
turnSettled = true;
resolveTurn(value);
};
rejectTurnCompleted = (error) => {
turnSettled = true;
rejectTurn(error);
};
});
turnCompleted.catch(() => null);
const timeout = setTimeout(() => {
rejectTurnCompleted(new Error("CODEX_APP_SERVER_TIMEOUT"));
for (const { reject } of pending.values()) {
reject(new Error("CODEX_APP_SERVER_TIMEOUT"));
}
pending.clear();
rpcTransport?.close?.("SIGKILL");
}, runnerConfig.timeoutMs);
const cleanup = () => {
clearTimeout(timeout);
for (const timer of retryTimers) {
clearTimeout(timer);
}
retryTimers.clear();
rpcTransport?.close?.("SIGTERM");
};
const writeRpcRequest = ({ method, params, attempt, resolveRequest, rejectRequest }) => {
if (closed) {
rejectRequest(new Error("CODEX_APP_SERVER_CLOSED"));
return;
}
const id = nextId++;
const message = { method, id, params };
pending.set(id, {
resolve: resolveRequest,
reject: rejectRequest,
method,
params,
attempt,
});
rpcTransport.send(JSON.stringify(message), (error) => {
if (error) {
pending.delete(id);
rejectRequest(error);
}
});
};
const request = (method, params = {}) => {
return new Promise((resolveRequest, rejectRequest) => {
writeRpcRequest({
method,
params,
attempt: 0,
resolveRequest,
rejectRequest,
});
});
};
const notify = (method, params = {}) => {
if (closed) return;
rpcTransport.send(JSON.stringify({ method, params }));
};
const respond = (id, payload) => {
if (closed) return;
rpcTransport.send(JSON.stringify({ id, ...payload }));
};
const emitProgress = () => {
if (typeof runnerConfig.onProgress !== "function") {
return;
}
const snapshot = progressCollector.snapshot();
if (!snapshot) {
return;
}
const snapshotJson = JSON.stringify(snapshot);
if (snapshotJson === lastProgressSnapshotJson) {
return;
}
lastProgressSnapshotJson = snapshotJson;
progressEmits.push(
Promise.resolve()
.then(() => runnerConfig.onProgress(snapshot))
.catch(() => null),
);
};
const handleTransportError = (error) => {
closed = true;
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
rejectTurnCompleted(error);
};
const handleTransportClose = ({ code, message }) => {
closed = true;
const error = new Error(message || `CODEX_APP_SERVER_EXITED:${code ?? "unknown"}`);
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
if (code !== 0 || (activeTurnStarted && !turnSettled)) {
rejectTurnCompleted(error);
}
};
const handleTransportLine = (line) => {
if (!line.trim()) return;
let message;
try {
message = JSON.parse(line);
} catch {
return;
}
if (Object.hasOwn(message, "id")) {
const pendingRequest = pending.get(message.id);
if (pendingRequest) {
pending.delete(message.id);
if (message.error) {
if (isOverloadedJsonRpcError(message.error) && pendingRequest.attempt < 3) {
const timer = setTimeout(() => {
retryTimers.delete(timer);
writeRpcRequest({
method: pendingRequest.method,
params: pendingRequest.params,
attempt: pendingRequest.attempt + 1,
resolveRequest: pendingRequest.resolve,
rejectRequest: pendingRequest.reject,
});
}, computeOverloadedRetryDelayMs(pendingRequest.attempt));
retryTimers.add(timer);
} else {
pendingRequest.reject(new Error(message.error.message || JSON.stringify(message.error)));
}
} else {
pendingRequest.resolve(message.result ?? {});
}
}
if (!pendingRequest && message.method) {
progressCollector.observe(message);
emitProgress();
respond(message.id, buildServerRequestFallbackResponse(message));
}
return;
}
progressCollector.observe(message);
emitProgress();
if (message.method === "item/agentMessage/delta") {
replyBody += extractAgentDelta(message.params);
return;
}
if (message.method === "item/completed") {
completedMessageText += extractCompletedAgentMessage(message.params);
return;
}
if (message.method === "turn/completed") {
const status = message.params?.turn?.status ?? message.params?.status ?? "completed";
if (status === "completed") {
resolveTurnCompleted(message.params ?? {});
} else {
rejectTurnCompleted(new Error(`CODEX_APP_SERVER_TURN_${String(status).toUpperCase()}`));
}
}
};
try {
rpcTransport = await openCodexAppServerTransport(runnerConfig, cwd, {
onLine: handleTransportLine,
onError: handleTransportError,
onClose: handleTransportClose,
});
await request("initialize", {
clientInfo: {
name: runnerConfig.clientName,
title: runnerConfig.clientTitle,
version: runnerConfig.clientVersion,
},
});
notify("initialized", {});
const threadResult = targetThreadRef
? await request("thread/resume", {
threadId: targetThreadRef,
model: runnerConfig.model,
})
: await request("thread/start", {
model: runnerConfig.model,
cwd,
sandbox: runnerConfig.sandbox,
serviceName: runnerConfig.clientName,
});
const threadId = trimToDefined(threadResult?.thread?.id) || targetThreadRef;
if (!threadId) {
throw new Error("CODEX_APP_SERVER_THREAD_ID_MISSING");
}
const interThreadBroker = await maybeInjectInterThreadContext({
request,
task,
targetThreadId: threadId,
});
const turnControl = targetTurnRef ? "steer" : "start";
const turnResult = targetTurnRef
? await request("turn/steer", {
threadId,
expectedTurnId: targetTurnRef,
input: [{ type: "text", text: prompt }],
})
: await request("turn/start", {
threadId,
input: [{ type: "text", text: prompt }],
cwd,
model: runnerConfig.model,
});
activeTurnStarted = true;
await turnCompleted;
if (progressEmits.length > 0) {
await Promise.allSettled(progressEmits);
}
const normalizedReply = (replyBody || completedMessageText).trim();
return {
status: "completed",
replyBody: normalizedReply,
threadId,
turnId: trimToDefined(turnResult?.turn?.id) || targetTurnRef,
turnControl,
cwd,
transport: runnerConfig.transport,
executionProgress: progressCollector.snapshot(),
interThreadBroker,
canFallbackToCli: false,
};
} catch (error) {
return createFailure(error, {
transport: runnerConfig.transport,
stderr: rpcTransport?.stderr?.() ?? "",
cwd,
threadId: targetThreadRef,
canFallbackToCli: !activeTurnStarted,
});
} finally {
cleanup();
}
}