280 lines
7.2 KiB
JavaScript
280 lines
7.2 KiB
JavaScript
import os from "node:os";
|
|
import { appendFile, open, readdir } from "node:fs/promises";
|
|
import { DatabaseSync } from "node:sqlite";
|
|
import { resolve } from "node:path";
|
|
|
|
const MAX_ROLLOUT_TAIL_BYTES = 256 * 1024;
|
|
|
|
function trimToDefined(value) {
|
|
const trimmed = String(value ?? "").trim();
|
|
return trimmed ? trimmed : undefined;
|
|
}
|
|
|
|
function defaultCodexPath(relativePath) {
|
|
return resolve(os.homedir(), ".codex", relativePath);
|
|
}
|
|
|
|
function resolveThreadRolloutPath({ stateDbPath, targetThreadRef }) {
|
|
const resolvedStateDbPath = trimToDefined(stateDbPath || defaultCodexPath("state_5.sqlite"));
|
|
if (!resolvedStateDbPath) {
|
|
throw new Error("CODEX_STATE_DB_MISSING");
|
|
}
|
|
|
|
const db = new DatabaseSync(resolvedStateDbPath, { readonly: true });
|
|
try {
|
|
const row = db
|
|
.prepare("SELECT rollout_path, archived FROM threads WHERE id = ? LIMIT 1")
|
|
.get(targetThreadRef);
|
|
if (!row) {
|
|
throw new Error("CODEX_THREAD_NOT_FOUND");
|
|
}
|
|
if (row.archived) {
|
|
throw new Error("CODEX_THREAD_ARCHIVED");
|
|
}
|
|
const rolloutPath = trimToDefined(row.rollout_path);
|
|
if (!rolloutPath) {
|
|
throw new Error("CODEX_ROLLOUT_PATH_MISSING");
|
|
}
|
|
return resolve(rolloutPath);
|
|
} finally {
|
|
db.close();
|
|
}
|
|
}
|
|
|
|
function defaultCodexSessionsDir() {
|
|
return defaultCodexPath("sessions");
|
|
}
|
|
|
|
async function findRolloutPathInSessionsDir({ sessionsDir, targetThreadRef }) {
|
|
const root = trimToDefined(sessionsDir || defaultCodexSessionsDir());
|
|
if (!root) {
|
|
throw new Error("CODEX_SESSIONS_DIR_MISSING");
|
|
}
|
|
|
|
const stack = [resolve(root)];
|
|
const suffix = `-${targetThreadRef}.jsonl`;
|
|
while (stack.length > 0) {
|
|
const current = stack.pop();
|
|
if (!current) {
|
|
continue;
|
|
}
|
|
let entries = [];
|
|
try {
|
|
entries = await readdir(current, { withFileTypes: true });
|
|
} catch {
|
|
continue;
|
|
}
|
|
for (const entry of entries) {
|
|
const entryPath = resolve(current, entry.name);
|
|
if (entry.isDirectory()) {
|
|
stack.push(entryPath);
|
|
continue;
|
|
}
|
|
if (entry.isFile() && entry.name.endsWith(suffix)) {
|
|
return entryPath;
|
|
}
|
|
}
|
|
}
|
|
|
|
throw new Error("CODEX_ROLLOUT_PATH_FALLBACK_NOT_FOUND");
|
|
}
|
|
|
|
async function resolveThreadRolloutPathWithFallback({ stateDbPath, sessionsDir, targetThreadRef }) {
|
|
try {
|
|
return resolveThreadRolloutPath({ stateDbPath, targetThreadRef });
|
|
} catch (error) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
if (message === "CODEX_THREAD_ARCHIVED") {
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
return findRolloutPathInSessionsDir({
|
|
sessionsDir,
|
|
targetThreadRef,
|
|
});
|
|
}
|
|
|
|
function resolveThreadTouchTimestamps(sentAt) {
|
|
const parsed = Date.parse(sentAt);
|
|
const updatedAtMs = Number.isFinite(parsed) ? parsed : Date.now();
|
|
return {
|
|
updatedAtMs,
|
|
updatedAt: Math.floor(updatedAtMs / 1000),
|
|
};
|
|
}
|
|
|
|
function touchThreadActivity({ stateDbPath, targetThreadRef, sentAt }) {
|
|
const resolvedStateDbPath = trimToDefined(stateDbPath || defaultCodexPath("state_5.sqlite"));
|
|
if (!resolvedStateDbPath) {
|
|
throw new Error("CODEX_STATE_DB_MISSING");
|
|
}
|
|
|
|
const { updatedAt, updatedAtMs } = resolveThreadTouchTimestamps(sentAt);
|
|
const db = new DatabaseSync(resolvedStateDbPath);
|
|
try {
|
|
try {
|
|
const result = db.prepare(
|
|
`
|
|
UPDATE threads
|
|
SET updated_at = ?,
|
|
updated_at_ms = ?,
|
|
has_user_event = 1
|
|
WHERE id = ?
|
|
`,
|
|
).run(updatedAt, updatedAtMs, targetThreadRef);
|
|
if (Number(result?.changes ?? 0) <= 0) {
|
|
return {
|
|
status: "skipped",
|
|
reason: "thread-not-found",
|
|
};
|
|
}
|
|
} catch (error) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
if (!message.includes("updated_at_ms")) {
|
|
throw error;
|
|
}
|
|
const result = db.prepare(
|
|
`
|
|
UPDATE threads
|
|
SET updated_at = ?,
|
|
has_user_event = 1
|
|
WHERE id = ?
|
|
`,
|
|
).run(updatedAt, targetThreadRef);
|
|
if (Number(result?.changes ?? 0) <= 0) {
|
|
return {
|
|
status: "skipped",
|
|
reason: "thread-not-found",
|
|
};
|
|
}
|
|
}
|
|
return {
|
|
status: "updated",
|
|
updatedAt,
|
|
updatedAtMs,
|
|
};
|
|
} finally {
|
|
db.close();
|
|
}
|
|
}
|
|
|
|
async function readRolloutTail(rolloutPath) {
|
|
let handle;
|
|
try {
|
|
handle = await open(rolloutPath, "r");
|
|
const stats = await handle.stat();
|
|
const start = Math.max(0, stats.size - MAX_ROLLOUT_TAIL_BYTES);
|
|
const length = Math.max(0, stats.size - start);
|
|
if (length === 0) {
|
|
return "";
|
|
}
|
|
const buffer = Buffer.alloc(length);
|
|
await handle.read(buffer, 0, length, start);
|
|
let text = buffer.toString("utf8");
|
|
if (start > 0) {
|
|
const firstNewline = text.indexOf("\n");
|
|
text = firstNewline >= 0 ? text.slice(firstNewline + 1) : "";
|
|
}
|
|
return text;
|
|
} finally {
|
|
await handle?.close().catch(() => {});
|
|
}
|
|
}
|
|
|
|
async function rolloutAlreadyHasSourceMessage(rolloutPath, sourceMessageId) {
|
|
const tail = await readRolloutTail(rolloutPath);
|
|
if (!tail.trim()) {
|
|
return false;
|
|
}
|
|
for (const line of tail.split(/\r?\n/)) {
|
|
if (!line.trim()) {
|
|
continue;
|
|
}
|
|
try {
|
|
const parsed = JSON.parse(line);
|
|
if (parsed?.payload?.metadata?.bossSourceMessageId === sourceMessageId) {
|
|
return true;
|
|
}
|
|
} catch {
|
|
continue;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
export async function appendBossUserMessageToCodexThreadRollout(params) {
|
|
const targetThreadRef = trimToDefined(params?.targetThreadRef);
|
|
const sourceMessageId = trimToDefined(params?.sourceMessageId);
|
|
const message = trimToDefined(params?.message);
|
|
const sentAt = trimToDefined(params?.sentAt) ?? new Date().toISOString();
|
|
|
|
if (!targetThreadRef) {
|
|
throw new Error("CODEX_THREAD_REF_REQUIRED");
|
|
}
|
|
if (!sourceMessageId) {
|
|
throw new Error("CODEX_SOURCE_MESSAGE_ID_REQUIRED");
|
|
}
|
|
if (!message) {
|
|
throw new Error("CODEX_SOURCE_MESSAGE_BODY_REQUIRED");
|
|
}
|
|
|
|
const rolloutPath = await resolveThreadRolloutPathWithFallback({
|
|
stateDbPath: params?.stateDbPath,
|
|
sessionsDir: params?.sessionsDir,
|
|
targetThreadRef,
|
|
});
|
|
if (await rolloutAlreadyHasSourceMessage(rolloutPath, sourceMessageId)) {
|
|
return {
|
|
status: "duplicate",
|
|
rolloutPath,
|
|
};
|
|
}
|
|
|
|
const responseItem = {
|
|
timestamp: sentAt,
|
|
type: "response_item",
|
|
payload: {
|
|
type: "message",
|
|
role: "user",
|
|
content: [
|
|
{
|
|
type: "input_text",
|
|
text: message,
|
|
},
|
|
],
|
|
},
|
|
};
|
|
const event = {
|
|
timestamp: sentAt,
|
|
type: "event_msg",
|
|
payload: {
|
|
type: "user_message",
|
|
message,
|
|
images: [],
|
|
local_images: [],
|
|
text_elements: [],
|
|
metadata: {
|
|
bossSourceMessageId: sourceMessageId,
|
|
bossMirroredFrom: "boss-app",
|
|
},
|
|
},
|
|
};
|
|
await appendFile(rolloutPath, `${JSON.stringify(responseItem)}\n${JSON.stringify(event)}\n`, "utf8");
|
|
let threadTouch = { status: "skipped" };
|
|
try {
|
|
threadTouch = touchThreadActivity({
|
|
stateDbPath: params?.stateDbPath,
|
|
targetThreadRef,
|
|
sentAt,
|
|
});
|
|
} catch {
|
|
threadTouch = { status: "skipped" };
|
|
}
|
|
return {
|
|
status: "written",
|
|
rolloutPath,
|
|
threadTouch,
|
|
};
|
|
}
|