#!/usr/bin/env node import { spawn } from "node:child_process"; import { createServer } from "node:http"; import { access, readFile, readdir, rm } from "node:fs/promises"; import os from "node:os"; import { join, resolve } from "node:path"; import { discoverCodexProjectCandidatesInWorker } from "./codex-session-discovery.mjs"; import { buildCodexTaskExecution } from "./codex-task-runner.mjs"; async function loadConfig(configPath) { const raw = await readFile(resolve(configPath), "utf8"); return JSON.parse(raw); } async function resolveHeartbeatProjects(config, runtime) { const staticProjects = Array.isArray(config.projects) ? config.projects : []; const staticCandidates = Array.isArray(config.projectCandidates) ? config.projectCandidates : []; if (config.codexSessionDiscoveryEnabled === false) { return { projects: staticProjects, projectCandidates: staticCandidates, }; } try { const discovered = await discoverCodexProjectCandidatesInWorker({ stateDbPath: config.codexStateDbPath, logsDbPath: config.codexLogsDbPath, sessionIndexPath: config.codexSessionIndexPath, globalStatePath: config.codexGlobalStatePath, sessionsDir: config.codexSessionsDir, lookbackHours: config.codexSessionLookbackHours, }); const candidateMap = new Map(); for (const candidate of [...staticCandidates, ...discovered.projectCandidates]) { candidateMap.set(candidate.codexThreadRef ?? candidate.threadId, candidate); } const mergedCandidates = [...candidateMap.values()]; const mergedProjects = [...new Set([...staticProjects, ...discovered.projects, ...mergedCandidates.map((candidate) => candidate.folderName)])]; runtime.lastProjectDiscoveryAt = new Date().toISOString(); runtime.lastProjectDiscoveryOk = true; runtime.lastProjectDiscoverySummary = `${mergedCandidates.length} threads / ${mergedProjects.length} folders`; return { projects: mergedProjects, projectCandidates: mergedCandidates, }; } catch (error) { runtime.lastProjectDiscoveryAt = new Date().toISOString(); runtime.lastProjectDiscoveryOk = false; runtime.lastProjectDiscoverySummary = error instanceof Error ? error.message : String(error); await postAppLog(config, runtime, { level: "error", category: "local_agent.codex_discovery_failed", message: "Codex 线程扫描失败,已退回静态项目配置。", detail: runtime.lastProjectDiscoverySummary, mirrorToMaster: true, }); return { projects: staticProjects, projectCandidates: staticCandidates, }; } } async function postHeartbeat(config, runtime, heartbeatProjects) { const response = await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/device-heartbeat`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ deviceId: config.deviceId, token: runtime.issuedToken ?? config.token, pairingCode: runtime.issuedToken ? undefined : config.pairingCode, name: config.name, avatar: config.avatar, account: config.account, status: config.status, quota5h: config.quota5h, quota7d: config.quota7d, projects: heartbeatProjects.projects, projectCandidates: heartbeatProjects.projectCandidates, endpoint: config.endpoint, }), }); const text = await response.text(); let json = null; try { json = JSON.parse(text); } catch { json = null; } return { ok: response.ok, status: response.status, body: text, json, }; } function deviceTokenHeaders(config, runtime) { const token = runtime.issuedToken ?? config.token; return token ? { "x-boss-device-token": token } : {}; } async function postThreadContext(config, runtime, snapshot) { const workerId = snapshot.workerId ?? config.workerId ?? `${config.deviceId}-worker`; const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/workers/${workerId}/thread-context`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ nodeId: config.deviceId, projectId: snapshot.projectId, taskId: snapshot.taskId, threadId: snapshot.threadId, title: snapshot.title, summary: snapshot.summary, sourceKind: snapshot.sourceKind ?? "worker_estimator", status: snapshot.status ?? "running", contextBudgetRemainingPct: snapshot.contextBudgetRemainingPct, contextBudgetLevel: snapshot.contextBudgetLevel, compactionExpectedAt: snapshot.compactionExpectedAt, mustFinishBeforeCompaction: snapshot.mustFinishBeforeCompaction, estimatedRemainingTurns: snapshot.estimatedRemainingTurns ?? 0, estimatedRemainingLargeMessages: snapshot.estimatedRemainingLargeMessages ?? 0, lastCompactionAt: snapshot.lastCompactionAt, compactionCount: snapshot.compactionCount ?? 0, patchPending: snapshot.patchPending ?? false, testsPending: snapshot.testsPending ?? false, evidencePending: snapshot.evidencePending ?? false, checklist: snapshot.checklist ?? [], capturedAt: new Date().toISOString(), }), }, ); return { ok: response.ok, status: response.status, body: await response.text(), workerId, threadId: snapshot.threadId, }; } function parseSkillDescription(content) { const descriptionMatch = content.match(/description:\s*(.+)/); if (descriptionMatch?.[1]) { return descriptionMatch[1].trim().replace(/^["']|["']$/g, ""); } const lines = content .split("\n") .map((line) => line.trim()) .filter(Boolean); return lines.find((line) => !line.startsWith("---") && !line.startsWith("#")) ?? "未提供说明"; } async function discoverSkills(config) { const skillsDir = resolve(config.skillsDir ?? join(os.homedir(), ".codex/skills")); const pending = [skillsDir]; const skills = []; while (pending.length > 0) { const currentDir = pending.pop(); if (!currentDir) continue; let entries = []; try { entries = await readdir(currentDir, { withFileTypes: true }); } catch { continue; } for (const entry of entries) { if (entry.isDirectory()) { pending.push(join(currentDir, entry.name)); continue; } if (!entry.isFile() || entry.name !== "SKILL.md") continue; const skillPath = join(currentDir, entry.name); try { await access(skillPath); const content = await readFile(skillPath, "utf8"); const skillName = currentDir.split("/").pop() ?? "unknown-skill"; const relativeCategory = currentDir .replace(`${skillsDir}/`, "") .split("/") .slice(0, -1) .join(" / "); skills.push({ name: skillName, description: parseSkillDescription(content), path: skillPath, invocation: `[$${skillName}](${skillPath})`, category: relativeCategory || config.name, }); } catch { continue; } } } return skills.sort((a, b) => a.name.localeCompare(b.name)); } async function postSkills(config, runtime, skills) { const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/devices/${config.deviceId}/skills`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ skills }), }, ); return { ok: response.ok, status: response.status, body: await response.text(), count: skills.length, }; } async function postAppLog(config, runtime, payload) { try { await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/app-logs`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ deviceId: config.deviceId, source: "local_agent", ...payload, }), }); } catch { // Ignore log transport failures to avoid blocking the agent loop. } } async function claimMasterAgentTask(config, runtime) { const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/claim`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ deviceId: config.deviceId }), }, ); return { ok: response.ok, status: response.status, body: await response.text(), }; } async function completeMasterAgentTask(config, runtime, payload) { const response = await fetch( `${config.controlPlaneUrl.replace(/\/$/, "")}/api/v1/master-agent/tasks/${payload.taskId}/complete`, { method: "POST", headers: { "Content-Type": "application/json", ...deviceTokenHeaders(config, runtime), }, body: JSON.stringify({ deviceId: config.deviceId, status: payload.status, replyBody: payload.replyBody, errorMessage: payload.errorMessage, requestId: payload.requestId, dispatchExecutionId: payload.dispatchExecutionId, targetProjectId: payload.targetProjectId, targetThreadId: payload.targetThreadId, rawThreadReply: payload.rawThreadReply, }), }, ); return { ok: response.ok, status: response.status, body: await response.text(), }; } function parseDispatchExecutionCompletion(rawOutput) { const trimmed = String(rawOutput || "").trim(); if (!trimmed) { return { rawThreadReply: "", replyBody: undefined, }; } const fencedMatch = trimmed.match(/```(?:json)?\s*([\s\S]*?)```/i); const jsonCandidate = fencedMatch?.[1]?.trim() ?? trimmed; try { const parsed = JSON.parse(jsonCandidate); if (parsed && typeof parsed === "object") { const rawThreadReply = typeof parsed.rawThreadReply === "string" ? parsed.rawThreadReply.trim() : ""; const replyBody = typeof parsed.replyBody === "string" ? parsed.replyBody.trim() : undefined; if (rawThreadReply) { return { rawThreadReply, replyBody: replyBody || undefined, }; } } } catch { // Fall back to treating the full output as the raw thread reply. } return { rawThreadReply: trimmed, replyBody: undefined, }; } async function runMasterAgentTask(config, runtime, task) { const outputFile = join(os.tmpdir(), `${task.taskId}.reply.txt`); const stderrChunks = []; runtime.activeMasterTask = { taskId: task.taskId, status: "running", startedAt: new Date().toISOString(), }; try { const codexExecution = buildCodexTaskExecution(config, task, outputFile); await new Promise((resolveTask, rejectTask) => { const child = spawn("codex", codexExecution.args, { cwd: codexExecution.cwd, env: process.env, }); child.stderr.on("data", (chunk) => { stderrChunks.push(String(chunk)); }); child.on("error", rejectTask); child.on("close", (code) => { if (code === 0) { resolveTask(); return; } rejectTask(new Error(stderrChunks.join("").trim() || `codex exit code ${code}`)); }); }); const replyBody = (await readFile(outputFile, "utf8")).trim(); const dispatchExecutionCompletion = task.taskType === "dispatch_execution" ? parseDispatchExecutionCompletion(replyBody) : null; const completion = await completeMasterAgentTask(config, runtime, { taskId: task.taskId, status: "completed", replyBody: dispatchExecutionCompletion?.replyBody ?? replyBody, dispatchExecutionId: task.dispatchExecutionId, targetProjectId: task.targetProjectId, targetThreadId: task.targetThreadId, rawThreadReply: dispatchExecutionCompletion?.rawThreadReply, }); runtime.activeMasterTask = { taskId: task.taskId, status: completion.ok ? "completed" : "complete_failed", completedAt: new Date().toISOString(), detail: completion.body, }; await postAppLog(config, runtime, { projectId: "master-agent", level: "info", category: "local_agent.master_agent_task_completed", message: `Master Codex Node 已完成主 Agent 任务 ${task.taskId}。`, detail: replyBody.slice(0, 280), mirrorToMaster: false, }); } catch (error) { const detail = error instanceof Error ? error.message : String(error); runtime.activeMasterTask = { taskId: task.taskId, status: "failed", completedAt: new Date().toISOString(), detail, }; await completeMasterAgentTask(config, runtime, { taskId: task.taskId, status: "failed", errorMessage: detail, dispatchExecutionId: task.dispatchExecutionId, targetProjectId: task.targetProjectId, targetThreadId: task.targetThreadId, }).catch(() => null); await postAppLog(config, runtime, { projectId: "master-agent", level: "error", category: "local_agent.master_agent_task_failed", message: `Master Codex Node 执行主 Agent 任务失败:${task.taskId}`, detail, mirrorToMaster: true, }); } finally { await rm(outputFile, { force: true }).catch(() => null); runtime.masterTaskBusy = false; } } async function pollMasterAgentTasks(config, runtime) { if (config.masterAgentEnabled === false || runtime.masterTaskBusy) { return; } try { const claim = await claimMasterAgentTask(config, runtime); if (!claim.ok) { runtime.lastMasterTaskPoll = { at: new Date().toISOString(), ok: false, status: claim.status, body: claim.body, }; return; } const parsed = JSON.parse(claim.body); runtime.lastMasterTaskPoll = { at: new Date().toISOString(), ok: true, status: claim.status, body: claim.body, }; if (!parsed.task) { return; } runtime.masterTaskBusy = true; await runMasterAgentTask(config, runtime, parsed.task); } catch (error) { runtime.lastMasterTaskPoll = { at: new Date().toISOString(), ok: false, status: 0, body: error instanceof Error ? error.message : String(error), }; } } const configPath = process.argv[2]; if (!configPath) { console.error("Usage: node local-agent/server.mjs "); process.exit(1); } const config = await loadConfig(configPath); const runtime = { lastHeartbeatAt: null, lastHeartbeatOk: false, lastHeartbeatStatus: null, lastHeartbeatBody: null, lastSkillSyncAt: null, lastSkillSyncOk: false, lastSkillSyncStatus: null, lastSkillSyncBody: null, lastSkills: [], issuedToken: config.token ?? null, pairingCodeUsed: config.pairingCode ?? null, lastThreadContextResults: [], masterTaskBusy: false, activeMasterTask: null, lastMasterTaskPoll: null, lastProjectDiscoveryAt: null, lastProjectDiscoveryOk: false, lastProjectDiscoverySummary: null, }; async function heartbeat() { try { const heartbeatProjects = await resolveHeartbeatProjects(config, runtime); const result = await postHeartbeat(config, runtime, heartbeatProjects); runtime.lastHeartbeatAt = new Date().toISOString(); runtime.lastHeartbeatOk = result.ok; runtime.lastHeartbeatStatus = result.status; runtime.lastHeartbeatBody = result.body; if (result.json?.token) { runtime.issuedToken = result.json.token; } if (!result.ok) { await postAppLog(config, runtime, { level: "error", category: "local_agent.heartbeat_failed", message: "local-agent 心跳返回失败。", detail: result.body, mirrorToMaster: true, }); } const snapshots = Array.isArray(config.threadContexts) ? config.threadContexts : []; runtime.lastThreadContextResults = []; for (const snapshot of snapshots) { const threadResult = await postThreadContext(config, runtime, snapshot); runtime.lastThreadContextResults.push(threadResult); if (!threadResult.ok) { await postAppLog(config, runtime, { projectId: snapshot.projectId, level: "error", category: "local_agent.thread_context_failed", message: `线程预算上报失败:${snapshot.threadId}`, detail: threadResult.body, mirrorToMaster: true, }); } } try { const skills = await discoverSkills(config); runtime.lastSkills = skills; const skillSyncResult = await postSkills(config, runtime, skills); runtime.lastSkillSyncAt = new Date().toISOString(); runtime.lastSkillSyncOk = skillSyncResult.ok; runtime.lastSkillSyncStatus = skillSyncResult.status; runtime.lastSkillSyncBody = skillSyncResult.body; } catch (error) { runtime.lastSkillSyncAt = new Date().toISOString(); runtime.lastSkillSyncOk = false; runtime.lastSkillSyncStatus = 0; runtime.lastSkillSyncBody = error instanceof Error ? error.message : String(error); await postAppLog(config, runtime, { level: "error", category: "local_agent.skills_sync_failed", message: "Skill 扫描或同步失败。", detail: runtime.lastSkillSyncBody, mirrorToMaster: true, }); } } catch (error) { runtime.lastHeartbeatAt = new Date().toISOString(); runtime.lastHeartbeatOk = false; runtime.lastHeartbeatStatus = 0; runtime.lastHeartbeatBody = error instanceof Error ? error.message : String(error); await postAppLog(config, runtime, { level: "error", category: "local_agent.heartbeat_exception", message: "local-agent 心跳执行异常。", detail: runtime.lastHeartbeatBody, mirrorToMaster: true, }); } } const server = createServer(async (request, response) => { if (request.url === "/health") { response.writeHead(200, { "Content-Type": "application/json" }); response.end( JSON.stringify({ ok: true, service: "boss-local-agent", runtime, }), ); return; } if (request.url === "/api/v1/device") { response.writeHead(200, { "Content-Type": "application/json" }); response.end(JSON.stringify({ config, runtime })); return; } if (request.url === "/api/v1/skills") { response.writeHead(200, { "Content-Type": "application/json" }); response.end( JSON.stringify({ ok: true, deviceId: config.deviceId, skills: runtime.lastSkills, sync: { at: runtime.lastSkillSyncAt, ok: runtime.lastSkillSyncOk, status: runtime.lastSkillSyncStatus, body: runtime.lastSkillSyncBody, }, }), ); return; } if (request.url === "/api/v1/heartbeat" && request.method === "POST") { await heartbeat(); response.writeHead(200, { "Content-Type": "application/json" }); response.end(JSON.stringify({ ok: runtime.lastHeartbeatOk, runtime })); return; } response.writeHead(404, { "Content-Type": "application/json" }); response.end(JSON.stringify({ ok: false, message: "not_found" })); }); server.listen(config.port, config.bindHost, () => { console.log( JSON.stringify({ ok: true, service: "boss-local-agent", bind: `${config.bindHost}:${config.port}`, controlPlaneUrl: config.controlPlaneUrl, workerId: config.workerId, }), ); }); void (async () => { await heartbeat(); await pollMasterAgentTasks(config, runtime); })(); setInterval(() => { void heartbeat(); }, config.heartbeatIntervalMs ?? 60000); setInterval(() => { void pollMasterAgentTasks(config, runtime); }, config.masterAgentPollIntervalMs ?? 3000);