379 lines
12 KiB
JavaScript
379 lines
12 KiB
JavaScript
import { mkdir, readFile, rename, writeFile } from "node:fs/promises";
|
|
import { dirname, join } from "node:path";
|
|
import os from "node:os";
|
|
|
|
const MAX_OUTBOX_RECORDS = 500;
|
|
const MAX_APP_LOG_RECORDS = 120;
|
|
const RETRYABLE_STATUS_CODES = new Set([408, 409, 425, 429, 500, 502, 503, 504]);
|
|
const outboxWriteQueues = new Map();
|
|
|
|
function reliableOutboxPriority(record) {
|
|
switch (record?.kind) {
|
|
case "task.complete":
|
|
return 0;
|
|
case "task.progress":
|
|
return 10;
|
|
case "app.log":
|
|
return 30;
|
|
default:
|
|
return 20;
|
|
}
|
|
}
|
|
|
|
function recordCreatedMs(record) {
|
|
const value = Date.parse(record?.createdAt || "");
|
|
return Number.isFinite(value) ? value : 0;
|
|
}
|
|
|
|
function parseRecordBody(record) {
|
|
if (!record || record.body == null) {
|
|
return {};
|
|
}
|
|
if (typeof record.body === "object") {
|
|
return record.body;
|
|
}
|
|
if (typeof record.body !== "string") {
|
|
return {};
|
|
}
|
|
try {
|
|
const parsed = JSON.parse(record.body);
|
|
return parsed && typeof parsed === "object" ? parsed : {};
|
|
} catch {
|
|
return {};
|
|
}
|
|
}
|
|
|
|
function taskProgressCoalescingKey(record) {
|
|
if (record?.kind !== "task.progress") {
|
|
return "";
|
|
}
|
|
const body = parseRecordBody(record);
|
|
const taskId = typeof body.taskId === "string" ? body.taskId.trim() : "";
|
|
if (taskId) {
|
|
return taskId;
|
|
}
|
|
const url = typeof record.url === "string" ? record.url : "";
|
|
const match = url.match(/\/master-agent\/tasks\/([^/]+)\/progress(?:\?|$)/);
|
|
return match ? decodeURIComponent(match[1]) : "";
|
|
}
|
|
|
|
function taskCompletionCoalescingKey(record) {
|
|
if (record?.kind !== "task.complete") {
|
|
return "";
|
|
}
|
|
const body = parseRecordBody(record);
|
|
const taskId = typeof body.taskId === "string" ? body.taskId.trim() : "";
|
|
if (taskId) {
|
|
return taskId;
|
|
}
|
|
const url = typeof record.url === "string" ? record.url : "";
|
|
const match = url.match(/\/master-agent\/tasks\/([^/]+)\/complete(?:\?|$)/);
|
|
return match ? decodeURIComponent(match[1]) : "";
|
|
}
|
|
|
|
function appLogCoalescingKey(record) {
|
|
if (record?.kind !== "app.log") {
|
|
return "";
|
|
}
|
|
const body = parseRecordBody(record);
|
|
const category = typeof body.category === "string" ? body.category.trim() : "";
|
|
const message = typeof body.message === "string" ? body.message.trim() : "";
|
|
if (!category && !message) {
|
|
return "";
|
|
}
|
|
const projectId = typeof body.projectId === "string" ? body.projectId.trim() : "";
|
|
return [record.url || "", projectId, category, message].join("|");
|
|
}
|
|
|
|
function orderReliableOutboxRecordsForReplay(records) {
|
|
return [...records].sort((left, right) => {
|
|
const priorityDiff = reliableOutboxPriority(left) - reliableOutboxPriority(right);
|
|
if (priorityDiff !== 0) return priorityDiff;
|
|
return recordCreatedMs(left) - recordCreatedMs(right);
|
|
});
|
|
}
|
|
|
|
function compactReliableOutboxRecords(records) {
|
|
const active = records.filter((record) => record && record.id && record.status !== "sent");
|
|
const completionTaskKeys = new Set(
|
|
active.map(taskCompletionCoalescingKey).filter(Boolean),
|
|
);
|
|
const progressByTask = new Map();
|
|
const retained = [];
|
|
for (const record of active) {
|
|
const progressKey = taskProgressCoalescingKey(record);
|
|
if (!progressKey) {
|
|
retained.push(record);
|
|
continue;
|
|
}
|
|
if (completionTaskKeys.has(progressKey)) {
|
|
continue;
|
|
}
|
|
const previous = progressByTask.get(progressKey);
|
|
if (!previous || recordCreatedMs(record) >= recordCreatedMs(previous)) {
|
|
progressByTask.set(progressKey, record);
|
|
}
|
|
}
|
|
const keyedAppLogs = new Map();
|
|
const unkeyedAppLogs = [];
|
|
for (const record of retained.filter((item) => item.kind === "app.log")) {
|
|
const appLogKey = appLogCoalescingKey(record);
|
|
if (!appLogKey) {
|
|
unkeyedAppLogs.push(record);
|
|
continue;
|
|
}
|
|
const previous = keyedAppLogs.get(appLogKey);
|
|
if (!previous || recordCreatedMs(record) >= recordCreatedMs(previous)) {
|
|
keyedAppLogs.set(appLogKey, record);
|
|
}
|
|
}
|
|
const appLogs = [...keyedAppLogs.values(), ...unkeyedAppLogs]
|
|
.sort((left, right) => recordCreatedMs(right) - recordCreatedMs(left))
|
|
.slice(0, MAX_APP_LOG_RECORDS);
|
|
const compacted = [
|
|
...retained.filter((record) => record.kind !== "app.log"),
|
|
...progressByTask.values(),
|
|
...appLogs,
|
|
];
|
|
if (compacted.length <= MAX_OUTBOX_RECORDS) {
|
|
return compacted.sort((left, right) => recordCreatedMs(left) - recordCreatedMs(right));
|
|
}
|
|
const taskCompletions = compacted.filter((record) => record.kind === "task.complete");
|
|
const remainingBudget = Math.max(0, MAX_OUTBOX_RECORDS - taskCompletions.length);
|
|
const otherRecords = compacted
|
|
.filter((record) => record.kind !== "task.complete")
|
|
.sort((left, right) => {
|
|
const priorityDiff = reliableOutboxPriority(left) - reliableOutboxPriority(right);
|
|
if (priorityDiff !== 0) return priorityDiff;
|
|
return recordCreatedMs(right) - recordCreatedMs(left);
|
|
})
|
|
.slice(0, remainingBudget);
|
|
return [...taskCompletions, ...otherRecords].sort(
|
|
(left, right) => recordCreatedMs(left) - recordCreatedMs(right),
|
|
);
|
|
}
|
|
|
|
function normalizeTimeoutMs(value, fallback = 5_000) {
|
|
const numeric = Number(value);
|
|
if (!Number.isFinite(numeric) || numeric <= 0) {
|
|
return fallback;
|
|
}
|
|
return Math.max(50, Math.min(60_000, Math.round(numeric)));
|
|
}
|
|
|
|
function normalizeDurationBudgetMs(value) {
|
|
const numeric = Number(value);
|
|
if (!Number.isFinite(numeric) || numeric <= 0) {
|
|
return 0;
|
|
}
|
|
return Math.max(50, Math.min(60_000, Math.round(numeric)));
|
|
}
|
|
|
|
function nowIso() {
|
|
return new Date().toISOString();
|
|
}
|
|
|
|
function randomId(prefix) {
|
|
return `${prefix}-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 10)}`;
|
|
}
|
|
|
|
export function resolveReliableOutboxPath(config = {}) {
|
|
if (config.reliableOutboxPath) {
|
|
return String(config.reliableOutboxPath);
|
|
}
|
|
return join(os.homedir(), ".boss-agent", `${config.deviceId || "device"}-outbox.json`);
|
|
}
|
|
|
|
async function readOutboxRecords(outboxPath) {
|
|
try {
|
|
const parsed = JSON.parse(await readFile(outboxPath, "utf8"));
|
|
return Array.isArray(parsed?.records) ? parsed.records : [];
|
|
} catch {
|
|
return [];
|
|
}
|
|
}
|
|
|
|
async function writeOutboxRecords(outboxPath, records) {
|
|
await mkdir(dirname(outboxPath), { recursive: true });
|
|
const compacted = compactReliableOutboxRecords(records);
|
|
const tmpPath = `${outboxPath}.${process.pid}.${Date.now()}.tmp`;
|
|
await writeFile(
|
|
tmpPath,
|
|
JSON.stringify({ version: 1, updatedAt: nowIso(), records: compacted }, null, 2),
|
|
);
|
|
await rename(tmpPath, outboxPath);
|
|
}
|
|
|
|
async function mutateOutboxRecords(outboxPath, mutator) {
|
|
const run = async () => {
|
|
const records = await readOutboxRecords(outboxPath);
|
|
const result = await mutator(records);
|
|
await writeOutboxRecords(outboxPath, result.records);
|
|
return result.value;
|
|
};
|
|
const previous = outboxWriteQueues.get(outboxPath) || Promise.resolve();
|
|
const next = previous.then(run, run);
|
|
outboxWriteQueues.set(outboxPath, next.catch(() => null));
|
|
return await next;
|
|
}
|
|
|
|
export async function appendReliableOutboxRecord(outboxPath, input) {
|
|
const record = {
|
|
id: input.id || randomId(input.kind || "outbox"),
|
|
kind: input.kind,
|
|
url: input.url,
|
|
method: input.method || "POST",
|
|
headers: input.headers || {},
|
|
body: input.body,
|
|
requestTimeoutMs: input.requestTimeoutMs,
|
|
status: "pending",
|
|
attemptCount: 0,
|
|
createdAt: nowIso(),
|
|
lastAttemptAt: undefined,
|
|
lastError: undefined,
|
|
};
|
|
return await mutateOutboxRecords(outboxPath, async (records) => ({
|
|
records: [...records, record],
|
|
value: record,
|
|
}));
|
|
}
|
|
|
|
export async function markReliableOutboxRecordSent(outboxPath, recordId) {
|
|
await mutateOutboxRecords(outboxPath, async (records) => ({
|
|
records: records.filter((record) => record.id !== recordId),
|
|
value: undefined,
|
|
}));
|
|
}
|
|
|
|
function shouldRetry(status) {
|
|
if (!status) return true;
|
|
return RETRYABLE_STATUS_CODES.has(status);
|
|
}
|
|
|
|
async function updateReliableOutboxRecordFailure(outboxPath, recordId, detail) {
|
|
await mutateOutboxRecords(outboxPath, async (records) => ({
|
|
records: records.map((record) => {
|
|
if (record.id !== recordId) return record;
|
|
return {
|
|
...record,
|
|
attemptCount: Number(record.attemptCount || 0) + 1,
|
|
lastAttemptAt: nowIso(),
|
|
lastError: String(detail || "OUTBOX_SEND_FAILED").slice(0, 240),
|
|
};
|
|
}),
|
|
value: undefined,
|
|
}));
|
|
}
|
|
|
|
export async function sendReliableOutboxRecord(record, options = {}) {
|
|
const timeoutMs = normalizeTimeoutMs(options.requestTimeoutMs ?? record.requestTimeoutMs);
|
|
const controller = new AbortController();
|
|
const timeout = setTimeout(() => {
|
|
controller.abort(new Error("RELIABLE_OUTBOX_SEND_TIMEOUT"));
|
|
}, timeoutMs);
|
|
try {
|
|
const response = await fetch(record.url, {
|
|
method: record.method || "POST",
|
|
headers: record.headers || {},
|
|
body: typeof record.body === "string" ? record.body : JSON.stringify(record.body ?? {}),
|
|
signal: controller.signal,
|
|
});
|
|
const body = await response.text();
|
|
return {
|
|
ok: response.ok,
|
|
retryable: !response.ok && shouldRetry(response.status),
|
|
status: response.status,
|
|
body,
|
|
};
|
|
} catch (error) {
|
|
if (controller.signal.aborted) {
|
|
throw new Error("RELIABLE_OUTBOX_SEND_TIMEOUT");
|
|
}
|
|
throw error;
|
|
} finally {
|
|
clearTimeout(timeout);
|
|
}
|
|
}
|
|
|
|
export async function postThroughReliableOutbox(config, recordInput) {
|
|
if (config.reliableOutboxEnabled === false) {
|
|
return await sendReliableOutboxRecord({
|
|
...recordInput,
|
|
id: recordInput.id || randomId(recordInput.kind || "direct"),
|
|
requestTimeoutMs: recordInput.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs,
|
|
});
|
|
}
|
|
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
const record = await appendReliableOutboxRecord(outboxPath, {
|
|
...recordInput,
|
|
requestTimeoutMs: recordInput.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs,
|
|
});
|
|
try {
|
|
const result = await sendReliableOutboxRecord(record, {
|
|
requestTimeoutMs: record.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs,
|
|
});
|
|
if (result.ok || !result.retryable) {
|
|
await markReliableOutboxRecordSent(outboxPath, record.id);
|
|
} else {
|
|
await updateReliableOutboxRecordFailure(outboxPath, record.id, result.body);
|
|
}
|
|
return result;
|
|
} catch (error) {
|
|
await updateReliableOutboxRecordFailure(
|
|
outboxPath,
|
|
record.id,
|
|
error instanceof Error ? error.message : String(error),
|
|
);
|
|
return {
|
|
ok: false,
|
|
retryable: true,
|
|
status: 0,
|
|
body: error instanceof Error ? error.message : String(error),
|
|
};
|
|
}
|
|
}
|
|
|
|
export async function replayReliableOutbox(config, options = {}) {
|
|
if (config.reliableOutboxEnabled === false) {
|
|
return { attempted: 0, sent: 0, retained: 0 };
|
|
}
|
|
const outboxPath = resolveReliableOutboxPath(config);
|
|
const records = (await readOutboxRecords(outboxPath)).filter(
|
|
(record) => record?.status !== "sent",
|
|
);
|
|
const limit = Math.max(1, Math.min(Number(options.limit || 50), 100));
|
|
const startedAt = Date.now();
|
|
const maxDurationMs = normalizeDurationBudgetMs(options.maxDurationMs ?? config.reliableOutboxReplayBudgetMs);
|
|
const requestTimeoutMs = options.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs;
|
|
let attempted = 0;
|
|
let sent = 0;
|
|
let stoppedByBudget = false;
|
|
for (const record of orderReliableOutboxRecordsForReplay(records).slice(0, limit)) {
|
|
if (maxDurationMs > 0 && Date.now() - startedAt >= maxDurationMs) {
|
|
stoppedByBudget = true;
|
|
break;
|
|
}
|
|
attempted += 1;
|
|
try {
|
|
const result = await sendReliableOutboxRecord(record, {
|
|
requestTimeoutMs: record.requestTimeoutMs ?? requestTimeoutMs,
|
|
});
|
|
if (result.ok || !result.retryable) {
|
|
await markReliableOutboxRecordSent(outboxPath, record.id);
|
|
sent += 1;
|
|
} else {
|
|
await updateReliableOutboxRecordFailure(outboxPath, record.id, result.body);
|
|
}
|
|
} catch (error) {
|
|
await updateReliableOutboxRecordFailure(
|
|
outboxPath,
|
|
record.id,
|
|
error instanceof Error ? error.message : String(error),
|
|
);
|
|
}
|
|
}
|
|
const retained = (await readOutboxRecords(outboxPath)).length;
|
|
return { attempted, sent, retained, stoppedByBudget };
|
|
}
|