#!/usr/bin/env node import { spawn } from "node:child_process"; import { createServer } from "node:http"; import { access, readFile, readdir, rm } from "node:fs/promises"; import os from "node:os"; import { join, resolve } from "node:path"; async function loadConfig(configPath) { const raw = await readFile(resolve(configPath), "utf8"); return JSON.parse(raw); } async function postHeartbeat(config, runtime) { const response = await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/device-heartbeat`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ deviceId: config.deviceId, token: runtime.issuedToken ?? config.token, pairingCode: runtime.issuedToken ? undefined : config.pairingCode, name: config.name, avatar: config.avatar, account: config.account, status: config.status, quota5h: config.quota5h, quota7d: config.quota7d, projects: config.projects, endpoint: config.endpoint, }), }); const text = await response.text(); let json = null; try { json = JSON.parse(text); } catch { json = null; } return { ok: response.ok, status: response.status, body: text, json, }; } function deviceTokenHeaders(config, runtime) { const token = runtime.issuedToken ?? config.token; return token ? { "x-boss-device-token": token } : {}; } async function postThreadContext(config, runtime, snapshot) { const workerId = snapshot.workerId ?? config.workerId ?? `${config.deviceId}-worker`; const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/workers/${workerId}/thread-context`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ nodeId: config.deviceId, projectId: snapshot.projectId, taskId: snapshot.taskId, threadId: snapshot.threadId, title: snapshot.title, summary: snapshot.summary, sourceKind: snapshot.sourceKind ?? "worker_estimator", status: snapshot.status ?? "running", contextBudgetRemainingPct: snapshot.contextBudgetRemainingPct, contextBudgetLevel: snapshot.contextBudgetLevel, compactionExpectedAt: snapshot.compactionExpectedAt, mustFinishBeforeCompaction: snapshot.mustFinishBeforeCompaction, estimatedRemainingTurns: snapshot.estimatedRemainingTurns ?? 0, estimatedRemainingLargeMessages: snapshot.estimatedRemainingLargeMessages ?? 0, lastCompactionAt: snapshot.lastCompactionAt, compactionCount: snapshot.compactionCount ?? 0, patchPending: snapshot.patchPending ?? false, testsPending: snapshot.testsPending ?? false, evidencePending: snapshot.evidencePending ?? false, checklist: snapshot.checklist ?? [], capturedAt: new Date().toISOString(), }), }, ); return { ok: response.ok, status: response.status, body: await response.text(), workerId, threadId: snapshot.threadId, }; } function parseSkillDescription(content) { const descriptionMatch = content.match(/description:\s*(.+)/); if (descriptionMatch?.[1]) { return descriptionMatch[1].trim().replace(/^["']|["']$/g, ""); } const lines = content .split("\n") .map((line) => line.trim()) .filter(Boolean); return lines.find((line) => !line.startsWith("---") && !line.startsWith("#")) ?? "未提供说明"; } async function discoverSkills(config) { const skillsDir = resolve(config.skillsDir ?? join(os.homedir(), ".codex/skills")); const pending = [skillsDir]; const skills = []; while (pending.length > 0) { const currentDir = pending.pop(); if (!currentDir) continue; let entries = []; try { entries = await readdir(currentDir, { withFileTypes: true }); } catch { continue; } for (const entry of entries) { if (entry.isDirectory()) { pending.push(join(currentDir, entry.name)); continue; } if (!entry.isFile() || entry.name !== "SKILL.md") continue; const skillPath = join(currentDir, entry.name); try { await access(skillPath); const content = await readFile(skillPath, "utf8"); const skillName = currentDir.split("/").pop() ?? "unknown-skill"; const relativeCategory = currentDir .replace(`${skillsDir}/`, "") .split("/") .slice(0, -1) .join(" / "); skills.push({ name: skillName, description: parseSkillDescription(content), path: skillPath, invocation: `[$${skillName}](${skillPath})`, category: relativeCategory || config.name, }); } catch { continue; } } } return skills.sort((a, b) => a.name.localeCompare(b.name)); } async function postSkills(config, skills) { const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skills`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ skills }), }, ); return { ok: response.ok, status: response.status, body: await response.text(), count: skills.length, }; } async function postAppLog(config, payload) { try { await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/app-logs`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ deviceId: config.deviceId, source: "local_agent", ...payload, }), }); } catch { // Ignore log transport failures to avoid blocking the agent loop. } } async function claimMasterAgentTask(config, runtime) { const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/claim`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ deviceId: config.deviceId }), }, ); return { ok: response.ok, status: response.status, body: await response.text(), }; } async function completeMasterAgentTask(config, runtime, payload) { const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${payload.taskId}/complete`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ deviceId: config.deviceId, status: payload.status, replyBody: payload.replyBody, errorMessage: payload.errorMessage, requestId: payload.requestId, }), }, ); return { ok: response.ok, status: response.status, body: await response.text(), }; } function buildCodexArgs(config, outputFile, prompt) { const args = [ "exec", "--ephemeral", "--skip-git-repo-check", "-C", config.masterAgentWorkdir || process.cwd(), "-s", config.masterAgentSandbox || "workspace-write", "-o", outputFile, ]; if (config.masterAgentModel) { args.push("-m", config.masterAgentModel); } args.push(prompt); return args; } async function runMasterAgentTask(config, runtime, task) { const outputFile = join(os.tmpdir(), `${task.taskId}.reply.txt`); const stderrChunks = []; runtime.activeMasterTask = { taskId: task.taskId, status: "running", startedAt: new Date().toISOString(), }; try { await new Promise((resolveTask, rejectTask) => { const child = spawn("codex", buildCodexArgs(config, outputFile, task.executionPrompt), { cwd: config.masterAgentWorkdir || process.cwd(), env: process.env, }); child.stderr.on("data", (chunk) => { stderrChunks.push(String(chunk)); }); child.on("error", rejectTask); child.on("close", (code) => { if (code === 0) { resolveTask(); return; } rejectTask(new Error(stderrChunks.join("").trim() || `codex exit code ${code}`)); }); }); const replyBody = (await readFile(outputFile, "utf8")).trim(); const completion = await completeMasterAgentTask(config, runtime, { taskId: task.taskId, status: "completed", replyBody, }); runtime.activeMasterTask = { taskId: task.taskId, status: completion.ok ? "completed" : "complete_failed", completedAt: new Date().toISOString(), detail: completion.body, }; await postAppLog(config, { projectId: "master-agent", level: "info", category: "local_agent.master_agent_task_completed", message: `Master Codex Node 已完成主 Agent 任务 ${task.taskId}。`, detail: replyBody.slice(0, 280), mirrorToMaster: false, }); } catch (error) { const detail = error instanceof Error ? error.message : String(error); runtime.activeMasterTask = { taskId: task.taskId, status: "failed", completedAt: new Date().toISOString(), detail, }; await completeMasterAgentTask(config, runtime, { taskId: task.taskId, status: "failed", errorMessage: detail, }).catch(() => null); await postAppLog(config, { projectId: "master-agent", level: "error", category: "local_agent.master_agent_task_failed", message: `Master Codex Node 执行主 Agent 任务失败:${task.taskId}`, detail, mirrorToMaster: true, }); } finally { await rm(outputFile, { force: true }).catch(() => null); runtime.masterTaskBusy = false; } } async function pollMasterAgentTasks(config, runtime) { if (config.masterAgentEnabled === false || runtime.masterTaskBusy) { return; } try { const claim = await claimMasterAgentTask(config, runtime); if (!claim.ok) { runtime.lastMasterTaskPoll = { at: new Date().toISOString(), ok: false, status: claim.status, body: claim.body, }; return; } const parsed = JSON.parse(claim.body); runtime.lastMasterTaskPoll = { at: new Date().toISOString(), ok: true, status: claim.status, body: claim.body, }; if (!parsed.task) { return; } runtime.masterTaskBusy = true; await runMasterAgentTask(config, runtime, parsed.task); } catch (error) { runtime.lastMasterTaskPoll = { at: new Date().toISOString(), ok: false, status: 0, body: error instanceof Error ? error.message : String(error), }; } } const configPath = process.argv[2]; if (!configPath) { console.error("Usage: node local-agent/server.mjs "); process.exit(1); } const config = await loadConfig(configPath); const runtime = { lastHeartbeatAt: null, lastHeartbeatOk: false, lastHeartbeatStatus: null, lastHeartbeatBody: null, lastSkillSyncAt: null, lastSkillSyncOk: false, lastSkillSyncStatus: null, lastSkillSyncBody: null, lastSkills: [], issuedToken: config.token ?? null, pairingCodeUsed: config.pairingCode ?? null, lastThreadContextResults: [], masterTaskBusy: false, activeMasterTask: null, lastMasterTaskPoll: null, }; async function heartbeat() { try { const result = await postHeartbeat(config, runtime); runtime.lastHeartbeatAt = new Date().toISOString(); runtime.lastHeartbeatOk = result.ok; runtime.lastHeartbeatStatus = result.status; runtime.lastHeartbeatBody = result.body; if (result.json?.token) { runtime.issuedToken = result.json.token; } if (!result.ok) { await postAppLog(config, { level: "error", category: "local_agent.heartbeat_failed", message: "local-agent 心跳返回失败。", detail: result.body, mirrorToMaster: true, }); } const snapshots = Array.isArray(config.threadContexts) ? config.threadContexts : []; runtime.lastThreadContextResults = []; for (const snapshot of snapshots) { const threadResult = await postThreadContext(config, runtime, snapshot); runtime.lastThreadContextResults.push(threadResult); if (!threadResult.ok) { await postAppLog(config, { projectId: snapshot.projectId, level: "error", category: "local_agent.thread_context_failed", message: `线程预算上报失败:${snapshot.threadId}`, detail: threadResult.body, mirrorToMaster: true, }); } } try { const skills = await discoverSkills(config); runtime.lastSkills = skills; const skillSyncResult = await postSkills(config, skills); runtime.lastSkillSyncAt = new Date().toISOString(); runtime.lastSkillSyncOk = skillSyncResult.ok; runtime.lastSkillSyncStatus = skillSyncResult.status; runtime.lastSkillSyncBody = skillSyncResult.body; } catch (error) { runtime.lastSkillSyncAt = new Date().toISOString(); runtime.lastSkillSyncOk = false; runtime.lastSkillSyncStatus = 0; runtime.lastSkillSyncBody = error instanceof Error ? error.message : String(error); await postAppLog(config, { level: "error", category: "local_agent.skills_sync_failed", message: "Skill 扫描或同步失败。", detail: runtime.lastSkillSyncBody, mirrorToMaster: true, }); } } catch (error) { runtime.lastHeartbeatAt = new Date().toISOString(); runtime.lastHeartbeatOk = false; runtime.lastHeartbeatStatus = 0; runtime.lastHeartbeatBody = error instanceof Error ? error.message : String(error); await postAppLog(config, { level: "error", category: "local_agent.heartbeat_exception", message: "local-agent 心跳执行异常。", detail: runtime.lastHeartbeatBody, mirrorToMaster: true, }); } } await heartbeat(); await pollMasterAgentTasks(config, runtime); setInterval(() => { void heartbeat(); }, config.heartbeatIntervalMs ?? 60000); setInterval(() => { void pollMasterAgentTasks(config, runtime); }, config.masterAgentPollIntervalMs ?? 3000); const server = createServer(async (request, response) => { if (request.url === "/health") { response.writeHead(200, { "Content-Type": "application/json" }); response.end( JSON.stringify({ ok: true, service: "boss-local-agent", runtime, }), ); return; } if (request.url === "/api/v1/device") { response.writeHead(200, { "Content-Type": "application/json" }); response.end(JSON.stringify({ config, runtime })); return; } if (request.url === "/api/v1/skills") { response.writeHead(200, { "Content-Type": "application/json" }); response.end( JSON.stringify({ ok: true, deviceId: config.deviceId, skills: runtime.lastSkills, sync: { at: runtime.lastSkillSyncAt, ok: runtime.lastSkillSyncOk, status: runtime.lastSkillSyncStatus, body: runtime.lastSkillSyncBody, }, }), ); return; } if (request.url === "/api/v1/heartbeat" && request.method === "POST") { await heartbeat(); response.writeHead(200, { "Content-Type": "application/json" }); response.end(JSON.stringify({ ok: runtime.lastHeartbeatOk, runtime })); return; } response.writeHead(404, { "Content-Type": "application/json" }); response.end(JSON.stringify({ ok: false, message: "not_found" })); }); server.listen(config.port, config.bindHost, () => { console.log( JSON.stringify({ ok: true, service: "boss-local-agent", bind: `${config.bindHost}:${config.port}`, controlPlaneUrl: config.controlPlaneUrl, workerId: config.workerId, }), ); });