Files
boss/local-agent/server.mjs

1546 lines
52 KiB
JavaScript
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env node
import { spawn } from "node:child_process";
import { createServer } from "node:http";
import { access, readFile, readdir, rm } from "node:fs/promises";
import os from "node:os";
import { delimiter, isAbsolute, join, resolve } from "node:path";
import { discoverCodexProjectCandidatesInWorker } from "./codex-session-discovery.mjs";
import { prepareCodexTaskExecution } from "./codex-task-runner.mjs";
import {
discoverCodexAppServerCapabilities,
executeCodexAppServerTask,
getCodexAppServerRunnerConfig,
shouldUseCodexAppServerTaskRunner,
} from "./codex-app-server-runner.mjs";
import { appendBossUserMessageToCodexThreadRollout } from "./codex-thread-rollout-writer.mjs";
import {
executeOmxTeamTask,
getOmxTeamTaskRunnerConfig,
shouldUseOmxTeamTaskRunner,
} from "./omx-team-task-runner.mjs";
import {
canHandleBrowserControlTask,
executeBrowserControlTask,
getBrowserControlTaskRunnerConfig,
} from "./browser-control-task-runner.mjs";
import {
canHandleComputerUseTask,
executeComputerUseTask,
getComputerUseTaskRunnerConfig,
} from "./computer-use-task-runner.mjs";
import {
executeCodexDesktopRefreshBridge,
} from "./codex-desktop-refresh-bridge.mjs";
import {
executeSkillLifecycleRequest,
getSkillLifecycleRunnerConfig,
} from "./skill-lifecycle-runner.mjs";
import {
applyBossAgentOtaUpdate,
checkBossAgentOtaUpdate,
getBossAgentOtaRunnerConfig,
} from "./boss-agent-ota-runner.mjs";
import {
sanitizeSensitiveTaskFailureDetailForLog,
sanitizeSensitiveTaskFailureDetailForTransport,
} from "./master-task-output-sanitizer.mjs";
import {
resolveMasterAgentTaskTimeoutMs,
runWithTaskTimeout,
} from "./master-task-timeout.mjs";
import {
buildBossAgentStatus,
detectLocalComputerPermissions,
mergeBossAgentNativePermissionOverrides,
normalizeBossAgentTab,
openBossAgentPermissionSettings,
renderBossAgentHtmlWithQr,
} from "./boss-agent-status.mjs";
import {
buildComputerUseCompletionPayload,
buildMasterAgentTaskCompletionRequestBody,
buildRemoteExecutionCompletionPayload,
} from "./master-task-completion.mjs";
import { createSerializedRunner } from "./serialized-runner.mjs";
async function loadConfig(configPath) {
const raw = await readFile(resolve(configPath), "utf8");
return JSON.parse(raw);
}
async function resolveHeartbeatProjects(config, runtime) {
const staticProjects = Array.isArray(config.projects) ? config.projects : [];
const staticCandidates = Array.isArray(config.projectCandidates) ? config.projectCandidates : [];
if (config.codexSessionDiscoveryEnabled === false) {
return {
projects: staticProjects,
projectCandidates: staticCandidates,
};
}
try {
const discovered = await discoverCodexProjectCandidatesInWorker({
stateDbPath: config.codexStateDbPath,
logsDbPath: config.codexLogsDbPath,
sessionIndexPath: config.codexSessionIndexPath,
globalStatePath: config.codexGlobalStatePath,
sessionsDir: config.codexSessionsDir,
lookbackHours: config.codexSessionLookbackHours,
});
const candidateMap = new Map();
for (const candidate of [...staticCandidates, ...discovered.projectCandidates]) {
candidateMap.set(candidate.codexThreadRef ?? candidate.threadId, candidate);
}
const mergedCandidates = [...candidateMap.values()];
const mergedProjects = [...new Set([...staticProjects, ...discovered.projects, ...mergedCandidates.map((candidate) => candidate.folderName)])];
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = true;
runtime.lastProjectDiscoverySummary = `${mergedCandidates.length} threads / ${mergedProjects.length} folders`;
runtime.lastCodexGuiConnected = discovered.guiConnected === true;
return {
projects: mergedProjects,
projectCandidates: mergedCandidates,
guiConnected: discovered.guiConnected === true,
};
} catch (error) {
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = false;
runtime.lastProjectDiscoverySummary = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.codex_discovery_failed",
message: "Codex 线程扫描失败,已退回静态项目配置。",
detail: runtime.lastProjectDiscoverySummary,
mirrorToMaster: true,
});
return {
projects: staticProjects,
projectCandidates: staticCandidates,
guiConnected: false,
};
}
}
async function postHeartbeat(config, runtime, heartbeatProjects) {
const now = new Date().toISOString();
const preferredExecutionMode =
config.preferredExecutionMode === "gui" || config.preferredExecutionMode === "cli"
? config.preferredExecutionMode
: undefined;
const browserControlRuntime = getBrowserControlTaskRunnerConfig(process.env, config);
const computerUseRuntime = getComputerUseTaskRunnerConfig(process.env, config);
const codexAppServerRuntime = getCodexAppServerRunnerConfig(process.env, config);
const computerUseConnected = await resolveComputerUseCapabilityConnected(config, computerUseRuntime);
const codexAppServerConnected = await resolveCodexAppServerCapabilityConnected(codexAppServerRuntime);
const codexAppServerMetadata = await resolveCodexAppServerCapabilityMetadata(
config,
runtime,
codexAppServerRuntime,
codexAppServerConnected,
);
const guiConnected =
config.guiConnected === true ||
(config.guiConnected !== false && heartbeatProjects.guiConnected === true);
const response = await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/device-heartbeat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
deviceId: config.deviceId,
token: runtime.issuedToken ?? config.token,
pairingCode: runtime.issuedToken ? undefined : config.pairingCode,
name: config.name,
avatar: config.avatar,
account: config.account,
status: config.status,
quota5h: config.quota5h,
quota7d: config.quota7d,
capabilities: {
gui: {
connected: guiConnected,
lastSeenAt: now,
lastActiveProjectId: "",
},
cli: {
connected: config.cliConnected !== false,
lastSeenAt: now,
lastActiveProjectId: "",
},
browserAutomation: {
connected: config.browserAutomationConnected !== false || Boolean(browserControlRuntime.enabled && browserControlRuntime.command),
lastSeenAt: now,
lastActiveProjectId: "",
},
computerUse: {
connected: computerUseConnected,
lastSeenAt: now,
lastActiveProjectId: "",
},
codexAppServer: {
connected: codexAppServerConnected,
lastSeenAt: now,
lastActiveProjectId: "",
metadata: codexAppServerMetadata,
},
},
preferredExecutionMode,
projects: heartbeatProjects.projects,
projectCandidates: heartbeatProjects.projectCandidates,
endpoint: config.endpoint,
}),
});
const text = await response.text();
let json = null;
try {
json = JSON.parse(text);
} catch {
json = null;
}
return {
ok: response.ok,
status: response.status,
body: text,
json,
};
}
function isCuaDriverRuntime(runtime) {
return (
Array.isArray(runtime?.args) &&
runtime.args.some((item) => String(item).includes("cua-driver-computer-use-runtime.mjs"))
);
}
async function canExecuteCommand(command, cwd) {
const normalizedCommand = String(command || "").trim();
if (!normalizedCommand) return false;
const commandHasPath = normalizedCommand.includes("/") || isAbsolute(normalizedCommand);
const pathCandidates = commandHasPath
? [isAbsolute(normalizedCommand) ? normalizedCommand : resolve(cwd || process.cwd(), normalizedCommand)]
: String(process.env.PATH || "")
.split(delimiter)
.filter(Boolean)
.map((item) => join(item, normalizedCommand));
const candidatePaths = [
...pathCandidates,
commandHasPath ? undefined : join(os.homedir(), ".local", "bin", normalizedCommand),
commandHasPath ? undefined : join("/usr/local/bin", normalizedCommand),
commandHasPath ? undefined : join("/opt/homebrew/bin", normalizedCommand),
normalizedCommand === "cua-driver" ? "/Applications/CuaDriver.app/Contents/MacOS/cua-driver" : undefined,
].filter(Boolean);
for (const candidate of candidatePaths) {
try {
await access(candidate);
return true;
} catch {
// Try the next PATH entry.
}
}
return false;
}
async function resolveComputerUseCapabilityConnected(config, computerUseRuntime) {
if (!computerUseRuntime?.enabled || !computerUseRuntime?.command) {
return false;
}
if (!isCuaDriverRuntime(computerUseRuntime)) {
return Boolean(config.computerUseConnected) || Boolean(computerUseRuntime.command);
}
const driverCommand = computerUseRuntime.cuaDriverCommand || "cua-driver";
return canExecuteCommand(driverCommand, computerUseRuntime.cwd || process.cwd());
}
async function resolveCodexAppServerCapabilityConnected(codexAppServerRuntime) {
if (!codexAppServerRuntime?.enabled) {
return false;
}
if (codexAppServerRuntime.transport === "ws" || codexAppServerRuntime.transport === "unix") {
return Boolean(codexAppServerRuntime.url);
}
if (!codexAppServerRuntime.command) {
return false;
}
return canExecuteCommand(codexAppServerRuntime.command, codexAppServerRuntime.cwd || process.cwd());
}
async function resolveCodexAppServerCapabilityMetadata(config, runtime, codexAppServerRuntime, connected) {
if (!connected || !codexAppServerRuntime?.enabled || codexAppServerRuntime.discoveryEnabled === false) {
return undefined;
}
const now = Date.now();
const ttlMs = codexAppServerRuntime.discoveryTtlMs ?? 300_000;
if (
runtime.codexAppServerCapabilityMetadata &&
runtime.codexAppServerCapabilityMetadataAtMs &&
now - runtime.codexAppServerCapabilityMetadataAtMs < ttlMs
) {
return runtime.codexAppServerCapabilityMetadata;
}
try {
const metadata = await discoverCodexAppServerCapabilities(codexAppServerRuntime);
runtime.codexAppServerCapabilityMetadata = metadata;
runtime.codexAppServerCapabilityMetadataAtMs = now;
runtime.codexAppServerCapabilityMetadataError = "";
return metadata;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
runtime.codexAppServerCapabilityMetadataError = message;
await postAppLog(config, runtime, {
level: "warn",
category: "local_agent.codex_app_server_capability_discovery_failed",
message: "Codex App Server 能力清单发现失败,设备心跳继续上报连接状态。",
detail: message,
mirrorToMaster: false,
});
return runtime.codexAppServerCapabilityMetadata;
}
}
function deviceTokenHeaders(config, runtime) {
const token = runtime.issuedToken ?? config.token;
return token ? { "x-boss-device-token": token } : {};
}
async function postThreadContext(config, runtime, snapshot) {
const workerId = snapshot.workerId ?? config.workerId ?? `${config.deviceId}-worker`;
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/workers/${workerId}/thread-context`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
nodeId: config.deviceId,
projectId: snapshot.projectId,
taskId: snapshot.taskId,
threadId: snapshot.threadId,
title: snapshot.title,
summary: snapshot.summary,
sourceKind: snapshot.sourceKind ?? "worker_estimator",
status: snapshot.status ?? "running",
contextBudgetRemainingPct: snapshot.contextBudgetRemainingPct,
contextBudgetLevel: snapshot.contextBudgetLevel,
compactionExpectedAt: snapshot.compactionExpectedAt,
mustFinishBeforeCompaction: snapshot.mustFinishBeforeCompaction,
estimatedRemainingTurns: snapshot.estimatedRemainingTurns ?? 0,
estimatedRemainingLargeMessages: snapshot.estimatedRemainingLargeMessages ?? 0,
lastCompactionAt: snapshot.lastCompactionAt,
compactionCount: snapshot.compactionCount ?? 0,
patchPending: snapshot.patchPending ?? false,
testsPending: snapshot.testsPending ?? false,
evidencePending: snapshot.evidencePending ?? false,
checklist: snapshot.checklist ?? [],
capturedAt: new Date().toISOString(),
}),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
workerId,
threadId: snapshot.threadId,
};
}
function parseSkillDescription(content) {
const descriptionMatch = content.match(/description:\s*(.+)/);
if (descriptionMatch?.[1]) {
return descriptionMatch[1].trim().replace(/^["']|["']$/g, "");
}
const lines = content
.split("\n")
.map((line) => line.trim())
.filter(Boolean);
return lines.find((line) => !line.startsWith("---") && !line.startsWith("#")) ?? "未提供说明";
}
async function discoverSkills(config) {
const skillsDir = resolve(config.skillsDir ?? join(os.homedir(), ".codex/skills"));
const pending = [skillsDir];
const skills = [];
while (pending.length > 0) {
const currentDir = pending.pop();
if (!currentDir) continue;
let entries = [];
try {
entries = await readdir(currentDir, { withFileTypes: true });
} catch {
continue;
}
for (const entry of entries) {
if (entry.isDirectory()) {
pending.push(join(currentDir, entry.name));
continue;
}
if (!entry.isFile() || entry.name !== "SKILL.md") continue;
const skillPath = join(currentDir, entry.name);
try {
await access(skillPath);
const content = await readFile(skillPath, "utf8");
const skillName = currentDir.split("/").pop() ?? "unknown-skill";
const relativeCategory = currentDir
.replace(`${skillsDir}/`, "")
.split("/")
.slice(0, -1)
.join(" / ");
skills.push({
name: skillName,
description: parseSkillDescription(content),
path: skillPath,
invocation: `[$${skillName}](${skillPath})`,
category: relativeCategory || config.name,
});
} catch {
continue;
}
}
}
return skills.sort((a, b) => a.name.localeCompare(b.name));
}
async function postSkills(config, runtime, skills) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skills`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ skills }),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
count: skills.length,
};
}
async function postAppLog(config, runtime, payload) {
try {
await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/app-logs`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
deviceId: config.deviceId,
source: "local_agent",
...payload,
}),
});
} catch {
// Ignore log transport failures to avoid blocking the agent loop.
}
}
async function claimMasterAgentTask(config, runtime) {
const configuredWaitMs = Number(
config.masterAgentClaimWaitMs ?? config.masterAgentLongPollMs ?? 25_000,
);
const waitMs = Number.isFinite(configuredWaitMs)
? Math.max(0, Math.min(30_000, Math.floor(configuredWaitMs)))
: 25_000;
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/claim`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ deviceId: config.deviceId, waitMs }),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function completeMasterAgentTask(config, runtime, payload) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${payload.taskId}/complete`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify(buildMasterAgentTaskCompletionRequestBody(config, payload)),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function postMasterAgentTaskProgress(config, runtime, payload) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${payload.taskId}/progress`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
deviceId: config.deviceId,
status: payload.status || "running",
requestId: payload.requestId,
executionProgress: payload.executionProgress,
}),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function fetchMasterAgentTaskControlState(config, runtime, task) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${task.taskId}/control-state`,
{
method: "GET",
headers: {
...deviceTokenHeaders(config, runtime),
},
},
);
if (!response.ok) {
return {
ok: false,
status: response.status,
body: await response.text(),
};
}
return {
ok: true,
status: response.status,
body: await response.json(),
};
}
function normalizeInterruptPollIntervalMs(config) {
const value = Number(config.masterAgentInterruptPollIntervalMs ?? 750);
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : 750;
}
async function claimSkillLifecycleRequest(config, runtime) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skill-requests/claim`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ deviceId: config.deviceId }),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function completeSkillLifecycleRequest(config, runtime, request, result) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skill-requests/${request.requestId}/complete`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
status: result.status === "failed" ? "failed" : "completed",
resultSummary: result.resultSummary,
error: result.error,
}),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
function parseDispatchExecutionCompletion(rawOutput) {
const trimmed = String(rawOutput || "").trim();
if (!trimmed) {
return {
rawThreadReply: "",
replyBody: undefined,
};
}
const fencedMatch = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i);
const jsonCandidate = fencedMatch?.[1]?.trim() ?? trimmed;
try {
const parsed = JSON.parse(jsonCandidate);
if (parsed && typeof parsed === "object") {
const rawThreadReply =
typeof parsed.rawThreadReply === "string" ? parsed.rawThreadReply.trim() : "";
const replyBody =
typeof parsed.replyBody === "string" ? parsed.replyBody.trim() : undefined;
if (rawThreadReply) {
return {
rawThreadReply,
replyBody: replyBody || undefined,
};
}
}
} catch {
// Fall back to treating the full output as the raw thread reply.
}
return {
rawThreadReply: trimmed,
replyBody: undefined,
};
}
function runShortCommand(command, args, options = {}) {
return new Promise((resolve) => {
const child = spawn(command, args, {
cwd: options.cwd || process.cwd(),
env: process.env,
});
let stdout = "";
let stderr = "";
const timeout = setTimeout(() => {
if (!child.killed) {
child.kill("SIGKILL");
}
}, options.timeoutMs || 1500);
child.stdout.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
child.on("error", (error) => {
clearTimeout(timeout);
resolve({ ok: false, stdout, stderr: error.message });
});
child.on("close", (code) => {
clearTimeout(timeout);
resolve({
ok: code === 0,
stdout: stdout.trim(),
stderr: stderr.trim(),
});
});
});
}
function parseGitShortstat(shortstat) {
const text = String(shortstat || "");
const changedFiles = Number((text.match(/(\d+)\s+files?\s+changed/) || [])[1]);
const additions = Number((text.match(/(\d+)\s+insertions?\(\+\)/) || [])[1]);
const deletions = Number((text.match(/(\d+)\s+deletions?\(-\)/) || [])[1]);
return {
changedFiles: Number.isFinite(changedFiles) ? changedFiles : undefined,
additions: Number.isFinite(additions) ? additions : undefined,
deletions: Number.isFinite(deletions) ? deletions : undefined,
};
}
function collectArtifactsFromReply(text) {
const matches = new Set();
const source = String(text || "");
const pattern = /(?:[\w.-]+\/)*[\w.-]+\.(?:md|txt|ts|tsx|js|mjs|java|kt|json|png|jpe?g|webp|svg|apk|aab)\b/gi;
let match;
while ((match = pattern.exec(source)) && matches.size < 12) {
const label = match[0].split("/").filter(Boolean).pop();
if (label) {
matches.add(label);
}
}
return Array.from(matches).map((label) => ({
label,
kind: /\.(png|jpe?g|webp|svg)$/i.test(label) ? "image" : "file",
}));
}
async function collectLocalExecutionProgress(cwd, replyBody) {
const [diffShortstat, statusShort, ghVersion] = await Promise.all([
runShortCommand("git", ["diff", "--shortstat"], { cwd }),
runShortCommand("git", ["status", "--short"], { cwd }),
runShortCommand("gh", ["--version"], { cwd }),
]);
const parsedDiff = diffShortstat.ok ? parseGitShortstat(diffShortstat.stdout) : {};
const hasGitState = diffShortstat.ok || statusShort.ok;
return {
branch: hasGitState
? {
...parsedDiff,
gitStatus: statusShort.ok && statusShort.stdout ? "有未提交变更" : "工作区干净",
githubCliStatus: ghVersion.ok ? "available" : "unavailable",
}
: {
githubCliStatus: ghVersion.ok ? "available" : "unavailable",
},
artifacts: collectArtifactsFromReply(replyBody),
};
}
function mergeExecutionProgress(primary, secondary) {
const first = primary && typeof primary === "object" ? primary : {};
const second = secondary && typeof secondary === "object" ? secondary : {};
const artifacts = [];
const seenArtifacts = new Set();
for (const artifact of [...(first.artifacts || []), ...(second.artifacts || [])]) {
const label = String(artifact?.label || "").trim();
if (!label || seenArtifacts.has(label)) {
continue;
}
seenArtifacts.add(label);
artifacts.push(artifact);
}
const agents = [];
const seenAgents = new Set();
for (const agent of [...(first.agents || []), ...(second.agents || [])]) {
const name = String(agent?.name || "").trim();
const key = `${name}:${String(agent?.role || "").trim()}`;
if (!name || seenAgents.has(key)) {
continue;
}
seenAgents.add(key);
agents.push(agent);
}
return {
...(second || {}),
...(first || {}),
branch:
first.branch || second.branch
? {
...(second.branch || {}),
...(first.branch || {}),
}
: undefined,
artifacts: artifacts.length > 0 ? artifacts : undefined,
agents: agents.length > 0 ? agents : undefined,
};
}
async function runMasterAgentTask(config, runtime, task) {
const outputFile = join(os.tmpdir(), `${task.taskId}.reply.txt`);
const stderrChunks = [];
const taskTimeoutMs = resolveMasterAgentTaskTimeoutMs(config, task);
runtime.activeMasterTask = {
taskId: task.taskId,
status: "running",
startedAt: new Date().toISOString(),
};
try {
let activeChild = null;
const executionResult = await (async () => {
if (canHandleBrowserControlTask(task)) {
const browserResult = await executeBrowserControlTask(task, config);
if (browserResult.status === "failed") {
throw new Error(browserResult.errorMessage || "BROWSER_CONTROL_FAILED");
}
return {
replyBody: browserResult.replyBody,
dispatchExecutionCompletion: {
targetUrl: browserResult.targetUrl,
},
};
}
if (canHandleComputerUseTask(task)) {
const computerUseResult = await executeComputerUseTask(task, config);
if (computerUseResult.status === "failed") {
throw new Error(computerUseResult.errorMessage || "COMPUTER_USE_FAILED");
}
if (computerUseResult.status === "needs_user_action") {
return {
waitingUserActionCompletion: buildComputerUseCompletionPayload(task, computerUseResult),
};
}
return {
replyBody: computerUseResult.replyBody,
dispatchExecutionCompletion: {
targetApp: computerUseResult.targetApp,
computerUseProvider: computerUseResult.computerUseProvider,
},
};
}
if (shouldUseOmxTeamTaskRunner(task)) {
const omxResult = await executeOmxTeamTask(getOmxTeamTaskRunnerConfig(process.env, config), task);
if (omxResult.status === "failed") {
throw new Error(omxResult.errorMessage || "OMX_EXECUTION_FAILED");
}
return {
replyBody: omxResult.replyBody ?? omxResult.rawThreadReply,
dispatchExecutionCompletion: {
rawThreadReply: omxResult.rawThreadReply,
replyBody: omxResult.replyBody,
},
};
}
const codexAppServerRunner = getCodexAppServerRunnerConfig(process.env, config);
if (shouldUseCodexAppServerTaskRunner(codexAppServerRunner, task)) {
const appServerResult = await executeCodexAppServerTask(
{
...codexAppServerRunner,
interruptPollIntervalMs: normalizeInterruptPollIntervalMs(config),
shouldInterruptActiveTurn: async () => {
const controlState = await fetchMasterAgentTaskControlState(config, runtime, task);
if (!controlState.ok) {
return false;
}
if (controlState.body?.canceled === true || controlState.body?.status === "canceled") {
return {
interrupt: true,
reason: controlState.body?.cancelReason || "USER_CANCELED_TASK",
};
}
return false;
},
onProgress: async (executionProgress) => {
const progressResult = await postMasterAgentTaskProgress(config, runtime, {
taskId: task.taskId,
status: "running",
executionProgress,
});
if (!progressResult.ok) {
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_app_server_progress_failed",
message: "Codex App Server 进度实时回写失败,完成回写仍会携带最终进度。",
detail: progressResult.body,
mirrorToMaster: false,
});
}
},
},
task,
);
if (appServerResult.status === "interrupted") {
return {
interruptedCompletion: {
replyBody: appServerResult.replyBody,
interruptReason: appServerResult.interruptReason,
executionProgress: appServerResult.executionProgress,
},
};
}
if (appServerResult.status === "completed") {
const localExecutionProgress = await collectLocalExecutionProgress(
appServerResult.cwd || task.targetCodexFolderRef || config.masterAgentWorkdir || process.cwd(),
appServerResult.replyBody,
);
const executionProgress = mergeExecutionProgress(
appServerResult.executionProgress,
localExecutionProgress,
);
try {
if (task.targetCodexThreadRef || task.targetThreadId) {
await executeCodexDesktopRefreshBridge(
{
targetThreadRef: task.targetCodexThreadRef || task.targetThreadId,
sourceMessageId: task.sourceMessageId || task.requestMessageId,
threadTouchStatus: "app_server_turn_started",
},
config,
);
}
} catch {
// Desktop refresh is only a visibility hint; app-server already owns the thread turn.
}
return {
replyBody: appServerResult.replyBody,
executionProgress,
dispatchExecutionCompletion:
task.taskType === "dispatch_execution"
? parseDispatchExecutionCompletion(appServerResult.replyBody)
: null,
};
}
if (appServerResult.canFallbackToCli !== true || config.codexAppServerFallbackToCli === false) {
throw new Error(appServerResult.errorMessage || "CODEX_APP_SERVER_EXECUTION_FAILED");
}
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_app_server_fallback",
message: "Codex App Server 本轮不可用,已回退到 CLI resume。",
detail: appServerResult.errorMessage,
mirrorToMaster: false,
});
}
const codexPreparation = await prepareCodexTaskExecution(config, task, outputFile);
if (!codexPreparation.ok) {
throw new Error(codexPreparation.error.message);
}
const codexExecution = codexPreparation.execution;
if (codexExecution.desktopMirror?.enabled) {
const mirrorResult = await appendBossUserMessageToCodexThreadRollout({
stateDbPath: config.codexStateDbPath,
sessionsDir: config.codexSessionsDir,
targetThreadRef: codexExecution.desktopMirror.targetThreadRef,
sourceMessageId: codexExecution.desktopMirror.sourceMessageId,
message: codexExecution.desktopMirror.sourceMessageBody,
sentAt: codexExecution.desktopMirror.sourceMessageSentAt,
});
try {
const refreshResult = await executeCodexDesktopRefreshBridge(
{
targetThreadRef: codexExecution.desktopMirror.targetThreadRef,
sourceMessageId: codexExecution.desktopMirror.sourceMessageId,
rolloutPath: mirrorResult.rolloutPath,
threadTouchStatus: mirrorResult.threadTouch?.status,
},
config,
);
if (refreshResult.status === "failed") {
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_desktop_refresh_failed",
message: "Codex 桌面刷新提示未完成,消息已写入线程记录。",
detail: refreshResult.detail,
mirrorToMaster: false,
});
}
} catch (error) {
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_desktop_refresh_failed",
message: "Codex 桌面刷新提示执行失败,消息已写入线程记录。",
detail: error instanceof Error ? error.message : String(error),
mirrorToMaster: false,
});
}
}
await runWithTaskTimeout(
{
timeoutMs: taskTimeoutMs,
label: `master task ${task.taskId}`,
onTimeout: async () => {
if (activeChild && !activeChild.killed) {
activeChild.kill("SIGKILL");
}
},
},
async () =>
await new Promise((resolveTask, rejectTask) => {
const child = spawn("codex", codexExecution.args, {
cwd: codexExecution.cwd,
env: process.env,
});
activeChild = child;
child.stderr.on("data", (chunk) => {
stderrChunks.push(String(chunk));
});
child.on("error", (error) => {
activeChild = null;
rejectTask(error);
});
child.on("close", (code) => {
activeChild = null;
if (code === 0) {
resolveTask();
return;
}
rejectTask(new Error(stderrChunks.join("").trim() || `codex exit code ${code}`));
});
}),
);
const replyBody = (await readFile(outputFile, "utf8")).trim();
const executionProgress = await collectLocalExecutionProgress(codexExecution.cwd, replyBody);
return {
replyBody,
executionProgress,
dispatchExecutionCompletion:
task.taskType === "dispatch_execution"
? parseDispatchExecutionCompletion(replyBody)
: null,
};
})();
if (executionResult.waitingUserActionCompletion) {
const completion = await completeMasterAgentTask(
config,
runtime,
executionResult.waitingUserActionCompletion,
);
if (!completion.ok) {
throw new Error(`DIALOG_GUARD_COMPLETION_FAILED:${completion.status}:${completion.body}`);
}
runtime.activeMasterTask = {
taskId: task.taskId,
status: "needs_user_action",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.desktop_dialog_guard_waiting_user_action",
message: `Master Codex Node 等待用户处理桌面弹窗:${task.taskId}`,
detail: executionResult.waitingUserActionCompletion.summary,
mirrorToMaster: false,
});
return;
}
if (executionResult.interruptedCompletion) {
runtime.activeMasterTask = {
taskId: task.taskId,
status: "canceled",
completedAt: new Date().toISOString(),
detail: executionResult.interruptedCompletion.replyBody,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.codex_app_server_turn_interrupted",
message: `Master Codex Node 已按取消状态中断 Codex App Server turn${task.taskId}`,
detail: executionResult.interruptedCompletion.interruptReason,
mirrorToMaster: false,
});
return;
}
const { replyBody, dispatchExecutionCompletion, executionProgress } = executionResult;
const completion = await completeMasterAgentTask(
config,
runtime,
buildRemoteExecutionCompletionPayload(task, {
status: "completed",
replyBody: dispatchExecutionCompletion?.replyBody ?? replyBody,
dispatchExecutionId: task.dispatchExecutionId,
targetProjectId: task.targetProjectId,
targetThreadId: task.targetThreadId,
targetUrl: dispatchExecutionCompletion?.targetUrl,
targetApp: dispatchExecutionCompletion?.targetApp,
computerUseProvider: dispatchExecutionCompletion?.computerUseProvider,
rawThreadReply: dispatchExecutionCompletion?.rawThreadReply,
executionProgress,
}),
);
runtime.activeMasterTask = {
taskId: task.taskId,
status: completion.ok ? "completed" : "complete_failed",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.master_agent_task_completed",
message: `Master Codex Node 已完成主 Agent 任务 ${task.taskId}`,
detail: replyBody.slice(0, 280),
mirrorToMaster: false,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
const transportDetail = sanitizeSensitiveTaskFailureDetailForTransport(detail);
const logDetail = sanitizeSensitiveTaskFailureDetailForLog(detail);
runtime.activeMasterTask = {
taskId: task.taskId,
status: "failed",
completedAt: new Date().toISOString(),
detail: logDetail ?? transportDetail ?? "MASTER_AGENT_TASK_FAILED",
};
await completeMasterAgentTask(
config,
runtime,
buildRemoteExecutionCompletionPayload(task, {
status: "failed",
errorMessage: transportDetail,
dispatchExecutionId: task.dispatchExecutionId,
targetProjectId: task.targetProjectId,
targetThreadId: task.targetThreadId,
}),
).catch(() => null);
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "error",
category: "local_agent.master_agent_task_failed",
message: `Master Codex Node 执行主 Agent 任务失败:${task.taskId}`,
detail: logDetail,
mirrorToMaster: true,
});
} finally {
await rm(outputFile, { force: true }).catch(() => null);
runtime.masterTaskBusy = false;
}
}
async function pollMasterAgentTasks(config, runtime) {
if (config.masterAgentEnabled === false || runtime.masterTaskBusy) {
return;
}
try {
const claim = await claimMasterAgentTask(config, runtime);
if (!claim.ok) {
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: false,
status: claim.status,
body: claim.body,
};
return;
}
const parsed = JSON.parse(claim.body);
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: true,
status: claim.status,
body: claim.body,
};
if (!parsed.task) {
return;
}
runtime.masterTaskBusy = true;
await runMasterAgentTask(config, runtime, parsed.task);
} catch (error) {
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
}
}
async function pollSkillLifecycleRequests(config, runtime) {
const runnerConfig = getSkillLifecycleRunnerConfig(process.env, config);
if (!runnerConfig.enabled || runtime.skillLifecycleBusy) {
return;
}
try {
const claim = await claimSkillLifecycleRequest(config, runtime);
runtime.lastSkillLifecyclePoll = {
at: new Date().toISOString(),
ok: claim.ok,
status: claim.status,
body: claim.body,
};
if (!claim.ok) {
return;
}
const parsed = JSON.parse(claim.body);
if (!parsed.request) {
return;
}
runtime.skillLifecycleBusy = true;
runtime.activeSkillLifecycleRequest = {
requestId: parsed.request.requestId,
action: parsed.request.action,
status: "running",
startedAt: new Date().toISOString(),
};
let result = await executeSkillLifecycleRequest(parsed.request, config, runtime);
if (result.status !== "failed") {
const skills = await discoverSkills(config);
runtime.lastSkills = skills;
const skillSyncResult = await postSkills(config, runtime, skills);
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = skillSyncResult.ok;
runtime.lastSkillSyncStatus = skillSyncResult.status;
runtime.lastSkillSyncBody = skillSyncResult.body;
if (!skillSyncResult.ok) {
result = {
status: "failed",
error: `SKILL_SYNC_FAILED:${skillSyncResult.status}:${skillSyncResult.body}`,
};
}
}
const completion = await completeSkillLifecycleRequest(config, runtime, parsed.request, result);
runtime.activeSkillLifecycleRequest = {
requestId: parsed.request.requestId,
action: parsed.request.action,
status: completion.ok ? result.status : "complete_failed",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
level: result.status === "failed" ? "error" : "info",
category: result.status === "failed"
? "local_agent.skill_lifecycle_failed"
: "local_agent.skill_lifecycle_completed",
message: `Skill 远程治理任务${result.status === "failed" ? "失败" : "完成"}${parsed.request.action}`,
detail: result.resultSummary ?? result.error,
mirrorToMaster: result.status === "failed",
});
} catch (error) {
runtime.lastSkillLifecyclePoll = {
at: new Date().toISOString(),
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
} finally {
runtime.skillLifecycleBusy = false;
}
}
async function checkBossAgentOta(config, runtime) {
const runnerConfig = getBossAgentOtaRunnerConfig(process.env, config);
if (!runnerConfig.enabled || runtime.bossAgentOtaBusy) {
return;
}
runtime.bossAgentOtaBusy = true;
try {
const status = await checkBossAgentOtaUpdate(config, runtime);
if (status?.hasUpdate) {
await postAppLog(config, runtime, {
level: "info",
category: "local_agent.boss_agent_ota_available",
message: `boss-agent 发现可用更新:${status.latest?.version ?? "未知版本"}`,
detail: status.latest?.fileName,
mirrorToMaster: false,
});
}
} catch (error) {
runtime.lastBossAgentOtaStatus = {
enabled: true,
currentVersion: getBossAgentOtaRunnerConfig(process.env, config).currentVersion,
hasUpdate: false,
latest: null,
checkedAt: new Date().toISOString(),
error: error instanceof Error ? error.message : String(error),
};
} finally {
runtime.bossAgentOtaBusy = false;
}
}
const configPath = process.argv[2];
if (!configPath) {
console.error("Usage: node local-agent/server.mjs <config.json>");
process.exit(1);
}
const config = await loadConfig(configPath);
const runtime = {
lastHeartbeatAt: null,
lastHeartbeatOk: false,
lastHeartbeatStatus: null,
lastHeartbeatBody: null,
lastSkillSyncAt: null,
lastSkillSyncOk: false,
lastSkillSyncStatus: null,
lastSkillSyncBody: null,
lastSkills: [],
issuedToken: config.token ?? null,
pairingCodeUsed: config.pairingCode ?? null,
lastThreadContextResults: [],
masterTaskBusy: false,
activeMasterTask: null,
lastMasterTaskPoll: null,
skillLifecycleBusy: false,
activeSkillLifecycleRequest: null,
lastSkillLifecyclePoll: null,
bossAgentOtaBusy: false,
lastBossAgentOtaStatus: null,
lastBossAgentOtaApply: null,
lastProjectDiscoveryAt: null,
lastProjectDiscoveryOk: false,
lastProjectDiscoverySummary: null,
};
async function performHeartbeat() {
try {
const heartbeatProjects = await resolveHeartbeatProjects(config, runtime);
const result = await postHeartbeat(config, runtime, heartbeatProjects);
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = result.ok;
runtime.lastHeartbeatStatus = result.status;
runtime.lastHeartbeatBody = result.body;
if (result.json?.token) {
runtime.issuedToken = result.json.token;
}
if (!result.ok) {
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.heartbeat_failed",
message: "local-agent 心跳返回失败。",
detail: result.body,
mirrorToMaster: true,
});
}
const snapshots = Array.isArray(config.threadContexts) ? config.threadContexts : [];
runtime.lastThreadContextResults = [];
for (const snapshot of snapshots) {
const threadResult = await postThreadContext(config, runtime, snapshot);
runtime.lastThreadContextResults.push(threadResult);
if (!threadResult.ok) {
await postAppLog(config, runtime, {
projectId: snapshot.projectId,
level: "error",
category: "local_agent.thread_context_failed",
message: `线程预算上报失败:${snapshot.threadId}`,
detail: threadResult.body,
mirrorToMaster: true,
});
}
}
try {
const skills = await discoverSkills(config);
runtime.lastSkills = skills;
const skillSyncResult = await postSkills(config, runtime, skills);
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = skillSyncResult.ok;
runtime.lastSkillSyncStatus = skillSyncResult.status;
runtime.lastSkillSyncBody = skillSyncResult.body;
} catch (error) {
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = false;
runtime.lastSkillSyncStatus = 0;
runtime.lastSkillSyncBody = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.skills_sync_failed",
message: "Skill 扫描或同步失败。",
detail: runtime.lastSkillSyncBody,
mirrorToMaster: true,
});
}
} catch (error) {
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = false;
runtime.lastHeartbeatStatus = 0;
runtime.lastHeartbeatBody = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.heartbeat_exception",
message: "local-agent 心跳执行异常。",
detail: runtime.lastHeartbeatBody,
mirrorToMaster: true,
});
}
}
const heartbeat = createSerializedRunner(performHeartbeat);
const masterTaskPoll = createSerializedRunner(async () => {
await pollMasterAgentTasks(config, runtime);
});
const skillLifecyclePoll = createSerializedRunner(async () => {
await pollSkillLifecycleRequests(config, runtime);
});
const bossAgentOtaPoll = createSerializedRunner(async () => {
await checkBossAgentOta(config, runtime);
});
const server = createServer(async (request, response) => {
const requestUrl = new URL(request.url || "/", `http://${config.bindHost || "127.0.0.1"}`);
if (requestUrl.pathname === "/" || requestUrl.pathname === "/boss-agent") {
const permissions = mergeBossAgentNativePermissionOverrides(
await detectLocalComputerPermissions(),
requestUrl.searchParams,
);
const status = buildBossAgentStatus(config, runtime, { permissions });
const activeTab = normalizeBossAgentTab(requestUrl.searchParams.get("tab") ?? "overview");
response.writeHead(200, { "Content-Type": "text/html; charset=utf-8" });
response.end(await renderBossAgentHtmlWithQr(status, { activeTab }));
return;
}
if (requestUrl.pathname === "/favicon.ico") {
response.writeHead(204);
response.end();
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/status") {
const permissions = await detectLocalComputerPermissions();
const status = buildBossAgentStatus(config, runtime, { permissions });
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: true, status }));
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/ota/check") {
const status = await checkBossAgentOtaUpdate(config, runtime);
const wantsJson = String(request.headers.accept || "").includes("application/json");
if (!wantsJson) {
response.writeHead(302, { Location: "/boss-agent?tab=overview" });
response.end();
return;
}
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: true, status }));
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/ota/apply" && request.method === "POST") {
const launchInstaller = requestUrl.searchParams.get("launch") !== "0";
const result = await applyBossAgentOtaUpdate(config, runtime, { launchInstaller });
await postAppLog(config, runtime, {
level: result.status === "failed" ? "error" : "info",
category: result.status === "failed"
? "local_agent.boss_agent_ota_failed"
: "local_agent.boss_agent_ota_applied",
message: result.status === "failed"
? "boss-agent OTA 更新失败"
: `boss-agent OTA 已${result.status === "installer_launched" ? "拉起安装器" : "下载暂存"}`,
detail: result.version ?? result.error ?? result.archivePath,
mirrorToMaster: result.status === "failed",
});
const wantsJson = String(request.headers.accept || "").includes("application/json");
if (!wantsJson) {
response.writeHead(302, { Location: "/boss-agent?tab=overview" });
response.end();
return;
}
response.writeHead(result.status === "failed" ? 400 : 200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: result.status !== "failed", result }));
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/permissions/open") {
const target = requestUrl.searchParams.get("target") || "core";
const returnTab = normalizeBossAgentTab(requestUrl.searchParams.get("returnTab") ?? "permissions");
const result = await openBossAgentPermissionSettings(target);
const wantsJson = String(request.headers.accept || "").includes("application/json");
if (wantsJson) {
response.writeHead(result.ok ? 200 : 500, { "Content-Type": "application/json" });
response.end(JSON.stringify(result));
return;
}
response.writeHead(302, { Location: `/boss-agent?tab=${encodeURIComponent(returnTab)}` });
response.end();
return;
}
if (requestUrl.pathname === "/health") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(
JSON.stringify({
ok: true,
service: "boss-local-agent",
runtime,
}),
);
return;
}
if (requestUrl.pathname === "/api/v1/device") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ config, runtime }));
return;
}
if (requestUrl.pathname === "/api/v1/skills") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(
JSON.stringify({
ok: true,
deviceId: config.deviceId,
skills: runtime.lastSkills,
sync: {
at: runtime.lastSkillSyncAt,
ok: runtime.lastSkillSyncOk,
status: runtime.lastSkillSyncStatus,
body: runtime.lastSkillSyncBody,
},
}),
);
return;
}
if (requestUrl.pathname === "/api/v1/heartbeat" && request.method === "POST") {
await heartbeat();
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: runtime.lastHeartbeatOk, runtime }));
return;
}
response.writeHead(404, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: false, message: "not_found" }));
});
server.listen(config.port, config.bindHost, () => {
console.log(
JSON.stringify({
ok: true,
service: "boss-local-agent",
bind: `${config.bindHost}:${config.port}`,
controlPlaneUrl: config.controlPlaneUrl,
workerId: config.workerId,
}),
);
});
void (async () => {
await heartbeat();
await masterTaskPoll();
await skillLifecyclePoll();
await bossAgentOtaPoll();
})();
setInterval(() => {
void heartbeat();
}, config.heartbeatIntervalMs ?? 15000);
setInterval(() => {
void masterTaskPoll();
}, config.masterAgentPollIntervalMs ?? 1000);
setInterval(() => {
void skillLifecyclePoll();
}, config.skillLifecyclePollIntervalMs ?? 5000);
setInterval(() => {
void bossAgentOtaPoll();
}, getBossAgentOtaRunnerConfig(process.env, config).checkIntervalMs);