Files
boss/local-agent/codex-task-runner.mjs

380 lines
10 KiB
JavaScript

import os from "node:os";
import { readFileSync } from "node:fs";
import { constants } from "node:fs";
import { access, readFile, readdir, stat } from "node:fs/promises";
import { DatabaseSync } from "node:sqlite";
import { dirname, resolve } from "node:path";
function trimToDefined(value) {
const trimmed = String(value ?? "").trim();
return trimmed ? trimmed : undefined;
}
function parseSandboxPolicyType(value) {
const raw = trimToDefined(value);
if (!raw) {
return undefined;
}
try {
const parsed = JSON.parse(raw);
return trimToDefined(parsed?.type) || raw;
} catch {
return raw;
}
}
function isReadOnlySandboxPolicy(value) {
return parseSandboxPolicyType(value) === "read-only";
}
function resolveResumeTarget(config, task) {
const targetThreadRef = trimToDefined(task?.targetCodexThreadRef || task?.targetThreadId);
const targetFolderRef = trimToDefined(
task?.targetCodexFolderRef || config.masterAgentWorkdir || process.cwd(),
);
const cwd = resolve(targetFolderRef || process.cwd());
return {
targetThreadRef,
targetFolderRef: targetFolderRef || process.cwd(),
cwd,
};
}
function defaultCodexPath(relativePath) {
return resolve(os.homedir(), ".codex", relativePath);
}
function defaultSessionsDirForStateDb(stateDbPath) {
const resolvedStateDbPath = trimToDefined(stateDbPath);
if (resolvedStateDbPath) {
return resolve(dirname(resolve(resolvedStateDbPath)), "sessions");
}
return defaultCodexPath("sessions");
}
function loadThreadWorkspaceHints(globalStatePath) {
try {
const raw = readFileSync(resolve(globalStatePath), "utf8");
const parsed = JSON.parse(raw);
return new Map(Object.entries(parsed["thread-workspace-root-hints"] ?? {}));
} catch {
return new Map();
}
}
function shouldPreflightResumeTask(task) {
const taskType = String(task?.taskType || "").trim();
if (taskType === "dispatch_execution") {
return true;
}
if (taskType !== "conversation_reply") {
return false;
}
return Boolean(
trimToDefined(task?.targetCodexThreadRef || task?.targetThreadId) ||
trimToDefined(task?.targetCodexFolderRef) ||
trimToDefined(task?.targetProjectId) ||
trimToDefined(task?.targetThreadDisplayName),
);
}
function buildDesktopMirrorPlan(task, targetThreadRef) {
if (task?.taskType !== "conversation_reply") {
return { enabled: false };
}
if (task?.mirrorBossUserMessageToCodexDesktop !== true) {
return { enabled: false };
}
const sourceMessageId = trimToDefined(task?.sourceMessageId || task?.requestMessageId);
const sourceMessageBody = trimToDefined(task?.sourceMessageBody || task?.requestText);
if (!targetThreadRef || !sourceMessageId || !sourceMessageBody) {
return { enabled: false };
}
return {
enabled: true,
targetThreadRef,
sourceMessageId,
sourceMessageBody,
sourceMessageSentAt: trimToDefined(task?.sourceMessageSentAt),
};
}
function buildStructuredTaskBindingError(code, message, details) {
return {
code,
message,
details,
};
}
function parseSessionMetaLine(line) {
try {
const parsed = JSON.parse(line);
if (parsed?.type !== "session_meta" || !parsed?.payload?.id || !parsed?.payload?.cwd) {
return null;
}
return {
id: String(parsed.payload.id),
cwd: String(parsed.payload.cwd),
};
} catch {
return null;
}
}
async function findSessionThreadBinding(config, targetThreadRef) {
const root = trimToDefined(
config?.codexSessionsDir || defaultSessionsDirForStateDb(config?.codexStateDbPath),
);
if (!root) {
return {
status: "missing",
};
}
const stack = [resolve(root)];
const suffix = `-${targetThreadRef}.jsonl`;
while (stack.length > 0) {
const current = stack.pop();
if (!current) continue;
let entries = [];
try {
entries = await readdir(current, { withFileTypes: true });
} catch {
continue;
}
for (const entry of entries) {
const entryPath = resolve(current, entry.name);
if (entry.isDirectory()) {
stack.push(entryPath);
continue;
}
if (!entry.isFile() || !entry.name.endsWith(suffix)) {
continue;
}
try {
const raw = await readFile(entryPath, "utf8");
const meta = parseSessionMetaLine(raw.split(/\r?\n/, 1)[0] ?? "");
if (meta?.id === targetThreadRef) {
return {
status: "ok",
threadCwd: trimToDefined(meta.cwd) || "",
};
}
} catch {
continue;
}
}
}
return {
status: "missing",
};
}
async function inspectCodexThreadBinding(config, targetThreadRef, targetFolderRef) {
const stateDbPath = trimToDefined(config?.codexStateDbPath || defaultCodexPath("state_5.sqlite"));
if (!stateDbPath) {
return {
status: "skipped",
};
}
try {
const db = new DatabaseSync(stateDbPath, { readonly: true });
try {
const row = db
.prepare("SELECT id, cwd, archived, sandbox_policy FROM threads WHERE id = ? LIMIT 1")
.get(targetThreadRef);
if (!row) {
return await findSessionThreadBinding(config, targetThreadRef);
}
if (row.archived) {
return {
status: "missing",
};
}
const sandboxPolicyType = parseSandboxPolicyType(row.sandbox_policy);
if (isReadOnlySandboxPolicy(row.sandbox_policy)) {
return {
status: "read_only",
sandboxPolicyType,
};
}
const workspaceHints = loadThreadWorkspaceHints(
trimToDefined(config?.codexGlobalStatePath || defaultCodexPath(".codex-global-state.json")),
);
const threadCwd = trimToDefined(workspaceHints.get(targetThreadRef)) || trimToDefined(row.cwd) || "";
if (threadCwd && resolve(threadCwd) !== resolve(targetFolderRef)) {
return {
status: "mismatch",
threadCwd,
};
}
return {
status: "ok",
threadCwd,
};
} finally {
db.close();
}
} catch {
return await findSessionThreadBinding(config, targetThreadRef);
}
}
export async function prepareCodexTaskExecution(config, task, outputFile) {
if (!shouldPreflightResumeTask(task)) {
return {
ok: true,
execution: buildCodexTaskExecution(config, task, outputFile),
};
}
const targetThreadRef = trimToDefined(task?.targetCodexThreadRef || task?.targetThreadId);
if (!targetThreadRef) {
return {
ok: false,
error: buildStructuredTaskBindingError(
"LOCAL_AGENT_CODEX_THREAD_BINDING_MISSING",
"LOCAL_AGENT_CODEX_THREAD_BINDING_MISSING: 目标线程绑定缺失,已拒绝 codex exec resume。",
{
taskType: String(task?.taskType || "").trim() || undefined,
targetProjectId: trimToDefined(task?.targetProjectId),
targetCodexFolderRef: trimToDefined(task?.targetCodexFolderRef),
targetThreadDisplayName: trimToDefined(task?.targetThreadDisplayName),
},
),
};
}
const resumeTarget = resolveResumeTarget(config, task);
const bindingInspection = await inspectCodexThreadBinding(config, targetThreadRef, resumeTarget.cwd);
if (bindingInspection.status === "missing") {
return {
ok: false,
error: buildStructuredTaskBindingError(
"LOCAL_AGENT_CODEX_THREAD_BINDING_STALE",
"LOCAL_AGENT_CODEX_THREAD_BINDING_STALE: 目标线程绑定在 Codex 状态库中不存在或已归档,已拒绝 codex exec resume。",
{
targetThreadRef,
targetCodexFolderRef: resumeTarget.targetFolderRef,
},
),
};
}
if (bindingInspection.status === "read_only") {
return {
ok: false,
error: buildStructuredTaskBindingError(
"LOCAL_AGENT_CODEX_THREAD_READ_ONLY",
`LOCAL_AGENT_CODEX_THREAD_READ_ONLY: 目标线程当前是只读会话,已拒绝 codex exec resume。thread=${targetThreadRef} sandbox=${bindingInspection.sandboxPolicyType ?? "read-only"}`,
{
targetThreadRef,
targetCodexFolderRef: resumeTarget.targetFolderRef,
sandboxPolicyType: bindingInspection.sandboxPolicyType,
},
),
};
}
try {
const folderStat = await stat(resumeTarget.cwd);
if (!folderStat.isDirectory()) {
throw new Error("NOT_A_DIRECTORY");
}
await access(resumeTarget.cwd, constants.R_OK | constants.X_OK);
} catch {
return {
ok: false,
error: buildStructuredTaskBindingError(
"LOCAL_AGENT_CODEX_WORKDIR_INVALID",
`LOCAL_AGENT_CODEX_WORKDIR_INVALID: 线程工作目录不可访问,已拒绝 codex exec resume。cwd=${resumeTarget.cwd}`,
{
cwd: resumeTarget.cwd,
targetThreadRef,
targetCodexFolderRef: resumeTarget.targetFolderRef,
},
),
};
}
if (bindingInspection.status === "mismatch") {
return {
ok: false,
error: buildStructuredTaskBindingError(
"LOCAL_AGENT_CODEX_THREAD_BINDING_MISMATCH",
`LOCAL_AGENT_CODEX_THREAD_BINDING_MISMATCH: 目标线程绑定的 cwd 与当前目录不一致,已拒绝 codex exec resume。cwd=${resumeTarget.cwd} liveCwd=${bindingInspection.threadCwd}`,
{
cwd: resumeTarget.cwd,
liveCwd: bindingInspection.threadCwd,
targetThreadRef,
targetCodexFolderRef: resumeTarget.targetFolderRef,
},
),
};
}
return {
ok: true,
execution: buildCodexTaskExecution(config, task, outputFile),
};
}
export function buildCodexTaskExecution(config, task, outputFile) {
const { targetThreadRef, cwd } = resolveResumeTarget(config, task);
const prompt = String(task?.executionPrompt || "");
if (
targetThreadRef &&
(task?.taskType === "conversation_reply" || task?.taskType === "dispatch_execution")
) {
const args = [
"exec",
"resume",
"--skip-git-repo-check",
"-o",
outputFile,
];
if (config.masterAgentModel) {
args.push("-m", config.masterAgentModel);
}
args.push(targetThreadRef, prompt);
return {
mode: "resume",
cwd,
args,
desktopMirror: buildDesktopMirrorPlan(task, targetThreadRef),
};
}
const args = [
"exec",
"--ephemeral",
"--skip-git-repo-check",
"-C",
config.masterAgentWorkdir || process.cwd(),
"-s",
config.masterAgentSandbox || "workspace-write",
"-o",
outputFile,
];
if (config.masterAgentModel) {
args.push("-m", config.masterAgentModel);
}
args.push(prompt);
return {
mode: "ephemeral",
cwd: config.masterAgentWorkdir || process.cwd(),
args,
desktopMirror: { enabled: false },
};
}