Files
boss/tests/local-agent-codex-desktop-refresh-bridge.test.mjs

621 lines
20 KiB
JavaScript

import test from "node:test";
import assert from "node:assert/strict";
import fs from "node:fs/promises";
import { createServer } from "node:http";
import http from "node:http";
import os from "node:os";
import path from "node:path";
import { spawn } from "node:child_process";
import {
buildCodexDesktopRefreshExecution,
executeCodexDesktopRefreshBridge,
getCodexDesktopRefreshBridgeConfig,
} from "../local-agent/codex-desktop-refresh-bridge.mjs";
const repoRoot = path.resolve(import.meta.dirname, "..");
function runRefreshHintDryRun(payload) {
return new Promise((resolve, reject) => {
const child = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-refresh-hint.mjs")], {
cwd: repoRoot,
env: {
...process.env,
BOSS_CODEX_DESKTOP_REFRESH_DRY_RUN: "true",
},
stdio: ["pipe", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
child.stdout.setEncoding("utf8");
child.stderr.setEncoding("utf8");
child.stdout.on("data", (chunk) => {
stdout += chunk;
});
child.stderr.on("data", (chunk) => {
stderr += chunk;
});
child.on("error", reject);
child.on("close", (code) => {
if (code !== 0) {
reject(new Error(stderr.trim() || `refresh hint dry run exited ${code}`));
return;
}
resolve(JSON.parse(stdout.trim().split(/\r?\n/).at(-1)));
});
child.stdin.end(JSON.stringify(payload));
});
}
function closeServer(server) {
return new Promise((resolve, reject) => {
server.close((error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}
function readRequestJson(request) {
return new Promise((resolve, reject) => {
let raw = "";
request.setEncoding("utf8");
request.on("data", (chunk) => {
raw += chunk;
});
request.on("end", () => {
try {
resolve(JSON.parse(raw || "{}"));
} catch (error) {
reject(error);
}
});
request.on("error", reject);
});
}
function listen(server) {
return new Promise((resolve) => {
server.listen(0, "127.0.0.1", () => {
resolve(server.address());
});
});
}
function waitForJsonLine(stream, timeoutMs = 4000) {
return new Promise((resolve, reject) => {
let buffer = "";
const timer = setTimeout(() => {
cleanup();
reject(new Error("TIMED_OUT_WAITING_FOR_JSON_LINE"));
}, timeoutMs);
function cleanup() {
clearTimeout(timer);
stream.off("data", handleData);
stream.off("error", handleError);
}
function handleError(error) {
cleanup();
reject(error);
}
function handleData(chunk) {
buffer += chunk;
const lines = buffer.split(/\r?\n/);
buffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
try {
cleanup();
resolve(JSON.parse(trimmed));
return;
} catch {
// Ignore non-JSON startup noise.
}
}
}
stream.setEncoding("utf8");
stream.on("data", handleData);
stream.on("error", handleError);
});
}
function subscribeToRefreshEvents(baseUrl, timeoutMs = 4000) {
const events = [];
let request;
let response;
let buffer = "";
let currentEvent = {};
let resolveNext;
let nextTimer;
function cleanup() {
clearTimeout(nextTimer);
request?.destroy();
response?.destroy();
}
function finishEvent() {
if (!currentEvent.event || currentEvent.event === "ready") {
currentEvent = {};
return;
}
const event = {
event: currentEvent.event,
id: currentEvent.id,
data: currentEvent.data ? JSON.parse(currentEvent.data) : {},
};
if (resolveNext) {
const resolve = resolveNext;
resolveNext = undefined;
clearTimeout(nextTimer);
resolve(event);
} else {
events.push(event);
}
currentEvent = {};
}
function handleLine(line) {
if (line === "") {
finishEvent();
return;
}
if (line.startsWith("event:")) {
currentEvent.event = line.slice("event:".length).trim();
} else if (line.startsWith("id:")) {
currentEvent.id = line.slice("id:".length).trim();
} else if (line.startsWith("data:")) {
currentEvent.data = `${currentEvent.data || ""}${line.slice("data:".length).trim()}`;
}
}
const ready = new Promise((resolve, reject) => {
const readyTimer = setTimeout(() => {
cleanup();
reject(new Error("TIMED_OUT_WAITING_FOR_SSE_READY"));
}, timeoutMs);
const url = new URL("/api/v1/codex-desktop/events", baseUrl);
request = http.get(url, (incoming) => {
response = incoming;
incoming.setEncoding("utf8");
incoming.on("data", (chunk) => {
buffer += chunk;
const lines = buffer.split(/\r?\n/);
buffer = lines.pop() ?? "";
for (const line of lines) {
if (line.startsWith("event: ready")) {
clearTimeout(readyTimer);
resolve();
}
handleLine(line);
}
});
incoming.on("error", (error) => {
clearTimeout(readyTimer);
reject(error);
});
});
request.on("error", (error) => {
clearTimeout(readyTimer);
reject(error);
});
});
return {
ready,
events,
nextEvent() {
if (events.length > 0) {
return Promise.resolve(events.shift());
}
return new Promise((resolve, reject) => {
resolveNext = resolve;
nextTimer = setTimeout(() => {
reject(new Error("TIMED_OUT_WAITING_FOR_SSE_EVENT"));
}, timeoutMs);
});
},
close: cleanup,
};
}
test("Codex desktop refresh bridge is skipped when disabled", async () => {
const result = await executeCodexDesktopRefreshBridge(
{
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh",
rolloutPath: "/tmp/rollout.jsonl",
},
{
codexDesktopRefreshEnabled: false,
},
);
assert.deepEqual(result, {
status: "skipped",
reason: "disabled",
});
});
test("Codex desktop refresh bridge daemon exposes a local persistent refresh endpoint", async () => {
const child = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-refresh-bridge-daemon.mjs")], {
cwd: repoRoot,
env: {
...process.env,
BOSS_CODEX_DESKTOP_BRIDGE_HOST: "127.0.0.1",
BOSS_CODEX_DESKTOP_BRIDGE_PORT: "0",
BOSS_CODEX_DESKTOP_REFRESH_DRY_RUN: "true",
},
stdio: ["ignore", "pipe", "pipe"],
});
try {
const ready = await waitForJsonLine(child.stdout);
assert.equal(ready.status, "ready");
assert.equal(ready.host, "127.0.0.1");
assert.equal(typeof ready.port, "number");
const response = await fetch(`http://${ready.host}:${ready.port}/api/v1/codex-desktop/refresh`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
kind: "codex_desktop_refresh_hint",
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh-daemon",
appName: "Codex",
refreshMode: "deeplink-reload",
message: "must not be reflected",
}),
});
const result = await response.json();
assert.equal(response.status, 200);
assert.equal(result.status, "completed");
assert.equal(result.targetThreadRef, "019d-thread-refresh");
assert.equal(result.deepLink, "codex://threads/019d-thread-refresh");
assert.doesNotMatch(result.detail, /must not be reflected/);
} finally {
child.kill("SIGTERM");
}
});
test("Codex desktop refresh bridge daemon broadcasts safe realtime events over SSE", async () => {
const child = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-refresh-bridge-daemon.mjs")], {
cwd: repoRoot,
env: {
...process.env,
BOSS_CODEX_DESKTOP_BRIDGE_HOST: "127.0.0.1",
BOSS_CODEX_DESKTOP_BRIDGE_PORT: "0",
BOSS_CODEX_DESKTOP_REFRESH_DRY_RUN: "true",
},
stdio: ["ignore", "pipe", "pipe"],
});
let subscription;
try {
const ready = await waitForJsonLine(child.stdout);
const baseUrl = `http://${ready.host}:${ready.port}`;
subscription = subscribeToRefreshEvents(baseUrl);
await subscription.ready;
const response = await fetch(`${baseUrl}/api/v1/codex-desktop/refresh`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
kind: "codex_desktop_refresh_hint",
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh-sse",
appName: "Codex",
refreshMode: "deeplink-reload",
message: "must not be broadcast",
executionPrompt: "must not be broadcast",
}),
});
assert.equal(response.status, 200);
const event = await subscription.nextEvent();
assert.equal(event.event, "codex_desktop_refresh");
assert.equal(event.data.targetThreadRef, "019d-thread-refresh");
assert.equal(event.data.sourceMessageId, "msg-refresh-sse");
assert.equal(event.data.status, "completed");
assert.equal(event.data.deepLink, "codex://threads/019d-thread-refresh");
assert.equal(Object.prototype.hasOwnProperty.call(event.data, "message"), false);
assert.equal(Object.prototype.hasOwnProperty.call(event.data, "executionPrompt"), false);
const recentResponse = await fetch(`${baseUrl}/api/v1/codex-desktop/events/recent`);
const recent = await recentResponse.json();
assert.equal(recent.ok, true);
assert.equal(recent.events.at(-1).sourceMessageId, "msg-refresh-sse");
} finally {
subscription?.close();
child.kill("SIGTERM");
}
});
test("Codex desktop event consumer receives one safe refresh event from the local bridge", async () => {
const daemon = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-refresh-bridge-daemon.mjs")], {
cwd: repoRoot,
env: {
...process.env,
BOSS_CODEX_DESKTOP_BRIDGE_HOST: "127.0.0.1",
BOSS_CODEX_DESKTOP_BRIDGE_PORT: "0",
BOSS_CODEX_DESKTOP_REFRESH_DRY_RUN: "true",
},
stdio: ["ignore", "pipe", "pipe"],
});
let consumer;
try {
const ready = await waitForJsonLine(daemon.stdout);
const baseUrl = `http://${ready.host}:${ready.port}`;
consumer = spawn(process.execPath, [path.join(repoRoot, "scripts/codex-desktop-event-consumer.mjs")], {
cwd: repoRoot,
env: {
...process.env,
BOSS_CODEX_DESKTOP_EVENTS_URL: `${baseUrl}/api/v1/codex-desktop/events`,
BOSS_CODEX_DESKTOP_EVENTS_ONCE: "true",
},
stdio: ["ignore", "pipe", "pipe"],
});
await new Promise((resolve) => setTimeout(resolve, 200));
const response = await fetch(`${baseUrl}/api/v1/codex-desktop/refresh`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
kind: "codex_desktop_refresh_hint",
targetThreadRef: "019d-thread-consumer",
sourceMessageId: "msg-refresh-consumer",
appName: "Codex",
refreshMode: "deeplink-reload",
message: "must not reach consumer",
executionPrompt: "must not reach consumer",
}),
});
assert.equal(response.status, 200);
const consumed = await waitForJsonLine(consumer.stdout);
assert.equal(consumed.eventType, "codex_desktop_refresh");
assert.equal(consumed.targetThreadRef, "019d-thread-consumer");
assert.equal(consumed.sourceMessageId, "msg-refresh-consumer");
assert.equal(consumed.deepLink, "codex://threads/019d-thread-consumer");
assert.equal(Object.prototype.hasOwnProperty.call(consumed, "message"), false);
assert.equal(Object.prototype.hasOwnProperty.call(consumed, "executionPrompt"), false);
} finally {
consumer?.kill("SIGTERM");
daemon.kill("SIGTERM");
}
});
test("Codex desktop refresh bridge can use a persistent local endpoint without a command", async () => {
const receivedPayloads = [];
const server = createServer(async (request, response) => {
assert.equal(request.method, "POST");
assert.equal(request.url, "/api/v1/codex-desktop/refresh");
const payload = await readRequestJson(request);
receivedPayloads.push(payload);
response.writeHead(200, { "Content-Type": "application/json" });
response.end(
`${JSON.stringify({
status: "completed",
targetThreadRef: payload.targetThreadRef,
appName: payload.appName,
deepLink: `codex://threads/${payload.targetThreadRef}`,
detail: "persistent bridge accepted refresh hint",
})}\n`,
);
});
const address = await listen(server);
try {
const config = getCodexDesktopRefreshBridgeConfig(
{},
{
codexDesktopRefreshEnabled: true,
codexDesktopRefreshEndpoint: `http://${address.address}:${address.port}/api/v1/codex-desktop/refresh`,
codexDesktopRefreshTimeoutMs: 4000,
codexDesktopRefreshAppName: "Codex",
codexDesktopRefreshMode: "deeplink-reload",
},
);
assert.equal(config.endpoint, `http://${address.address}:${address.port}/api/v1/codex-desktop/refresh`);
const result = await executeCodexDesktopRefreshBridge(
{
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh-endpoint",
rolloutPath: "/tmp/rollout.jsonl",
threadTouchStatus: "updated",
},
config,
);
assert.deepEqual(result, {
status: "completed",
targetThreadRef: "019d-thread-refresh",
appName: "Codex",
deepLink: "codex://threads/019d-thread-refresh",
detail: "persistent bridge accepted refresh hint",
});
assert.equal(receivedPayloads.length, 1);
assert.equal(receivedPayloads[0].kind, "codex_desktop_refresh_hint");
assert.equal(receivedPayloads[0].targetThreadRef, "019d-thread-refresh");
assert.equal(Object.prototype.hasOwnProperty.call(receivedPayloads[0], "message"), false);
assert.equal(Object.prototype.hasOwnProperty.call(receivedPayloads[0], "executionPrompt"), false);
} finally {
await closeServer(server);
}
});
test("Codex desktop refresh bridge falls back to command when the local endpoint is unavailable", async () => {
const config = getCodexDesktopRefreshBridgeConfig(
{},
{
codexDesktopRefreshEnabled: true,
codexDesktopRefreshEndpoint: "http://127.0.0.1:9/api/v1/codex-desktop/refresh",
codexDesktopRefreshCommand: process.execPath,
codexDesktopRefreshArgs: ["tests/fixtures/codex-desktop-refresh-runtime.mjs"],
codexDesktopRefreshWorkdir: repoRoot,
codexDesktopRefreshTimeoutMs: 4000,
codexDesktopRefreshRetryCount: 0,
codexDesktopRefreshRetryDelayMs: 1,
codexDesktopRefreshAppName: "Codex",
codexDesktopRefreshMode: "deeplink-reload",
},
);
const result = await executeCodexDesktopRefreshBridge(
{
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh-fallback",
rolloutPath: "/tmp/rollout.jsonl",
threadTouchStatus: "updated",
},
config,
);
assert.deepEqual(result, {
status: "completed",
targetThreadRef: "019d-thread-refresh",
appName: "Codex",
detail: "refresh hint accepted: deeplink-reload",
});
});
test("Codex desktop refresh bridge sends a safe refresh hint to the configured runtime", async () => {
const config = getCodexDesktopRefreshBridgeConfig(
{},
{
codexDesktopRefreshEnabled: true,
codexDesktopRefreshCommand: process.execPath,
codexDesktopRefreshArgs: ["tests/fixtures/codex-desktop-refresh-runtime.mjs"],
codexDesktopRefreshWorkdir: repoRoot,
codexDesktopRefreshTimeoutMs: 4000,
codexDesktopRefreshAppName: "Codex",
codexDesktopRefreshMode: "deeplink-reload",
},
);
const execution = buildCodexDesktopRefreshExecution(config, {
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh",
rolloutPath: "/tmp/rollout.jsonl",
threadTouchStatus: "updated",
});
assert.equal(execution.command, process.execPath);
assert.deepEqual(execution.args, [path.join(repoRoot, "tests/fixtures/codex-desktop-refresh-runtime.mjs")]);
assert.equal(execution.stdinPayload.kind, "codex_desktop_refresh_hint");
assert.equal(execution.stdinPayload.targetThreadRef, "019d-thread-refresh");
assert.equal(execution.stdinPayload.sourceMessageId, "msg-refresh");
assert.equal(execution.stdinPayload.rolloutPath, "/tmp/rollout.jsonl");
assert.equal(execution.stdinPayload.appName, "Codex");
assert.equal(execution.stdinPayload.refreshMode, "deeplink-reload");
assert.equal(Object.prototype.hasOwnProperty.call(execution.stdinPayload, "message"), false);
assert.equal(Object.prototype.hasOwnProperty.call(execution.stdinPayload, "executionPrompt"), false);
const result = await executeCodexDesktopRefreshBridge(
{
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh",
rolloutPath: "/tmp/rollout.jsonl",
threadTouchStatus: "updated",
},
config,
);
assert.deepEqual(result, {
status: "completed",
targetThreadRef: "019d-thread-refresh",
appName: "Codex",
detail: "refresh hint accepted: deeplink-reload",
});
});
test("Codex desktop refresh bridge retries a failed runtime and reports attempt count", async () => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "boss-codex-refresh-"));
const stateFile = path.join(tempDir, "flaky-state.json");
const previousStateFile = process.env.BOSS_CODEX_REFRESH_FLAKY_STATE;
process.env.BOSS_CODEX_REFRESH_FLAKY_STATE = stateFile;
try {
const config = getCodexDesktopRefreshBridgeConfig(
{
BOSS_CODEX_DESKTOP_REFRESH_RETRY_COUNT: "2",
BOSS_CODEX_DESKTOP_REFRESH_RETRY_DELAY_MS: "1",
},
{
codexDesktopRefreshEnabled: true,
codexDesktopRefreshCommand: process.execPath,
codexDesktopRefreshArgs: ["tests/fixtures/codex-desktop-refresh-flaky-runtime.mjs"],
codexDesktopRefreshWorkdir: repoRoot,
codexDesktopRefreshTimeoutMs: 4000,
codexDesktopRefreshAppName: "Codex",
codexDesktopRefreshMode: "deeplink-reload",
},
);
assert.equal(config.retryCount, 2);
assert.equal(config.retryDelayMs, 1);
const result = await executeCodexDesktopRefreshBridge(
{
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh-flaky",
rolloutPath: "/tmp/rollout.jsonl",
threadTouchStatus: "updated",
},
config,
);
assert.deepEqual(result, {
status: "completed",
targetThreadRef: "019d-thread-refresh",
appName: "Codex",
deepLink: "codex://threads/019d-thread-refresh",
detail: "flaky refresh accepted after 2 attempts",
attemptCount: 2,
});
const state = JSON.parse(await fs.readFile(stateFile, "utf8"));
assert.equal(state.count, 2);
} finally {
if (previousStateFile === undefined) {
delete process.env.BOSS_CODEX_REFRESH_FLAKY_STATE;
} else {
process.env.BOSS_CODEX_REFRESH_FLAKY_STATE = previousStateFile;
}
await fs.rm(tempDir, { recursive: true, force: true });
}
});
test("Codex desktop refresh hint can target a concrete desktop thread deeplink without sending message text", async () => {
const result = await runRefreshHintDryRun({
kind: "codex_desktop_refresh_hint",
targetThreadRef: "019d-thread-refresh",
sourceMessageId: "msg-refresh",
appName: "Codex",
refreshMode: "deeplink-reload",
message: "this must not be interpreted by the desktop bridge",
});
assert.equal(result.status, "completed");
assert.equal(result.targetThreadRef, "019d-thread-refresh");
assert.equal(result.deepLink, "codex://threads/019d-thread-refresh");
assert.match(result.detail, /would open codex:\/\/threads\/019d-thread-refresh/);
assert.doesNotMatch(result.detail, /this must not be interpreted/);
});