Files
boss/local-agent/reliable-outbox.mjs
2026-06-08 12:22:50 +08:00

379 lines
12 KiB
JavaScript

import { mkdir, readFile, rename, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import os from "node:os";
const MAX_OUTBOX_RECORDS = 500;
const MAX_APP_LOG_RECORDS = 120;
const RETRYABLE_STATUS_CODES = new Set([408, 409, 425, 429, 500, 502, 503, 504]);
const outboxWriteQueues = new Map();
function reliableOutboxPriority(record) {
switch (record?.kind) {
case "task.complete":
return 0;
case "task.progress":
return 10;
case "app.log":
return 30;
default:
return 20;
}
}
function recordCreatedMs(record) {
const value = Date.parse(record?.createdAt || "");
return Number.isFinite(value) ? value : 0;
}
function parseRecordBody(record) {
if (!record || record.body == null) {
return {};
}
if (typeof record.body === "object") {
return record.body;
}
if (typeof record.body !== "string") {
return {};
}
try {
const parsed = JSON.parse(record.body);
return parsed && typeof parsed === "object" ? parsed : {};
} catch {
return {};
}
}
function taskProgressCoalescingKey(record) {
if (record?.kind !== "task.progress") {
return "";
}
const body = parseRecordBody(record);
const taskId = typeof body.taskId === "string" ? body.taskId.trim() : "";
if (taskId) {
return taskId;
}
const url = typeof record.url === "string" ? record.url : "";
const match = url.match(/\/master-agent\/tasks\/([^/]+)\/progress(?:\?|$)/);
return match ? decodeURIComponent(match[1]) : "";
}
function taskCompletionCoalescingKey(record) {
if (record?.kind !== "task.complete") {
return "";
}
const body = parseRecordBody(record);
const taskId = typeof body.taskId === "string" ? body.taskId.trim() : "";
if (taskId) {
return taskId;
}
const url = typeof record.url === "string" ? record.url : "";
const match = url.match(/\/master-agent\/tasks\/([^/]+)\/complete(?:\?|$)/);
return match ? decodeURIComponent(match[1]) : "";
}
function appLogCoalescingKey(record) {
if (record?.kind !== "app.log") {
return "";
}
const body = parseRecordBody(record);
const category = typeof body.category === "string" ? body.category.trim() : "";
const message = typeof body.message === "string" ? body.message.trim() : "";
if (!category && !message) {
return "";
}
const projectId = typeof body.projectId === "string" ? body.projectId.trim() : "";
return [record.url || "", projectId, category, message].join("|");
}
function orderReliableOutboxRecordsForReplay(records) {
return [...records].sort((left, right) => {
const priorityDiff = reliableOutboxPriority(left) - reliableOutboxPriority(right);
if (priorityDiff !== 0) return priorityDiff;
return recordCreatedMs(left) - recordCreatedMs(right);
});
}
function compactReliableOutboxRecords(records) {
const active = records.filter((record) => record && record.id && record.status !== "sent");
const completionTaskKeys = new Set(
active.map(taskCompletionCoalescingKey).filter(Boolean),
);
const progressByTask = new Map();
const retained = [];
for (const record of active) {
const progressKey = taskProgressCoalescingKey(record);
if (!progressKey) {
retained.push(record);
continue;
}
if (completionTaskKeys.has(progressKey)) {
continue;
}
const previous = progressByTask.get(progressKey);
if (!previous || recordCreatedMs(record) >= recordCreatedMs(previous)) {
progressByTask.set(progressKey, record);
}
}
const keyedAppLogs = new Map();
const unkeyedAppLogs = [];
for (const record of retained.filter((item) => item.kind === "app.log")) {
const appLogKey = appLogCoalescingKey(record);
if (!appLogKey) {
unkeyedAppLogs.push(record);
continue;
}
const previous = keyedAppLogs.get(appLogKey);
if (!previous || recordCreatedMs(record) >= recordCreatedMs(previous)) {
keyedAppLogs.set(appLogKey, record);
}
}
const appLogs = [...keyedAppLogs.values(), ...unkeyedAppLogs]
.sort((left, right) => recordCreatedMs(right) - recordCreatedMs(left))
.slice(0, MAX_APP_LOG_RECORDS);
const compacted = [
...retained.filter((record) => record.kind !== "app.log"),
...progressByTask.values(),
...appLogs,
];
if (compacted.length <= MAX_OUTBOX_RECORDS) {
return compacted.sort((left, right) => recordCreatedMs(left) - recordCreatedMs(right));
}
const taskCompletions = compacted.filter((record) => record.kind === "task.complete");
const remainingBudget = Math.max(0, MAX_OUTBOX_RECORDS - taskCompletions.length);
const otherRecords = compacted
.filter((record) => record.kind !== "task.complete")
.sort((left, right) => {
const priorityDiff = reliableOutboxPriority(left) - reliableOutboxPriority(right);
if (priorityDiff !== 0) return priorityDiff;
return recordCreatedMs(right) - recordCreatedMs(left);
})
.slice(0, remainingBudget);
return [...taskCompletions, ...otherRecords].sort(
(left, right) => recordCreatedMs(left) - recordCreatedMs(right),
);
}
function normalizeTimeoutMs(value, fallback = 5_000) {
const numeric = Number(value);
if (!Number.isFinite(numeric) || numeric <= 0) {
return fallback;
}
return Math.max(50, Math.min(60_000, Math.round(numeric)));
}
function normalizeDurationBudgetMs(value) {
const numeric = Number(value);
if (!Number.isFinite(numeric) || numeric <= 0) {
return 0;
}
return Math.max(50, Math.min(60_000, Math.round(numeric)));
}
function nowIso() {
return new Date().toISOString();
}
function randomId(prefix) {
return `${prefix}-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 10)}`;
}
export function resolveReliableOutboxPath(config = {}) {
if (config.reliableOutboxPath) {
return String(config.reliableOutboxPath);
}
return join(os.homedir(), ".boss-agent", `${config.deviceId || "device"}-outbox.json`);
}
async function readOutboxRecords(outboxPath) {
try {
const parsed = JSON.parse(await readFile(outboxPath, "utf8"));
return Array.isArray(parsed?.records) ? parsed.records : [];
} catch {
return [];
}
}
async function writeOutboxRecords(outboxPath, records) {
await mkdir(dirname(outboxPath), { recursive: true });
const compacted = compactReliableOutboxRecords(records);
const tmpPath = `${outboxPath}.${process.pid}.${Date.now()}.tmp`;
await writeFile(
tmpPath,
JSON.stringify({ version: 1, updatedAt: nowIso(), records: compacted }, null, 2),
);
await rename(tmpPath, outboxPath);
}
async function mutateOutboxRecords(outboxPath, mutator) {
const run = async () => {
const records = await readOutboxRecords(outboxPath);
const result = await mutator(records);
await writeOutboxRecords(outboxPath, result.records);
return result.value;
};
const previous = outboxWriteQueues.get(outboxPath) || Promise.resolve();
const next = previous.then(run, run);
outboxWriteQueues.set(outboxPath, next.catch(() => null));
return await next;
}
export async function appendReliableOutboxRecord(outboxPath, input) {
const record = {
id: input.id || randomId(input.kind || "outbox"),
kind: input.kind,
url: input.url,
method: input.method || "POST",
headers: input.headers || {},
body: input.body,
requestTimeoutMs: input.requestTimeoutMs,
status: "pending",
attemptCount: 0,
createdAt: nowIso(),
lastAttemptAt: undefined,
lastError: undefined,
};
return await mutateOutboxRecords(outboxPath, async (records) => ({
records: [...records, record],
value: record,
}));
}
export async function markReliableOutboxRecordSent(outboxPath, recordId) {
await mutateOutboxRecords(outboxPath, async (records) => ({
records: records.filter((record) => record.id !== recordId),
value: undefined,
}));
}
function shouldRetry(status) {
if (!status) return true;
return RETRYABLE_STATUS_CODES.has(status);
}
async function updateReliableOutboxRecordFailure(outboxPath, recordId, detail) {
await mutateOutboxRecords(outboxPath, async (records) => ({
records: records.map((record) => {
if (record.id !== recordId) return record;
return {
...record,
attemptCount: Number(record.attemptCount || 0) + 1,
lastAttemptAt: nowIso(),
lastError: String(detail || "OUTBOX_SEND_FAILED").slice(0, 240),
};
}),
value: undefined,
}));
}
export async function sendReliableOutboxRecord(record, options = {}) {
const timeoutMs = normalizeTimeoutMs(options.requestTimeoutMs ?? record.requestTimeoutMs);
const controller = new AbortController();
const timeout = setTimeout(() => {
controller.abort(new Error("RELIABLE_OUTBOX_SEND_TIMEOUT"));
}, timeoutMs);
try {
const response = await fetch(record.url, {
method: record.method || "POST",
headers: record.headers || {},
body: typeof record.body === "string" ? record.body : JSON.stringify(record.body ?? {}),
signal: controller.signal,
});
const body = await response.text();
return {
ok: response.ok,
retryable: !response.ok && shouldRetry(response.status),
status: response.status,
body,
};
} catch (error) {
if (controller.signal.aborted) {
throw new Error("RELIABLE_OUTBOX_SEND_TIMEOUT");
}
throw error;
} finally {
clearTimeout(timeout);
}
}
export async function postThroughReliableOutbox(config, recordInput) {
if (config.reliableOutboxEnabled === false) {
return await sendReliableOutboxRecord({
...recordInput,
id: recordInput.id || randomId(recordInput.kind || "direct"),
requestTimeoutMs: recordInput.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs,
});
}
const outboxPath = resolveReliableOutboxPath(config);
const record = await appendReliableOutboxRecord(outboxPath, {
...recordInput,
requestTimeoutMs: recordInput.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs,
});
try {
const result = await sendReliableOutboxRecord(record, {
requestTimeoutMs: record.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs,
});
if (result.ok || !result.retryable) {
await markReliableOutboxRecordSent(outboxPath, record.id);
} else {
await updateReliableOutboxRecordFailure(outboxPath, record.id, result.body);
}
return result;
} catch (error) {
await updateReliableOutboxRecordFailure(
outboxPath,
record.id,
error instanceof Error ? error.message : String(error),
);
return {
ok: false,
retryable: true,
status: 0,
body: error instanceof Error ? error.message : String(error),
};
}
}
export async function replayReliableOutbox(config, options = {}) {
if (config.reliableOutboxEnabled === false) {
return { attempted: 0, sent: 0, retained: 0 };
}
const outboxPath = resolveReliableOutboxPath(config);
const records = (await readOutboxRecords(outboxPath)).filter(
(record) => record?.status !== "sent",
);
const limit = Math.max(1, Math.min(Number(options.limit || 50), 100));
const startedAt = Date.now();
const maxDurationMs = normalizeDurationBudgetMs(options.maxDurationMs ?? config.reliableOutboxReplayBudgetMs);
const requestTimeoutMs = options.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs;
let attempted = 0;
let sent = 0;
let stoppedByBudget = false;
for (const record of orderReliableOutboxRecordsForReplay(records).slice(0, limit)) {
if (maxDurationMs > 0 && Date.now() - startedAt >= maxDurationMs) {
stoppedByBudget = true;
break;
}
attempted += 1;
try {
const result = await sendReliableOutboxRecord(record, {
requestTimeoutMs: record.requestTimeoutMs ?? requestTimeoutMs,
});
if (result.ok || !result.retryable) {
await markReliableOutboxRecordSent(outboxPath, record.id);
sent += 1;
} else {
await updateReliableOutboxRecordFailure(outboxPath, record.id, result.body);
}
} catch (error) {
await updateReliableOutboxRecordFailure(
outboxPath,
record.id,
error instanceof Error ? error.message : String(error),
);
}
}
const retained = (await readOutboxRecords(outboxPath)).length;
return { attempted, sent, retained, stoppedByBudget };
}