feat: auto-sync bound codex threads into conversations

This commit is contained in:
kris
2026-03-30 13:01:37 +08:00
parent 9c15c30a41
commit 98dd0e3cd5
9 changed files with 755 additions and 82 deletions

View File

@@ -0,0 +1,236 @@
import os from "node:os";
import { basename, resolve } from "node:path";
import { readFileSync } from "node:fs";
import { readFile, readdir } from "node:fs/promises";
import { DatabaseSync } from "node:sqlite";
function toIsoFromUnixSeconds(value) {
if (!Number.isFinite(value) || value <= 0) return null;
return new Date(value * 1000).toISOString();
}
function sanitizeDisplayName(raw, fallback) {
const source = typeof raw === "string" ? raw : "";
const firstLine = source
.replace(/\u0000/g, "")
.split(/\r?\n/)
.map((line) => line.trim())
.find(Boolean);
if (!firstLine) return fallback;
const compact = firstLine.replace(/\s+/g, " ").trim();
if (!compact) return fallback;
return compact.length > 48 ? `${compact.slice(0, 45)}...` : compact;
}
function fallbackDisplayName(thread, folderName) {
const suffix = thread.id.replace(/-/g, "").slice(0, 6);
if (thread.agentNickname) {
return thread.agentNickname;
}
if (thread.agentRole) {
return `${thread.agentRole} · ${suffix}`;
}
return `${folderName} · ${suffix}`;
}
function loadThreadWorkspaceHints(globalStatePath) {
if (!globalStatePath) return new Map();
try {
const parsed = JSON.parse(requireText(globalStatePath));
return new Map(Object.entries(parsed["thread-workspace-root-hints"] ?? {}));
} catch {
return new Map();
}
}
function loadSessionNames(sessionIndexPath) {
if (!sessionIndexPath) return new Map();
const names = new Map();
try {
const raw = requireText(sessionIndexPath);
for (const line of raw.split(/\r?\n/)) {
if (!line.trim()) continue;
const parsed = JSON.parse(line);
if (!parsed?.id) continue;
const previous = names.get(parsed.id);
if (!previous || String(parsed.updated_at ?? "") >= String(previous.updatedAt ?? "")) {
names.set(parsed.id, {
threadName: parsed.thread_name,
updatedAt: parsed.updated_at,
});
}
}
} catch {
return names;
}
return names;
}
function loadRecentThreadActivity(logsDbPath) {
if (!logsDbPath) return new Map();
try {
const db = new DatabaseSync(logsDbPath, { readonly: true });
try {
const rows = db
.prepare("SELECT thread_id, MAX(ts) AS latest_ts FROM logs WHERE thread_id IS NOT NULL GROUP BY thread_id")
.all();
return new Map(
rows
.filter((row) => typeof row.thread_id === "string" && Number.isFinite(row.latest_ts))
.map((row) => [row.thread_id, Number(row.latest_ts)]),
);
} finally {
db.close();
}
} catch {
return new Map();
}
}
function loadThreadsFromStateDb(stateDbPath) {
if (!stateDbPath) return [];
try {
const db = new DatabaseSync(stateDbPath, { readonly: true });
try {
return db
.prepare(
"SELECT id, cwd, updated_at, archived, title, agent_nickname, agent_role FROM threads WHERE archived = 0 ORDER BY updated_at DESC",
)
.all()
.map((row) => ({
id: String(row.id),
cwd: String(row.cwd),
updatedAtSeconds: Number(row.updated_at),
archived: Boolean(row.archived),
title: String(row.title ?? ""),
agentNickname: typeof row.agent_nickname === "string" ? row.agent_nickname : "",
agentRole: typeof row.agent_role === "string" ? row.agent_role : "",
}));
} finally {
db.close();
}
} catch {
return [];
}
}
function parseSessionMeta(line) {
try {
const parsed = JSON.parse(line);
if (parsed?.type !== "session_meta" || !parsed?.payload?.id || !parsed?.payload?.cwd) {
return null;
}
return {
id: String(parsed.payload.id),
cwd: String(parsed.payload.cwd),
updatedAtSeconds: Math.floor(new Date(parsed.payload.timestamp ?? parsed.timestamp ?? Date.now()).getTime() / 1000),
archived: false,
title: "",
agentNickname: typeof parsed.payload.agent_nickname === "string" ? parsed.payload.agent_nickname : "",
agentRole: typeof parsed.payload.agent_role === "string" ? parsed.payload.agent_role : "",
};
} catch {
return null;
}
}
async function loadThreadsFromSessions(sessionsDir) {
if (!sessionsDir) return [];
const pending = [resolve(sessionsDir)];
const threads = [];
while (pending.length > 0) {
const dir = pending.pop();
if (!dir) continue;
let entries = [];
try {
entries = await readdir(dir, { withFileTypes: true });
} catch {
continue;
}
for (const entry of entries) {
const fullPath = `${dir}/${entry.name}`;
if (entry.isDirectory()) {
pending.push(fullPath);
continue;
}
if (!entry.isFile() || !entry.name.endsWith(".jsonl")) continue;
try {
const raw = await readFile(fullPath, "utf8");
const firstLine = raw.split(/\r?\n/, 1)[0];
const parsed = parseSessionMeta(firstLine);
if (parsed) threads.push(parsed);
} catch {
continue;
}
}
}
return threads;
}
function requireText(filePath) {
return readFileSync(resolve(filePath), "utf8");
}
export async function discoverCodexProjectCandidates(options = {}) {
const now = options.now instanceof Date ? options.now : new Date();
const lookbackHours = Number.isFinite(options.lookbackHours) ? Number(options.lookbackHours) : 24;
const cutoffSeconds = Math.floor(now.getTime() / 1000) - lookbackHours * 60 * 60;
const sessionNames = loadSessionNames(options.sessionIndexPath ?? resolve(os.homedir(), ".codex/session_index.jsonl"));
const workspaceHints = loadThreadWorkspaceHints(
options.globalStatePath ?? resolve(os.homedir(), ".codex/.codex-global-state.json"),
);
const latestLogByThread = loadRecentThreadActivity(
options.logsDbPath ?? resolve(os.homedir(), ".codex/logs_1.sqlite"),
);
let threads = loadThreadsFromStateDb(
options.stateDbPath ?? resolve(os.homedir(), ".codex/state_5.sqlite"),
);
if (threads.length === 0) {
threads = await loadThreadsFromSessions(
options.sessionsDir ?? resolve(os.homedir(), ".codex/sessions"),
);
}
const seenThreadIds = new Set();
const candidates = [];
for (const thread of threads) {
if (!thread?.id || seenThreadIds.has(thread.id)) continue;
const latestActivitySeconds = latestLogByThread.get(thread.id) ?? thread.updatedAtSeconds;
if (!Number.isFinite(latestActivitySeconds) || latestActivitySeconds < cutoffSeconds) {
continue;
}
seenThreadIds.add(thread.id);
const hintedPath = workspaceHints.get(thread.id);
const folderPath = resolve(hintedPath || thread.cwd || "");
const folderName = basename(folderPath);
if (!folderName) continue;
const sessionName = sessionNames.get(thread.id)?.threadName;
const displayName = sanitizeDisplayName(
sessionName,
sanitizeDisplayName(thread.title, fallbackDisplayName(thread, folderName)),
);
candidates.push({
folderName,
folderRef: folderPath,
threadId: thread.id,
threadDisplayName: displayName,
codexFolderRef: folderPath,
codexThreadRef: thread.id,
lastActiveAt: toIsoFromUnixSeconds(latestActivitySeconds) ?? now.toISOString(),
suggestedImport: true,
});
}
candidates.sort((a, b) => b.lastActiveAt.localeCompare(a.lastActiveAt));
const projects = [...new Set(candidates.map((candidate) => candidate.folderName))].sort((a, b) =>
a.localeCompare(b),
);
return {
projects,
projectCandidates: candidates,
};
}

View File

@@ -5,13 +5,64 @@ import { createServer } from "node:http";
import { access, readFile, readdir, rm } from "node:fs/promises";
import os from "node:os";
import { join, resolve } from "node:path";
import { discoverCodexProjectCandidates } from "./codex-session-discovery.mjs";
async function loadConfig(configPath) {
const raw = await readFile(resolve(configPath), "utf8");
return JSON.parse(raw);
}
async function postHeartbeat(config, runtime) {
async function resolveHeartbeatProjects(config, runtime) {
const staticProjects = Array.isArray(config.projects) ? config.projects : [];
const staticCandidates = Array.isArray(config.projectCandidates) ? config.projectCandidates : [];
if (config.codexSessionDiscoveryEnabled === false) {
return {
projects: staticProjects,
projectCandidates: staticCandidates,
};
}
try {
const discovered = await discoverCodexProjectCandidates({
stateDbPath: config.codexStateDbPath,
logsDbPath: config.codexLogsDbPath,
sessionIndexPath: config.codexSessionIndexPath,
globalStatePath: config.codexGlobalStatePath,
sessionsDir: config.codexSessionsDir,
lookbackHours: config.codexSessionLookbackHours,
});
const candidateMap = new Map();
for (const candidate of [...staticCandidates, ...discovered.projectCandidates]) {
candidateMap.set(candidate.codexThreadRef ?? candidate.threadId, candidate);
}
const mergedCandidates = [...candidateMap.values()];
const mergedProjects = [...new Set([...staticProjects, ...discovered.projects, ...mergedCandidates.map((candidate) => candidate.folderName)])];
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = true;
runtime.lastProjectDiscoverySummary = `${mergedCandidates.length} threads / ${mergedProjects.length} folders`;
return {
projects: mergedProjects,
projectCandidates: mergedCandidates,
};
} catch (error) {
runtime.lastProjectDiscoveryAt = new Date().toISOString();
runtime.lastProjectDiscoveryOk = false;
runtime.lastProjectDiscoverySummary = error instanceof Error ? error.message : String(error);
await postAppLog(config, runtime, {
level: "error",
category: "local_agent.codex_discovery_failed",
message: "Codex 线程扫描失败,已退回静态项目配置。",
detail: runtime.lastProjectDiscoverySummary,
mirrorToMaster: true,
});
return {
projects: staticProjects,
projectCandidates: staticCandidates,
};
}
}
async function postHeartbeat(config, runtime, heartbeatProjects) {
const response = await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/device-heartbeat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
@@ -25,8 +76,8 @@ async function postHeartbeat(config, runtime) {
status: config.status,
quota5h: config.quota5h,
quota7d: config.quota7d,
projects: config.projects,
projectCandidates: config.projectCandidates,
projects: heartbeatProjects.projects,
projectCandidates: heartbeatProjects.projectCandidates,
endpoint: config.endpoint,
}),
});
@@ -454,11 +505,15 @@ const runtime = {
masterTaskBusy: false,
activeMasterTask: null,
lastMasterTaskPoll: null,
lastProjectDiscoveryAt: null,
lastProjectDiscoveryOk: false,
lastProjectDiscoverySummary: null,
};
async function heartbeat() {
try {
const result = await postHeartbeat(config, runtime);
const heartbeatProjects = await resolveHeartbeatProjects(config, runtime);
const result = await postHeartbeat(config, runtime, heartbeatProjects);
runtime.lastHeartbeatAt = new Date().toISOString();
runtime.lastHeartbeatOk = result.ok;
runtime.lastHeartbeatStatus = result.status;