import test from "node:test"; import assert from "node:assert/strict"; import { spawn } from "node:child_process"; import { createServer } from "node:http"; import { chmod, mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; import { DatabaseSync } from "node:sqlite"; import os from "node:os"; import path from "node:path"; import { fileURLToPath } from "node:url"; const repoRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), ".."); function listen(server, host = "127.0.0.1") { return new Promise((resolve, reject) => { server.once("error", reject); server.listen(0, host, () => { server.off("error", reject); resolve(server.address().port); }); }); } function readJsonBody(request) { return new Promise((resolve, reject) => { let raw = ""; request.setEncoding("utf8"); request.on("data", (chunk) => { raw += chunk; }); request.on("end", () => { try { resolve(raw ? JSON.parse(raw) : {}); } catch (error) { reject(error); } }); request.on("error", reject); }); } async function waitFor(predicate, timeoutMs = 8000) { const started = Date.now(); while (Date.now() - started < timeoutMs) { if (await predicate()) return; await new Promise((resolve) => setTimeout(resolve, 50)); } throw new Error("waitFor timeout"); } async function createCodexStateDb(runtimeRoot, thread) { const dbPath = path.join(runtimeRoot, "state_5.sqlite"); const db = new DatabaseSync(dbPath); db.exec(` CREATE TABLE threads ( id TEXT PRIMARY KEY, rollout_path TEXT NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, source TEXT NOT NULL, model_provider TEXT NOT NULL, cwd TEXT NOT NULL, title TEXT NOT NULL, sandbox_policy TEXT NOT NULL, approval_mode TEXT NOT NULL, tokens_used INTEGER NOT NULL DEFAULT 0, has_user_event INTEGER NOT NULL DEFAULT 0, archived INTEGER NOT NULL DEFAULT 0 ); `); db.prepare(` INSERT INTO threads ( id, rollout_path, created_at, updated_at, source, model_provider, cwd, title, sandbox_policy, approval_mode, tokens_used, has_user_event, archived ) VALUES (?, ?, 1774845600, 1774845618, 'desktop', 'openai', ?, ?, '{"type":"workspace-write"}', 'never', 0, 1, 0) `).run(thread.id, path.join(runtimeRoot, `${thread.id}.jsonl`), thread.cwd, thread.title); db.close(); return dbPath; } test("local-agent uses Codex App Server for conversation replies when feature flag is enabled", async () => { const runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-local-agent-app-server-flow-")); const skillsDir = path.join(runtimeRoot, "skills"); const projectDir = path.join(runtimeRoot, "project"); const fakeBinDir = path.join(runtimeRoot, "bin"); await mkdir(skillsDir, { recursive: true }); await mkdir(projectDir, { recursive: true }); await mkdir(fakeBinDir, { recursive: true }); const stateDbPath = await createCodexStateDb(runtimeRoot, { id: "019d-app-server-thread", cwd: projectDir, title: "App Server Thread", }); const fakeCodexPath = path.join(fakeBinDir, "codex"); await writeFile( fakeCodexPath, `#!/usr/bin/env node const fs = require("node:fs"); const outputIndex = process.argv.indexOf("-o"); if (outputIndex >= 0) fs.writeFileSync(process.argv[outputIndex + 1], "CLI_FALLBACK_USED"); process.exit(0); `, "utf8", ); await chmod(fakeCodexPath, 0o755); const completeBodies = []; const progressBodies = []; let claimCount = 0; const controlPlane = createServer(async (request, response) => { const url = request.url || ""; if (request.method === "POST" && url === "/api/v1/master-agent/tasks/claim") { claimCount += 1; await readJsonBody(request); response.writeHead(200, { "content-type": "application/json" }); response.end( JSON.stringify({ ok: true, task: claimCount === 1 ? { taskId: "conversation-app-server-task", taskType: "conversation_reply", projectId: "app-server-project", requestMessageId: "msg-app-server", requestText: "继续开发", executionPrompt: "用 app server 回复", requestedByAccount: "krisolo", targetCodexThreadRef: "019d-app-server-thread", targetCodexFolderRef: projectDir, requestedAt: "2026-05-16T10:00:00.000Z", } : null, }), ); return; } if ( request.method === "POST" && url === "/api/v1/master-agent/tasks/conversation-app-server-task/progress" ) { progressBodies.push(await readJsonBody(request)); response.writeHead(200, { "content-type": "application/json" }); response.end(JSON.stringify({ ok: true })); return; } if ( request.method === "POST" && url === "/api/v1/master-agent/tasks/conversation-app-server-task/complete" ) { completeBodies.push(await readJsonBody(request)); response.writeHead(200, { "content-type": "application/json" }); response.end(JSON.stringify({ ok: true })); return; } if (request.method === "POST" && url === "/api/device-heartbeat") { response.writeHead(200, { "content-type": "application/json" }); response.end(JSON.stringify({ ok: true, token: "server-token" })); return; } if (request.method === "POST" && url === "/api/v1/app-logs") { response.writeHead(200, { "content-type": "application/json" }); response.end(JSON.stringify({ ok: true })); return; } if (request.method === "POST" && url === "/api/v1/devices/mac-studio/skills") { response.writeHead(200, { "content-type": "application/json" }); response.end(JSON.stringify({ ok: true })); return; } response.writeHead(404, { "content-type": "application/json" }); response.end(JSON.stringify({ ok: false, message: "not_found", url })); }); const controlPort = await listen(controlPlane); const agentServer = createServer(); const agentPort = await listen(agentServer); await new Promise((resolve) => agentServer.close(resolve)); const configPath = path.join(runtimeRoot, "local-agent-config.json"); await writeFile( configPath, JSON.stringify({ port: agentPort, bindHost: "127.0.0.1", controlPlaneUrl: `http://127.0.0.1:${controlPort}`, deviceId: "mac-studio", token: "local-token", name: "Mac Studio", account: "krisolo", status: "online", codexSessionDiscoveryEnabled: false, codexStateDbPath: stateDbPath, skillsDir, masterAgentEnabled: true, masterAgentPollIntervalMs: 60_000, heartbeatIntervalMs: 60_000, skillLifecycleEnabled: false, masterAgentWorkdir: projectDir, masterAgentModel: "gpt-5.4", codexAppServerEnabled: true, codexAppServerCommand: process.execPath, codexAppServerArgs: ["tests/fixtures/codex-app-server-runtime.mjs"], codexAppServerWorkdir: repoRoot, codexAppServerTimeoutMs: 5000, }), "utf8", ); const child = spawn(process.execPath, ["local-agent/server.mjs", configPath], { cwd: repoRoot, env: { ...process.env, BOSS_CODEX_APP_SERVER_FIXTURE_EMIT_PROGRESS: "1", PATH: `${fakeBinDir}${path.delimiter}${process.env.PATH || ""}`, }, stdio: ["ignore", "pipe", "pipe"], }); try { await waitFor(() => completeBodies.length > 0); const body = completeBodies.at(0); assert.equal(body.status, "completed"); assert.equal(body.replyBody, "APP_SERVER_REPLY:用 app server 回复"); assert.notEqual(body.replyBody, "CLI_FALLBACK_USED"); assert.ok(progressBodies.length >= 1, "expected local-agent to stream app-server progress before completion"); assert.equal(progressBodies[0].status, "running"); assert.equal(progressBodies[0].executionProgress.steps[0].text, "读取 Codex 官方 app-server 协议"); assert.ok( progressBodies.some((progress) => progress.executionProgress?.branch?.additions === 181), "expected one streamed progress update to include diff stats", ); assert.equal(body.executionProgress.steps[0].text, "读取 Codex 官方 app-server 协议"); assert.equal(body.executionProgress.branch.additions, 181); assert.equal(body.executionProgress.artifacts[0].label, "codex-app-server-protocol-0.135.0.json"); } finally { child.kill("SIGTERM"); await new Promise((resolve) => { child.once("close", resolve); }).catch(() => null); controlPlane.close(); await rm(runtimeRoot, { recursive: true, force: true }); } });