import test from "node:test"; import assert from "node:assert/strict"; import fs from "node:fs/promises"; import { createServer } from "node:http"; import http from "node:http"; import os from "node:os"; import path from "node:path"; import { spawn } from "node:child_process"; import { buildCodexDesktopRefreshExecution, executeCodexDesktopRefreshBridge, getCodexDesktopRefreshBridgeConfig, } from "../local-agent/codex-desktop-refresh-bridge.mjs"; const repoRoot = path.resolve(import.meta.dirname, ".."); function runRefreshHintDryRun(payload) { return new Promise((resolve, reject) => { const child = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-refresh-hint.mjs")], { cwd: repoRoot, env: { ...process.env, BOSS_CODEX_DESKTOP_REFRESH_DRY_RUN: "true", }, stdio: ["pipe", "pipe", "pipe"], }); let stdout = ""; let stderr = ""; child.stdout.setEncoding("utf8"); child.stderr.setEncoding("utf8"); child.stdout.on("data", (chunk) => { stdout += chunk; }); child.stderr.on("data", (chunk) => { stderr += chunk; }); child.on("error", reject); child.on("close", (code) => { if (code !== 0) { reject(new Error(stderr.trim() || `refresh hint dry run exited ${code}`)); return; } resolve(JSON.parse(stdout.trim().split(/\r?\n/).at(-1))); }); child.stdin.end(JSON.stringify(payload)); }); } function closeServer(server) { return new Promise((resolve, reject) => { server.close((error) => { if (error) { reject(error); return; } resolve(); }); }); } function readRequestJson(request) { return new Promise((resolve, reject) => { let raw = ""; request.setEncoding("utf8"); request.on("data", (chunk) => { raw += chunk; }); request.on("end", () => { try { resolve(JSON.parse(raw || "{}")); } catch (error) { reject(error); } }); request.on("error", reject); }); } function listen(server) { return new Promise((resolve) => { server.listen(0, "127.0.0.1", () => { resolve(server.address()); }); }); } function waitForJsonLine(stream, timeoutMs = 4000) { return new Promise((resolve, reject) => { let buffer = ""; const timer = setTimeout(() => { cleanup(); reject(new Error("TIMED_OUT_WAITING_FOR_JSON_LINE")); }, timeoutMs); function cleanup() { clearTimeout(timer); stream.off("data", handleData); stream.off("error", handleError); } function handleError(error) { cleanup(); reject(error); } function handleData(chunk) { buffer += chunk; const lines = buffer.split(/\r?\n/); buffer = lines.pop() ?? ""; for (const line of lines) { const trimmed = line.trim(); if (!trimmed) { continue; } try { cleanup(); resolve(JSON.parse(trimmed)); return; } catch { // Ignore non-JSON startup noise. } } } stream.setEncoding("utf8"); stream.on("data", handleData); stream.on("error", handleError); }); } function subscribeToRefreshEvents(baseUrl, timeoutMs = 4000) { const events = []; let request; let response; let buffer = ""; let currentEvent = {}; let resolveNext; let nextTimer; function cleanup() { clearTimeout(nextTimer); request?.destroy(); response?.destroy(); } function finishEvent() { if (!currentEvent.event || currentEvent.event === "ready") { currentEvent = {}; return; } const event = { event: currentEvent.event, id: currentEvent.id, data: currentEvent.data ? JSON.parse(currentEvent.data) : {}, }; if (resolveNext) { const resolve = resolveNext; resolveNext = undefined; clearTimeout(nextTimer); resolve(event); } else { events.push(event); } currentEvent = {}; } function handleLine(line) { if (line === "") { finishEvent(); return; } if (line.startsWith("event:")) { currentEvent.event = line.slice("event:".length).trim(); } else if (line.startsWith("id:")) { currentEvent.id = line.slice("id:".length).trim(); } else if (line.startsWith("data:")) { currentEvent.data = `${currentEvent.data || ""}${line.slice("data:".length).trim()}`; } } const ready = new Promise((resolve, reject) => { const readyTimer = setTimeout(() => { cleanup(); reject(new Error("TIMED_OUT_WAITING_FOR_SSE_READY")); }, timeoutMs); const url = new URL("/api/v1/codex-desktop/events", baseUrl); request = http.get(url, (incoming) => { response = incoming; incoming.setEncoding("utf8"); incoming.on("data", (chunk) => { buffer += chunk; const lines = buffer.split(/\r?\n/); buffer = lines.pop() ?? ""; for (const line of lines) { if (line.startsWith("event: ready")) { clearTimeout(readyTimer); resolve(); } handleLine(line); } }); incoming.on("error", (error) => { clearTimeout(readyTimer); reject(error); }); }); request.on("error", (error) => { clearTimeout(readyTimer); reject(error); }); }); return { ready, events, nextEvent() { if (events.length > 0) { return Promise.resolve(events.shift()); } return new Promise((resolve, reject) => { resolveNext = resolve; nextTimer = setTimeout(() => { reject(new Error("TIMED_OUT_WAITING_FOR_SSE_EVENT")); }, timeoutMs); }); }, close: cleanup, }; } test("Codex desktop refresh bridge is skipped when disabled", async () => { const result = await executeCodexDesktopRefreshBridge( { targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh", rolloutPath: "/tmp/rollout.jsonl", }, { codexDesktopRefreshEnabled: false, }, ); assert.deepEqual(result, { status: "skipped", reason: "disabled", }); }); test("Codex desktop refresh bridge daemon exposes a local persistent refresh endpoint", async () => { const child = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-refresh-bridge-daemon.mjs")], { cwd: repoRoot, env: { ...process.env, BOSS_CODEX_DESKTOP_BRIDGE_HOST: "127.0.0.1", BOSS_CODEX_DESKTOP_BRIDGE_PORT: "0", BOSS_CODEX_DESKTOP_REFRESH_DRY_RUN: "true", }, stdio: ["ignore", "pipe", "pipe"], }); try { const ready = await waitForJsonLine(child.stdout); assert.equal(ready.status, "ready"); assert.equal(ready.host, "127.0.0.1"); assert.equal(typeof ready.port, "number"); const response = await fetch(`http://${ready.host}:${ready.port}/api/v1/codex-desktop/refresh`, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify({ kind: "codex_desktop_refresh_hint", targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh-daemon", appName: "Codex", refreshMode: "deeplink-reload", message: "must not be reflected", }), }); const result = await response.json(); assert.equal(response.status, 200); assert.equal(result.status, "completed"); assert.equal(result.targetThreadRef, "019d-thread-refresh"); assert.equal(result.deepLink, "codex://threads/019d-thread-refresh"); assert.doesNotMatch(result.detail, /must not be reflected/); } finally { child.kill("SIGTERM"); } }); test("Codex desktop refresh bridge daemon broadcasts safe realtime events over SSE", async () => { const child = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-refresh-bridge-daemon.mjs")], { cwd: repoRoot, env: { ...process.env, BOSS_CODEX_DESKTOP_BRIDGE_HOST: "127.0.0.1", BOSS_CODEX_DESKTOP_BRIDGE_PORT: "0", BOSS_CODEX_DESKTOP_REFRESH_DRY_RUN: "true", }, stdio: ["ignore", "pipe", "pipe"], }); let subscription; try { const ready = await waitForJsonLine(child.stdout); const baseUrl = `http://${ready.host}:${ready.port}`; subscription = subscribeToRefreshEvents(baseUrl); await subscription.ready; const response = await fetch(`${baseUrl}/api/v1/codex-desktop/refresh`, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify({ kind: "codex_desktop_refresh_hint", targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh-sse", appName: "Codex", refreshMode: "deeplink-reload", message: "must not be broadcast", executionPrompt: "must not be broadcast", }), }); assert.equal(response.status, 200); const event = await subscription.nextEvent(); assert.equal(event.event, "codex_desktop_refresh"); assert.equal(event.data.targetThreadRef, "019d-thread-refresh"); assert.equal(event.data.sourceMessageId, "msg-refresh-sse"); assert.equal(event.data.status, "completed"); assert.equal(event.data.deepLink, "codex://threads/019d-thread-refresh"); assert.equal(Object.prototype.hasOwnProperty.call(event.data, "message"), false); assert.equal(Object.prototype.hasOwnProperty.call(event.data, "executionPrompt"), false); const recentResponse = await fetch(`${baseUrl}/api/v1/codex-desktop/events/recent`); const recent = await recentResponse.json(); assert.equal(recent.ok, true); assert.equal(recent.events.at(-1).sourceMessageId, "msg-refresh-sse"); } finally { subscription?.close(); child.kill("SIGTERM"); } }); test("Codex desktop event consumer receives one safe refresh event from the local bridge", async () => { const daemon = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-refresh-bridge-daemon.mjs")], { cwd: repoRoot, env: { ...process.env, BOSS_CODEX_DESKTOP_BRIDGE_HOST: "127.0.0.1", BOSS_CODEX_DESKTOP_BRIDGE_PORT: "0", BOSS_CODEX_DESKTOP_REFRESH_DRY_RUN: "true", }, stdio: ["ignore", "pipe", "pipe"], }); let consumer; try { const ready = await waitForJsonLine(daemon.stdout); const baseUrl = `http://${ready.host}:${ready.port}`; consumer = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-event-consumer.mjs")], { cwd: repoRoot, env: { ...process.env, BOSS_CODEX_DESKTOP_EVENTS_URL: `${baseUrl}/api/v1/codex-desktop/events`, BOSS_CODEX_DESKTOP_EVENTS_ONCE: "true", }, stdio: ["ignore", "pipe", "pipe"], }); await new Promise((resolve) => setTimeout(resolve, 200)); const response = await fetch(`${baseUrl}/api/v1/codex-desktop/refresh`, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify({ kind: "codex_desktop_refresh_hint", targetThreadRef: "019d-thread-consumer", sourceMessageId: "msg-refresh-consumer", appName: "Codex", refreshMode: "deeplink-reload", message: "must not reach consumer", executionPrompt: "must not reach consumer", }), }); assert.equal(response.status, 200); const consumed = await waitForJsonLine(consumer.stdout); assert.equal(consumed.eventType, "codex_desktop_refresh"); assert.equal(consumed.targetThreadRef, "019d-thread-consumer"); assert.equal(consumed.sourceMessageId, "msg-refresh-consumer"); assert.equal(consumed.deepLink, "codex://threads/019d-thread-consumer"); assert.equal(Object.prototype.hasOwnProperty.call(consumed, "message"), false); assert.equal(Object.prototype.hasOwnProperty.call(consumed, "executionPrompt"), false); } finally { consumer?.kill("SIGTERM"); daemon.kill("SIGTERM"); } }); test("Codex desktop refresh bridge can use a persistent local endpoint without a command", async () => { const receivedPayloads = []; const server = createServer(async (request, response) => { assert.equal(request.method, "POST"); assert.equal(request.url, "/api/v1/codex-desktop/refresh"); const payload = await readRequestJson(request); receivedPayloads.push(payload); response.writeHead(200, { "Content-Type": "application/json" }); response.end( `${JSON.stringify({ status: "completed", targetThreadRef: payload.targetThreadRef, appName: payload.appName, deepLink: `codex://threads/${payload.targetThreadRef}`, detail: "persistent bridge accepted refresh hint", })}\n`, ); }); const address = await listen(server); try { const config = getCodexDesktopRefreshBridgeConfig( {}, { codexDesktopRefreshEnabled: true, codexDesktopRefreshEndpoint: `http://${address.address}:${address.port}/api/v1/codex-desktop/refresh`, codexDesktopRefreshTimeoutMs: 4000, codexDesktopRefreshAppName: "Codex", codexDesktopRefreshMode: "deeplink-reload", }, ); assert.equal(config.endpoint, `http://${address.address}:${address.port}/api/v1/codex-desktop/refresh`); const result = await executeCodexDesktopRefreshBridge( { targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh-endpoint", rolloutPath: "/tmp/rollout.jsonl", threadTouchStatus: "updated", }, config, ); assert.deepEqual(result, { status: "completed", targetThreadRef: "019d-thread-refresh", appName: "Codex", deepLink: "codex://threads/019d-thread-refresh", detail: "persistent bridge accepted refresh hint", }); assert.equal(receivedPayloads.length, 1); assert.equal(receivedPayloads[0].kind, "codex_desktop_refresh_hint"); assert.equal(receivedPayloads[0].targetThreadRef, "019d-thread-refresh"); assert.equal(Object.prototype.hasOwnProperty.call(receivedPayloads[0], "message"), false); assert.equal(Object.prototype.hasOwnProperty.call(receivedPayloads[0], "executionPrompt"), false); } finally { await closeServer(server); } }); test("Codex desktop refresh bridge falls back to command when the local endpoint is unavailable", async () => { const config = getCodexDesktopRefreshBridgeConfig( {}, { codexDesktopRefreshEnabled: true, codexDesktopRefreshEndpoint: "http://127.0.0.1:9/api/v1/codex-desktop/refresh", codexDesktopRefreshCommand: process.execPath, codexDesktopRefreshArgs: ["tests/fixtures/codex-desktop-refresh-runtime.mjs"], codexDesktopRefreshWorkdir: repoRoot, codexDesktopRefreshTimeoutMs: 4000, codexDesktopRefreshRetryCount: 0, codexDesktopRefreshRetryDelayMs: 1, codexDesktopRefreshAppName: "Codex", codexDesktopRefreshMode: "deeplink-reload", }, ); const result = await executeCodexDesktopRefreshBridge( { targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh-fallback", rolloutPath: "/tmp/rollout.jsonl", threadTouchStatus: "updated", }, config, ); assert.deepEqual(result, { status: "completed", targetThreadRef: "019d-thread-refresh", appName: "Codex", detail: "refresh hint accepted: deeplink-reload", }); }); test("Codex desktop refresh bridge sends a safe refresh hint to the configured runtime", async () => { const config = getCodexDesktopRefreshBridgeConfig( {}, { codexDesktopRefreshEnabled: true, codexDesktopRefreshCommand: process.execPath, codexDesktopRefreshArgs: ["tests/fixtures/codex-desktop-refresh-runtime.mjs"], codexDesktopRefreshWorkdir: repoRoot, codexDesktopRefreshTimeoutMs: 4000, codexDesktopRefreshAppName: "Codex", codexDesktopRefreshMode: "deeplink-reload", }, ); const execution = buildCodexDesktopRefreshExecution(config, { targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh", rolloutPath: "/tmp/rollout.jsonl", threadTouchStatus: "updated", }); assert.equal(execution.command, process.execPath); assert.deepEqual(execution.args, [path.join(repoRoot, "tests/fixtures/codex-desktop-refresh-runtime.mjs")]); assert.equal(execution.stdinPayload.kind, "codex_desktop_refresh_hint"); assert.equal(execution.stdinPayload.targetThreadRef, "019d-thread-refresh"); assert.equal(execution.stdinPayload.sourceMessageId, "msg-refresh"); assert.equal(execution.stdinPayload.rolloutPath, "/tmp/rollout.jsonl"); assert.equal(execution.stdinPayload.appName, "Codex"); assert.equal(execution.stdinPayload.refreshMode, "deeplink-reload"); assert.equal(Object.prototype.hasOwnProperty.call(execution.stdinPayload, "message"), false); assert.equal(Object.prototype.hasOwnProperty.call(execution.stdinPayload, "executionPrompt"), false); const result = await executeCodexDesktopRefreshBridge( { targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh", rolloutPath: "/tmp/rollout.jsonl", threadTouchStatus: "updated", }, config, ); assert.deepEqual(result, { status: "completed", targetThreadRef: "019d-thread-refresh", appName: "Codex", detail: "refresh hint accepted: deeplink-reload", }); }); test("Codex desktop refresh bridge retries a failed runtime and reports attempt count", async () => { const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "boss-codex-refresh-")); const stateFile = path.join(tempDir, "flaky-state.json"); const previousStateFile = process.env.BOSS_CODEX_REFRESH_FLAKY_STATE; process.env.BOSS_CODEX_REFRESH_FLAKY_STATE = stateFile; try { const config = getCodexDesktopRefreshBridgeConfig( { BOSS_CODEX_DESKTOP_REFRESH_RETRY_COUNT: "2", BOSS_CODEX_DESKTOP_REFRESH_RETRY_DELAY_MS: "1", }, { codexDesktopRefreshEnabled: true, codexDesktopRefreshCommand: process.execPath, codexDesktopRefreshArgs: ["tests/fixtures/codex-desktop-refresh-flaky-runtime.mjs"], codexDesktopRefreshWorkdir: repoRoot, codexDesktopRefreshTimeoutMs: 4000, codexDesktopRefreshAppName: "Codex", codexDesktopRefreshMode: "deeplink-reload", }, ); assert.equal(config.retryCount, 2); assert.equal(config.retryDelayMs, 1); const result = await executeCodexDesktopRefreshBridge( { targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh-flaky", rolloutPath: "/tmp/rollout.jsonl", threadTouchStatus: "updated", }, config, ); assert.deepEqual(result, { status: "completed", targetThreadRef: "019d-thread-refresh", appName: "Codex", deepLink: "codex://threads/019d-thread-refresh", detail: "flaky refresh accepted after 2 attempts", attemptCount: 2, }); const state = JSON.parse(await fs.readFile(stateFile, "utf8")); assert.equal(state.count, 2); } finally { if (previousStateFile === undefined) { delete process.env.BOSS_CODEX_REFRESH_FLAKY_STATE; } else { process.env.BOSS_CODEX_REFRESH_FLAKY_STATE = previousStateFile; } await fs.rm(tempDir, { recursive: true, force: true }); } }); test("Codex desktop refresh hint can target a concrete desktop thread deeplink without sending message text", async () => { const result = await runRefreshHintDryRun({ kind: "codex_desktop_refresh_hint", targetThreadRef: "019d-thread-refresh", sourceMessageId: "msg-refresh", appName: "Codex", refreshMode: "deeplink-reload", message: "this must not be interpreted by the desktop bridge", }); assert.equal(result.status, "completed"); assert.equal(result.targetThreadRef, "019d-thread-refresh"); assert.equal(result.deepLink, "codex://threads/019d-thread-refresh"); assert.match(result.detail, /would open codex:\/\/threads\/019d-thread-refresh/); assert.doesNotMatch(result.detail, /this must not be interpreted/); });