Files
boss/local-agent/server.mjs
2026-06-08 12:22:50 +08:00

2040 lines
70 KiB
JavaScript
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env node
import { spawn } from "node:child_process";
import { createServer } from "node:http";
import { access, readFile, readdir, rm } from "node:fs/promises";
import os from "node:os";
import { delimiter, isAbsolute, join, resolve } from "node:path";
import { discoverCodexProjectCandidatesInWorker } from "./codex-session-discovery.mjs";
import { prepareCodexTaskExecution } from "./codex-task-runner.mjs";
import {
discoverCodexAppServerCapabilities,
executeCodexAppServerTask,
getCodexAppServerRunnerConfig,
shouldUseCodexAppServerTaskRunner,
} from "./codex-app-server-runner.mjs";
import {
shouldSkipCodexAppServerDiscovery,
} from "./codex-app-server-discovery-guard.mjs";
import {
buildLongRunningCodexProgressSnapshot,
normalizeLongRunningProgressIntervalMs,
} from "./master-task-progress-heartbeat.mjs";
import {
resolveHeartbeatProjectsFromSnapshot,
runHeartbeatProjectDiscoveryWithTimeout,
storeHeartbeatProjectsSnapshot,
} from "./heartbeat-project-snapshot.mjs";
import {
recordHeartbeatRunnerError,
} from "./heartbeat-error-state.mjs";
import { appendBossUserMessageToCodexThreadRollout } from "./codex-thread-rollout-writer.mjs";
import {
executeOmxTeamTask,
getOmxTeamTaskRunnerConfig,
shouldUseOmxTeamTaskRunner,
} from "./omx-team-task-runner.mjs";
import {
canHandleBrowserControlTask,
executeBrowserControlTask,
getBrowserControlTaskRunnerConfig,
} from "./browser-control-task-runner.mjs";
import {
canHandleComputerUseTask,
executeComputerUseTask,
getComputerUseTaskRunnerConfig,
} from "./computer-use-task-runner.mjs";
import {
executeCodexDesktopRefreshBridge,
} from "./codex-desktop-refresh-bridge.mjs";
import {
executeSkillLifecycleRequest,
getSkillLifecycleRunnerConfig,
} from "./skill-lifecycle-runner.mjs";
import {
applyBossAgentOtaUpdate,
checkBossAgentOtaUpdate,
getBossAgentOtaRunnerConfig,
} from "./boss-agent-ota-runner.mjs";
import {
runCodexRemoteControlDaemonAction,
} from "./codex-remote-control-daemon.mjs";
import {
sanitizeSensitiveTaskFailureDetailForLog,
sanitizeSensitiveTaskFailureDetailForTransport,
} from "./master-task-output-sanitizer.mjs";
import {
resolveMasterAgentTaskTimeoutMs,
runWithTaskTimeout,
} from "./master-task-timeout.mjs";
import {
buildBossAgentStatus,
detectLocalComputerPermissions,
mergeBossAgentNativePermissionOverrides,
normalizeBossAgentTab,
openBossAgentPermissionSettings,
renderBossAgentHtmlWithQr,
} from "./boss-agent-status.mjs";
import {
buildComputerUseCompletionPayload,
buildMasterAgentTaskCompletionRequestBody,
buildRemoteExecutionCompletionPayload,
} from "./master-task-completion.mjs";
import {
postThroughReliableOutbox,
replayReliableOutbox,
} from "./reliable-outbox.mjs";
import {
buildLocalAgentHealthSummary,
} from "./health-summary.mjs";
import { createSerializedRunner } from "./serialized-runner.mjs";
import { fetchWithTimeout } from "./fetch-timeout.mjs";
async function loadConfig(configPath) {
const raw = await readFile(resolve(configPath), "utf8");
return JSON.parse(raw);
}
async function resolveHeartbeatProjects(config, runtime) {
const staticProjects = Array.isArray(config.projects) ? config.projects : [];
const staticCandidates = Array.isArray(config.projectCandidates) ? config.projectCandidates : [];
const snapshotFallback = runtime.lastHeartbeatProjectsSnapshot && typeof runtime.lastHeartbeatProjectsSnapshot === "object"
? runtime.lastHeartbeatProjectsSnapshot
: {
projects: staticProjects,
projectCandidates: staticCandidates,
guiConnected: runtime.lastCodexGuiConnected === true,
};
const snapshotDecision = resolveHeartbeatProjectsFromSnapshot({ config, runtime });
if (snapshotDecision.shouldUseSnapshot) {
runtime.lastProjectDiscoverySkippedAt = new Date().toISOString();
runtime.lastProjectDiscoverySkipReason = "master_task_running";
return {
projects: snapshotDecision.projects,
projectCandidates: snapshotDecision.projectCandidates,
guiConnected: snapshotDecision.guiConnected,
};
}
if (config.codexSessionDiscoveryEnabled === false) {
return {
projects: staticProjects,
projectCandidates: staticCandidates,
};
}
try {
const discoveryTimeoutMs = config.codexSessionDiscoveryTimeoutMs ?? 3_500;
const discoveryResult = await runHeartbeatProjectDiscoveryWithTimeout({
timeoutMs: discoveryTimeoutMs,
fallback: snapshotFallback,
discover: () => discoverCodexProjectCandidatesInWorker({
stateDbPath: config.codexStateDbPath,
logsDbPath: config.codexLogsDbPath,
sessionIndexPath: config.codexSessionIndexPath,
globalStatePath: config.codexGlobalStatePath,
sessionsDir: config.codexSessionsDir,
lookbackHours: config.codexSessionLookbackHours,
timeoutMs: discoveryTimeoutMs,
}),
});
if (discoveryResult.error) {
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = false;
runtime.lastProjectDiscoverySummary = discoveryResult.error instanceof Error
? discoveryResult.error.message
: String(discoveryResult.error);
runtime.lastCodexGuiConnected = discoveryResult.value.guiConnected === true;
postAppLog(config, runtime, {
level: "warning",
category: "local_agent.codex_discovery_degraded",
message: "Codex 线程扫描超时或失败,已使用缓存项目继续心跳。",
detail: runtime.lastProjectDiscoverySummary,
mirrorToMaster: false,
}).catch(() => null);
return discoveryResult.value;
}
const discovered = discoveryResult.value;
const candidateMap = new Map();
for (const candidate of [...staticCandidates, ...discovered.projectCandidates]) {
candidateMap.set(candidate.codexThreadRef ?? candidate.threadId, candidate);
}
const mergedCandidates = [...candidateMap.values()];
const mergedProjects = [...new Set([...staticProjects, ...discovered.projects, ...mergedCandidates.map((candidate) => candidate.folderName)])];
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = true;
runtime.lastProjectDiscoverySummary = `${mergedCandidates.length} threads / ${mergedProjects.length} folders`;
runtime.lastCodexGuiConnected = discovered.guiConnected === true;
const heartbeatProjects = {
projects: mergedProjects,
projectCandidates: mergedCandidates,
guiConnected: discovered.guiConnected === true,
};
storeHeartbeatProjectsSnapshot(runtime, heartbeatProjects);
return heartbeatProjects;
} catch (error) {
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = false;
runtime.lastProjectDiscoverySummary = error instanceof Error ? error.message : String(error);
postAppLog(config, runtime, {
level: "error",
category: "local_agent.codex_discovery_failed",
message: "Codex 线程扫描失败,已退回静态项目配置。",
detail: runtime.lastProjectDiscoverySummary,
mirrorToMaster: true,
}).catch(() => null);
return {
projects: staticProjects,
projectCandidates: staticCandidates,
guiConnected: false,
};
}
}
function trimToDefined(value) {
const trimmed = String(value ?? "").trim();
return trimmed ? trimmed : undefined;
}
function mergeRecentAssistantMessages(left = [], right = []) {
const messages = new Map();
for (const message of [...left, ...right]) {
const messageId = trimToDefined(message?.messageId);
const body = trimToDefined(message?.body);
if (!messageId || !body) {
continue;
}
messages.set(messageId, {
messageId,
body,
sentAt: trimToDefined(message?.sentAt) || new Date().toISOString(),
...(trimToDefined(message?.phase) ? { phase: trimToDefined(message.phase) } : {}),
});
}
return [...messages.values()].sort((a, b) => a.sentAt.localeCompare(b.sentAt)).slice(-6);
}
function resolveCandidateKey(candidate) {
return trimToDefined(candidate?.codexThreadRef) || trimToDefined(candidate?.threadId);
}
function mergeHeartbeatProjectCandidates(existingCandidates = [], appServerCandidates = []) {
const candidateMap = new Map();
for (const candidate of [...existingCandidates, ...appServerCandidates]) {
const key = resolveCandidateKey(candidate);
if (!key) {
continue;
}
const existing = candidateMap.get(key);
if (!existing) {
candidateMap.set(key, candidate);
continue;
}
const recentAssistantMessages = mergeRecentAssistantMessages(
existing.recentAssistantMessages,
candidate.recentAssistantMessages,
);
candidateMap.set(key, {
...candidate,
...existing,
lastActiveAt: [existing.lastActiveAt, candidate.lastActiveAt].filter(Boolean).sort().at(-1),
suggestedImport: existing.suggestedImport ?? candidate.suggestedImport ?? true,
...(recentAssistantMessages.length > 0 ? { recentAssistantMessages } : {}),
});
}
return [...candidateMap.values()].sort((a, b) =>
String(b.lastActiveAt ?? "").localeCompare(String(a.lastActiveAt ?? "")),
);
}
function buildCodexAppServerProjectCandidates(metadata) {
const visibleThreads = Array.isArray(metadata?.threadSummary?.visibleThreads)
? metadata.threadSummary.visibleThreads
: [];
const turnSummaries = Array.isArray(metadata?.threadTurnSummary?.threads)
? metadata.threadTurnSummary.threads
: [];
const turnSummaryByThreadId = new Map(
turnSummaries
.map((thread) => [trimToDefined(thread?.threadId), thread])
.filter(([threadId]) => Boolean(threadId)),
);
return visibleThreads
.map((thread) => {
const threadId = trimToDefined(thread?.id);
if (!threadId) {
return null;
}
const turnSummary = turnSummaryByThreadId.get(threadId);
const threadName = trimToDefined(thread?.name) || threadId;
const recentAssistantMessages = mergeRecentAssistantMessages(
[],
turnSummary?.recentAssistantMessages,
);
return {
folderName: threadName,
threadId,
threadDisplayName: threadName,
codexThreadRef: threadId,
lastActiveAt:
trimToDefined(turnSummary?.latestTurnUpdatedAt) ||
trimToDefined(thread?.updatedAt) ||
new Date().toISOString(),
suggestedImport: true,
...(recentAssistantMessages.length > 0 ? { recentAssistantMessages } : {}),
};
})
.filter(Boolean);
}
async function postHeartbeat(config, runtime, heartbeatProjects) {
const now = new Date().toISOString();
const preferredExecutionMode =
config.preferredExecutionMode === "gui" || config.preferredExecutionMode === "cli"
? config.preferredExecutionMode
: undefined;
const browserControlRuntime = getBrowserControlTaskRunnerConfig(process.env, config);
const computerUseRuntime = getComputerUseTaskRunnerConfig(process.env, config);
const codexAppServerRuntime = getCodexAppServerRunnerConfig(process.env, config);
const computerUseConnected = await resolveComputerUseCapabilityConnected(config, computerUseRuntime);
const codexAppServerConnected = await resolveCodexAppServerCapabilityConnected(codexAppServerRuntime);
const codexAppServerMetadata = await resolveCodexAppServerCapabilityMetadata(
config,
runtime,
codexAppServerRuntime,
codexAppServerConnected,
);
const guiConnected =
config.guiConnected === true ||
(config.guiConnected !== false && heartbeatProjects.guiConnected === true);
const appServerCandidates = buildCodexAppServerProjectCandidates(codexAppServerMetadata);
const mergedProjectCandidates = mergeHeartbeatProjectCandidates(
heartbeatProjects.projectCandidates,
appServerCandidates,
);
const mergedProjects = [
...new Set([
...heartbeatProjects.projects,
...mergedProjectCandidates.map((candidate) => candidate.folderName).filter(Boolean),
]),
];
const response = await fetchWithTimeout(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/device-heartbeat`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
deviceId: config.deviceId,
token: runtime.issuedToken ?? config.token,
pairingCode: runtime.issuedToken ? undefined : config.pairingCode,
name: config.name,
avatar: config.avatar,
account: config.account,
status: config.status,
quota5h: config.quota5h,
quota7d: config.quota7d,
capabilities: {
gui: {
connected: guiConnected,
lastSeenAt: now,
lastActiveProjectId: "",
},
cli: {
connected: config.cliConnected !== false,
lastSeenAt: now,
lastActiveProjectId: "",
},
browserAutomation: {
connected: config.browserAutomationConnected !== false || Boolean(browserControlRuntime.enabled && browserControlRuntime.command),
lastSeenAt: now,
lastActiveProjectId: "",
},
computerUse: {
connected: computerUseConnected,
lastSeenAt: now,
lastActiveProjectId: "",
},
codexAppServer: {
connected: codexAppServerConnected,
lastSeenAt: now,
lastActiveProjectId: "",
metadata: codexAppServerMetadata,
},
},
preferredExecutionMode,
projects: mergedProjects,
projectCandidates: mergedProjectCandidates,
endpoint: config.endpoint,
}),
},
{
timeoutMs: config.heartbeatPostTimeoutMs ?? 4_000,
timeoutMessage: "LOCAL_AGENT_HEARTBEAT_POST_TIMEOUT",
},
);
const text = await response.text();
let json = null;
try {
json = JSON.parse(text);
} catch {
json = null;
}
return {
ok: response.ok,
status: response.status,
body: text,
json,
};
}
function isCuaDriverRuntime(runtime) {
return (
Array.isArray(runtime?.args) &&
runtime.args.some((item) => String(item).includes("cua-driver-computer-use-runtime.mjs"))
);
}
async function canExecuteCommand(command, cwd) {
const normalizedCommand = String(command || "").trim();
if (!normalizedCommand) return false;
const commandHasPath = normalizedCommand.includes("/") || isAbsolute(normalizedCommand);
const pathCandidates = commandHasPath
? [isAbsolute(normalizedCommand) ? normalizedCommand : resolve(cwd || process.cwd(), normalizedCommand)]
: String(process.env.PATH || "")
.split(delimiter)
.filter(Boolean)
.map((item) => join(item, normalizedCommand));
const candidatePaths = [
...pathCandidates,
commandHasPath ? undefined : join(os.homedir(), ".local", "bin", normalizedCommand),
commandHasPath ? undefined : join("/usr/local/bin", normalizedCommand),
commandHasPath ? undefined : join("/opt/homebrew/bin", normalizedCommand),
normalizedCommand === "cua-driver" ? "/Applications/CuaDriver.app/Contents/MacOS/cua-driver" : undefined,
].filter(Boolean);
for (const candidate of candidatePaths) {
try {
await access(candidate);
return true;
} catch {
// Try the next PATH entry.
}
}
return false;
}
async function resolveComputerUseCapabilityConnected(config, computerUseRuntime) {
if (!computerUseRuntime?.enabled || !computerUseRuntime?.command) {
return false;
}
if (!isCuaDriverRuntime(computerUseRuntime)) {
return Boolean(config.computerUseConnected) || Boolean(computerUseRuntime.command);
}
const driverCommand = computerUseRuntime.cuaDriverCommand || "cua-driver";
return canExecuteCommand(driverCommand, computerUseRuntime.cwd || process.cwd());
}
async function resolveCodexAppServerCapabilityConnected(codexAppServerRuntime) {
if (!codexAppServerRuntime?.enabled) {
return false;
}
if (codexAppServerRuntime.transport === "ws" || codexAppServerRuntime.transport === "unix") {
return Boolean(codexAppServerRuntime.url);
}
if (!codexAppServerRuntime.command) {
return false;
}
return canExecuteCommand(codexAppServerRuntime.command, codexAppServerRuntime.cwd || process.cwd());
}
function refreshCodexAppServerCapabilityMetadataInBackground(config, runtime, codexAppServerRuntime, now) {
if (runtime.codexAppServerCapabilityMetadataRefreshBusy) {
return;
}
runtime.codexAppServerCapabilityMetadataRefreshBusy = true;
runtime.codexAppServerCapabilityMetadataRefreshStartedAt = new Date(now).toISOString();
void (async () => {
try {
const metadata = await discoverCodexAppServerCapabilities(codexAppServerRuntime);
runtime.codexAppServerCapabilityMetadata = metadata;
runtime.codexAppServerCapabilityMetadataAtMs = Date.now();
runtime.codexAppServerCapabilityMetadataError = "";
runtime.codexAppServerCapabilityMetadataRefreshCompletedAt = new Date().toISOString();
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
runtime.codexAppServerCapabilityMetadataError = message;
runtime.codexAppServerCapabilityMetadataRefreshFailedAt = new Date().toISOString();
await postAppLog(config, runtime, {
level: "warn",
category: "local_agent.codex_app_server_capability_discovery_failed",
message: "Codex App Server 能力清单发现失败,设备心跳继续上报连接状态。",
detail: message,
mirrorToMaster: false,
});
} finally {
runtime.codexAppServerCapabilityMetadataRefreshBusy = false;
}
})();
}
async function resolveCodexAppServerCapabilityMetadata(config, runtime, codexAppServerRuntime, connected) {
if (!connected || !codexAppServerRuntime?.enabled || codexAppServerRuntime.discoveryEnabled === false) {
return undefined;
}
const now = Date.now();
const discoveryGuard = shouldSkipCodexAppServerDiscovery({ config, runtime });
if (discoveryGuard.skip) {
runtime.codexAppServerCapabilityMetadataSkippedAt = new Date(now).toISOString();
runtime.codexAppServerCapabilityMetadataSkipReason = discoveryGuard.reason;
runtime.codexAppServerCapabilityMetadataSkipTaskId = discoveryGuard.activeTaskId;
return runtime.codexAppServerCapabilityMetadata;
}
const ttlMs = codexAppServerRuntime.discoveryTtlMs ?? 300_000;
if (
runtime.codexAppServerCapabilityMetadata &&
runtime.codexAppServerCapabilityMetadataAtMs &&
now - runtime.codexAppServerCapabilityMetadataAtMs < ttlMs
) {
return runtime.codexAppServerCapabilityMetadata;
}
if (config.codexAppServerDiscoveryInlineInHeartbeat === true) {
try {
const metadata = await discoverCodexAppServerCapabilities(codexAppServerRuntime);
runtime.codexAppServerCapabilityMetadata = metadata;
runtime.codexAppServerCapabilityMetadataAtMs = now;
runtime.codexAppServerCapabilityMetadataError = "";
return metadata;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
runtime.codexAppServerCapabilityMetadataError = message;
postAppLog(config, runtime, {
level: "warn",
category: "local_agent.codex_app_server_capability_discovery_failed",
message: "Codex App Server 能力清单发现失败,设备心跳继续上报连接状态。",
detail: message,
mirrorToMaster: false,
}).catch(() => null);
return runtime.codexAppServerCapabilityMetadata;
}
}
refreshCodexAppServerCapabilityMetadataInBackground(config, runtime, codexAppServerRuntime, now);
runtime.codexAppServerCapabilityMetadataSkippedAt = new Date(now).toISOString();
runtime.codexAppServerCapabilityMetadataSkipReason = "background_refresh";
return runtime.codexAppServerCapabilityMetadata;
}
function deviceTokenHeaders(config, runtime) {
const token = runtime.issuedToken ?? config.token;
return token ? { "x-boss-device-token": token } : {};
}
async function postThreadContext(config, runtime, snapshot) {
const workerId = snapshot.workerId ?? config.workerId ?? `${config.deviceId}-worker`;
const response = await fetchWithTimeout(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/workers/${workerId}/thread-context`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
nodeId: config.deviceId,
projectId: snapshot.projectId,
taskId: snapshot.taskId,
threadId: snapshot.threadId,
title: snapshot.title,
summary: snapshot.summary,
sourceKind: snapshot.sourceKind ?? "worker_estimator",
status: snapshot.status ?? "running",
contextBudgetRemainingPct: snapshot.contextBudgetRemainingPct,
contextBudgetLevel: snapshot.contextBudgetLevel,
compactionExpectedAt: snapshot.compactionExpectedAt,
mustFinishBeforeCompaction: snapshot.mustFinishBeforeCompaction,
estimatedRemainingTurns: snapshot.estimatedRemainingTurns ?? 0,
estimatedRemainingLargeMessages: snapshot.estimatedRemainingLargeMessages ?? 0,
lastCompactionAt: snapshot.lastCompactionAt,
compactionCount: snapshot.compactionCount ?? 0,
patchPending: snapshot.patchPending ?? false,
testsPending: snapshot.testsPending ?? false,
evidencePending: snapshot.evidencePending ?? false,
checklist: snapshot.checklist ?? [],
capturedAt: new Date().toISOString(),
}),
},
{
timeoutMs: config.threadContextPostTimeoutMs ?? 3_000,
timeoutMessage: "LOCAL_AGENT_THREAD_CONTEXT_POST_TIMEOUT",
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
workerId,
threadId: snapshot.threadId,
};
}
function parseSkillDescription(content) {
const descriptionMatch = content.match(/description:\s*(.+)/);
if (descriptionMatch?.[1]) {
return descriptionMatch[1].trim().replace(/^["']|["']$/g, "");
}
const lines = content
.split("\n")
.map((line) => line.trim())
.filter(Boolean);
return lines.find((line) => !line.startsWith("---") && !line.startsWith("#")) ?? "未提供说明";
}
async function discoverSkills(config) {
const skillsDir = resolve(config.skillsDir ?? join(os.homedir(), ".codex/skills"));
const pending = [skillsDir];
const skills = [];
while (pending.length > 0) {
const currentDir = pending.pop();
if (!currentDir) continue;
let entries = [];
try {
entries = await readdir(currentDir, { withFileTypes: true });
} catch {
continue;
}
for (const entry of entries) {
if (entry.isDirectory()) {
pending.push(join(currentDir, entry.name));
continue;
}
if (!entry.isFile() || entry.name !== "SKILL.md") continue;
const skillPath = join(currentDir, entry.name);
try {
await access(skillPath);
const content = await readFile(skillPath, "utf8");
const skillName = currentDir.split("/").pop() ?? "unknown-skill";
const relativeCategory = currentDir
.replace(`${skillsDir}/`, "")
.split("/")
.slice(0, -1)
.join(" / ");
skills.push({
name: skillName,
description: parseSkillDescription(content),
path: skillPath,
invocation: `[$${skillName}](${skillPath})`,
category: relativeCategory || config.name,
});
} catch {
continue;
}
}
}
return skills.sort((a, b) => a.name.localeCompare(b.name));
}
async function postSkills(config, runtime, skills) {
const response = await fetchWithTimeout(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skills`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ skills }),
},
{
timeoutMs: config.skillsPostTimeoutMs ?? 3_000,
timeoutMessage: "LOCAL_AGENT_SKILLS_POST_TIMEOUT",
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
count: skills.length,
};
}
async function postAppLog(config, runtime, payload) {
try {
await postThroughReliableOutbox(config, {
kind: "app.log",
url: `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/app-logs`,
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: {
deviceId: config.deviceId,
source: "local_agent",
...payload,
},
});
} catch {
// Ignore log transport failures to avoid blocking the agent loop.
}
}
async function claimMasterAgentTask(config, runtime) {
const configuredWaitMs = Number(
config.masterAgentClaimWaitMs ?? config.masterAgentLongPollMs ?? 25_000,
);
const waitMs = Number.isFinite(configuredWaitMs)
? Math.max(0, Math.min(30_000, Math.floor(configuredWaitMs)))
: 25_000;
const response = await fetchWithTimeout(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/claim`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ deviceId: config.deviceId, waitMs }),
},
{
timeoutMs: waitMs + Number(config.masterAgentClaimTimeoutPaddingMs ?? 5_000),
timeoutMessage: "LOCAL_AGENT_MASTER_TASK_CLAIM_TIMEOUT",
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function completeMasterAgentTask(config, runtime, payload) {
const result = await postThroughReliableOutbox(config, {
kind: "task.complete",
url: `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${payload.taskId}/complete`,
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: buildMasterAgentTaskCompletionRequestBody(config, payload),
});
return result;
}
async function postMasterAgentTaskProgress(config, runtime, payload) {
const result = await postThroughReliableOutbox(config, {
kind: "task.progress",
url: `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${payload.taskId}/progress`,
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: {
deviceId: config.deviceId,
status: payload.status || "running",
phase: payload.phase,
requestId: payload.requestId,
executionProgress: payload.executionProgress,
},
});
return result;
}
async function fetchMasterAgentTaskControlState(config, runtime, task) {
const response = await fetchWithTimeout(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${task.taskId}/control-state`,
{
method: "GET",
headers: {
...deviceTokenHeaders(config, runtime),
},
},
{
timeoutMs: config.masterAgentControlStateTimeoutMs ?? 3_000,
timeoutMessage: "LOCAL_AGENT_MASTER_TASK_CONTROL_STATE_TIMEOUT",
},
);
if (!response.ok) {
return {
ok: false,
status: response.status,
body: await response.text(),
};
}
return {
ok: true,
status: response.status,
body: await response.json(),
};
}
function normalizeInterruptPollIntervalMs(config) {
const value = Number(config.masterAgentInterruptPollIntervalMs ?? 750);
return Number.isFinite(value) && value >= 0 ? Math.floor(value) : 750;
}
function canHandleCodexRemoteControlMaintenanceTask(task) {
return (
task?.taskType === "device_maintenance" &&
task?.maintenanceKind === "codex_remote_control" &&
(task?.codexRemoteControlAction === "start" || task?.codexRemoteControlAction === "stop")
);
}
function buildCodexRemoteControlMaintenanceReply(task, result) {
const actionLabel = task.codexRemoteControlAction === "start" ? "已启动" : "已停止";
const lines = [`Codex Remote Control ${actionLabel}`];
if (result.commandLabel) {
lines.push(`本机命令:${result.commandLabel}`);
}
if (result.outputSummary) {
lines.push(`返回摘要:${result.outputSummary}`);
}
return lines.join("\n");
}
async function claimSkillLifecycleRequest(config, runtime) {
const response = await fetchWithTimeout(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skill-requests/claim`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ deviceId: config.deviceId }),
},
{
timeoutMs: config.skillLifecycleClaimTimeoutMs ?? 5_000,
timeoutMessage: "LOCAL_AGENT_SKILL_REQUEST_CLAIM_TIMEOUT",
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function completeSkillLifecycleRequest(config, runtime, request, result) {
const response = await fetchWithTimeout(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skill-requests/${request.requestId}/complete`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
status: result.status === "failed" ? "failed" : "completed",
resultSummary: result.resultSummary,
error: result.error,
}),
},
{
timeoutMs: config.skillLifecycleCompleteTimeoutMs ?? 5_000,
timeoutMessage: "LOCAL_AGENT_SKILL_REQUEST_COMPLETE_TIMEOUT",
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
function parseDispatchExecutionCompletion(rawOutput) {
const trimmed = String(rawOutput || "").trim();
if (!trimmed) {
return {
rawThreadReply: "",
replyBody: undefined,
};
}
const fencedMatch = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i);
const jsonCandidate = fencedMatch?.[1]?.trim() ?? trimmed;
try {
const parsed = JSON.parse(jsonCandidate);
if (parsed && typeof parsed === "object") {
const rawThreadReply =
typeof parsed.rawThreadReply === "string" ? parsed.rawThreadReply.trim() : "";
const replyBody =
typeof parsed.replyBody === "string" ? parsed.replyBody.trim() : undefined;
if (rawThreadReply) {
return {
rawThreadReply,
replyBody: replyBody || undefined,
};
}
}
} catch {
// Fall back to treating the full output as the raw thread reply.
}
return {
rawThreadReply: trimmed,
replyBody: undefined,
};
}
function runShortCommand(command, args, options = {}) {
return new Promise((resolve) => {
const child = spawn(command, args, {
cwd: options.cwd || process.cwd(),
env: process.env,
});
let stdout = "";
let stderr = "";
const timeout = setTimeout(() => {
if (!child.killed) {
child.kill("SIGKILL");
}
}, options.timeoutMs || 1500);
child.stdout.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
child.on("error", (error) => {
clearTimeout(timeout);
resolve({ ok: false, stdout, stderr: error.message });
});
child.on("close", (code) => {
clearTimeout(timeout);
resolve({
ok: code === 0,
stdout: stdout.trim(),
stderr: stderr.trim(),
});
});
});
}
function parseGitShortstat(shortstat) {
const text = String(shortstat || "");
const changedFiles = Number((text.match(/(\d+)\s+files?\s+changed/) || [])[1]);
const additions = Number((text.match(/(\d+)\s+insertions?\(\+\)/) || [])[1]);
const deletions = Number((text.match(/(\d+)\s+deletions?\(-\)/) || [])[1]);
return {
changedFiles: Number.isFinite(changedFiles) ? changedFiles : undefined,
additions: Number.isFinite(additions) ? additions : undefined,
deletions: Number.isFinite(deletions) ? deletions : undefined,
};
}
function collectArtifactsFromReply(text) {
const matches = new Set();
const source = String(text || "");
const pattern = /(?:[\w.-]+\/)*[\w.-]+\.(?:md|txt|ts|tsx|js|mjs|java|kt|json|png|jpe?g|webp|svg|apk|aab)\b/gi;
let match;
while ((match = pattern.exec(source)) && matches.size < 12) {
const label = match[0].split("/").filter(Boolean).pop();
if (label) {
matches.add(label);
}
}
return Array.from(matches).map((label) => ({
label,
kind: /\.(png|jpe?g|webp|svg)$/i.test(label) ? "image" : "file",
}));
}
async function collectLocalExecutionProgress(cwd, replyBody) {
const [diffShortstat, statusShort, ghVersion] = await Promise.all([
runShortCommand("git", ["diff", "--shortstat"], { cwd }),
runShortCommand("git", ["status", "--short"], { cwd }),
runShortCommand("gh", ["--version"], { cwd }),
]);
const parsedDiff = diffShortstat.ok ? parseGitShortstat(diffShortstat.stdout) : {};
const hasGitState = diffShortstat.ok || statusShort.ok;
return {
branch: hasGitState
? {
...parsedDiff,
gitStatus: statusShort.ok && statusShort.stdout ? "有未提交变更" : "工作区干净",
githubCliStatus: ghVersion.ok ? "available" : "unavailable",
}
: {
githubCliStatus: ghVersion.ok ? "available" : "unavailable",
},
artifacts: collectArtifactsFromReply(replyBody),
};
}
function mergeExecutionProgress(primary, secondary) {
const first = primary && typeof primary === "object" ? primary : {};
const second = secondary && typeof secondary === "object" ? secondary : {};
const artifacts = [];
const seenArtifacts = new Set();
for (const artifact of [...(first.artifacts || []), ...(second.artifacts || [])]) {
const label = String(artifact?.label || "").trim();
if (!label || seenArtifacts.has(label)) {
continue;
}
seenArtifacts.add(label);
artifacts.push(artifact);
}
const agents = [];
const seenAgents = new Set();
for (const agent of [...(first.agents || []), ...(second.agents || [])]) {
const name = String(agent?.name || "").trim();
const key = `${name}:${String(agent?.role || "").trim()}`;
if (!name || seenAgents.has(key)) {
continue;
}
seenAgents.add(key);
agents.push(agent);
}
return {
...(second || {}),
...(first || {}),
branch:
first.branch || second.branch
? {
...(second.branch || {}),
...(first.branch || {}),
}
: undefined,
artifacts: artifacts.length > 0 ? artifacts : undefined,
agents: agents.length > 0 ? agents : undefined,
};
}
async function runMasterAgentTask(config, runtime, task) {
const outputFile = join(os.tmpdir(), `${task.taskId}.reply.txt`);
const stderrChunks = [];
const taskTimeoutMs = resolveMasterAgentTaskTimeoutMs(config, task);
runtime.activeMasterTask = {
taskId: task.taskId,
status: "running",
startedAt: new Date().toISOString(),
};
const emitTaskPhase = async (phase, executionProgress) => {
try {
const result = await postMasterAgentTaskProgress(config, runtime, {
taskId: task.taskId,
status: "running",
phase,
executionProgress: {
...(executionProgress || {}),
phase,
},
});
return result;
} catch (error) {
return {
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
}
};
const createLongRunningProgressHeartbeat = ({ phase = "awaiting_reply", getProgress } = {}) => {
const intervalMs = normalizeLongRunningProgressIntervalMs(
config.masterAgentLongTaskProgressIntervalMs ?? config.masterAgentProgressHeartbeatIntervalMs,
);
if (intervalMs <= 0) {
return () => {};
}
const startedAtMs = Date.now();
let heartbeatCount = 0;
const sendHeartbeat = async () => {
heartbeatCount += 1;
await emitTaskPhase(
phase,
buildLongRunningCodexProgressSnapshot({
task,
phase,
startedAtMs,
nowMs: Date.now(),
baseProgress: typeof getProgress === "function" ? getProgress() : undefined,
heartbeatCount,
}),
);
};
const timer = setInterval(() => {
void sendHeartbeat();
}, intervalMs);
return () => {
clearInterval(timer);
};
};
try {
let activeChild = null;
await emitTaskPhase("executor_starting");
const executionResult = await (async () => {
if (canHandleCodexRemoteControlMaintenanceTask(task)) {
const daemonResult = await runCodexRemoteControlDaemonAction(
task.codexRemoteControlAction,
config,
);
if (daemonResult.status === "failed") {
throw new Error(
daemonResult.error ||
daemonResult.outputSummary ||
`CODEX_REMOTE_CONTROL_${String(task.codexRemoteControlAction).toUpperCase()}_FAILED`,
);
}
return {
replyBody: buildCodexRemoteControlMaintenanceReply(task, daemonResult),
};
}
if (canHandleBrowserControlTask(task)) {
const browserResult = await executeBrowserControlTask(task, config);
if (browserResult.status === "failed") {
throw new Error(browserResult.errorMessage || "BROWSER_CONTROL_FAILED");
}
return {
replyBody: browserResult.replyBody,
dispatchExecutionCompletion: {
targetUrl: browserResult.targetUrl,
},
};
}
if (canHandleComputerUseTask(task)) {
const computerUseResult = await executeComputerUseTask(task, config);
if (computerUseResult.status === "failed") {
throw new Error(computerUseResult.errorMessage || "COMPUTER_USE_FAILED");
}
if (computerUseResult.status === "needs_user_action") {
return {
waitingUserActionCompletion: buildComputerUseCompletionPayload(task, computerUseResult),
};
}
return {
replyBody: computerUseResult.replyBody,
dispatchExecutionCompletion: {
targetApp: computerUseResult.targetApp,
computerUseProvider: computerUseResult.computerUseProvider,
},
};
}
if (shouldUseOmxTeamTaskRunner(task)) {
const omxResult = await executeOmxTeamTask(getOmxTeamTaskRunnerConfig(process.env, config), task);
if (omxResult.status === "failed") {
throw new Error(omxResult.errorMessage || "OMX_EXECUTION_FAILED");
}
return {
replyBody: omxResult.replyBody ?? omxResult.rawThreadReply,
dispatchExecutionCompletion: {
rawThreadReply: omxResult.rawThreadReply,
replyBody: omxResult.replyBody,
},
};
}
const codexAppServerRunner = getCodexAppServerRunnerConfig(process.env, config);
if (shouldUseCodexAppServerTaskRunner(codexAppServerRunner, task)) {
let latestCodexExecutionProgress;
const stopLongRunningProgressHeartbeat = createLongRunningProgressHeartbeat({
phase: "awaiting_reply",
getProgress: () => latestCodexExecutionProgress,
});
let appServerResult;
try {
appServerResult = await executeCodexAppServerTask(
{
...codexAppServerRunner,
interruptPollIntervalMs: normalizeInterruptPollIntervalMs(config),
shouldInterruptActiveTurn: async () => {
const controlState = await fetchMasterAgentTaskControlState(config, runtime, task);
if (!controlState.ok) {
return false;
}
if (controlState.body?.canceled === true || controlState.body?.status === "canceled") {
return {
interrupt: true,
reason: controlState.body?.cancelReason || "USER_CANCELED_TASK",
};
}
return false;
},
onProgress: async (executionProgress) => {
latestCodexExecutionProgress = executionProgress;
const progressResult = await postMasterAgentTaskProgress(config, runtime, {
taskId: task.taskId,
status: "running",
phase: "awaiting_reply",
executionProgress,
});
if (!progressResult.ok) {
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_app_server_progress_failed",
message: "Codex App Server 进度实时回写失败,完成回写仍会携带最终进度。",
detail: progressResult.body,
mirrorToMaster: false,
});
}
},
},
task,
);
} finally {
stopLongRunningProgressHeartbeat();
}
if (appServerResult.status === "interrupted") {
return {
interruptedCompletion: {
replyBody: appServerResult.replyBody,
interruptReason: appServerResult.interruptReason,
executionProgress: appServerResult.executionProgress,
},
};
}
if (appServerResult.status === "completed") {
const localExecutionProgress = await collectLocalExecutionProgress(
appServerResult.cwd || task.targetCodexFolderRef || config.masterAgentWorkdir || process.cwd(),
appServerResult.replyBody,
);
const executionProgress = mergeExecutionProgress(
appServerResult.executionProgress,
localExecutionProgress,
);
try {
if (task.targetCodexThreadRef || task.targetThreadId) {
await executeCodexDesktopRefreshBridge(
{
targetThreadRef: task.targetCodexThreadRef || task.targetThreadId,
sourceMessageId: task.sourceMessageId || task.requestMessageId,
threadTouchStatus: "app_server_turn_started",
},
config,
);
}
} catch {
// Desktop refresh is only a visibility hint; app-server already owns the thread turn.
}
return {
replyBody: appServerResult.replyBody,
executionProgress,
dispatchExecutionCompletion:
task.taskType === "dispatch_execution"
? parseDispatchExecutionCompletion(appServerResult.replyBody)
: null,
};
}
if (appServerResult.canFallbackToCli !== true || config.codexAppServerFallbackToCli === false) {
throw new Error(appServerResult.errorMessage || "CODEX_APP_SERVER_EXECUTION_FAILED");
}
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_app_server_fallback",
message: "Codex App Server 本轮不可用,已回退到 CLI resume。",
detail: appServerResult.errorMessage,
mirrorToMaster: false,
});
}
const codexPreparation = await prepareCodexTaskExecution(config, task, outputFile);
if (!codexPreparation.ok) {
throw new Error(codexPreparation.error.message);
}
const codexExecution = codexPreparation.execution;
if (codexExecution.desktopMirror?.enabled) {
const mirrorResult = await appendBossUserMessageToCodexThreadRollout({
stateDbPath: config.codexStateDbPath,
sessionsDir: config.codexSessionsDir,
targetThreadRef: codexExecution.desktopMirror.targetThreadRef,
sourceMessageId: codexExecution.desktopMirror.sourceMessageId,
message: codexExecution.desktopMirror.sourceMessageBody,
sentAt: codexExecution.desktopMirror.sourceMessageSentAt,
});
try {
const refreshResult = await executeCodexDesktopRefreshBridge(
{
targetThreadRef: codexExecution.desktopMirror.targetThreadRef,
sourceMessageId: codexExecution.desktopMirror.sourceMessageId,
rolloutPath: mirrorResult.rolloutPath,
threadTouchStatus: mirrorResult.threadTouch?.status,
},
config,
);
if (refreshResult.status === "failed") {
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_desktop_refresh_failed",
message: "Codex 桌面刷新提示未完成,消息已写入线程记录。",
detail: refreshResult.detail,
mirrorToMaster: false,
});
}
} catch (error) {
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_desktop_refresh_failed",
message: "Codex 桌面刷新提示执行失败,消息已写入线程记录。",
detail: error instanceof Error ? error.message : String(error),
mirrorToMaster: false,
});
}
}
await runWithTaskTimeout(
{
timeoutMs: taskTimeoutMs,
label: `master task ${task.taskId}`,
onTimeout: async () => {
if (activeChild && !activeChild.killed) {
activeChild.kill("SIGKILL");
}
},
},
async () =>
await new Promise((resolveTask, rejectTask) => {
void emitTaskPhase("turn_started");
const child = spawn("codex", codexExecution.args, {
cwd: codexExecution.cwd,
env: process.env,
});
activeChild = child;
child.stderr.on("data", (chunk) => {
stderrChunks.push(String(chunk));
});
child.on("error", (error) => {
activeChild = null;
rejectTask(error);
});
child.on("close", (code) => {
activeChild = null;
if (code === 0) {
resolveTask();
return;
}
rejectTask(new Error(stderrChunks.join("").trim() || `codex exit code ${code}`));
});
}),
);
const replyBody = (await readFile(outputFile, "utf8")).trim();
const executionProgress = await collectLocalExecutionProgress(codexExecution.cwd, replyBody);
return {
replyBody,
executionProgress,
dispatchExecutionCompletion:
task.taskType === "dispatch_execution"
? parseDispatchExecutionCompletion(replyBody)
: null,
};
})();
if (executionResult.waitingUserActionCompletion) {
const completion = await completeMasterAgentTask(
config,
runtime,
executionResult.waitingUserActionCompletion,
);
if (!completion.ok) {
throw new Error(`DIALOG_GUARD_COMPLETION_FAILED:${completion.status}:${completion.body}`);
}
runtime.activeMasterTask = {
taskId: task.taskId,
status: "needs_user_action",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.desktop_dialog_guard_waiting_user_action",
message: `Master Codex Node 等待用户处理桌面弹窗:${task.taskId}`,
detail: executionResult.waitingUserActionCompletion.summary,
mirrorToMaster: false,
});
return;
}
if (executionResult.interruptedCompletion) {
runtime.activeMasterTask = {
taskId: task.taskId,
status: "canceled",
completedAt: new Date().toISOString(),
detail: executionResult.interruptedCompletion.replyBody,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.codex_app_server_turn_interrupted",
message: `Master Codex Node 已按取消状态中断 Codex App Server turn${task.taskId}`,
detail: executionResult.interruptedCompletion.interruptReason,
mirrorToMaster: false,
});
return;
}
const { replyBody, dispatchExecutionCompletion, executionProgress } = executionResult;
await emitTaskPhase("completing", executionProgress);
const completion = await completeMasterAgentTask(
config,
runtime,
buildRemoteExecutionCompletionPayload(task, {
status: "completed",
replyBody: dispatchExecutionCompletion?.replyBody ?? replyBody,
dispatchExecutionId: task.dispatchExecutionId,
targetProjectId: task.targetProjectId,
targetThreadId: task.targetThreadId,
targetUrl: dispatchExecutionCompletion?.targetUrl,
targetApp: dispatchExecutionCompletion?.targetApp,
computerUseProvider: dispatchExecutionCompletion?.computerUseProvider,
rawThreadReply: dispatchExecutionCompletion?.rawThreadReply,
executionProgress,
}),
);
if (!completion.ok) {
await emitTaskPhase("completing", {
...(executionProgress && typeof executionProgress === "object" ? executionProgress : {}),
title: "结果已生成,正在同步",
warnings: [
...(
Array.isArray(executionProgress?.warnings)
? executionProgress.warnings.filter(Boolean).slice(0, 6)
: []
),
{
id: "task-complete-sync-retrying",
severity: "warning",
message: "本机已生成任务结果,正在重试同步到 Boss 对话窗口。",
},
],
});
}
runtime.activeMasterTask = {
taskId: task.taskId,
status: completion.ok ? "completed" : "complete_failed",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: completion.ok ? "info" : "warn",
category: completion.ok
? "local_agent.master_agent_task_completed"
: "local_agent.master_agent_task_completion_sync_retrying",
message: completion.ok
? `Master Codex Node 已完成主 Agent 任务 ${task.taskId}`
: `Master Codex Node 已生成结果,正在重试同步主 Agent 任务 ${task.taskId}`,
detail: completion.ok ? replyBody.slice(0, 280) : completion.body,
mirrorToMaster: false,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
const transportDetail = sanitizeSensitiveTaskFailureDetailForTransport(detail);
const logDetail = sanitizeSensitiveTaskFailureDetailForLog(detail);
runtime.activeMasterTask = {
taskId: task.taskId,
status: "failed",
completedAt: new Date().toISOString(),
detail: logDetail ?? transportDetail ?? "MASTER_AGENT_TASK_FAILED",
};
await completeMasterAgentTask(
config,
runtime,
buildRemoteExecutionCompletionPayload(task, {
status: "failed",
errorMessage: transportDetail,
dispatchExecutionId: task.dispatchExecutionId,
targetProjectId: task.targetProjectId,
targetThreadId: task.targetThreadId,
}),
).catch(() => null);
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "error",
category: "local_agent.master_agent_task_failed",
message: `Master Codex Node 执行主 Agent 任务失败:${task.taskId}`,
detail: logDetail,
mirrorToMaster: true,
});
} finally {
await rm(outputFile, { force: true }).catch(() => null);
runtime.masterTaskBusy = false;
}
}
async function pollMasterAgentTasks(config, runtime) {
if (config.masterAgentEnabled === false || runtime.masterTaskBusy) {
return;
}
try {
const claim = await claimMasterAgentTask(config, runtime);
if (!claim.ok) {
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: false,
status: claim.status,
body: claim.body,
};
return;
}
const parsed = JSON.parse(claim.body);
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: true,
status: claim.status,
body: claim.body,
};
if (!parsed.task) {
return;
}
runtime.masterTaskBusy = true;
await runMasterAgentTask(config, runtime, parsed.task);
} catch (error) {
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
}
}
async function pollSkillLifecycleRequests(config, runtime) {
const runnerConfig = getSkillLifecycleRunnerConfig(process.env, config);
if (!runnerConfig.enabled || runtime.skillLifecycleBusy) {
return;
}
try {
const claim = await claimSkillLifecycleRequest(config, runtime);
runtime.lastSkillLifecyclePoll = {
at: new Date().toISOString(),
ok: claim.ok,
status: claim.status,
body: claim.body,
};
if (!claim.ok) {
return;
}
const parsed = JSON.parse(claim.body);
if (!parsed.request) {
return;
}
runtime.skillLifecycleBusy = true;
runtime.activeSkillLifecycleRequest = {
requestId: parsed.request.requestId,
action: parsed.request.action,
status: "running",
startedAt: new Date().toISOString(),
};
let result = await executeSkillLifecycleRequest(parsed.request, config, runtime);
if (result.status !== "failed") {
const skills = await discoverSkills(config);
runtime.lastSkills = skills;
const skillSyncResult = await postSkills(config, runtime, skills);
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = skillSyncResult.ok;
runtime.lastSkillSyncStatus = skillSyncResult.status;
runtime.lastSkillSyncBody = skillSyncResult.body;
if (!skillSyncResult.ok) {
result = {
status: "failed",
error: `SKILL_SYNC_FAILED:${skillSyncResult.status}:${skillSyncResult.body}`,
};
}
}
const completion = await completeSkillLifecycleRequest(config, runtime, parsed.request, result);
runtime.activeSkillLifecycleRequest = {
requestId: parsed.request.requestId,
action: parsed.request.action,
status: completion.ok ? result.status : "complete_failed",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
level: result.status === "failed" ? "error" : "info",
category: result.status === "failed"
? "local_agent.skill_lifecycle_failed"
: "local_agent.skill_lifecycle_completed",
message: `Skill 远程治理任务${result.status === "failed" ? "失败" : "完成"}${parsed.request.action}`,
detail: result.resultSummary ?? result.error,
mirrorToMaster: result.status === "failed",
});
} catch (error) {
runtime.lastSkillLifecyclePoll = {
at: new Date().toISOString(),
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
} finally {
runtime.skillLifecycleBusy = false;
}
}
async function checkBossAgentOta(config, runtime) {
const runnerConfig = getBossAgentOtaRunnerConfig(process.env, config);
if (!runnerConfig.enabled || runtime.bossAgentOtaBusy) {
return;
}
runtime.bossAgentOtaBusy = true;
try {
const status = await checkBossAgentOtaUpdate(config, runtime);
if (status?.hasUpdate) {
await postAppLog(config, runtime, {
level: "info",
category: "local_agent.boss_agent_ota_available",
message: `boss-agent 发现可用更新:${status.latest?.version ?? "未知版本"}`,
detail: status.latest?.fileName,
mirrorToMaster: false,
});
}
} catch (error) {
runtime.lastBossAgentOtaStatus = {
enabled: true,
currentVersion: getBossAgentOtaRunnerConfig(process.env, config).currentVersion,
hasUpdate: false,
latest: null,
checkedAt: new Date().toISOString(),
error: error instanceof Error ? error.message : String(error),
};
} finally {
runtime.bossAgentOtaBusy = false;
}
}
const configPath = process.argv[2];
if (!configPath) {
console.error("Usage: node local-agent/server.mjs <config.json>");
process.exit(1);
}
const config = await loadConfig(configPath);
const runtime = {
lastHeartbeatAt: null,
lastHeartbeatOk: false,
lastHeartbeatStatus: null,
lastHeartbeatBody: null,
lastSkillSyncAt: null,
lastSkillSyncOk: false,
lastSkillSyncStatus: null,
lastSkillSyncBody: null,
lastSkills: [],
issuedToken: config.token ?? null,
pairingCodeUsed: config.pairingCode ?? null,
lastThreadContextResults: [],
masterTaskBusy: false,
activeMasterTask: null,
lastMasterTaskPoll: null,
skillLifecycleBusy: false,
activeSkillLifecycleRequest: null,
lastSkillLifecyclePoll: null,
bossAgentOtaBusy: false,
lastBossAgentOtaStatus: null,
lastBossAgentOtaApply: null,
lastProjectDiscoveryAt: null,
lastProjectDiscoveryOk: false,
lastProjectDiscoverySummary: null,
lastReliableOutboxReplay: null,
};
function replayReliableOutboxInBackground(config, runtime) {
if (runtime.reliableOutboxReplayBusy) {
return;
}
runtime.reliableOutboxReplayBusy = true;
runtime.lastReliableOutboxReplayStartedAt = new Date().toISOString();
void replayReliableOutbox(config, {
limit: config.heartbeatOutboxReplayLimit ?? 5,
requestTimeoutMs: config.heartbeatOutboxRequestTimeoutMs ?? 1_000,
maxDurationMs: config.heartbeatOutboxReplayBudgetMs ?? 2_500,
})
.then((result) => {
runtime.lastReliableOutboxReplay = result;
runtime.lastReliableOutboxReplayAt = new Date().toISOString();
})
.catch((error) => {
runtime.lastReliableOutboxReplay = {
attempted: 0,
sent: 0,
retained: 0,
stoppedByBudget: false,
error: error instanceof Error ? error.message : String(error),
};
runtime.lastReliableOutboxReplayAt = new Date().toISOString();
})
.finally(() => {
runtime.reliableOutboxReplayBusy = false;
});
}
function syncThreadContextsInBackground(config, runtime, snapshots) {
if (runtime.threadContextSyncBusy || !Array.isArray(snapshots) || snapshots.length === 0) {
return;
}
runtime.threadContextSyncBusy = true;
runtime.lastThreadContextSyncStartedAt = new Date().toISOString();
void (async () => {
const results = [];
for (const snapshot of snapshots) {
let threadResult;
try {
threadResult = await postThreadContext(config, runtime, snapshot);
} catch (error) {
threadResult = {
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
workerId: snapshot.workerId ?? config.workerId ?? `${config.deviceId}-worker`,
threadId: snapshot.threadId,
};
}
results.push(threadResult);
if (!threadResult.ok) {
postAppLog(config, runtime, {
projectId: snapshot.projectId,
level: "error",
category: "local_agent.thread_context_failed",
message: `线程预算上报失败:${snapshot.threadId}`,
detail: threadResult.body,
mirrorToMaster: true,
}).catch(() => null);
}
}
runtime.lastThreadContextResults = results;
runtime.lastThreadContextSyncAt = new Date().toISOString();
})()
.catch((error) => {
runtime.lastThreadContextResults = [{
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
}];
runtime.lastThreadContextSyncAt = new Date().toISOString();
})
.finally(() => {
runtime.threadContextSyncBusy = false;
});
}
function syncSkillsInBackground(config, runtime) {
if (runtime.skillSyncBusy) {
return;
}
runtime.skillSyncBusy = true;
runtime.lastSkillSyncStartedAt = new Date().toISOString();
void (async () => {
const skills = await discoverSkills(config);
runtime.lastSkills = skills;
const skillSyncResult = await postSkills(config, runtime, skills);
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = skillSyncResult.ok;
runtime.lastSkillSyncStatus = skillSyncResult.status;
runtime.lastSkillSyncBody = skillSyncResult.body;
})()
.catch((error) => {
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = false;
runtime.lastSkillSyncStatus = 0;
runtime.lastSkillSyncBody = error instanceof Error ? error.message : String(error);
postAppLog(config, runtime, {
level: "error",
category: "local_agent.skills_sync_failed",
message: "Skill 扫描或同步失败。",
detail: runtime.lastSkillSyncBody,
mirrorToMaster: true,
}).catch(() => null);
})
.finally(() => {
runtime.skillSyncBusy = false;
});
}
async function performHeartbeat() {
try {
replayReliableOutboxInBackground(config, runtime);
const heartbeatProjects = await resolveHeartbeatProjects(config, runtime);
const result = await postHeartbeat(config, runtime, heartbeatProjects);
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = result.ok;
runtime.lastHeartbeatStatus = result.status;
runtime.lastHeartbeatBody = result.body;
if (result.json?.token) {
runtime.issuedToken = result.json.token;
}
if (!result.ok) {
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.heartbeat_failed",
message: "local-agent 心跳返回失败。",
detail: result.body,
mirrorToMaster: true,
});
}
const snapshots = Array.isArray(config.threadContexts) ? config.threadContexts : [];
syncThreadContextsInBackground(config, runtime, snapshots);
syncSkillsInBackground(config, runtime);
} catch (error) {
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = false;
runtime.lastHeartbeatStatus = 0;
runtime.lastHeartbeatBody = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.heartbeat_exception",
message: "local-agent 心跳执行异常。",
detail: runtime.lastHeartbeatBody,
mirrorToMaster: true,
});
}
}
const heartbeat = createSerializedRunner(performHeartbeat, {
timeoutMs: config.heartbeatTimeoutMs ?? 12_000,
timeoutErrorMessage: "LOCAL_AGENT_HEARTBEAT_TIMEOUT",
});
const masterTaskPoll = createSerializedRunner(async () => {
await pollMasterAgentTasks(config, runtime);
});
const skillLifecyclePoll = createSerializedRunner(async () => {
await pollSkillLifecycleRequests(config, runtime);
});
const bossAgentOtaPoll = createSerializedRunner(async () => {
await checkBossAgentOta(config, runtime);
});
const server = createServer(async (request, response) => {
const requestUrl = new URL(request.url || "/", `http://${config.bindHost || "127.0.0.1"}`);
if (requestUrl.pathname === "/" || requestUrl.pathname === "/boss-agent") {
const permissions = mergeBossAgentNativePermissionOverrides(
await detectLocalComputerPermissions(),
requestUrl.searchParams,
);
const status = buildBossAgentStatus(config, runtime, { permissions });
const activeTab = normalizeBossAgentTab(requestUrl.searchParams.get("tab") ?? "overview");
response.writeHead(200, { "Content-Type": "text/html; charset=utf-8" });
response.end(await renderBossAgentHtmlWithQr(status, { activeTab }));
return;
}
if (requestUrl.pathname === "/favicon.ico") {
response.writeHead(204);
response.end();
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/status") {
const permissions = await detectLocalComputerPermissions();
const status = buildBossAgentStatus(config, runtime, { permissions });
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: true, status }));
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/ota/check") {
const status = await checkBossAgentOtaUpdate(config, runtime);
const wantsJson = String(request.headers.accept || "").includes("application/json");
if (!wantsJson) {
response.writeHead(302, { Location: "/boss-agent?tab=overview" });
response.end();
return;
}
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: true, status }));
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/ota/apply" && request.method === "POST") {
const launchInstaller = requestUrl.searchParams.get("launch") !== "0";
const result = await applyBossAgentOtaUpdate(config, runtime, { launchInstaller });
await postAppLog(config, runtime, {
level: result.status === "failed" ? "error" : "info",
category: result.status === "failed"
? "local_agent.boss_agent_ota_failed"
: "local_agent.boss_agent_ota_applied",
message: result.status === "failed"
? "boss-agent OTA 更新失败"
: `boss-agent OTA 已${result.status === "installer_launched" ? "拉起安装器" : "下载暂存"}`,
detail: result.version ?? result.error ?? result.archivePath,
mirrorToMaster: result.status === "failed",
});
const wantsJson = String(request.headers.accept || "").includes("application/json");
if (!wantsJson) {
response.writeHead(302, { Location: "/boss-agent?tab=overview" });
response.end();
return;
}
response.writeHead(result.status === "failed" ? 400 : 200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: result.status !== "failed", result }));
return;
}
const codexRemoteControlMatch = requestUrl.pathname.match(
/^\/api\/v1\/boss-agent\/codex-remote-control\/(start|stop)$/,
);
if (codexRemoteControlMatch && request.method === "POST") {
const action = codexRemoteControlMatch[1];
const result = await runCodexRemoteControlDaemonAction(action, config);
runtime.lastCodexRemoteControlAction = {
action,
status: result.status,
at: new Date().toISOString(),
commandLabel: result.commandLabel,
outputSummary: result.outputSummary,
};
await postAppLog(config, runtime, {
level: result.status === "failed" ? "error" : "info",
category: result.status === "failed"
? "local_agent.codex_remote_control_failed"
: "local_agent.codex_remote_control_changed",
message: result.status === "failed"
? `Codex Remote Control ${action} 失败`
: `Codex Remote Control 已${action === "start" ? "启动" : "停止"}`,
detail: result.outputSummary,
mirrorToMaster: result.status === "failed",
});
const wantsJson = String(request.headers.accept || "").includes("application/json");
if (!wantsJson) {
response.writeHead(302, { Location: "/boss-agent?tab=overview" });
response.end();
return;
}
response.writeHead(result.status === "failed" ? 400 : 200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: result.status !== "failed", result }));
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/permissions/open") {
const target = requestUrl.searchParams.get("target") || "core";
const returnTab = normalizeBossAgentTab(requestUrl.searchParams.get("returnTab") ?? "permissions");
const result = await openBossAgentPermissionSettings(target);
const wantsJson = String(request.headers.accept || "").includes("application/json");
if (wantsJson) {
response.writeHead(result.ok ? 200 : 500, { "Content-Type": "application/json" });
response.end(JSON.stringify(result));
return;
}
response.writeHead(302, { Location: `/boss-agent?tab=${encodeURIComponent(returnTab)}` });
response.end();
return;
}
if (requestUrl.pathname === "/health") {
response.writeHead(200, { "Content-Type": "application/json" });
if (requestUrl.searchParams.get("verbose") === "1") {
response.end(
JSON.stringify({
ok: true,
service: "boss-local-agent",
runtime,
}),
);
return;
}
response.end(
JSON.stringify(buildLocalAgentHealthSummary(config, runtime)),
);
return;
}
if (requestUrl.pathname === "/api/v1/device") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ config, runtime }));
return;
}
if (requestUrl.pathname === "/api/v1/skills") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(
JSON.stringify({
ok: true,
deviceId: config.deviceId,
skills: runtime.lastSkills,
sync: {
at: runtime.lastSkillSyncAt,
ok: runtime.lastSkillSyncOk,
status: runtime.lastSkillSyncStatus,
body: runtime.lastSkillSyncBody,
},
}),
);
return;
}
if (requestUrl.pathname === "/api/v1/heartbeat" && request.method === "POST") {
await heartbeat().catch((error) => {
recordHeartbeatRunnerError(runtime, error);
});
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify(buildLocalAgentHealthSummary(config, runtime)));
return;
}
response.writeHead(404, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: false, message: "not_found" }));
});
server.listen(config.port, config.bindHost, () => {
console.log(
JSON.stringify({
ok: true,
service: "boss-local-agent",
bind: `${config.bindHost}:${config.port}`,
controlPlaneUrl: config.controlPlaneUrl,
workerId: config.workerId,
}),
);
});
void (async () => {
await heartbeat().catch((error) => {
recordHeartbeatRunnerError(runtime, error);
});
await masterTaskPoll();
await skillLifecyclePoll();
await bossAgentOtaPoll();
})();
setInterval(() => {
void heartbeat().catch((error) => {
recordHeartbeatRunnerError(runtime, error);
});
}, config.heartbeatIntervalMs ?? 15000);
setInterval(() => {
void masterTaskPoll();
}, config.masterAgentPollIntervalMs ?? 1000);
setInterval(() => {
void skillLifecyclePoll();
}, config.skillLifecyclePollIntervalMs ?? 5000);
setInterval(() => {
void bossAgentOtaPoll();
}, getBossAgentOtaRunnerConfig(process.env, config).checkIntervalMs);