import { createHash } from "node:crypto"; import os from "node:os"; import { basename, dirname, resolve } from "node:path"; import { readFileSync } from "node:fs"; import { open, readFile, readdir } from "node:fs/promises"; import { DatabaseSync } from "node:sqlite"; import { Worker, isMainThread, parentPort, workerData } from "node:worker_threads"; const MAX_ROLLOUT_TAIL_BYTES = 768 * 1024; const MAX_RECENT_ASSISTANT_MESSAGES = 6; const ASSISTANT_DUPLICATE_TURN_WINDOW_MS = 2_000; const LEAKED_TITLE_PREFIXES = [ "你当前接手的项目根目录是", "你现在接手的项目根目录是", "你现在以目标线程身份直接回复用户", "你正在向主 Agent 同步当前项目状态", "只回复对用户真正有用的内容", "只输出 JSON", ]; const LEAKED_TITLE_CONTAINS = [ "不要发送内部字段", "不要自称主 Agent", "不要解释系统如何分发", "不要输出 JSON", "项目名称:", "线程名称:", "文件夹:", "同步原因:", "当前消息:", "用户当前消息:", ]; function toIsoFromUnixSeconds(value) { if (!Number.isFinite(value) || value <= 0) return null; return new Date(value * 1000).toISOString(); } function normalizeDisplayName(raw) { const source = typeof raw === "string" ? raw : ""; const firstLine = source .replace(/\u0000/g, "") .split(/\r?\n/) .map((line) => line.trim()) .find(Boolean); if (!firstLine) return ""; return firstLine.replace(/\s+/g, " ").trim(); } function trimWorkspacePrefix(value) { const normalized = normalizeDisplayName(value).replaceAll("\\", "/"); if (!normalized) { return ""; } return normalized .replace(/^\/Users\/[^/]+\/code\//i, "") .replace(/^\/home\/[^/]+\/code\//i, "") .replace(/^[A-Za-z]:\/Users\/[^/]+\/code\//i, ""); } function stripTrailingDisplayNameNoise(value) { return value.replace(/['"}\]]{2,}$/g, "").trimEnd(); } function looksLikeLeakedDisplayName(value) { const normalized = normalizeDisplayName(value); if (!normalized) { return false; } return ( LEAKED_TITLE_PREFIXES.some((marker) => normalized.startsWith(marker)) || LEAKED_TITLE_CONTAINS.some((marker) => normalized.includes(marker)) ); } function extractWorkspaceProjectName(value) { const normalized = normalizeDisplayName(value).replaceAll("\\", "/"); if (!normalized) { return ""; } const patterns = [ /\/Users\/[^/]+\/code\/([^/\s"'`,。;!?]+)/i, /\/home\/[^/]+\/code\/([^/\s"'`,。;!?]+)/i, /[A-Za-z]:\/Users\/[^/]+\/code\/([^/\s"'`,。;!?]+)/i, ]; for (const pattern of patterns) { const match = normalized.match(pattern); if (match?.[1]) { return match[1].split("/")[0]?.trim() ?? ""; } } return ""; } function pickDisplayNameFallback(candidates) { for (const candidate of candidates) { const extracted = extractWorkspaceProjectName(candidate); if (extracted && !looksLikeLeakedDisplayName(extracted)) { return extracted; } const normalized = stripTrailingDisplayNameNoise(trimWorkspacePrefix(candidate)); if (normalized && !looksLikeLeakedDisplayName(normalized)) { return normalized; } } return ""; } function sanitizeDisplayName(raw, fallback, options = {}) { const compact = stripTrailingDisplayNameNoise(trimWorkspacePrefix(raw)); if (compact && !looksLikeLeakedDisplayName(raw) && !looksLikeLeakedDisplayName(compact)) { return compact.length > 48 ? `${compact.slice(0, 45)}...` : compact; } const extractedProject = extractWorkspaceProjectName(raw); if (extractedProject && !looksLikeLeakedDisplayName(extractedProject)) { return extractedProject; } const safeFallback = pickDisplayNameFallback([ options.folderName, options.folderPath, fallback, ]); if (safeFallback) { return safeFallback; } return fallback; } function fallbackDisplayName(thread, folderName) { const suffix = thread.id.replace(/-/g, "").slice(0, 6); if (thread.agentNickname) { return thread.agentNickname; } if (thread.agentRole) { return `${thread.agentRole} · ${suffix}`; } return `${folderName} · ${suffix}`; } function trimToDefined(value) { const trimmed = typeof value === "string" ? value.trim() : ""; return trimmed ? trimmed : null; } function parseSandboxPolicyType(value) { const raw = trimToDefined(value); if (!raw) return null; try { const parsed = JSON.parse(raw); return trimToDefined(parsed?.type) ?? raw; } catch { return raw; } } function isReadOnlySandboxPolicy(value) { return parseSandboxPolicyType(value) === "read-only"; } function isPrimaryWorkspaceThread(thread) { return !trimToDefined(thread.agentRole) && !trimToDefined(thread.agentNickname); } function loadThreadWorkspaceHints(globalStatePath) { if (!globalStatePath) return new Map(); try { const parsed = JSON.parse(requireText(globalStatePath)); return new Map(Object.entries(parsed["thread-workspace-root-hints"] ?? {})); } catch { return new Map(); } } function loadSessionNames(sessionIndexPath) { if (!sessionIndexPath) return new Map(); const names = new Map(); try { const raw = requireText(sessionIndexPath); for (const line of raw.split(/\r?\n/)) { if (!line.trim()) continue; const parsed = JSON.parse(line); if (!parsed?.id) continue; const previous = names.get(parsed.id); if (!previous || String(parsed.updated_at ?? "") >= String(previous.updatedAt ?? "")) { names.set(parsed.id, { threadName: parsed.thread_name, updatedAt: parsed.updated_at, }); } } } catch { return names; } return names; } function loadRecentThreadActivity(logsDbPath) { if (!logsDbPath) return new Map(); try { const db = new DatabaseSync(logsDbPath, { readonly: true }); try { const rows = db .prepare("SELECT thread_id, MAX(ts) AS latest_ts FROM logs WHERE thread_id IS NOT NULL GROUP BY thread_id") .all(); return new Map( rows .filter((row) => typeof row.thread_id === "string" && Number.isFinite(row.latest_ts)) .map((row) => [row.thread_id, Number(row.latest_ts)]), ); } finally { db.close(); } } catch { return new Map(); } } function loadThreadsFromStateDb(stateDbPath) { if (!stateDbPath) return []; try { const db = new DatabaseSync(stateDbPath, { readonly: true }); try { return db .prepare( "SELECT id, cwd, updated_at, archived, title, sandbox_policy, agent_nickname, agent_role, rollout_path, source FROM threads WHERE archived = 0 ORDER BY updated_at DESC", ) .all() .map((row) => ({ id: String(row.id), cwd: String(row.cwd), updatedAtSeconds: Number(row.updated_at), archived: Boolean(row.archived), title: String(row.title ?? ""), sandboxPolicy: typeof row.sandbox_policy === "string" ? row.sandbox_policy : "", agentNickname: typeof row.agent_nickname === "string" ? row.agent_nickname : "", agentRole: typeof row.agent_role === "string" ? row.agent_role : "", rolloutPath: typeof row.rollout_path === "string" ? row.rollout_path : "", source: typeof row.source === "string" ? row.source : "", })); } finally { db.close(); } } catch { return []; } } function parseSessionMeta(line) { try { const parsed = JSON.parse(line); if (parsed?.type !== "session_meta" || !parsed?.payload?.id || !parsed?.payload?.cwd) { return null; } return { id: String(parsed.payload.id), cwd: String(parsed.payload.cwd), updatedAtSeconds: Math.floor(new Date(parsed.payload.timestamp ?? parsed.timestamp ?? Date.now()).getTime() / 1000), archived: false, title: "", agentNickname: typeof parsed.payload.agent_nickname === "string" ? parsed.payload.agent_nickname : "", agentRole: typeof parsed.payload.agent_role === "string" ? parsed.payload.agent_role : "", rolloutPath: "", source: "sessions", }; } catch { return null; } } function parseRolloutFilenameTimestampSeconds(fileName) { const match = fileName.match( /^rollout-(\d{4}-\d{2}-\d{2})T(\d{2})-(\d{2})-(\d{2})-/, ); if (!match) { return null; } const [, datePart, hour, minute, second] = match; const parsed = new Date(`${datePart}T${hour}:${minute}:${second}`); if (Number.isNaN(parsed.getTime())) { return null; } return Math.floor(parsed.getTime() / 1000); } function shouldReadSessionFile(fileName, cutoffSeconds) { if (!Number.isFinite(cutoffSeconds)) { return true; } const filenameSeconds = parseRolloutFilenameTimestampSeconds(fileName); if (filenameSeconds === null) { return true; } return filenameSeconds >= cutoffSeconds; } async function loadThreadsFromSessions(sessionsDir, options = {}) { if (!sessionsDir) return []; const pending = [resolve(sessionsDir)]; const threads = []; const cutoffSeconds = Number(options.cutoffSeconds); while (pending.length > 0) { const dir = pending.pop(); if (!dir) continue; let entries = []; try { entries = await readdir(dir, { withFileTypes: true }); } catch { continue; } for (const entry of entries) { const fullPath = `${dir}/${entry.name}`; if (entry.isDirectory()) { pending.push(fullPath); continue; } if (!entry.isFile() || !entry.name.endsWith(".jsonl")) continue; if (!shouldReadSessionFile(entry.name, cutoffSeconds)) continue; try { const raw = await readFile(fullPath, "utf8"); const firstLine = raw.split(/\r?\n/, 1)[0]; const parsed = parseSessionMeta(firstLine); if (parsed) threads.push({ ...parsed, rolloutPath: fullPath }); } catch { continue; } } } return threads; } function normalizeEventTimestamp(value) { if (typeof value !== "string" || !value.trim()) return null; const parsed = new Date(value); if (Number.isNaN(parsed.getTime())) return null; return parsed.toISOString(); } function buildAssistantMessageId(threadId, sentAt, body) { const digest = createHash("sha1").update(body).digest("hex").slice(0, 12); return `codex-thread:${threadId}:${sentAt}:${digest}`; } function assistantMessageTimeValue(value) { const parsed = Date.parse(value); return Number.isNaN(parsed) ? 0 : parsed; } function assistantPhasePriority(value) { const phase = normalizeAssistantMessagePhase(value); if (!phase) return 0; if (phase === "final_answer" || phase === "final" || phase === "answer") return 3; if (phase === "commentary" || phase === "process" || phase === "thinking") return 1; return 2; } function normalizeAssistantMessagePhase(value) { const phase = trimToDefined(value); return phase || undefined; } async function readRolloutTail(rolloutPath) { if (!rolloutPath) return ""; let handle; try { handle = await open(resolve(rolloutPath), "r"); const stats = await handle.stat(); const start = Math.max(0, stats.size - MAX_ROLLOUT_TAIL_BYTES); const length = Math.max(0, stats.size - start); if (length === 0) { return ""; } const buffer = Buffer.alloc(length); await handle.read(buffer, 0, length, start); let text = buffer.toString("utf8"); if (start > 0) { const firstNewline = text.indexOf("\n"); text = firstNewline >= 0 ? text.slice(firstNewline + 1) : ""; } return text; } catch { return ""; } finally { await handle?.close().catch(() => {}); } } function parseRecentAssistantMessage(line, threadId) { if (!line.trim()) return null; try { const parsed = JSON.parse(line); let body = ""; let phase; if (parsed?.type === "event_msg" && parsed?.payload?.type === "agent_message") { body = typeof parsed.payload.message === "string" ? parsed.payload.message.trim() : ""; phase = normalizeAssistantMessagePhase(parsed.payload.phase); } else if ( parsed?.type === "response_item" && parsed?.payload?.type === "message" && parsed?.payload?.role === "assistant" ) { const content = Array.isArray(parsed.payload.content) ? parsed.payload.content : []; body = content .map((item) => (typeof item?.text === "string" ? item.text.trim() : "")) .filter(Boolean) .join("\n") .trim(); phase = normalizeAssistantMessagePhase(parsed.payload.phase); } else { return null; } const sentAt = normalizeEventTimestamp(parsed.timestamp ?? parsed.payload.timestamp); if (!body || !sentAt) { return null; } const message = { messageId: buildAssistantMessageId(threadId, sentAt, body), body, sentAt, }; return phase ? { ...message, phase } : message; } catch { return null; } } function mergeRecentAssistantMessage(existing, incoming) { if (!existing) return incoming; const existingPriority = assistantPhasePriority(existing.phase); const incomingPriority = assistantPhasePriority(incoming.phase); if (!incoming.phase || existingPriority >= incomingPriority) return existing; return { ...existing, phase: incoming.phase, }; } function isDuplicateAssistantTurn(existing, incoming) { if (existing.body !== incoming.body) return false; const existingTime = assistantMessageTimeValue(existing.sentAt); const incomingTime = assistantMessageTimeValue(incoming.sentAt); if (!existingTime || !incomingTime) return false; return Math.abs(existingTime - incomingTime) <= ASSISTANT_DUPLICATE_TURN_WINDOW_MS; } function isGuiCodexThreadSource(value) { const source = trimToDefined(value); if (!source) return false; if (source === "cli" || source === "exec" || source === "sessions") { return false; } return true; } async function loadRecentAssistantMessages(rolloutPath, threadId) { const tail = await readRolloutTail(rolloutPath); if (!tail) return []; const messagesById = new Map(); for (const line of tail.split(/\r?\n/)) { const message = parseRecentAssistantMessage(line, threadId); if (!message) continue; const duplicateEntry = [...messagesById.entries()].find(([, existing]) => isDuplicateAssistantTurn(existing, message), ); if (duplicateEntry) { const [duplicateId, existing] = duplicateEntry; messagesById.set(duplicateId, mergeRecentAssistantMessage(existing, message)); continue; } messagesById.set(message.messageId, mergeRecentAssistantMessage(messagesById.get(message.messageId), message)); } return [...messagesById.values()] .sort((left, right) => left.sentAt.localeCompare(right.sentAt)) .slice(-MAX_RECENT_ASSISTANT_MESSAGES); } function requireText(filePath) { return readFileSync(resolve(filePath), "utf8"); } function resolveDefaultSessionsDir(options = {}) { if (options.sessionsDir) { return options.sessionsDir; } if (options.stateDbPath) { return resolve(dirname(resolve(options.stateDbPath)), "sessions"); } return resolve(os.homedir(), ".codex/sessions"); } export async function discoverCodexProjectCandidates(options = {}) { const now = options.now instanceof Date ? options.now : new Date(); const lookbackHours = Number.isFinite(options.lookbackHours) ? Number(options.lookbackHours) : 24; const cutoffSeconds = Math.floor(now.getTime() / 1000) - lookbackHours * 60 * 60; const sessionNames = loadSessionNames(options.sessionIndexPath ?? resolve(os.homedir(), ".codex/session_index.jsonl")); const workspaceHints = loadThreadWorkspaceHints( options.globalStatePath ?? resolve(os.homedir(), ".codex/.codex-global-state.json"), ); const latestLogByThread = loadRecentThreadActivity( options.logsDbPath ?? resolve(os.homedir(), ".codex/logs_1.sqlite"), ); const stateDbThreads = loadThreadsFromStateDb( options.stateDbPath ?? resolve(os.homedir(), ".codex/state_5.sqlite"), ); const sessionThreads = await loadThreadsFromSessions(resolveDefaultSessionsDir(options), { cutoffSeconds, }); const threads = [...stateDbThreads, ...sessionThreads]; const seenThreadIds = new Set(); const groupedCandidates = new Map(); let guiConnected = false; for (const thread of threads) { if (!thread?.id || seenThreadIds.has(thread.id)) continue; if (isReadOnlySandboxPolicy(thread.sandboxPolicy)) { continue; } const latestActivitySeconds = latestLogByThread.get(thread.id) ?? thread.updatedAtSeconds; if (!Number.isFinite(latestActivitySeconds) || latestActivitySeconds < cutoffSeconds) { continue; } seenThreadIds.add(thread.id); guiConnected = guiConnected || isGuiCodexThreadSource(thread.source); const hintedPath = workspaceHints.get(thread.id); const folderPath = resolve(hintedPath || thread.cwd || ""); const folderName = basename(folderPath); if (!folderName) continue; const sessionName = sessionNames.get(thread.id)?.threadName; const displayName = sanitizeDisplayName( sessionName, sanitizeDisplayName(thread.title, fallbackDisplayName(thread, folderName), { folderName, folderPath, }), { folderName, folderPath, }, ); const recentAssistantMessages = await loadRecentAssistantMessages(thread.rolloutPath, thread.id); const candidate = { folderName, folderRef: folderPath, threadId: thread.id, threadDisplayName: displayName, codexFolderRef: folderPath, codexThreadRef: thread.id, lastActiveAt: toIsoFromUnixSeconds(latestActivitySeconds) ?? now.toISOString(), suggestedImport: true, ...(recentAssistantMessages.length > 0 ? { recentAssistantMessages } : {}), }; const folderKey = folderPath || folderName; const bucket = groupedCandidates.get(folderKey) ?? []; bucket.push({ candidate, latestActivitySeconds, primary: isPrimaryWorkspaceThread(thread), }); groupedCandidates.set(folderKey, bucket); } const candidates = []; for (const entries of groupedCandidates.values()) { entries.sort((left, right) => right.latestActivitySeconds - left.latestActivitySeconds); const primaryEntries = entries.filter((entry) => entry.primary); const chosenEntries = primaryEntries.length > 0 ? primaryEntries : entries.slice(0, 1); candidates.push(...chosenEntries.map((entry) => entry.candidate)); } candidates.sort((a, b) => b.lastActiveAt.localeCompare(a.lastActiveAt)); const projects = [...new Set(candidates.map((candidate) => candidate.folderName))].sort((a, b) => a.localeCompare(b), ); return { projects, projectCandidates: candidates, guiConnected, }; } export async function discoverCodexProjectCandidatesInWorker(options = {}) { return await new Promise((resolvePromise, rejectPromise) => { const worker = new Worker(new URL(import.meta.url), { workerData: { kind: "boss_codex_discovery", options, }, }); const timeoutMs = Number(options.timeoutMs); const effectiveTimeoutMs = Number.isFinite(timeoutMs) && timeoutMs > 0 ? timeoutMs : 0; let settled = false; let timeout; const cleanup = () => { if (timeout) { clearTimeout(timeout); } }; const resolveOnce = (value) => { if (settled) { return; } settled = true; cleanup(); resolvePromise(value); }; const rejectOnce = (error) => { if (settled) { return; } settled = true; cleanup(); rejectPromise(error); }; if (effectiveTimeoutMs > 0) { timeout = setTimeout(() => { rejectOnce(new Error("DISCOVERY_WORKER_TIMEOUT")); worker.terminate().catch(() => null); }, effectiveTimeoutMs); } worker.once("message", (payload) => { if (payload?.ok) { resolveOnce(payload.result); return; } rejectOnce(new Error(payload?.error ?? "DISCOVERY_WORKER_FAILED")); }); worker.once("error", rejectOnce); worker.once("exit", (code) => { if (code === 0) { cleanup(); return; } rejectOnce(new Error(`DISCOVERY_WORKER_EXIT_${code}`)); }); }); } if (!isMainThread && workerData?.kind === "boss_codex_discovery") { try { const result = await discoverCodexProjectCandidates(workerData.options ?? {}); parentPort?.postMessage({ ok: true, result }); } catch (error) { parentPort?.postMessage({ ok: false, error: error instanceof Error ? error.message : String(error), }); } }