import test from "node:test"; import assert from "node:assert/strict"; import { createServer } from "node:http"; import { mkdtemp, readFile, rm } from "node:fs/promises"; import os from "node:os"; import { join } from "node:path"; import { appendReliableOutboxRecord, postThroughReliableOutbox, replayReliableOutbox, resolveReliableOutboxPath, } from "./reliable-outbox.mjs"; async function withServer(handler, run) { const server = createServer(handler); await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)); const address = server.address(); const baseUrl = `http://127.0.0.1:${address.port}`; try { return await run(baseUrl); } finally { await new Promise((resolve) => server.close(resolve)); } } async function readRecords(outboxPath) { const parsed = JSON.parse(await readFile(outboxPath, "utf8")); return parsed.records; } test("postThroughReliableOutbox removes a record after successful send", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-success-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json") }; let received = 0; await withServer((request, response) => { received += 1; response.writeHead(200, { "Content-Type": "application/json" }); response.end(JSON.stringify({ ok: true })); }, async (baseUrl) => { const result = await postThroughReliableOutbox(config, { kind: "task.progress", url: `${baseUrl}/progress`, headers: { "Content-Type": "application/json" }, body: { ok: true }, }); assert.equal(result.ok, true); assert.equal(received, 1); assert.deepEqual(await readRecords(resolveReliableOutboxPath(config)), []); }); await rm(root, { recursive: true, force: true }); }); test("postThroughReliableOutbox times out stalled requests and keeps them retryable", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-timeout-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json"), reliableOutboxRequestTimeoutMs: 20, }; await withServer(() => { // Intentionally leave the request open to simulate a stalled network write. }, async (baseUrl) => { const started = Date.now(); const result = await postThroughReliableOutbox(config, { kind: "task.progress", url: `${baseUrl}/stall`, headers: { "Content-Type": "application/json" }, body: { ok: true }, }); assert.equal(result.ok, false); assert.equal(result.retryable, true); assert.equal(result.status, 0); assert.match(result.body, /RELIABLE_OUTBOX_SEND_TIMEOUT|aborted/i); assert.ok(Date.now() - started < 1_000); assert.equal((await readRecords(resolveReliableOutboxPath(config))).length, 1); }); await rm(root, { recursive: true, force: true }); }); test("replayReliableOutbox respects a replay duration budget", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-budget-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json"), }; await withServer(() => { // Keep every replay request pending; the replay budget must stop the loop. }, async (baseUrl) => { const outboxPath = resolveReliableOutboxPath(config); for (let index = 0; index < 3; index += 1) { await appendReliableOutboxRecord(outboxPath, { kind: "task.progress", url: `${baseUrl}/stall-${index}`, body: { index }, }); } const started = Date.now(); const replay = await replayReliableOutbox(config, { limit: 3, requestTimeoutMs: 30, maxDurationMs: 50, }); assert.ok(Date.now() - started < 1_000); assert.ok(replay.attempted >= 1); assert.ok(replay.attempted < 3); assert.equal(replay.sent, 0); assert.equal(replay.retained, 3); assert.equal(replay.stoppedByBudget, true); }); await rm(root, { recursive: true, force: true }); }); test("postThroughReliableOutbox retains retryable failures and replay clears them", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-retry-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json") }; let fail = true; let received = 0; await withServer((request, response) => { received += 1; if (fail) { response.writeHead(503, { "Content-Type": "text/plain" }); response.end("temporary failure"); return; } response.writeHead(200, { "Content-Type": "application/json" }); response.end(JSON.stringify({ ok: true })); }, async (baseUrl) => { const first = await postThroughReliableOutbox(config, { kind: "task.complete", url: `${baseUrl}/complete`, headers: { "Content-Type": "application/json" }, body: { taskId: "task-1" }, }); assert.equal(first.ok, false); assert.equal(first.retryable, true); assert.equal((await readRecords(resolveReliableOutboxPath(config))).length, 1); fail = false; const replay = await replayReliableOutbox(config); assert.equal(replay.attempted, 1); assert.equal(replay.sent, 1); assert.equal(replay.retained, 0); assert.equal(received, 2); }); await rm(root, { recursive: true, force: true }); }); test("replayReliableOutbox prioritizes task completion over progress and app logs", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-priority-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json"), }; const received = []; await withServer((request, response) => { received.push(request.url); response.writeHead(200, { "Content-Type": "application/json" }); response.end(JSON.stringify({ ok: true })); }, async (baseUrl) => { const outboxPath = resolveReliableOutboxPath(config); await appendReliableOutboxRecord(outboxPath, { kind: "task.progress", url: `${baseUrl}/progress`, body: { taskId: "task-2" }, }); await appendReliableOutboxRecord(outboxPath, { kind: "app.log", url: `${baseUrl}/app-log`, body: { category: "noise" }, }); await appendReliableOutboxRecord(outboxPath, { kind: "task.complete", url: `${baseUrl}/complete`, body: { taskId: "task-1" }, }); const replay = await replayReliableOutbox(config, { limit: 1 }); assert.equal(replay.attempted, 1); assert.deepEqual(received, ["/complete"]); const retained = await readRecords(outboxPath); assert.equal(retained.some((record) => record.kind === "task.complete"), false); assert.equal(retained.some((record) => record.kind === "task.progress"), true); assert.equal(retained.some((record) => record.kind === "app.log"), true); }); await rm(root, { recursive: true, force: true }); }); test("reliable outbox compaction preserves pending task completion records before low priority logs", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-compact-priority-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json"), }; const outboxPath = resolveReliableOutboxPath(config); await appendReliableOutboxRecord(outboxPath, { kind: "task.complete", url: "http://127.0.0.1/complete", body: { taskId: "task-1" }, }); for (let index = 0; index < 510; index += 1) { await appendReliableOutboxRecord(outboxPath, { kind: "app.log", url: `http://127.0.0.1/app-log-${index}`, body: { index }, }); } const records = await readRecords(outboxPath); assert.equal(records.length, 121); assert.equal(records.some((record) => record.kind === "task.complete"), true); assert.equal(records.filter((record) => record.kind === "app.log").length, 120); await rm(root, { recursive: true, force: true }); }); test("reliable outbox coalesces repeated task progress records for the same task", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-progress-coalesce-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json"), }; const outboxPath = resolveReliableOutboxPath(config); for (let index = 0; index < 12; index += 1) { await appendReliableOutboxRecord(outboxPath, { kind: "task.progress", url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/progress", body: { taskId: "task-1", index }, }); } const records = await readRecords(outboxPath); assert.equal(records.length, 1); assert.equal(records[0].kind, "task.progress"); assert.equal(records[0].body.index, 11); await rm(root, { recursive: true, force: true }); }); test("reliable outbox caps noisy app logs while retaining completion and latest progress records", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-app-log-cap-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json"), }; const outboxPath = resolveReliableOutboxPath(config); await appendReliableOutboxRecord(outboxPath, { kind: "task.complete", url: "http://127.0.0.1/complete", body: { taskId: "task-1" }, }); for (let index = 0; index < 150; index += 1) { await appendReliableOutboxRecord(outboxPath, { kind: "app.log", url: "http://127.0.0.1/app-log", body: { index }, }); } await appendReliableOutboxRecord(outboxPath, { kind: "task.progress", url: "http://127.0.0.1/api/v1/master-agent/tasks/task-2/progress", body: { taskId: "task-2", index: 2 }, }); const records = await readRecords(outboxPath); const appLogs = records.filter((record) => record.kind === "app.log"); assert.equal(records.some((record) => record.kind === "task.complete"), true); assert.equal(records.some((record) => record.kind === "task.progress"), true); assert.equal(appLogs.length, 120); assert.equal(appLogs.some((record) => record.body.index === 149), true); assert.equal(appLogs.some((record) => record.body.index === 0), false); await rm(root, { recursive: true, force: true }); }); test("reliable outbox drops stale progress once a completion for the same task is pending", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-progress-after-complete-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json"), }; const outboxPath = resolveReliableOutboxPath(config); await appendReliableOutboxRecord(outboxPath, { kind: "task.progress", url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/progress", body: { taskId: "task-1", phase: "awaiting_reply" }, }); await appendReliableOutboxRecord(outboxPath, { kind: "task.complete", url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/complete", body: { taskId: "task-1", status: "completed" }, }); const records = await readRecords(outboxPath); assert.equal(records.length, 1); assert.equal(records[0].kind, "task.complete"); await rm(root, { recursive: true, force: true }); }); test("reliable outbox coalesces duplicate app logs by category and message", async () => { const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-app-log-coalesce-")); const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json"), }; const outboxPath = resolveReliableOutboxPath(config); for (let index = 0; index < 20; index += 1) { await appendReliableOutboxRecord(outboxPath, { kind: "app.log", url: "http://127.0.0.1/app-log", body: { projectId: "project-1", category: "local_agent.codex_app_server_progress_failed", message: "Codex App Server 进度实时回写失败,完成回写仍会携带最终进度。", detail: `attempt-${index}`, }, }); } const records = await readRecords(outboxPath); assert.equal(records.length, 1); assert.equal(records[0].kind, "app.log"); assert.equal(records[0].body.detail, "attempt-19"); await rm(root, { recursive: true, force: true }); });