Files
boss/local-agent/server.mjs
2026-05-12 17:04:40 +08:00

1108 lines
36 KiB
JavaScript
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env node
import { spawn } from "node:child_process";
import { createServer } from "node:http";
import { access, readFile, readdir, rm } from "node:fs/promises";
import os from "node:os";
import { join, resolve } from "node:path";
import { discoverCodexProjectCandidatesInWorker } from "./codex-session-discovery.mjs";
import { prepareCodexTaskExecution } from "./codex-task-runner.mjs";
import { appendBossUserMessageToCodexThreadRollout } from "./codex-thread-rollout-writer.mjs";
import {
executeOmxTeamTask,
getOmxTeamTaskRunnerConfig,
shouldUseOmxTeamTaskRunner,
} from "./omx-team-task-runner.mjs";
import {
canHandleBrowserControlTask,
executeBrowserControlTask,
getBrowserControlTaskRunnerConfig,
} from "./browser-control-task-runner.mjs";
import {
canHandleComputerUseTask,
executeComputerUseTask,
getComputerUseTaskRunnerConfig,
} from "./computer-use-task-runner.mjs";
import {
executeCodexDesktopRefreshBridge,
} from "./codex-desktop-refresh-bridge.mjs";
import {
executeSkillLifecycleRequest,
getSkillLifecycleRunnerConfig,
} from "./skill-lifecycle-runner.mjs";
import {
sanitizeSensitiveTaskFailureDetailForLog,
sanitizeSensitiveTaskFailureDetailForTransport,
} from "./master-task-output-sanitizer.mjs";
import {
resolveMasterAgentTaskTimeoutMs,
runWithTaskTimeout,
} from "./master-task-timeout.mjs";
import {
buildBossAgentStatus,
detectLocalComputerPermissions,
renderBossAgentHtmlWithQr,
} from "./boss-agent-status.mjs";
import {
buildComputerUseCompletionPayload,
buildMasterAgentTaskCompletionRequestBody,
buildRemoteExecutionCompletionPayload,
} from "./master-task-completion.mjs";
import { createSerializedRunner } from "./serialized-runner.mjs";
async function loadConfig(configPath) {
const raw = await readFile(resolve(configPath), "utf8");
return JSON.parse(raw);
}
async function resolveHeartbeatProjects(config, runtime) {
const staticProjects = Array.isArray(config.projects) ? config.projects : [];
const staticCandidates = Array.isArray(config.projectCandidates) ? config.projectCandidates : [];
if (config.codexSessionDiscoveryEnabled === false) {
return {
projects: staticProjects,
projectCandidates: staticCandidates,
};
}
try {
const discovered = await discoverCodexProjectCandidatesInWorker({
stateDbPath: config.codexStateDbPath,
logsDbPath: config.codexLogsDbPath,
sessionIndexPath: config.codexSessionIndexPath,
globalStatePath: config.codexGlobalStatePath,
sessionsDir: config.codexSessionsDir,
lookbackHours: config.codexSessionLookbackHours,
});
const candidateMap = new Map();
for (const candidate of [...staticCandidates, ...discovered.projectCandidates]) {
candidateMap.set(candidate.codexThreadRef ?? candidate.threadId, candidate);
}
const mergedCandidates = [...candidateMap.values()];
const mergedProjects = [...new Set([...staticProjects, ...discovered.projects, ...mergedCandidates.map((candidate) => candidate.folderName)])];
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = true;
runtime.lastProjectDiscoverySummary = `${mergedCandidates.length} threads / ${mergedProjects.length} folders`;
runtime.lastCodexGuiConnected = discovered.guiConnected === true;
return {
projects: mergedProjects,
projectCandidates: mergedCandidates,
guiConnected: discovered.guiConnected === true,
};
} catch (error) {
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = false;
runtime.lastProjectDiscoverySummary = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.codex_discovery_failed",
message: "Codex 线程扫描失败,已退回静态项目配置。",
detail: runtime.lastProjectDiscoverySummary,
mirrorToMaster: true,
});
return {
projects: staticProjects,
projectCandidates: staticCandidates,
guiConnected: false,
};
}
}
async function postHeartbeat(config, runtime, heartbeatProjects) {
const now = new Date().toISOString();
const preferredExecutionMode =
config.preferredExecutionMode === "gui" || config.preferredExecutionMode === "cli"
? config.preferredExecutionMode
: undefined;
const browserControlRuntime = getBrowserControlTaskRunnerConfig(process.env, config);
const computerUseRuntime = getComputerUseTaskRunnerConfig(process.env, config);
const guiConnected =
config.guiConnected === true ||
(config.guiConnected !== false && heartbeatProjects.guiConnected === true);
const response = await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/device-heartbeat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
deviceId: config.deviceId,
token: runtime.issuedToken ?? config.token,
pairingCode: runtime.issuedToken ? undefined : config.pairingCode,
name: config.name,
avatar: config.avatar,
account: config.account,
status: config.status,
quota5h: config.quota5h,
quota7d: config.quota7d,
capabilities: {
gui: {
connected: guiConnected,
lastSeenAt: now,
lastActiveProjectId: "",
},
cli: {
connected: config.cliConnected !== false,
lastSeenAt: now,
lastActiveProjectId: "",
},
browserAutomation: {
connected: config.browserAutomationConnected !== false || Boolean(browserControlRuntime.enabled && browserControlRuntime.command),
lastSeenAt: now,
lastActiveProjectId: "",
},
computerUse: {
connected: Boolean(config.computerUseConnected) || Boolean(computerUseRuntime.enabled && computerUseRuntime.command),
lastSeenAt: now,
lastActiveProjectId: "",
},
},
preferredExecutionMode,
projects: heartbeatProjects.projects,
projectCandidates: heartbeatProjects.projectCandidates,
endpoint: config.endpoint,
}),
});
const text = await response.text();
let json = null;
try {
json = JSON.parse(text);
} catch {
json = null;
}
return {
ok: response.ok,
status: response.status,
body: text,
json,
};
}
function deviceTokenHeaders(config, runtime) {
const token = runtime.issuedToken ?? config.token;
return token ? { "x-boss-device-token": token } : {};
}
async function postThreadContext(config, runtime, snapshot) {
const workerId = snapshot.workerId ?? config.workerId ?? `${config.deviceId}-worker`;
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/workers/${workerId}/thread-context`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
nodeId: config.deviceId,
projectId: snapshot.projectId,
taskId: snapshot.taskId,
threadId: snapshot.threadId,
title: snapshot.title,
summary: snapshot.summary,
sourceKind: snapshot.sourceKind ?? "worker_estimator",
status: snapshot.status ?? "running",
contextBudgetRemainingPct: snapshot.contextBudgetRemainingPct,
contextBudgetLevel: snapshot.contextBudgetLevel,
compactionExpectedAt: snapshot.compactionExpectedAt,
mustFinishBeforeCompaction: snapshot.mustFinishBeforeCompaction,
estimatedRemainingTurns: snapshot.estimatedRemainingTurns ?? 0,
estimatedRemainingLargeMessages: snapshot.estimatedRemainingLargeMessages ?? 0,
lastCompactionAt: snapshot.lastCompactionAt,
compactionCount: snapshot.compactionCount ?? 0,
patchPending: snapshot.patchPending ?? false,
testsPending: snapshot.testsPending ?? false,
evidencePending: snapshot.evidencePending ?? false,
checklist: snapshot.checklist ?? [],
capturedAt: new Date().toISOString(),
}),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
workerId,
threadId: snapshot.threadId,
};
}
function parseSkillDescription(content) {
const descriptionMatch = content.match(/description:\s*(.+)/);
if (descriptionMatch?.[1]) {
return descriptionMatch[1].trim().replace(/^["']|["']$/g, "");
}
const lines = content
.split("\n")
.map((line) => line.trim())
.filter(Boolean);
return lines.find((line) => !line.startsWith("---") && !line.startsWith("#")) ?? "未提供说明";
}
async function discoverSkills(config) {
const skillsDir = resolve(config.skillsDir ?? join(os.homedir(), ".codex/skills"));
const pending = [skillsDir];
const skills = [];
while (pending.length > 0) {
const currentDir = pending.pop();
if (!currentDir) continue;
let entries = [];
try {
entries = await readdir(currentDir, { withFileTypes: true });
} catch {
continue;
}
for (const entry of entries) {
if (entry.isDirectory()) {
pending.push(join(currentDir, entry.name));
continue;
}
if (!entry.isFile() || entry.name !== "SKILL.md") continue;
const skillPath = join(currentDir, entry.name);
try {
await access(skillPath);
const content = await readFile(skillPath, "utf8");
const skillName = currentDir.split("/").pop() ?? "unknown-skill";
const relativeCategory = currentDir
.replace(`${skillsDir}/`, "")
.split("/")
.slice(0, -1)
.join(" / ");
skills.push({
name: skillName,
description: parseSkillDescription(content),
path: skillPath,
invocation: `[$${skillName}](${skillPath})`,
category: relativeCategory || config.name,
});
} catch {
continue;
}
}
}
return skills.sort((a, b) => a.name.localeCompare(b.name));
}
async function postSkills(config, runtime, skills) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skills`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ skills }),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
count: skills.length,
};
}
async function postAppLog(config, runtime, payload) {
try {
await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/app-logs`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
deviceId: config.deviceId,
source: "local_agent",
...payload,
}),
});
} catch {
// Ignore log transport failures to avoid blocking the agent loop.
}
}
async function claimMasterAgentTask(config, runtime) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/claim`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ deviceId: config.deviceId }),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function completeMasterAgentTask(config, runtime, payload) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${payload.taskId}/complete`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify(buildMasterAgentTaskCompletionRequestBody(config, payload)),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function claimSkillLifecycleRequest(config, runtime) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skill-requests/claim`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ deviceId: config.deviceId }),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function completeSkillLifecycleRequest(config, runtime, request, result) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skill-requests/${request.requestId}/complete`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
status: result.status === "failed" ? "failed" : "completed",
resultSummary: result.resultSummary,
error: result.error,
}),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
function parseDispatchExecutionCompletion(rawOutput) {
const trimmed = String(rawOutput || "").trim();
if (!trimmed) {
return {
rawThreadReply: "",
replyBody: undefined,
};
}
const fencedMatch = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i);
const jsonCandidate = fencedMatch?.[1]?.trim() ?? trimmed;
try {
const parsed = JSON.parse(jsonCandidate);
if (parsed && typeof parsed === "object") {
const rawThreadReply =
typeof parsed.rawThreadReply === "string" ? parsed.rawThreadReply.trim() : "";
const replyBody =
typeof parsed.replyBody === "string" ? parsed.replyBody.trim() : undefined;
if (rawThreadReply) {
return {
rawThreadReply,
replyBody: replyBody || undefined,
};
}
}
} catch {
// Fall back to treating the full output as the raw thread reply.
}
return {
rawThreadReply: trimmed,
replyBody: undefined,
};
}
function runShortCommand(command, args, options = {}) {
return new Promise((resolve) => {
const child = spawn(command, args, {
cwd: options.cwd || process.cwd(),
env: process.env,
});
let stdout = "";
let stderr = "";
const timeout = setTimeout(() => {
if (!child.killed) {
child.kill("SIGKILL");
}
}, options.timeoutMs || 1500);
child.stdout.on("data", (chunk) => {
stdout += String(chunk);
});
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
child.on("error", (error) => {
clearTimeout(timeout);
resolve({ ok: false, stdout, stderr: error.message });
});
child.on("close", (code) => {
clearTimeout(timeout);
resolve({
ok: code === 0,
stdout: stdout.trim(),
stderr: stderr.trim(),
});
});
});
}
function parseGitShortstat(shortstat) {
const text = String(shortstat || "");
const changedFiles = Number((text.match(/(\d+)\s+files?\s+changed/) || [])[1]);
const additions = Number((text.match(/(\d+)\s+insertions?\(\+\)/) || [])[1]);
const deletions = Number((text.match(/(\d+)\s+deletions?\(-\)/) || [])[1]);
return {
changedFiles: Number.isFinite(changedFiles) ? changedFiles : undefined,
additions: Number.isFinite(additions) ? additions : undefined,
deletions: Number.isFinite(deletions) ? deletions : undefined,
};
}
function collectArtifactsFromReply(text) {
const matches = new Set();
const source = String(text || "");
const pattern = /(?:[\w.-]+\/)*[\w.-]+\.(?:md|txt|ts|tsx|js|mjs|java|kt|json|png|jpe?g|webp|svg|apk|aab)\b/gi;
let match;
while ((match = pattern.exec(source)) && matches.size < 12) {
const label = match[0].split("/").filter(Boolean).pop();
if (label) {
matches.add(label);
}
}
return Array.from(matches).map((label) => ({
label,
kind: /\.(png|jpe?g|webp|svg)$/i.test(label) ? "image" : "file",
}));
}
async function collectLocalExecutionProgress(cwd, replyBody) {
const [diffShortstat, statusShort, ghVersion] = await Promise.all([
runShortCommand("git", ["diff", "--shortstat"], { cwd }),
runShortCommand("git", ["status", "--short"], { cwd }),
runShortCommand("gh", ["--version"], { cwd }),
]);
const parsedDiff = diffShortstat.ok ? parseGitShortstat(diffShortstat.stdout) : {};
const hasGitState = diffShortstat.ok || statusShort.ok;
return {
branch: hasGitState
? {
...parsedDiff,
gitStatus: statusShort.ok && statusShort.stdout ? "有未提交变更" : "工作区干净",
githubCliStatus: ghVersion.ok ? "available" : "unavailable",
}
: {
githubCliStatus: ghVersion.ok ? "available" : "unavailable",
},
artifacts: collectArtifactsFromReply(replyBody),
};
}
async function runMasterAgentTask(config, runtime, task) {
const outputFile = join(os.tmpdir(), `${task.taskId}.reply.txt`);
const stderrChunks = [];
const taskTimeoutMs = resolveMasterAgentTaskTimeoutMs(config, task);
runtime.activeMasterTask = {
taskId: task.taskId,
status: "running",
startedAt: new Date().toISOString(),
};
try {
let activeChild = null;
const executionResult = await (async () => {
if (canHandleBrowserControlTask(task)) {
const browserResult = await executeBrowserControlTask(task, config);
if (browserResult.status === "failed") {
throw new Error(browserResult.errorMessage || "BROWSER_CONTROL_FAILED");
}
return {
replyBody: browserResult.replyBody,
dispatchExecutionCompletion: {
targetUrl: browserResult.targetUrl,
},
};
}
if (canHandleComputerUseTask(task)) {
const computerUseResult = await executeComputerUseTask(task, config);
if (computerUseResult.status === "failed") {
throw new Error(computerUseResult.errorMessage || "COMPUTER_USE_FAILED");
}
if (computerUseResult.status === "needs_user_action") {
return {
waitingUserActionCompletion: buildComputerUseCompletionPayload(task, computerUseResult),
};
}
return {
replyBody: computerUseResult.replyBody,
dispatchExecutionCompletion: {
targetApp: computerUseResult.targetApp,
},
};
}
if (shouldUseOmxTeamTaskRunner(task)) {
const omxResult = await executeOmxTeamTask(getOmxTeamTaskRunnerConfig(process.env, config), task);
if (omxResult.status === "failed") {
throw new Error(omxResult.errorMessage || "OMX_EXECUTION_FAILED");
}
return {
replyBody: omxResult.replyBody ?? omxResult.rawThreadReply,
dispatchExecutionCompletion: {
rawThreadReply: omxResult.rawThreadReply,
replyBody: omxResult.replyBody,
},
};
}
const codexPreparation = await prepareCodexTaskExecution(config, task, outputFile);
if (!codexPreparation.ok) {
throw new Error(codexPreparation.error.message);
}
const codexExecution = codexPreparation.execution;
if (codexExecution.desktopMirror?.enabled) {
const mirrorResult = await appendBossUserMessageToCodexThreadRollout({
stateDbPath: config.codexStateDbPath,
sessionsDir: config.codexSessionsDir,
targetThreadRef: codexExecution.desktopMirror.targetThreadRef,
sourceMessageId: codexExecution.desktopMirror.sourceMessageId,
message: codexExecution.desktopMirror.sourceMessageBody,
sentAt: codexExecution.desktopMirror.sourceMessageSentAt,
});
try {
const refreshResult = await executeCodexDesktopRefreshBridge(
{
targetThreadRef: codexExecution.desktopMirror.targetThreadRef,
sourceMessageId: codexExecution.desktopMirror.sourceMessageId,
rolloutPath: mirrorResult.rolloutPath,
threadTouchStatus: mirrorResult.threadTouch?.status,
},
config,
);
if (refreshResult.status === "failed") {
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_desktop_refresh_failed",
message: "Codex 桌面刷新提示未完成,消息已写入线程记录。",
detail: refreshResult.detail,
mirrorToMaster: false,
});
}
} catch (error) {
await postAppLog(config, runtime, {
projectId: task.projectId,
level: "warn",
category: "local_agent.codex_desktop_refresh_failed",
message: "Codex 桌面刷新提示执行失败,消息已写入线程记录。",
detail: error instanceof Error ? error.message : String(error),
mirrorToMaster: false,
});
}
}
await runWithTaskTimeout(
{
timeoutMs: taskTimeoutMs,
label: `master task ${task.taskId}`,
onTimeout: async () => {
if (activeChild && !activeChild.killed) {
activeChild.kill("SIGKILL");
}
},
},
async () =>
await new Promise((resolveTask, rejectTask) => {
const child = spawn("codex", codexExecution.args, {
cwd: codexExecution.cwd,
env: process.env,
});
activeChild = child;
child.stderr.on("data", (chunk) => {
stderrChunks.push(String(chunk));
});
child.on("error", (error) => {
activeChild = null;
rejectTask(error);
});
child.on("close", (code) => {
activeChild = null;
if (code === 0) {
resolveTask();
return;
}
rejectTask(new Error(stderrChunks.join("").trim() || `codex exit code ${code}`));
});
}),
);
const replyBody = (await readFile(outputFile, "utf8")).trim();
const executionProgress = await collectLocalExecutionProgress(codexExecution.cwd, replyBody);
return {
replyBody,
executionProgress,
dispatchExecutionCompletion:
task.taskType === "dispatch_execution"
? parseDispatchExecutionCompletion(replyBody)
: null,
};
})();
if (executionResult.waitingUserActionCompletion) {
const completion = await completeMasterAgentTask(
config,
runtime,
executionResult.waitingUserActionCompletion,
);
if (!completion.ok) {
throw new Error(`DIALOG_GUARD_COMPLETION_FAILED:${completion.status}:${completion.body}`);
}
runtime.activeMasterTask = {
taskId: task.taskId,
status: "needs_user_action",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.desktop_dialog_guard_waiting_user_action",
message: `Master Codex Node 等待用户处理桌面弹窗:${task.taskId}`,
detail: executionResult.waitingUserActionCompletion.summary,
mirrorToMaster: false,
});
return;
}
const { replyBody, dispatchExecutionCompletion, executionProgress } = executionResult;
const completion = await completeMasterAgentTask(
config,
runtime,
buildRemoteExecutionCompletionPayload(task, {
status: "completed",
replyBody: dispatchExecutionCompletion?.replyBody ?? replyBody,
dispatchExecutionId: task.dispatchExecutionId,
targetProjectId: task.targetProjectId,
targetThreadId: task.targetThreadId,
targetUrl: dispatchExecutionCompletion?.targetUrl,
targetApp: dispatchExecutionCompletion?.targetApp,
rawThreadReply: dispatchExecutionCompletion?.rawThreadReply,
executionProgress,
}),
);
runtime.activeMasterTask = {
taskId: task.taskId,
status: completion.ok ? "completed" : "complete_failed",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.master_agent_task_completed",
message: `Master Codex Node 已完成主 Agent 任务 ${task.taskId}`,
detail: replyBody.slice(0, 280),
mirrorToMaster: false,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
const transportDetail = sanitizeSensitiveTaskFailureDetailForTransport(detail);
const logDetail = sanitizeSensitiveTaskFailureDetailForLog(detail);
runtime.activeMasterTask = {
taskId: task.taskId,
status: "failed",
completedAt: new Date().toISOString(),
detail: logDetail ?? transportDetail ?? "MASTER_AGENT_TASK_FAILED",
};
await completeMasterAgentTask(
config,
runtime,
buildRemoteExecutionCompletionPayload(task, {
status: "failed",
errorMessage: transportDetail,
dispatchExecutionId: task.dispatchExecutionId,
targetProjectId: task.targetProjectId,
targetThreadId: task.targetThreadId,
}),
).catch(() => null);
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "error",
category: "local_agent.master_agent_task_failed",
message: `Master Codex Node 执行主 Agent 任务失败:${task.taskId}`,
detail: logDetail,
mirrorToMaster: true,
});
} finally {
await rm(outputFile, { force: true }).catch(() => null);
runtime.masterTaskBusy = false;
}
}
async function pollMasterAgentTasks(config, runtime) {
if (config.masterAgentEnabled === false || runtime.masterTaskBusy) {
return;
}
try {
const claim = await claimMasterAgentTask(config, runtime);
if (!claim.ok) {
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: false,
status: claim.status,
body: claim.body,
};
return;
}
const parsed = JSON.parse(claim.body);
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: true,
status: claim.status,
body: claim.body,
};
if (!parsed.task) {
return;
}
runtime.masterTaskBusy = true;
await runMasterAgentTask(config, runtime, parsed.task);
} catch (error) {
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
}
}
async function pollSkillLifecycleRequests(config, runtime) {
const runnerConfig = getSkillLifecycleRunnerConfig(process.env, config);
if (!runnerConfig.enabled || runtime.skillLifecycleBusy) {
return;
}
try {
const claim = await claimSkillLifecycleRequest(config, runtime);
runtime.lastSkillLifecyclePoll = {
at: new Date().toISOString(),
ok: claim.ok,
status: claim.status,
body: claim.body,
};
if (!claim.ok) {
return;
}
const parsed = JSON.parse(claim.body);
if (!parsed.request) {
return;
}
runtime.skillLifecycleBusy = true;
runtime.activeSkillLifecycleRequest = {
requestId: parsed.request.requestId,
action: parsed.request.action,
status: "running",
startedAt: new Date().toISOString(),
};
let result = await executeSkillLifecycleRequest(parsed.request, config, runtime);
if (result.status !== "failed") {
const skills = await discoverSkills(config);
runtime.lastSkills = skills;
const skillSyncResult = await postSkills(config, runtime, skills);
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = skillSyncResult.ok;
runtime.lastSkillSyncStatus = skillSyncResult.status;
runtime.lastSkillSyncBody = skillSyncResult.body;
if (!skillSyncResult.ok) {
result = {
status: "failed",
error: `SKILL_SYNC_FAILED:${skillSyncResult.status}:${skillSyncResult.body}`,
};
}
}
const completion = await completeSkillLifecycleRequest(config, runtime, parsed.request, result);
runtime.activeSkillLifecycleRequest = {
requestId: parsed.request.requestId,
action: parsed.request.action,
status: completion.ok ? result.status : "complete_failed",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
level: result.status === "failed" ? "error" : "info",
category: result.status === "failed"
? "local_agent.skill_lifecycle_failed"
: "local_agent.skill_lifecycle_completed",
message: `Skill 远程治理任务${result.status === "failed" ? "失败" : "完成"}${parsed.request.action}`,
detail: result.resultSummary ?? result.error,
mirrorToMaster: result.status === "failed",
});
} catch (error) {
runtime.lastSkillLifecyclePoll = {
at: new Date().toISOString(),
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
} finally {
runtime.skillLifecycleBusy = false;
}
}
const configPath = process.argv[2];
if (!configPath) {
console.error("Usage: node local-agent/server.mjs <config.json>");
process.exit(1);
}
const config = await loadConfig(configPath);
const runtime = {
lastHeartbeatAt: null,
lastHeartbeatOk: false,
lastHeartbeatStatus: null,
lastHeartbeatBody: null,
lastSkillSyncAt: null,
lastSkillSyncOk: false,
lastSkillSyncStatus: null,
lastSkillSyncBody: null,
lastSkills: [],
issuedToken: config.token ?? null,
pairingCodeUsed: config.pairingCode ?? null,
lastThreadContextResults: [],
masterTaskBusy: false,
activeMasterTask: null,
lastMasterTaskPoll: null,
skillLifecycleBusy: false,
activeSkillLifecycleRequest: null,
lastSkillLifecyclePoll: null,
lastProjectDiscoveryAt: null,
lastProjectDiscoveryOk: false,
lastProjectDiscoverySummary: null,
};
async function performHeartbeat() {
try {
const heartbeatProjects = await resolveHeartbeatProjects(config, runtime);
const result = await postHeartbeat(config, runtime, heartbeatProjects);
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = result.ok;
runtime.lastHeartbeatStatus = result.status;
runtime.lastHeartbeatBody = result.body;
if (result.json?.token) {
runtime.issuedToken = result.json.token;
}
if (!result.ok) {
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.heartbeat_failed",
message: "local-agent 心跳返回失败。",
detail: result.body,
mirrorToMaster: true,
});
}
const snapshots = Array.isArray(config.threadContexts) ? config.threadContexts : [];
runtime.lastThreadContextResults = [];
for (const snapshot of snapshots) {
const threadResult = await postThreadContext(config, runtime, snapshot);
runtime.lastThreadContextResults.push(threadResult);
if (!threadResult.ok) {
await postAppLog(config, runtime, {
projectId: snapshot.projectId,
level: "error",
category: "local_agent.thread_context_failed",
message: `线程预算上报失败:${snapshot.threadId}`,
detail: threadResult.body,
mirrorToMaster: true,
});
}
}
try {
const skills = await discoverSkills(config);
runtime.lastSkills = skills;
const skillSyncResult = await postSkills(config, runtime, skills);
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = skillSyncResult.ok;
runtime.lastSkillSyncStatus = skillSyncResult.status;
runtime.lastSkillSyncBody = skillSyncResult.body;
} catch (error) {
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = false;
runtime.lastSkillSyncStatus = 0;
runtime.lastSkillSyncBody = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.skills_sync_failed",
message: "Skill 扫描或同步失败。",
detail: runtime.lastSkillSyncBody,
mirrorToMaster: true,
});
}
} catch (error) {
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = false;
runtime.lastHeartbeatStatus = 0;
runtime.lastHeartbeatBody = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.heartbeat_exception",
message: "local-agent 心跳执行异常。",
detail: runtime.lastHeartbeatBody,
mirrorToMaster: true,
});
}
}
const heartbeat = createSerializedRunner(performHeartbeat);
const masterTaskPoll = createSerializedRunner(async () => {
await pollMasterAgentTasks(config, runtime);
});
const skillLifecyclePoll = createSerializedRunner(async () => {
await pollSkillLifecycleRequests(config, runtime);
});
const server = createServer(async (request, response) => {
const requestUrl = new URL(request.url || "/", `http://${config.bindHost || "127.0.0.1"}`);
if (requestUrl.pathname === "/" || requestUrl.pathname === "/boss-agent") {
const permissions = await detectLocalComputerPermissions();
const status = buildBossAgentStatus(config, runtime, { permissions });
response.writeHead(200, { "Content-Type": "text/html; charset=utf-8" });
response.end(await renderBossAgentHtmlWithQr(status));
return;
}
if (requestUrl.pathname === "/favicon.ico") {
response.writeHead(204);
response.end();
return;
}
if (requestUrl.pathname === "/api/v1/boss-agent/status") {
const permissions = await detectLocalComputerPermissions();
const status = buildBossAgentStatus(config, runtime, { permissions });
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: true, status }));
return;
}
if (requestUrl.pathname === "/health") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(
JSON.stringify({
ok: true,
service: "boss-local-agent",
runtime,
}),
);
return;
}
if (requestUrl.pathname === "/api/v1/device") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ config, runtime }));
return;
}
if (requestUrl.pathname === "/api/v1/skills") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(
JSON.stringify({
ok: true,
deviceId: config.deviceId,
skills: runtime.lastSkills,
sync: {
at: runtime.lastSkillSyncAt,
ok: runtime.lastSkillSyncOk,
status: runtime.lastSkillSyncStatus,
body: runtime.lastSkillSyncBody,
},
}),
);
return;
}
if (requestUrl.pathname === "/api/v1/heartbeat" && request.method === "POST") {
await heartbeat();
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: runtime.lastHeartbeatOk, runtime }));
return;
}
response.writeHead(404, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: false, message: "not_found" }));
});
server.listen(config.port, config.bindHost, () => {
console.log(
JSON.stringify({
ok: true,
service: "boss-local-agent",
bind: `${config.bindHost}:${config.port}`,
controlPlaneUrl: config.controlPlaneUrl,
workerId: config.workerId,
}),
);
});
void (async () => {
await heartbeat();
await masterTaskPoll();
await skillLifecyclePoll();
})();
setInterval(() => {
void heartbeat();
}, config.heartbeatIntervalMs ?? 15000);
setInterval(() => {
void masterTaskPoll();
}, config.masterAgentPollIntervalMs ?? 1000);
setInterval(() => {
void skillLifecyclePoll();
}, config.skillLifecyclePollIntervalMs ?? 5000);