Files
boss/local-agent/codex-app-server-runner.mjs
2026-05-17 02:20:08 +08:00

361 lines
9.9 KiB
JavaScript

import { spawn } from "node:child_process";
import readline from "node:readline";
import { resolve } from "node:path";
function trimToDefined(value) {
const trimmed = String(value ?? "").trim();
return trimmed ? trimmed : undefined;
}
function boolFromEnv(value) {
const normalized = trimToDefined(value)?.toLowerCase();
return normalized === "1" || normalized === "true" || normalized === "yes";
}
function listFromEnv(value) {
if (!value) return undefined;
try {
const parsed = JSON.parse(String(value));
if (Array.isArray(parsed)) {
return parsed.map((item) => String(item));
}
} catch {
// Fall through to whitespace splitting.
}
return String(value).split(/\s+/).filter(Boolean);
}
function resolveTaskThreadRef(task) {
return trimToDefined(task?.targetCodexThreadRef || task?.targetThreadId);
}
function resolveTaskCwd(config, task) {
return resolve(
trimToDefined(task?.targetCodexFolderRef) ||
trimToDefined(config.masterAgentWorkdir) ||
process.cwd(),
);
}
function resolvePrompt(task) {
return String(task?.executionPrompt || task?.requestText || "").trim();
}
function normalizeTimeoutMs(value) {
const numeric = Number(value);
return Number.isFinite(numeric) && numeric > 0 ? Math.floor(numeric) : 120_000;
}
export function getCodexAppServerRunnerConfig(env = process.env, config = {}) {
const argsFromConfig = Array.isArray(config.codexAppServerArgs)
? config.codexAppServerArgs.map((item) => String(item))
: undefined;
const args =
argsFromConfig ??
listFromEnv(env.BOSS_CODEX_APP_SERVER_ARGS) ??
["app-server"];
return {
enabled:
config.codexAppServerEnabled === true ||
boolFromEnv(env.BOSS_CODEX_APP_SERVER_ENABLED),
command:
trimToDefined(config.codexAppServerCommand) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_COMMAND) ||
"codex",
args,
cwd:
trimToDefined(config.codexAppServerWorkdir) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_WORKDIR) ||
trimToDefined(config.masterAgentWorkdir) ||
process.cwd(),
timeoutMs: normalizeTimeoutMs(
config.codexAppServerTimeoutMs ?? env.BOSS_CODEX_APP_SERVER_TIMEOUT_MS,
),
clientName:
trimToDefined(config.codexAppServerClientName) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_NAME) ||
"boss_local_agent",
clientTitle:
trimToDefined(config.codexAppServerClientTitle) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_TITLE) ||
"Boss Local Agent",
clientVersion:
trimToDefined(config.codexAppServerClientVersion) ||
trimToDefined(env.BOSS_CODEX_APP_SERVER_CLIENT_VERSION) ||
"0.1.0",
model: trimToDefined(config.masterAgentModel || env.BOSS_MASTER_AGENT_MODEL),
sandbox: trimToDefined(config.masterAgentSandbox),
transport: "stdio",
};
}
export function shouldUseCodexAppServerTaskRunner(runnerConfig, task) {
if (!runnerConfig?.enabled || !runnerConfig.command) {
return false;
}
const taskType = trimToDefined(task?.taskType);
if (taskType !== "conversation_reply" && taskType !== "dispatch_execution") {
return false;
}
return Boolean(resolvePrompt(task));
}
function extractAgentTextFromContent(value) {
if (typeof value === "string") {
return value;
}
if (Array.isArray(value)) {
return value.map(extractAgentTextFromContent).filter(Boolean).join("");
}
if (!value || typeof value !== "object") {
return "";
}
return (
extractAgentTextFromContent(value.text) ||
extractAgentTextFromContent(value.outputText) ||
extractAgentTextFromContent(value.content) ||
extractAgentTextFromContent(value.delta)
);
}
function extractAgentDelta(params) {
if (!params || typeof params !== "object") {
return "";
}
return (
extractAgentTextFromContent(params.delta) ||
extractAgentTextFromContent(params.text) ||
extractAgentTextFromContent(params.messageDelta) ||
extractAgentTextFromContent(params.item?.delta)
);
}
function extractCompletedAgentMessage(params) {
const item = params?.item;
if (!item || typeof item !== "object") {
return "";
}
const itemType = String(item.type || item.kind || "");
if (!/agent|assistant/i.test(itemType)) {
return "";
}
return (
extractAgentTextFromContent(item.text) ||
extractAgentTextFromContent(item.content) ||
extractAgentTextFromContent(item.message) ||
extractAgentTextFromContent(item.output)
);
}
function createFailure(error, extra = {}) {
return {
status: "failed",
errorMessage: error instanceof Error ? error.message : String(error),
transport: "stdio",
...extra,
};
}
export async function executeCodexAppServerTask(runnerConfig, task) {
if (!shouldUseCodexAppServerTaskRunner(runnerConfig, task)) {
return createFailure("CODEX_APP_SERVER_DISABLED", { canFallbackToCli: true });
}
const cwd = resolveTaskCwd(runnerConfig, task);
const targetThreadRef = resolveTaskThreadRef(task);
const prompt = resolvePrompt(task);
const child = spawn(runnerConfig.command, runnerConfig.args, {
cwd: runnerConfig.cwd || cwd,
env: process.env,
stdio: ["pipe", "pipe", "pipe"],
});
let closed = false;
let stderr = "";
let nextId = 1;
let activeTurnStarted = false;
let turnSettled = false;
let replyBody = "";
let completedMessageText = "";
const pending = new Map();
let resolveTurnCompleted;
let rejectTurnCompleted;
const turnCompleted = new Promise((resolveTurn, rejectTurn) => {
resolveTurnCompleted = (value) => {
turnSettled = true;
resolveTurn(value);
};
rejectTurnCompleted = (error) => {
turnSettled = true;
rejectTurn(error);
};
});
const timeout = setTimeout(() => {
rejectTurnCompleted(new Error("CODEX_APP_SERVER_TIMEOUT"));
for (const { reject } of pending.values()) {
reject(new Error("CODEX_APP_SERVER_TIMEOUT"));
}
pending.clear();
if (!child.killed) {
child.kill("SIGKILL");
}
}, runnerConfig.timeoutMs);
const rl = readline.createInterface({ input: child.stdout });
const cleanup = () => {
clearTimeout(timeout);
rl.close();
if (!child.killed) {
child.kill("SIGTERM");
}
};
const request = (method, params = {}) => {
if (closed) {
return Promise.reject(new Error("CODEX_APP_SERVER_CLOSED"));
}
const id = nextId++;
const message = { method, id, params };
return new Promise((resolveRequest, rejectRequest) => {
pending.set(id, { resolve: resolveRequest, reject: rejectRequest });
child.stdin.write(`${JSON.stringify(message)}\n`, (error) => {
if (error) {
pending.delete(id);
rejectRequest(error);
}
});
});
};
const notify = (method, params = {}) => {
if (closed) return;
child.stdin.write(`${JSON.stringify({ method, params })}\n`);
};
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
child.on("error", (error) => {
closed = true;
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
rejectTurnCompleted(error);
});
child.on("close", (code) => {
closed = true;
const error = new Error(
stderr.trim() || `CODEX_APP_SERVER_EXITED:${code ?? "unknown"}`,
);
for (const { reject } of pending.values()) {
reject(error);
}
pending.clear();
if (code !== 0 || (activeTurnStarted && !turnSettled)) {
rejectTurnCompleted(error);
}
});
rl.on("line", (line) => {
if (!line.trim()) return;
let message;
try {
message = JSON.parse(line);
} catch {
return;
}
if (Object.hasOwn(message, "id")) {
const pendingRequest = pending.get(message.id);
if (pendingRequest) {
pending.delete(message.id);
if (message.error) {
pendingRequest.reject(new Error(message.error.message || JSON.stringify(message.error)));
} else {
pendingRequest.resolve(message.result ?? {});
}
}
return;
}
if (message.method === "item/agentMessage/delta") {
replyBody += extractAgentDelta(message.params);
return;
}
if (message.method === "item/completed") {
completedMessageText += extractCompletedAgentMessage(message.params);
return;
}
if (message.method === "turn/completed") {
const status = message.params?.turn?.status ?? message.params?.status ?? "completed";
if (status === "completed") {
resolveTurnCompleted(message.params ?? {});
} else {
rejectTurnCompleted(new Error(`CODEX_APP_SERVER_TURN_${String(status).toUpperCase()}`));
}
}
});
try {
await request("initialize", {
clientInfo: {
name: runnerConfig.clientName,
title: runnerConfig.clientTitle,
version: runnerConfig.clientVersion,
},
});
notify("initialized", {});
const threadResult = targetThreadRef
? await request("thread/resume", {
threadId: targetThreadRef,
model: runnerConfig.model,
})
: await request("thread/start", {
model: runnerConfig.model,
cwd,
sandbox: runnerConfig.sandbox,
serviceName: runnerConfig.clientName,
});
const threadId = trimToDefined(threadResult?.thread?.id) || targetThreadRef;
if (!threadId) {
throw new Error("CODEX_APP_SERVER_THREAD_ID_MISSING");
}
await request("turn/start", {
threadId,
input: [{ type: "text", text: prompt }],
cwd,
model: runnerConfig.model,
});
activeTurnStarted = true;
await turnCompleted;
const normalizedReply = (replyBody || completedMessageText).trim();
return {
status: "completed",
replyBody: normalizedReply,
threadId,
cwd,
transport: "stdio",
canFallbackToCli: false,
};
} catch (error) {
return createFailure(error, {
stderr: stderr.trim(),
cwd,
threadId: targetThreadRef,
canFallbackToCli: !activeTurnStarted,
});
} finally {
cleanup();
}
}