Files
boss/local-agent/server.mjs

694 lines
22 KiB
JavaScript
Executable File

#!/usr/bin/env node
import { spawn } from "node:child_process";
import { createServer } from "node:http";
import { access, readFile, readdir, rm } from "node:fs/promises";
import os from "node:os";
import { join, resolve } from "node:path";
import { discoverCodexProjectCandidatesInWorker } from "./codex-session-discovery.mjs";
import { prepareCodexTaskExecution } from "./codex-task-runner.mjs";
import {
executeOmxTeamTask,
getOmxTeamTaskRunnerConfig,
shouldUseOmxTeamTaskRunner,
} from "./omx-team-task-runner.mjs";
import { createSerializedRunner } from "./serialized-runner.mjs";
async function loadConfig(configPath) {
const raw = await readFile(resolve(configPath), "utf8");
return JSON.parse(raw);
}
async function resolveHeartbeatProjects(config, runtime) {
const staticProjects = Array.isArray(config.projects) ? config.projects : [];
const staticCandidates = Array.isArray(config.projectCandidates) ? config.projectCandidates : [];
if (config.codexSessionDiscoveryEnabled === false) {
return {
projects: staticProjects,
projectCandidates: staticCandidates,
};
}
try {
const discovered = await discoverCodexProjectCandidatesInWorker({
stateDbPath: config.codexStateDbPath,
logsDbPath: config.codexLogsDbPath,
sessionIndexPath: config.codexSessionIndexPath,
globalStatePath: config.codexGlobalStatePath,
sessionsDir: config.codexSessionsDir,
lookbackHours: config.codexSessionLookbackHours,
});
const candidateMap = new Map();
for (const candidate of [...staticCandidates, ...discovered.projectCandidates]) {
candidateMap.set(candidate.codexThreadRef ?? candidate.threadId, candidate);
}
const mergedCandidates = [...candidateMap.values()];
const mergedProjects = [...new Set([...staticProjects, ...discovered.projects, ...mergedCandidates.map((candidate) => candidate.folderName)])];
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = true;
runtime.lastProjectDiscoverySummary = `${mergedCandidates.length} threads / ${mergedProjects.length} folders`;
return {
projects: mergedProjects,
projectCandidates: mergedCandidates,
};
} catch (error) {
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = false;
runtime.lastProjectDiscoverySummary = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.codex_discovery_failed",
message: "Codex 线程扫描失败,已退回静态项目配置。",
detail: runtime.lastProjectDiscoverySummary,
mirrorToMaster: true,
});
return {
projects: staticProjects,
projectCandidates: staticCandidates,
};
}
}
async function postHeartbeat(config, runtime, heartbeatProjects) {
const response = await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/device-heartbeat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
deviceId: config.deviceId,
token: runtime.issuedToken ?? config.token,
pairingCode: runtime.issuedToken ? undefined : config.pairingCode,
name: config.name,
avatar: config.avatar,
account: config.account,
status: config.status,
quota5h: config.quota5h,
quota7d: config.quota7d,
projects: heartbeatProjects.projects,
projectCandidates: heartbeatProjects.projectCandidates,
endpoint: config.endpoint,
}),
});
const text = await response.text();
let json = null;
try {
json = JSON.parse(text);
} catch {
json = null;
}
return {
ok: response.ok,
status: response.status,
body: text,
json,
};
}
function deviceTokenHeaders(config, runtime) {
const token = runtime.issuedToken ?? config.token;
return token ? { "x-boss-device-token": token } : {};
}
async function postThreadContext(config, runtime, snapshot) {
const workerId = snapshot.workerId ?? config.workerId ?? `${config.deviceId}-worker`;
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/workers/${workerId}/thread-context`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
nodeId: config.deviceId,
projectId: snapshot.projectId,
taskId: snapshot.taskId,
threadId: snapshot.threadId,
title: snapshot.title,
summary: snapshot.summary,
sourceKind: snapshot.sourceKind ?? "worker_estimator",
status: snapshot.status ?? "running",
contextBudgetRemainingPct: snapshot.contextBudgetRemainingPct,
contextBudgetLevel: snapshot.contextBudgetLevel,
compactionExpectedAt: snapshot.compactionExpectedAt,
mustFinishBeforeCompaction: snapshot.mustFinishBeforeCompaction,
estimatedRemainingTurns: snapshot.estimatedRemainingTurns ?? 0,
estimatedRemainingLargeMessages: snapshot.estimatedRemainingLargeMessages ?? 0,
lastCompactionAt: snapshot.lastCompactionAt,
compactionCount: snapshot.compactionCount ?? 0,
patchPending: snapshot.patchPending ?? false,
testsPending: snapshot.testsPending ?? false,
evidencePending: snapshot.evidencePending ?? false,
checklist: snapshot.checklist ?? [],
capturedAt: new Date().toISOString(),
}),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
workerId,
threadId: snapshot.threadId,
};
}
function parseSkillDescription(content) {
const descriptionMatch = content.match(/description:\s*(.+)/);
if (descriptionMatch?.[1]) {
return descriptionMatch[1].trim().replace(/^["']|["']$/g, "");
}
const lines = content
.split("\n")
.map((line) => line.trim())
.filter(Boolean);
return lines.find((line) => !line.startsWith("---") && !line.startsWith("#")) ?? "未提供说明";
}
async function discoverSkills(config) {
const skillsDir = resolve(config.skillsDir ?? join(os.homedir(), ".codex/skills"));
const pending = [skillsDir];
const skills = [];
while (pending.length > 0) {
const currentDir = pending.pop();
if (!currentDir) continue;
let entries = [];
try {
entries = await readdir(currentDir, { withFileTypes: true });
} catch {
continue;
}
for (const entry of entries) {
if (entry.isDirectory()) {
pending.push(join(currentDir, entry.name));
continue;
}
if (!entry.isFile() || entry.name !== "SKILL.md") continue;
const skillPath = join(currentDir, entry.name);
try {
await access(skillPath);
const content = await readFile(skillPath, "utf8");
const skillName = currentDir.split("/").pop() ?? "unknown-skill";
const relativeCategory = currentDir
.replace(`${skillsDir}/`, "")
.split("/")
.slice(0, -1)
.join(" / ");
skills.push({
name: skillName,
description: parseSkillDescription(content),
path: skillPath,
invocation: `[$${skillName}](${skillPath})`,
category: relativeCategory || config.name,
});
} catch {
continue;
}
}
}
return skills.sort((a, b) => a.name.localeCompare(b.name));
}
async function postSkills(config, runtime, skills) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skills`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ skills }),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
count: skills.length,
};
}
async function postAppLog(config, runtime, payload) {
try {
await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/app-logs`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
deviceId: config.deviceId,
source: "local_agent",
...payload,
}),
});
} catch {
// Ignore log transport failures to avoid blocking the agent loop.
}
}
async function claimMasterAgentTask(config, runtime) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/claim`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({ deviceId: config.deviceId }),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
async function completeMasterAgentTask(config, runtime, payload) {
const response = await fetch(
`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${payload.taskId}/complete`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
...deviceTokenHeaders(config, runtime),
},
body: JSON.stringify({
deviceId: config.deviceId,
status: payload.status,
replyBody: payload.replyBody,
errorMessage: payload.errorMessage,
requestId: payload.requestId,
dispatchExecutionId: payload.dispatchExecutionId,
targetProjectId: payload.targetProjectId,
targetThreadId: payload.targetThreadId,
rawThreadReply: payload.rawThreadReply,
}),
},
);
return {
ok: response.ok,
status: response.status,
body: await response.text(),
};
}
function parseDispatchExecutionCompletion(rawOutput) {
const trimmed = String(rawOutput || "").trim();
if (!trimmed) {
return {
rawThreadReply: "",
replyBody: undefined,
};
}
const fencedMatch = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i);
const jsonCandidate = fencedMatch?.[1]?.trim() ?? trimmed;
try {
const parsed = JSON.parse(jsonCandidate);
if (parsed && typeof parsed === "object") {
const rawThreadReply =
typeof parsed.rawThreadReply === "string" ? parsed.rawThreadReply.trim() : "";
const replyBody =
typeof parsed.replyBody === "string" ? parsed.replyBody.trim() : undefined;
if (rawThreadReply) {
return {
rawThreadReply,
replyBody: replyBody || undefined,
};
}
}
} catch {
// Fall back to treating the full output as the raw thread reply.
}
return {
rawThreadReply: trimmed,
replyBody: undefined,
};
}
function buildRemoteExecutionCompletionPayload(task, payload) {
return {
taskId: task.taskId,
status: payload.status === "failed" ? "failed" : "completed",
requestId: payload.requestId,
replyBody: typeof payload.replyBody === "string" ? payload.replyBody.trim() || undefined : undefined,
errorMessage: typeof payload.errorMessage === "string" ? payload.errorMessage.trim() || undefined : undefined,
dispatchExecutionId:
typeof payload.dispatchExecutionId === "string" ? payload.dispatchExecutionId.trim() || undefined : undefined,
targetProjectId:
typeof payload.targetProjectId === "string" ? payload.targetProjectId.trim() || undefined : undefined,
targetThreadId:
typeof payload.targetThreadId === "string" ? payload.targetThreadId.trim() || undefined : undefined,
rawThreadReply:
typeof payload.rawThreadReply === "string" ? payload.rawThreadReply.trim() || undefined : undefined,
};
}
async function runMasterAgentTask(config, runtime, task) {
const outputFile = join(os.tmpdir(), `${task.taskId}.reply.txt`);
const stderrChunks = [];
runtime.activeMasterTask = {
taskId: task.taskId,
status: "running",
startedAt: new Date().toISOString(),
};
try {
let replyBody;
let dispatchExecutionCompletion = null;
if (shouldUseOmxTeamTaskRunner(task)) {
const omxResult = await executeOmxTeamTask(getOmxTeamTaskRunnerConfig(process.env, config), task);
if (omxResult.status === "failed") {
throw new Error(omxResult.errorMessage || "OMX_EXECUTION_FAILED");
}
replyBody = omxResult.replyBody ?? omxResult.rawThreadReply;
dispatchExecutionCompletion = {
rawThreadReply: omxResult.rawThreadReply,
replyBody: omxResult.replyBody,
};
} else {
const codexPreparation = await prepareCodexTaskExecution(config, task, outputFile);
if (!codexPreparation.ok) {
throw new Error(codexPreparation.error.message);
}
const codexExecution = codexPreparation.execution;
await new Promise((resolveTask, rejectTask) => {
const child = spawn("codex", codexExecution.args, {
cwd: codexExecution.cwd,
env: process.env,
});
child.stderr.on("data", (chunk) => {
stderrChunks.push(String(chunk));
});
child.on("error", rejectTask);
child.on("close", (code) => {
if (code === 0) {
resolveTask();
return;
}
rejectTask(new Error(stderrChunks.join("").trim() || `codex exit code ${code}`));
});
});
replyBody = (await readFile(outputFile, "utf8")).trim();
dispatchExecutionCompletion =
task.taskType === "dispatch_execution"
? parseDispatchExecutionCompletion(replyBody)
: null;
}
const completion = await completeMasterAgentTask(
config,
runtime,
buildRemoteExecutionCompletionPayload(task, {
status: "completed",
replyBody: dispatchExecutionCompletion?.replyBody ?? replyBody,
dispatchExecutionId: task.dispatchExecutionId,
targetProjectId: task.targetProjectId,
targetThreadId: task.targetThreadId,
rawThreadReply: dispatchExecutionCompletion?.rawThreadReply,
}),
);
runtime.activeMasterTask = {
taskId: task.taskId,
status: completion.ok ? "completed" : "complete_failed",
completedAt: new Date().toISOString(),
detail: completion.body,
};
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "info",
category: "local_agent.master_agent_task_completed",
message: `Master Codex Node 已完成主 Agent 任务 ${task.taskId}`,
detail: replyBody.slice(0, 280),
mirrorToMaster: false,
});
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
runtime.activeMasterTask = {
taskId: task.taskId,
status: "failed",
completedAt: new Date().toISOString(),
detail,
};
await completeMasterAgentTask(
config,
runtime,
buildRemoteExecutionCompletionPayload(task, {
status: "failed",
errorMessage: detail,
dispatchExecutionId: task.dispatchExecutionId,
targetProjectId: task.targetProjectId,
targetThreadId: task.targetThreadId,
}),
).catch(() => null);
await postAppLog(config, runtime, {
projectId: "master-agent",
level: "error",
category: "local_agent.master_agent_task_failed",
message: `Master Codex Node 执行主 Agent 任务失败:${task.taskId}`,
detail,
mirrorToMaster: true,
});
} finally {
await rm(outputFile, { force: true }).catch(() => null);
runtime.masterTaskBusy = false;
}
}
async function pollMasterAgentTasks(config, runtime) {
if (config.masterAgentEnabled === false || runtime.masterTaskBusy) {
return;
}
try {
const claim = await claimMasterAgentTask(config, runtime);
if (!claim.ok) {
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: false,
status: claim.status,
body: claim.body,
};
return;
}
const parsed = JSON.parse(claim.body);
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: true,
status: claim.status,
body: claim.body,
};
if (!parsed.task) {
return;
}
runtime.masterTaskBusy = true;
await runMasterAgentTask(config, runtime, parsed.task);
} catch (error) {
runtime.lastMasterTaskPoll = {
at: new Date().toISOString(),
ok: false,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
}
}
const configPath = process.argv[2];
if (!configPath) {
console.error("Usage: node local-agent/server.mjs <config.json>");
process.exit(1);
}
const config = await loadConfig(configPath);
const runtime = {
lastHeartbeatAt: null,
lastHeartbeatOk: false,
lastHeartbeatStatus: null,
lastHeartbeatBody: null,
lastSkillSyncAt: null,
lastSkillSyncOk: false,
lastSkillSyncStatus: null,
lastSkillSyncBody: null,
lastSkills: [],
issuedToken: config.token ?? null,
pairingCodeUsed: config.pairingCode ?? null,
lastThreadContextResults: [],
masterTaskBusy: false,
activeMasterTask: null,
lastMasterTaskPoll: null,
lastProjectDiscoveryAt: null,
lastProjectDiscoveryOk: false,
lastProjectDiscoverySummary: null,
};
async function performHeartbeat() {
try {
const heartbeatProjects = await resolveHeartbeatProjects(config, runtime);
const result = await postHeartbeat(config, runtime, heartbeatProjects);
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = result.ok;
runtime.lastHeartbeatStatus = result.status;
runtime.lastHeartbeatBody = result.body;
if (result.json?.token) {
runtime.issuedToken = result.json.token;
}
if (!result.ok) {
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.heartbeat_failed",
message: "local-agent 心跳返回失败。",
detail: result.body,
mirrorToMaster: true,
});
}
const snapshots = Array.isArray(config.threadContexts) ? config.threadContexts : [];
runtime.lastThreadContextResults = [];
for (const snapshot of snapshots) {
const threadResult = await postThreadContext(config, runtime, snapshot);
runtime.lastThreadContextResults.push(threadResult);
if (!threadResult.ok) {
await postAppLog(config, runtime, {
projectId: snapshot.projectId,
level: "error",
category: "local_agent.thread_context_failed",
message: `线程预算上报失败:${snapshot.threadId}`,
detail: threadResult.body,
mirrorToMaster: true,
});
}
}
try {
const skills = await discoverSkills(config);
runtime.lastSkills = skills;
const skillSyncResult = await postSkills(config, runtime, skills);
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = skillSyncResult.ok;
runtime.lastSkillSyncStatus = skillSyncResult.status;
runtime.lastSkillSyncBody = skillSyncResult.body;
} catch (error) {
runtime.lastSkillSyncAt = new Date().toISOString();
runtime.lastSkillSyncOk = false;
runtime.lastSkillSyncStatus = 0;
runtime.lastSkillSyncBody = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.skills_sync_failed",
message: "Skill 扫描或同步失败。",
detail: runtime.lastSkillSyncBody,
mirrorToMaster: true,
});
}
} catch (error) {
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = false;
runtime.lastHeartbeatStatus = 0;
runtime.lastHeartbeatBody = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.heartbeat_exception",
message: "local-agent 心跳执行异常。",
detail: runtime.lastHeartbeatBody,
mirrorToMaster: true,
});
}
}
const heartbeat = createSerializedRunner(performHeartbeat);
const server = createServer(async (request, response) => {
if (request.url === "/health") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(
JSON.stringify({
ok: true,
service: "boss-local-agent",
runtime,
}),
);
return;
}
if (request.url === "/api/v1/device") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ config, runtime }));
return;
}
if (request.url === "/api/v1/skills") {
response.writeHead(200, { "Content-Type": "application/json" });
response.end(
JSON.stringify({
ok: true,
deviceId: config.deviceId,
skills: runtime.lastSkills,
sync: {
at: runtime.lastSkillSyncAt,
ok: runtime.lastSkillSyncOk,
status: runtime.lastSkillSyncStatus,
body: runtime.lastSkillSyncBody,
},
}),
);
return;
}
if (request.url === "/api/v1/heartbeat" && request.method === "POST") {
await heartbeat();
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: runtime.lastHeartbeatOk, runtime }));
return;
}
response.writeHead(404, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: false, message: "not_found" }));
});
server.listen(config.port, config.bindHost, () => {
console.log(
JSON.stringify({
ok: true,
service: "boss-local-agent",
bind: `${config.bindHost}:${config.port}`,
controlPlaneUrl: config.controlPlaneUrl,
workerId: config.workerId,
}),
);
});
void (async () => {
await heartbeat();
await pollMasterAgentTasks(config, runtime);
})();
setInterval(() => {
void heartbeat();
}, config.heartbeatIntervalMs ?? 60000);
setInterval(() => {
void pollMasterAgentTasks(config, runtime);
}, config.masterAgentPollIntervalMs ?? 3000);