import { mkdir, readFile, rename, writeFile } from "node:fs/promises"; import { dirname, join } from "node:path"; import os from "node:os"; const MAX_OUTBOX_RECORDS = 500; const MAX_APP_LOG_RECORDS = 120; const RETRYABLE_STATUS_CODES = new Set([408, 409, 425, 429, 500, 502, 503, 504]); const outboxWriteQueues = new Map(); function reliableOutboxPriority(record) { switch (record?.kind) { case "task.complete": return 0; case "task.progress": return 10; case "app.log": return 30; default: return 20; } } function recordCreatedMs(record) { const value = Date.parse(record?.createdAt || ""); return Number.isFinite(value) ? value : 0; } function parseRecordBody(record) { if (!record || record.body == null) { return {}; } if (typeof record.body === "object") { return record.body; } if (typeof record.body !== "string") { return {}; } try { const parsed = JSON.parse(record.body); return parsed && typeof parsed === "object" ? parsed : {}; } catch { return {}; } } function taskProgressCoalescingKey(record) { if (record?.kind !== "task.progress") { return ""; } const body = parseRecordBody(record); const taskId = typeof body.taskId === "string" ? body.taskId.trim() : ""; if (taskId) { return taskId; } const url = typeof record.url === "string" ? record.url : ""; const match = url.match(/\/master-agent\/tasks\/([^/]+)\/progress(?:\?|$)/); return match ? decodeURIComponent(match[1]) : ""; } function taskCompletionCoalescingKey(record) { if (record?.kind !== "task.complete") { return ""; } const body = parseRecordBody(record); const taskId = typeof body.taskId === "string" ? body.taskId.trim() : ""; if (taskId) { return taskId; } const url = typeof record.url === "string" ? record.url : ""; const match = url.match(/\/master-agent\/tasks\/([^/]+)\/complete(?:\?|$)/); return match ? decodeURIComponent(match[1]) : ""; } function appLogCoalescingKey(record) { if (record?.kind !== "app.log") { return ""; } const body = parseRecordBody(record); const category = typeof body.category === "string" ? body.category.trim() : ""; const message = typeof body.message === "string" ? body.message.trim() : ""; if (!category && !message) { return ""; } const projectId = typeof body.projectId === "string" ? body.projectId.trim() : ""; return [record.url || "", projectId, category, message].join("|"); } function orderReliableOutboxRecordsForReplay(records) { return [...records].sort((left, right) => { const priorityDiff = reliableOutboxPriority(left) - reliableOutboxPriority(right); if (priorityDiff !== 0) return priorityDiff; return recordCreatedMs(left) - recordCreatedMs(right); }); } function compactReliableOutboxRecords(records) { const active = records.filter((record) => record && record.id && record.status !== "sent"); const completionTaskKeys = new Set( active.map(taskCompletionCoalescingKey).filter(Boolean), ); const progressByTask = new Map(); const retained = []; for (const record of active) { const progressKey = taskProgressCoalescingKey(record); if (!progressKey) { retained.push(record); continue; } if (completionTaskKeys.has(progressKey)) { continue; } const previous = progressByTask.get(progressKey); if (!previous || recordCreatedMs(record) >= recordCreatedMs(previous)) { progressByTask.set(progressKey, record); } } const keyedAppLogs = new Map(); const unkeyedAppLogs = []; for (const record of retained.filter((item) => item.kind === "app.log")) { const appLogKey = appLogCoalescingKey(record); if (!appLogKey) { unkeyedAppLogs.push(record); continue; } const previous = keyedAppLogs.get(appLogKey); if (!previous || recordCreatedMs(record) >= recordCreatedMs(previous)) { keyedAppLogs.set(appLogKey, record); } } const appLogs = [...keyedAppLogs.values(), ...unkeyedAppLogs] .sort((left, right) => recordCreatedMs(right) - recordCreatedMs(left)) .slice(0, MAX_APP_LOG_RECORDS); const compacted = [ ...retained.filter((record) => record.kind !== "app.log"), ...progressByTask.values(), ...appLogs, ]; if (compacted.length <= MAX_OUTBOX_RECORDS) { return compacted.sort((left, right) => recordCreatedMs(left) - recordCreatedMs(right)); } const taskCompletions = compacted.filter((record) => record.kind === "task.complete"); const remainingBudget = Math.max(0, MAX_OUTBOX_RECORDS - taskCompletions.length); const otherRecords = compacted .filter((record) => record.kind !== "task.complete") .sort((left, right) => { const priorityDiff = reliableOutboxPriority(left) - reliableOutboxPriority(right); if (priorityDiff !== 0) return priorityDiff; return recordCreatedMs(right) - recordCreatedMs(left); }) .slice(0, remainingBudget); return [...taskCompletions, ...otherRecords].sort( (left, right) => recordCreatedMs(left) - recordCreatedMs(right), ); } function normalizeTimeoutMs(value, fallback = 5_000) { const numeric = Number(value); if (!Number.isFinite(numeric) || numeric <= 0) { return fallback; } return Math.max(50, Math.min(60_000, Math.round(numeric))); } function normalizeDurationBudgetMs(value) { const numeric = Number(value); if (!Number.isFinite(numeric) || numeric <= 0) { return 0; } return Math.max(50, Math.min(60_000, Math.round(numeric))); } function nowIso() { return new Date().toISOString(); } function randomId(prefix) { return `${prefix}-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 10)}`; } export function resolveReliableOutboxPath(config = {}) { if (config.reliableOutboxPath) { return String(config.reliableOutboxPath); } return join(os.homedir(), ".boss-agent", `${config.deviceId || "device"}-outbox.json`); } async function readOutboxRecords(outboxPath) { try { const parsed = JSON.parse(await readFile(outboxPath, "utf8")); return Array.isArray(parsed?.records) ? parsed.records : []; } catch { return []; } } async function writeOutboxRecords(outboxPath, records) { await mkdir(dirname(outboxPath), { recursive: true }); const compacted = compactReliableOutboxRecords(records); const tmpPath = `${outboxPath}.${process.pid}.${Date.now()}.tmp`; await writeFile( tmpPath, JSON.stringify({ version: 1, updatedAt: nowIso(), records: compacted }, null, 2), ); await rename(tmpPath, outboxPath); } async function mutateOutboxRecords(outboxPath, mutator) { const run = async () => { const records = await readOutboxRecords(outboxPath); const result = await mutator(records); await writeOutboxRecords(outboxPath, result.records); return result.value; }; const previous = outboxWriteQueues.get(outboxPath) || Promise.resolve(); const next = previous.then(run, run); outboxWriteQueues.set(outboxPath, next.catch(() => null)); return await next; } export async function appendReliableOutboxRecord(outboxPath, input) { const record = { id: input.id || randomId(input.kind || "outbox"), kind: input.kind, url: input.url, method: input.method || "POST", headers: input.headers || {}, body: input.body, requestTimeoutMs: input.requestTimeoutMs, status: "pending", attemptCount: 0, createdAt: nowIso(), lastAttemptAt: undefined, lastError: undefined, }; return await mutateOutboxRecords(outboxPath, async (records) => ({ records: [...records, record], value: record, })); } export async function markReliableOutboxRecordSent(outboxPath, recordId) { await mutateOutboxRecords(outboxPath, async (records) => ({ records: records.filter((record) => record.id !== recordId), value: undefined, })); } function shouldRetry(status) { if (!status) return true; return RETRYABLE_STATUS_CODES.has(status); } async function updateReliableOutboxRecordFailure(outboxPath, recordId, detail) { await mutateOutboxRecords(outboxPath, async (records) => ({ records: records.map((record) => { if (record.id !== recordId) return record; return { ...record, attemptCount: Number(record.attemptCount || 0) + 1, lastAttemptAt: nowIso(), lastError: String(detail || "OUTBOX_SEND_FAILED").slice(0, 240), }; }), value: undefined, })); } export async function sendReliableOutboxRecord(record, options = {}) { const timeoutMs = normalizeTimeoutMs(options.requestTimeoutMs ?? record.requestTimeoutMs); const controller = new AbortController(); const timeout = setTimeout(() => { controller.abort(new Error("RELIABLE_OUTBOX_SEND_TIMEOUT")); }, timeoutMs); try { const response = await fetch(record.url, { method: record.method || "POST", headers: record.headers || {}, body: typeof record.body === "string" ? record.body : JSON.stringify(record.body ?? {}), signal: controller.signal, }); const body = await response.text(); return { ok: response.ok, retryable: !response.ok && shouldRetry(response.status), status: response.status, body, }; } catch (error) { if (controller.signal.aborted) { throw new Error("RELIABLE_OUTBOX_SEND_TIMEOUT"); } throw error; } finally { clearTimeout(timeout); } } export async function postThroughReliableOutbox(config, recordInput) { if (config.reliableOutboxEnabled === false) { return await sendReliableOutboxRecord({ ...recordInput, id: recordInput.id || randomId(recordInput.kind || "direct"), requestTimeoutMs: recordInput.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs, }); } const outboxPath = resolveReliableOutboxPath(config); const record = await appendReliableOutboxRecord(outboxPath, { ...recordInput, requestTimeoutMs: recordInput.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs, }); try { const result = await sendReliableOutboxRecord(record, { requestTimeoutMs: record.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs, }); if (result.ok || !result.retryable) { await markReliableOutboxRecordSent(outboxPath, record.id); } else { await updateReliableOutboxRecordFailure(outboxPath, record.id, result.body); } return result; } catch (error) { await updateReliableOutboxRecordFailure( outboxPath, record.id, error instanceof Error ? error.message : String(error), ); return { ok: false, retryable: true, status: 0, body: error instanceof Error ? error.message : String(error), }; } } export async function replayReliableOutbox(config, options = {}) { if (config.reliableOutboxEnabled === false) { return { attempted: 0, sent: 0, retained: 0 }; } const outboxPath = resolveReliableOutboxPath(config); const records = (await readOutboxRecords(outboxPath)).filter( (record) => record?.status !== "sent", ); const limit = Math.max(1, Math.min(Number(options.limit || 50), 100)); const startedAt = Date.now(); const maxDurationMs = normalizeDurationBudgetMs(options.maxDurationMs ?? config.reliableOutboxReplayBudgetMs); const requestTimeoutMs = options.requestTimeoutMs ?? config.reliableOutboxRequestTimeoutMs; let attempted = 0; let sent = 0; let stoppedByBudget = false; for (const record of orderReliableOutboxRecordsForReplay(records).slice(0, limit)) { if (maxDurationMs > 0 && Date.now() - startedAt >= maxDurationMs) { stoppedByBudget = true; break; } attempted += 1; try { const result = await sendReliableOutboxRecord(record, { requestTimeoutMs: record.requestTimeoutMs ?? requestTimeoutMs, }); if (result.ok || !result.retryable) { await markReliableOutboxRecordSent(outboxPath, record.id); sent += 1; } else { await updateReliableOutboxRecordFailure(outboxPath, record.id, result.body); } } catch (error) { await updateReliableOutboxRecordFailure( outboxPath, record.id, error instanceof Error ? error.message : String(error), ); } } const retained = (await readOutboxRecords(outboxPath)).length; return { attempted, sent, retained, stoppedByBudget }; }