Files
boss/local-agent/reliable-outbox.test.mjs
2026-06-08 12:22:50 +08:00

331 lines
12 KiB
JavaScript

import test from "node:test";
import assert from "node:assert/strict";
import { createServer } from "node:http";
import { mkdtemp, readFile, rm } from "node:fs/promises";
import os from "node:os";
import { join } from "node:path";
import {
appendReliableOutboxRecord,
postThroughReliableOutbox,
replayReliableOutbox,
resolveReliableOutboxPath,
} from "./reliable-outbox.mjs";
async function withServer(handler, run) {
const server = createServer(handler);
await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve));
const address = server.address();
const baseUrl = `http://127.0.0.1:${address.port}`;
try {
return await run(baseUrl);
} finally {
await new Promise((resolve) => server.close(resolve));
}
}
async function readRecords(outboxPath) {
const parsed = JSON.parse(await readFile(outboxPath, "utf8"));
return parsed.records;
}
test("postThroughReliableOutbox removes a record after successful send", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-success-"));
const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json") };
let received = 0;
await withServer((request, response) => {
received += 1;
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: true }));
}, async (baseUrl) => {
const result = await postThroughReliableOutbox(config, {
kind: "task.progress",
url: `${baseUrl}/progress`,
headers: { "Content-Type": "application/json" },
body: { ok: true },
});
assert.equal(result.ok, true);
assert.equal(received, 1);
assert.deepEqual(await readRecords(resolveReliableOutboxPath(config)), []);
});
await rm(root, { recursive: true, force: true });
});
test("postThroughReliableOutbox times out stalled requests and keeps them retryable", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-timeout-"));
const config = {
deviceId: "test-device",
reliableOutboxPath: join(root, "outbox.json"),
reliableOutboxRequestTimeoutMs: 20,
};
await withServer(() => {
// Intentionally leave the request open to simulate a stalled network write.
}, async (baseUrl) => {
const started = Date.now();
const result = await postThroughReliableOutbox(config, {
kind: "task.progress",
url: `${baseUrl}/stall`,
headers: { "Content-Type": "application/json" },
body: { ok: true },
});
assert.equal(result.ok, false);
assert.equal(result.retryable, true);
assert.equal(result.status, 0);
assert.match(result.body, /RELIABLE_OUTBOX_SEND_TIMEOUT|aborted/i);
assert.ok(Date.now() - started < 1_000);
assert.equal((await readRecords(resolveReliableOutboxPath(config))).length, 1);
});
await rm(root, { recursive: true, force: true });
});
test("replayReliableOutbox respects a replay duration budget", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-budget-"));
const config = {
deviceId: "test-device",
reliableOutboxPath: join(root, "outbox.json"),
};
await withServer(() => {
// Keep every replay request pending; the replay budget must stop the loop.
}, async (baseUrl) => {
const outboxPath = resolveReliableOutboxPath(config);
for (let index = 0; index < 3; index += 1) {
await appendReliableOutboxRecord(outboxPath, {
kind: "task.progress",
url: `${baseUrl}/stall-${index}`,
body: { index },
});
}
const started = Date.now();
const replay = await replayReliableOutbox(config, {
limit: 3,
requestTimeoutMs: 30,
maxDurationMs: 50,
});
assert.ok(Date.now() - started < 1_000);
assert.ok(replay.attempted >= 1);
assert.ok(replay.attempted < 3);
assert.equal(replay.sent, 0);
assert.equal(replay.retained, 3);
assert.equal(replay.stoppedByBudget, true);
});
await rm(root, { recursive: true, force: true });
});
test("postThroughReliableOutbox retains retryable failures and replay clears them", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-retry-"));
const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json") };
let fail = true;
let received = 0;
await withServer((request, response) => {
received += 1;
if (fail) {
response.writeHead(503, { "Content-Type": "text/plain" });
response.end("temporary failure");
return;
}
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: true }));
}, async (baseUrl) => {
const first = await postThroughReliableOutbox(config, {
kind: "task.complete",
url: `${baseUrl}/complete`,
headers: { "Content-Type": "application/json" },
body: { taskId: "task-1" },
});
assert.equal(first.ok, false);
assert.equal(first.retryable, true);
assert.equal((await readRecords(resolveReliableOutboxPath(config))).length, 1);
fail = false;
const replay = await replayReliableOutbox(config);
assert.equal(replay.attempted, 1);
assert.equal(replay.sent, 1);
assert.equal(replay.retained, 0);
assert.equal(received, 2);
});
await rm(root, { recursive: true, force: true });
});
test("replayReliableOutbox prioritizes task completion over progress and app logs", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-priority-"));
const config = {
deviceId: "test-device",
reliableOutboxPath: join(root, "outbox.json"),
};
const received = [];
await withServer((request, response) => {
received.push(request.url);
response.writeHead(200, { "Content-Type": "application/json" });
response.end(JSON.stringify({ ok: true }));
}, async (baseUrl) => {
const outboxPath = resolveReliableOutboxPath(config);
await appendReliableOutboxRecord(outboxPath, {
kind: "task.progress",
url: `${baseUrl}/progress`,
body: { taskId: "task-2" },
});
await appendReliableOutboxRecord(outboxPath, {
kind: "app.log",
url: `${baseUrl}/app-log`,
body: { category: "noise" },
});
await appendReliableOutboxRecord(outboxPath, {
kind: "task.complete",
url: `${baseUrl}/complete`,
body: { taskId: "task-1" },
});
const replay = await replayReliableOutbox(config, { limit: 1 });
assert.equal(replay.attempted, 1);
assert.deepEqual(received, ["/complete"]);
const retained = await readRecords(outboxPath);
assert.equal(retained.some((record) => record.kind === "task.complete"), false);
assert.equal(retained.some((record) => record.kind === "task.progress"), true);
assert.equal(retained.some((record) => record.kind === "app.log"), true);
});
await rm(root, { recursive: true, force: true });
});
test("reliable outbox compaction preserves pending task completion records before low priority logs", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-compact-priority-"));
const config = {
deviceId: "test-device",
reliableOutboxPath: join(root, "outbox.json"),
};
const outboxPath = resolveReliableOutboxPath(config);
await appendReliableOutboxRecord(outboxPath, {
kind: "task.complete",
url: "http://127.0.0.1/complete",
body: { taskId: "task-1" },
});
for (let index = 0; index < 510; index += 1) {
await appendReliableOutboxRecord(outboxPath, {
kind: "app.log",
url: `http://127.0.0.1/app-log-${index}`,
body: { index },
});
}
const records = await readRecords(outboxPath);
assert.equal(records.length, 121);
assert.equal(records.some((record) => record.kind === "task.complete"), true);
assert.equal(records.filter((record) => record.kind === "app.log").length, 120);
await rm(root, { recursive: true, force: true });
});
test("reliable outbox coalesces repeated task progress records for the same task", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-progress-coalesce-"));
const config = {
deviceId: "test-device",
reliableOutboxPath: join(root, "outbox.json"),
};
const outboxPath = resolveReliableOutboxPath(config);
for (let index = 0; index < 12; index += 1) {
await appendReliableOutboxRecord(outboxPath, {
kind: "task.progress",
url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/progress",
body: { taskId: "task-1", index },
});
}
const records = await readRecords(outboxPath);
assert.equal(records.length, 1);
assert.equal(records[0].kind, "task.progress");
assert.equal(records[0].body.index, 11);
await rm(root, { recursive: true, force: true });
});
test("reliable outbox caps noisy app logs while retaining completion and latest progress records", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-app-log-cap-"));
const config = {
deviceId: "test-device",
reliableOutboxPath: join(root, "outbox.json"),
};
const outboxPath = resolveReliableOutboxPath(config);
await appendReliableOutboxRecord(outboxPath, {
kind: "task.complete",
url: "http://127.0.0.1/complete",
body: { taskId: "task-1" },
});
for (let index = 0; index < 150; index += 1) {
await appendReliableOutboxRecord(outboxPath, {
kind: "app.log",
url: "http://127.0.0.1/app-log",
body: { index },
});
}
await appendReliableOutboxRecord(outboxPath, {
kind: "task.progress",
url: "http://127.0.0.1/api/v1/master-agent/tasks/task-2/progress",
body: { taskId: "task-2", index: 2 },
});
const records = await readRecords(outboxPath);
const appLogs = records.filter((record) => record.kind === "app.log");
assert.equal(records.some((record) => record.kind === "task.complete"), true);
assert.equal(records.some((record) => record.kind === "task.progress"), true);
assert.equal(appLogs.length, 120);
assert.equal(appLogs.some((record) => record.body.index === 149), true);
assert.equal(appLogs.some((record) => record.body.index === 0), false);
await rm(root, { recursive: true, force: true });
});
test("reliable outbox drops stale progress once a completion for the same task is pending", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-progress-after-complete-"));
const config = {
deviceId: "test-device",
reliableOutboxPath: join(root, "outbox.json"),
};
const outboxPath = resolveReliableOutboxPath(config);
await appendReliableOutboxRecord(outboxPath, {
kind: "task.progress",
url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/progress",
body: { taskId: "task-1", phase: "awaiting_reply" },
});
await appendReliableOutboxRecord(outboxPath, {
kind: "task.complete",
url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/complete",
body: { taskId: "task-1", status: "completed" },
});
const records = await readRecords(outboxPath);
assert.equal(records.length, 1);
assert.equal(records[0].kind, "task.complete");
await rm(root, { recursive: true, force: true });
});
test("reliable outbox coalesces duplicate app logs by category and message", async () => {
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-app-log-coalesce-"));
const config = {
deviceId: "test-device",
reliableOutboxPath: join(root, "outbox.json"),
};
const outboxPath = resolveReliableOutboxPath(config);
for (let index = 0; index < 20; index += 1) {
await appendReliableOutboxRecord(outboxPath, {
kind: "app.log",
url: "http://127.0.0.1/app-log",
body: {
projectId: "project-1",
category: "local_agent.codex_app_server_progress_failed",
message: "Codex App Server 进度实时回写失败,完成回写仍会携带最终进度。",
detail: `attempt-${index}`,
},
});
}
const records = await readRecords(outboxPath);
assert.equal(records.length, 1);
assert.equal(records[0].kind, "app.log");
assert.equal(records[0].body.detail, "attempt-19");
await rm(root, { recursive: true, force: true });
});