Files
boss/local-agent/codex-thread-rollout-writer.mjs

280 lines
7.2 KiB
JavaScript

import os from "node:os";
import { appendFile, open, readdir } from "node:fs/promises";
import { DatabaseSync } from "node:sqlite";
import { resolve } from "node:path";
const MAX_ROLLOUT_TAIL_BYTES = 256 * 1024;
function trimToDefined(value) {
const trimmed = String(value ?? "").trim();
return trimmed ? trimmed : undefined;
}
function defaultCodexPath(relativePath) {
return resolve(os.homedir(), ".codex", relativePath);
}
function resolveThreadRolloutPath({ stateDbPath, targetThreadRef }) {
const resolvedStateDbPath = trimToDefined(stateDbPath || defaultCodexPath("state_5.sqlite"));
if (!resolvedStateDbPath) {
throw new Error("CODEX_STATE_DB_MISSING");
}
const db = new DatabaseSync(resolvedStateDbPath, { readonly: true });
try {
const row = db
.prepare("SELECT rollout_path, archived FROM threads WHERE id = ? LIMIT 1")
.get(targetThreadRef);
if (!row) {
throw new Error("CODEX_THREAD_NOT_FOUND");
}
if (row.archived) {
throw new Error("CODEX_THREAD_ARCHIVED");
}
const rolloutPath = trimToDefined(row.rollout_path);
if (!rolloutPath) {
throw new Error("CODEX_ROLLOUT_PATH_MISSING");
}
return resolve(rolloutPath);
} finally {
db.close();
}
}
function defaultCodexSessionsDir() {
return defaultCodexPath("sessions");
}
async function findRolloutPathInSessionsDir({ sessionsDir, targetThreadRef }) {
const root = trimToDefined(sessionsDir || defaultCodexSessionsDir());
if (!root) {
throw new Error("CODEX_SESSIONS_DIR_MISSING");
}
const stack = [resolve(root)];
const suffix = `-${targetThreadRef}.jsonl`;
while (stack.length > 0) {
const current = stack.pop();
if (!current) {
continue;
}
let entries = [];
try {
entries = await readdir(current, { withFileTypes: true });
} catch {
continue;
}
for (const entry of entries) {
const entryPath = resolve(current, entry.name);
if (entry.isDirectory()) {
stack.push(entryPath);
continue;
}
if (entry.isFile() && entry.name.endsWith(suffix)) {
return entryPath;
}
}
}
throw new Error("CODEX_ROLLOUT_PATH_FALLBACK_NOT_FOUND");
}
async function resolveThreadRolloutPathWithFallback({ stateDbPath, sessionsDir, targetThreadRef }) {
try {
return resolveThreadRolloutPath({ stateDbPath, targetThreadRef });
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (message === "CODEX_THREAD_ARCHIVED") {
throw error;
}
}
return findRolloutPathInSessionsDir({
sessionsDir,
targetThreadRef,
});
}
function resolveThreadTouchTimestamps(sentAt) {
const parsed = Date.parse(sentAt);
const updatedAtMs = Number.isFinite(parsed) ? parsed : Date.now();
return {
updatedAtMs,
updatedAt: Math.floor(updatedAtMs / 1000),
};
}
function touchThreadActivity({ stateDbPath, targetThreadRef, sentAt }) {
const resolvedStateDbPath = trimToDefined(stateDbPath || defaultCodexPath("state_5.sqlite"));
if (!resolvedStateDbPath) {
throw new Error("CODEX_STATE_DB_MISSING");
}
const { updatedAt, updatedAtMs } = resolveThreadTouchTimestamps(sentAt);
const db = new DatabaseSync(resolvedStateDbPath);
try {
try {
const result = db.prepare(
`
UPDATE threads
SET updated_at = ?,
updated_at_ms = ?,
has_user_event = 1
WHERE id = ?
`,
).run(updatedAt, updatedAtMs, targetThreadRef);
if (Number(result?.changes ?? 0) <= 0) {
return {
status: "skipped",
reason: "thread-not-found",
};
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!message.includes("updated_at_ms")) {
throw error;
}
const result = db.prepare(
`
UPDATE threads
SET updated_at = ?,
has_user_event = 1
WHERE id = ?
`,
).run(updatedAt, targetThreadRef);
if (Number(result?.changes ?? 0) <= 0) {
return {
status: "skipped",
reason: "thread-not-found",
};
}
}
return {
status: "updated",
updatedAt,
updatedAtMs,
};
} finally {
db.close();
}
}
async function readRolloutTail(rolloutPath) {
let handle;
try {
handle = await open(rolloutPath, "r");
const stats = await handle.stat();
const start = Math.max(0, stats.size - MAX_ROLLOUT_TAIL_BYTES);
const length = Math.max(0, stats.size - start);
if (length === 0) {
return "";
}
const buffer = Buffer.alloc(length);
await handle.read(buffer, 0, length, start);
let text = buffer.toString("utf8");
if (start > 0) {
const firstNewline = text.indexOf("\n");
text = firstNewline >= 0 ? text.slice(firstNewline + 1) : "";
}
return text;
} finally {
await handle?.close().catch(() => {});
}
}
async function rolloutAlreadyHasSourceMessage(rolloutPath, sourceMessageId) {
const tail = await readRolloutTail(rolloutPath);
if (!tail.trim()) {
return false;
}
for (const line of tail.split(/\r?\n/)) {
if (!line.trim()) {
continue;
}
try {
const parsed = JSON.parse(line);
if (parsed?.payload?.metadata?.bossSourceMessageId === sourceMessageId) {
return true;
}
} catch {
continue;
}
}
return false;
}
export async function appendBossUserMessageToCodexThreadRollout(params) {
const targetThreadRef = trimToDefined(params?.targetThreadRef);
const sourceMessageId = trimToDefined(params?.sourceMessageId);
const message = trimToDefined(params?.message);
const sentAt = trimToDefined(params?.sentAt) ?? new Date().toISOString();
if (!targetThreadRef) {
throw new Error("CODEX_THREAD_REF_REQUIRED");
}
if (!sourceMessageId) {
throw new Error("CODEX_SOURCE_MESSAGE_ID_REQUIRED");
}
if (!message) {
throw new Error("CODEX_SOURCE_MESSAGE_BODY_REQUIRED");
}
const rolloutPath = await resolveThreadRolloutPathWithFallback({
stateDbPath: params?.stateDbPath,
sessionsDir: params?.sessionsDir,
targetThreadRef,
});
if (await rolloutAlreadyHasSourceMessage(rolloutPath, sourceMessageId)) {
return {
status: "duplicate",
rolloutPath,
};
}
const responseItem = {
timestamp: sentAt,
type: "response_item",
payload: {
type: "message",
role: "user",
content: [
{
type: "input_text",
text: message,
},
],
},
};
const event = {
timestamp: sentAt,
type: "event_msg",
payload: {
type: "user_message",
message,
images: [],
local_images: [],
text_elements: [],
metadata: {
bossSourceMessageId: sourceMessageId,
bossMirroredFrom: "boss-app",
},
},
};
await appendFile(rolloutPath, `${JSON.stringify(responseItem)}\n${JSON.stringify(event)}\n`, "utf8");
let threadTouch = { status: "skipped" };
try {
threadTouch = touchThreadActivity({
stateDbPath: params?.stateDbPath,
targetThreadRef,
sentAt,
});
} catch {
threadTouch = { status: "skipped" };
}
return {
status: "written",
rolloutPath,
threadTouch,
};
}