331 lines
12 KiB
JavaScript
331 lines
12 KiB
JavaScript
import test from "node:test";
|
|
import assert from "node:assert/strict";
|
|
import { createServer } from "node:http";
|
|
import { mkdtemp, readFile, rm } from "node:fs/promises";
|
|
import os from "node:os";
|
|
import { join } from "node:path";
|
|
import {
|
|
appendReliableOutboxRecord,
|
|
postThroughReliableOutbox,
|
|
replayReliableOutbox,
|
|
resolveReliableOutboxPath,
|
|
} from "./reliable-outbox.mjs";
|
|
|
|
async function withServer(handler, run) {
|
|
const server = createServer(handler);
|
|
await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve));
|
|
const address = server.address();
|
|
const baseUrl = `http://127.0.0.1:${address.port}`;
|
|
try {
|
|
return await run(baseUrl);
|
|
} finally {
|
|
await new Promise((resolve) => server.close(resolve));
|
|
}
|
|
}
|
|
|
|
async function readRecords(outboxPath) {
|
|
const parsed = JSON.parse(await readFile(outboxPath, "utf8"));
|
|
return parsed.records;
|
|
}
|
|
|
|
test("postThroughReliableOutbox removes a record after successful send", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-success-"));
|
|
const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json") };
|
|
let received = 0;
|
|
await withServer((request, response) => {
|
|
received += 1;
|
|
response.writeHead(200, { "Content-Type": "application/json" });
|
|
response.end(JSON.stringify({ ok: true }));
|
|
}, async (baseUrl) => {
|
|
const result = await postThroughReliableOutbox(config, {
|
|
kind: "task.progress",
|
|
url: `${baseUrl}/progress`,
|
|
headers: { "Content-Type": "application/json" },
|
|
body: { ok: true },
|
|
});
|
|
assert.equal(result.ok, true);
|
|
assert.equal(received, 1);
|
|
assert.deepEqual(await readRecords(resolveReliableOutboxPath(config)), []);
|
|
});
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("postThroughReliableOutbox times out stalled requests and keeps them retryable", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-timeout-"));
|
|
const config = {
|
|
deviceId: "test-device",
|
|
reliableOutboxPath: join(root, "outbox.json"),
|
|
reliableOutboxRequestTimeoutMs: 20,
|
|
};
|
|
await withServer(() => {
|
|
// Intentionally leave the request open to simulate a stalled network write.
|
|
}, async (baseUrl) => {
|
|
const started = Date.now();
|
|
const result = await postThroughReliableOutbox(config, {
|
|
kind: "task.progress",
|
|
url: `${baseUrl}/stall`,
|
|
headers: { "Content-Type": "application/json" },
|
|
body: { ok: true },
|
|
});
|
|
|
|
assert.equal(result.ok, false);
|
|
assert.equal(result.retryable, true);
|
|
assert.equal(result.status, 0);
|
|
assert.match(result.body, /RELIABLE_OUTBOX_SEND_TIMEOUT|aborted/i);
|
|
assert.ok(Date.now() - started < 1_000);
|
|
assert.equal((await readRecords(resolveReliableOutboxPath(config))).length, 1);
|
|
});
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("replayReliableOutbox respects a replay duration budget", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-budget-"));
|
|
const config = {
|
|
deviceId: "test-device",
|
|
reliableOutboxPath: join(root, "outbox.json"),
|
|
};
|
|
await withServer(() => {
|
|
// Keep every replay request pending; the replay budget must stop the loop.
|
|
}, async (baseUrl) => {
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
for (let index = 0; index < 3; index += 1) {
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.progress",
|
|
url: `${baseUrl}/stall-${index}`,
|
|
body: { index },
|
|
});
|
|
}
|
|
|
|
const started = Date.now();
|
|
const replay = await replayReliableOutbox(config, {
|
|
limit: 3,
|
|
requestTimeoutMs: 30,
|
|
maxDurationMs: 50,
|
|
});
|
|
|
|
assert.ok(Date.now() - started < 1_000);
|
|
assert.ok(replay.attempted >= 1);
|
|
assert.ok(replay.attempted < 3);
|
|
assert.equal(replay.sent, 0);
|
|
assert.equal(replay.retained, 3);
|
|
assert.equal(replay.stoppedByBudget, true);
|
|
});
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("postThroughReliableOutbox retains retryable failures and replay clears them", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-retry-"));
|
|
const config = { deviceId: "test-device", reliableOutboxPath: join(root, "outbox.json") };
|
|
let fail = true;
|
|
let received = 0;
|
|
await withServer((request, response) => {
|
|
received += 1;
|
|
if (fail) {
|
|
response.writeHead(503, { "Content-Type": "text/plain" });
|
|
response.end("temporary failure");
|
|
return;
|
|
}
|
|
response.writeHead(200, { "Content-Type": "application/json" });
|
|
response.end(JSON.stringify({ ok: true }));
|
|
}, async (baseUrl) => {
|
|
const first = await postThroughReliableOutbox(config, {
|
|
kind: "task.complete",
|
|
url: `${baseUrl}/complete`,
|
|
headers: { "Content-Type": "application/json" },
|
|
body: { taskId: "task-1" },
|
|
});
|
|
assert.equal(first.ok, false);
|
|
assert.equal(first.retryable, true);
|
|
assert.equal((await readRecords(resolveReliableOutboxPath(config))).length, 1);
|
|
|
|
fail = false;
|
|
const replay = await replayReliableOutbox(config);
|
|
assert.equal(replay.attempted, 1);
|
|
assert.equal(replay.sent, 1);
|
|
assert.equal(replay.retained, 0);
|
|
assert.equal(received, 2);
|
|
});
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("replayReliableOutbox prioritizes task completion over progress and app logs", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-priority-"));
|
|
const config = {
|
|
deviceId: "test-device",
|
|
reliableOutboxPath: join(root, "outbox.json"),
|
|
};
|
|
const received = [];
|
|
await withServer((request, response) => {
|
|
received.push(request.url);
|
|
response.writeHead(200, { "Content-Type": "application/json" });
|
|
response.end(JSON.stringify({ ok: true }));
|
|
}, async (baseUrl) => {
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.progress",
|
|
url: `${baseUrl}/progress`,
|
|
body: { taskId: "task-2" },
|
|
});
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "app.log",
|
|
url: `${baseUrl}/app-log`,
|
|
body: { category: "noise" },
|
|
});
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.complete",
|
|
url: `${baseUrl}/complete`,
|
|
body: { taskId: "task-1" },
|
|
});
|
|
|
|
const replay = await replayReliableOutbox(config, { limit: 1 });
|
|
|
|
assert.equal(replay.attempted, 1);
|
|
assert.deepEqual(received, ["/complete"]);
|
|
const retained = await readRecords(outboxPath);
|
|
assert.equal(retained.some((record) => record.kind === "task.complete"), false);
|
|
assert.equal(retained.some((record) => record.kind === "task.progress"), true);
|
|
assert.equal(retained.some((record) => record.kind === "app.log"), true);
|
|
});
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("reliable outbox compaction preserves pending task completion records before low priority logs", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-compact-priority-"));
|
|
const config = {
|
|
deviceId: "test-device",
|
|
reliableOutboxPath: join(root, "outbox.json"),
|
|
};
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.complete",
|
|
url: "http://127.0.0.1/complete",
|
|
body: { taskId: "task-1" },
|
|
});
|
|
for (let index = 0; index < 510; index += 1) {
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "app.log",
|
|
url: `http://127.0.0.1/app-log-${index}`,
|
|
body: { index },
|
|
});
|
|
}
|
|
|
|
const records = await readRecords(outboxPath);
|
|
|
|
assert.equal(records.length, 121);
|
|
assert.equal(records.some((record) => record.kind === "task.complete"), true);
|
|
assert.equal(records.filter((record) => record.kind === "app.log").length, 120);
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("reliable outbox coalesces repeated task progress records for the same task", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-progress-coalesce-"));
|
|
const config = {
|
|
deviceId: "test-device",
|
|
reliableOutboxPath: join(root, "outbox.json"),
|
|
};
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
for (let index = 0; index < 12; index += 1) {
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.progress",
|
|
url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/progress",
|
|
body: { taskId: "task-1", index },
|
|
});
|
|
}
|
|
|
|
const records = await readRecords(outboxPath);
|
|
|
|
assert.equal(records.length, 1);
|
|
assert.equal(records[0].kind, "task.progress");
|
|
assert.equal(records[0].body.index, 11);
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("reliable outbox caps noisy app logs while retaining completion and latest progress records", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-app-log-cap-"));
|
|
const config = {
|
|
deviceId: "test-device",
|
|
reliableOutboxPath: join(root, "outbox.json"),
|
|
};
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.complete",
|
|
url: "http://127.0.0.1/complete",
|
|
body: { taskId: "task-1" },
|
|
});
|
|
for (let index = 0; index < 150; index += 1) {
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "app.log",
|
|
url: "http://127.0.0.1/app-log",
|
|
body: { index },
|
|
});
|
|
}
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.progress",
|
|
url: "http://127.0.0.1/api/v1/master-agent/tasks/task-2/progress",
|
|
body: { taskId: "task-2", index: 2 },
|
|
});
|
|
|
|
const records = await readRecords(outboxPath);
|
|
const appLogs = records.filter((record) => record.kind === "app.log");
|
|
|
|
assert.equal(records.some((record) => record.kind === "task.complete"), true);
|
|
assert.equal(records.some((record) => record.kind === "task.progress"), true);
|
|
assert.equal(appLogs.length, 120);
|
|
assert.equal(appLogs.some((record) => record.body.index === 149), true);
|
|
assert.equal(appLogs.some((record) => record.body.index === 0), false);
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("reliable outbox drops stale progress once a completion for the same task is pending", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-progress-after-complete-"));
|
|
const config = {
|
|
deviceId: "test-device",
|
|
reliableOutboxPath: join(root, "outbox.json"),
|
|
};
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.progress",
|
|
url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/progress",
|
|
body: { taskId: "task-1", phase: "awaiting_reply" },
|
|
});
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "task.complete",
|
|
url: "http://127.0.0.1/api/v1/master-agent/tasks/task-1/complete",
|
|
body: { taskId: "task-1", status: "completed" },
|
|
});
|
|
|
|
const records = await readRecords(outboxPath);
|
|
|
|
assert.equal(records.length, 1);
|
|
assert.equal(records[0].kind, "task.complete");
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|
|
|
|
test("reliable outbox coalesces duplicate app logs by category and message", async () => {
|
|
const root = await mkdtemp(join(os.tmpdir(), "boss-outbox-app-log-coalesce-"));
|
|
const config = {
|
|
deviceId: "test-device",
|
|
reliableOutboxPath: join(root, "outbox.json"),
|
|
};
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
for (let index = 0; index < 20; index += 1) {
|
|
await appendReliableOutboxRecord(outboxPath, {
|
|
kind: "app.log",
|
|
url: "http://127.0.0.1/app-log",
|
|
body: {
|
|
projectId: "project-1",
|
|
category: "local_agent.codex_app_server_progress_failed",
|
|
message: "Codex App Server 进度实时回写失败,完成回写仍会携带最终进度。",
|
|
detail: `attempt-${index}`,
|
|
},
|
|
});
|
|
}
|
|
|
|
const records = await readRecords(outboxPath);
|
|
|
|
assert.equal(records.length, 1);
|
|
assert.equal(records[0].kind, "app.log");
|
|
assert.equal(records[0].body.detail, "attempt-19");
|
|
await rm(root, { recursive: true, force: true });
|
|
});
|