From 98dd0e3cd5ecd13351702a87f5898f7fcc106035 Mon Sep 17 00:00:00 2001 From: kris Date: Mon, 30 Mar 2026 13:01:37 +0800 Subject: [PATCH] feat: auto-sync bound codex threads into conversations --- README.md | 2 + .../api_and_service_inventory_cn.md | 6 + .../current_runtime_and_deploy_status_cn.md | 7 +- ...velopment_subtasks_and_delivery_plan_cn.md | 17 ++ local-agent/codex-session-discovery.mjs | 236 +++++++++++++++++ local-agent/server.mjs | 63 ++++- src/lib/boss-data.ts | 243 ++++++++++++------ tests/device-import-draft.test.ts | 67 +++++ tests/local-agent-codex-discovery.test.mjs | 196 ++++++++++++++ 9 files changed, 755 insertions(+), 82 deletions(-) create mode 100644 local-agent/codex-session-discovery.mjs create mode 100644 tests/local-agent-codex-discovery.test.mjs diff --git a/README.md b/README.md index 79811d0..ac20789 100644 --- a/README.md +++ b/README.md @@ -277,6 +277,8 @@ npm run aab:release - 我的页新增 `AI 账号` 入口,`/me/ai-accounts` 会展示主 GPT / 备用 GPT / API 容灾,并明确主链路优先走已登录 `ChatGPT Plus / Codex` 的 `Master Codex Node` - API 容灾当前不走服务器预置 Key,而是由用户在 APP 的 `我的 > AI 账号` 中自行配置 `OpenAI API` 账号 - 设备页当前只展示已接入生产链路的设备,历史演示脏数据已经从正式设备视图、运维视图和审计视图中剔除 +- 本机 `local-agent` 现在会直接从 `~/.codex/state_5.sqlite / logs_1.sqlite / session_index.jsonl / .codex-global-state.json` 动态发现真实 Codex 线程,并在 heartbeat 里上报 `projectCandidates` +- 对已经绑定的生产设备,服务端现在会在 heartbeat 时自动选中建议导入项、生成导入决议并直接应用;因此会话页会自动出现这台设备当前真实运行的 Codex 线程窗口 - 登录页当前已临时切到免验证模式,点击“登录”会直接进入会话首页 - 认证现在已经有最小会话链路:登录后会写入 `boss_session` Cookie,默认保持 30 天,`会话 / 设备 / 我的 / 线程` 页面以及主要 `/api/v1/*` 接口都要求有效会话 - 新增 `GET /api/auth/session`、`POST /api/auth/logout` 与 `POST /api/auth/restore` diff --git a/docs/architecture/api_and_service_inventory_cn.md b/docs/architecture/api_and_service_inventory_cn.md index 4e02615..245a920 100644 --- a/docs/architecture/api_and_service_inventory_cn.md +++ b/docs/architecture/api_and_service_inventory_cn.md @@ -574,6 +574,9 @@ - 重复 apply 同一份 resolution 不会再重复创建线程会话 - 当前保护: - 仅 `highest_admin` 或设备所属账号可写 +- 当前补充: + - 已绑定的生产设备如果在 heartbeat 中携带真实 `projectCandidates[]`,服务端会自动完成 `select + review + apply` + - 新设备仍保持人工勾选导入流程,不会被自动跳过 #### `GET /api/v1/devices/[deviceId]/skills` @@ -721,6 +724,9 @@ - 用途:手动触发一次心跳 - 当前行为:除了设备心跳,还会顺带触发 `thread-context` 上报和 Skill 同步 +- 当前补充: + - `local-agent` 会优先从 `~/.codex/state_5.sqlite / logs_1.sqlite / session_index.jsonl / .codex-global-state.json` 动态发现真实 Codex 线程,并把结果填进 `projects[] + projectCandidates[]` + - 对已绑定的生产设备,服务端会在 heartbeat 时自动应用建议导入项;对新设备则继续走 `deviceImportDraft` 的人工勾选与应用流程 ### 4.5 主 Agent 轮询任务 diff --git a/docs/architecture/current_runtime_and_deploy_status_cn.md b/docs/architecture/current_runtime_and_deploy_status_cn.md index a8519cd..228a859 100644 --- a/docs/architecture/current_runtime_and_deploy_status_cn.md +++ b/docs/architecture/current_runtime_and_deploy_status_cn.md @@ -223,8 +223,11 @@ cd /Users/kris/code/boss - APP 实时日志当前已能同步到主 Agent 会话,但还没有单独的日志检索、分页和告警升级规则 - Skill 清单当前按设备同步和展示已经可用,但还没有“安装 / 卸载 Skill”这种远程管理能力 - 服务器侧主 Agent 实时回复依赖被绑定设备的 `local-agent` 在线并能执行 `codex exec`;如果设备离线,只能保留任务或走 API 容灾账号 -- 设备导入主链的后端状态机已经跑通;下一阶段重点从“纯后端打通”切到“Web / Android 把候选勾选、决议预览和应用导入完整接到前台” -- 设备导入的后端主链已经打通,但 Web / Android 仍未把“候选项目勾选与导入应用”完整接进前台页面;当前主要可通过 API 和后续 UI 接线验证 +- 设备导入主链的后端状态机已经跑通,并且已经分成两条: + - 新接入设备继续走 `import draft -> 勾选 -> review -> apply` + - 已绑定的生产设备如果 heartbeat 带上真实 `projectCandidates[]`,服务端会自动选中建议项、生成导入决议并直接应用,让会话页自动出现当前运行中的 Codex 线程 +- 本机 `mac-studio` 当前已经验证可通过 `local-agent` 直接从 `~/.codex/state_5.sqlite / logs_1.sqlite / session_index.jsonl / .codex-global-state.json` 扫描真实 Codex 线程,并通过 heartbeat 自动导入到会话列表 +- Web / Android 仍未把“新设备候选项目勾选与导入应用”完整接进前台页面;当前新设备主要通过 API 验证,已绑定生产设备则已能自动同步到会话页 - API 容灾当前由用户在 APP 的 `我的 > AI 账号` 页面自行配置 `OpenAI API` 账号,不再依赖服务器预置 Key - 原生 Android 的二级深层页虽然仍保留 `ProjectForwardActivity / ThreadDetailActivity / OpsCenterActivity` 等能力,但它们已经退出主 UI 正面;后续如再加入口,需继续遵守“一级微信式,复杂能力下沉”的规则 - Android 本地 Gradle 验证当前必须串行执行;如果并发跑 `testDebugUnitTest / compileDebugJavaWithJavac / assembleDebug`,会导致中间产物互踩并出现假失败 diff --git a/docs/architecture/development_subtasks_and_delivery_plan_cn.md b/docs/architecture/development_subtasks_and_delivery_plan_cn.md index 857b5bf..36ecc97 100644 --- a/docs/architecture/development_subtasks_and_delivery_plan_cn.md +++ b/docs/architecture/development_subtasks_and_delivery_plan_cn.md @@ -813,3 +813,20 @@ 接下来不应该再继续扩架构想法了。 最合理的做法是:以这份子任务清单为边界,把数据库、API、状态机、设备预部署、前端字段、验收脚本逐项冻结,然后直接进入开发。 + +--- + +## 13. 2026-03-30 当前推进记录 + +已完成: + +- 群聊主链已经具备 `主 Agent 推荐下发 -> 用户确认 -> dispatchExecution -> local-agent 认领 -> 线程原始结果回群 -> 主 Agent 汇总` 的后端闭环 +- 新设备导入主链已经具备 `heartbeat -> import draft -> select -> review -> apply` 的后端闭环,并补了 owner/admin 鉴权与幂等保护 +- 已绑定的生产设备当前新增自动同步链路:如果 heartbeat 携带真实 `projectCandidates[]`,服务端会自动完成建议项选择、导入决议和应用,把真实 Codex 线程直接落成会话窗口 +- 本机 `mac-studio` 当前已经验证可通过 `local-agent` 直接从 `~/.codex/state_5.sqlite / logs_1.sqlite / session_index.jsonl / .codex-global-state.json` 扫描真实 Codex 线程,并通过 heartbeat 自动导入到远端会话列表 + +当前仍待前台接线: + +- Web / Android 的“新设备候选项目勾选、决议预览、应用导入”页面 +- 群聊 `development / approval_required` 审批闸口的前台确认页 +- 真机逐页把新增业务流接入现有微信式 UI diff --git a/local-agent/codex-session-discovery.mjs b/local-agent/codex-session-discovery.mjs new file mode 100644 index 0000000..68363a2 --- /dev/null +++ b/local-agent/codex-session-discovery.mjs @@ -0,0 +1,236 @@ +import os from "node:os"; +import { basename, resolve } from "node:path"; +import { readFileSync } from "node:fs"; +import { readFile, readdir } from "node:fs/promises"; +import { DatabaseSync } from "node:sqlite"; + +function toIsoFromUnixSeconds(value) { + if (!Number.isFinite(value) || value <= 0) return null; + return new Date(value * 1000).toISOString(); +} + +function sanitizeDisplayName(raw, fallback) { + const source = typeof raw === "string" ? raw : ""; + const firstLine = source + .replace(/\u0000/g, "") + .split(/\r?\n/) + .map((line) => line.trim()) + .find(Boolean); + if (!firstLine) return fallback; + const compact = firstLine.replace(/\s+/g, " ").trim(); + if (!compact) return fallback; + return compact.length > 48 ? `${compact.slice(0, 45)}...` : compact; +} + +function fallbackDisplayName(thread, folderName) { + const suffix = thread.id.replace(/-/g, "").slice(0, 6); + if (thread.agentNickname) { + return thread.agentNickname; + } + if (thread.agentRole) { + return `${thread.agentRole} · ${suffix}`; + } + return `${folderName} · ${suffix}`; +} + +function loadThreadWorkspaceHints(globalStatePath) { + if (!globalStatePath) return new Map(); + try { + const parsed = JSON.parse(requireText(globalStatePath)); + return new Map(Object.entries(parsed["thread-workspace-root-hints"] ?? {})); + } catch { + return new Map(); + } +} + +function loadSessionNames(sessionIndexPath) { + if (!sessionIndexPath) return new Map(); + const names = new Map(); + try { + const raw = requireText(sessionIndexPath); + for (const line of raw.split(/\r?\n/)) { + if (!line.trim()) continue; + const parsed = JSON.parse(line); + if (!parsed?.id) continue; + const previous = names.get(parsed.id); + if (!previous || String(parsed.updated_at ?? "") >= String(previous.updatedAt ?? "")) { + names.set(parsed.id, { + threadName: parsed.thread_name, + updatedAt: parsed.updated_at, + }); + } + } + } catch { + return names; + } + return names; +} + +function loadRecentThreadActivity(logsDbPath) { + if (!logsDbPath) return new Map(); + try { + const db = new DatabaseSync(logsDbPath, { readonly: true }); + try { + const rows = db + .prepare("SELECT thread_id, MAX(ts) AS latest_ts FROM logs WHERE thread_id IS NOT NULL GROUP BY thread_id") + .all(); + return new Map( + rows + .filter((row) => typeof row.thread_id === "string" && Number.isFinite(row.latest_ts)) + .map((row) => [row.thread_id, Number(row.latest_ts)]), + ); + } finally { + db.close(); + } + } catch { + return new Map(); + } +} + +function loadThreadsFromStateDb(stateDbPath) { + if (!stateDbPath) return []; + try { + const db = new DatabaseSync(stateDbPath, { readonly: true }); + try { + return db + .prepare( + "SELECT id, cwd, updated_at, archived, title, agent_nickname, agent_role FROM threads WHERE archived = 0 ORDER BY updated_at DESC", + ) + .all() + .map((row) => ({ + id: String(row.id), + cwd: String(row.cwd), + updatedAtSeconds: Number(row.updated_at), + archived: Boolean(row.archived), + title: String(row.title ?? ""), + agentNickname: typeof row.agent_nickname === "string" ? row.agent_nickname : "", + agentRole: typeof row.agent_role === "string" ? row.agent_role : "", + })); + } finally { + db.close(); + } + } catch { + return []; + } +} + +function parseSessionMeta(line) { + try { + const parsed = JSON.parse(line); + if (parsed?.type !== "session_meta" || !parsed?.payload?.id || !parsed?.payload?.cwd) { + return null; + } + return { + id: String(parsed.payload.id), + cwd: String(parsed.payload.cwd), + updatedAtSeconds: Math.floor(new Date(parsed.payload.timestamp ?? parsed.timestamp ?? Date.now()).getTime() / 1000), + archived: false, + title: "", + agentNickname: typeof parsed.payload.agent_nickname === "string" ? parsed.payload.agent_nickname : "", + agentRole: typeof parsed.payload.agent_role === "string" ? parsed.payload.agent_role : "", + }; + } catch { + return null; + } +} + +async function loadThreadsFromSessions(sessionsDir) { + if (!sessionsDir) return []; + const pending = [resolve(sessionsDir)]; + const threads = []; + while (pending.length > 0) { + const dir = pending.pop(); + if (!dir) continue; + let entries = []; + try { + entries = await readdir(dir, { withFileTypes: true }); + } catch { + continue; + } + for (const entry of entries) { + const fullPath = `${dir}/${entry.name}`; + if (entry.isDirectory()) { + pending.push(fullPath); + continue; + } + if (!entry.isFile() || !entry.name.endsWith(".jsonl")) continue; + try { + const raw = await readFile(fullPath, "utf8"); + const firstLine = raw.split(/\r?\n/, 1)[0]; + const parsed = parseSessionMeta(firstLine); + if (parsed) threads.push(parsed); + } catch { + continue; + } + } + } + return threads; +} + +function requireText(filePath) { + return readFileSync(resolve(filePath), "utf8"); +} + +export async function discoverCodexProjectCandidates(options = {}) { + const now = options.now instanceof Date ? options.now : new Date(); + const lookbackHours = Number.isFinite(options.lookbackHours) ? Number(options.lookbackHours) : 24; + const cutoffSeconds = Math.floor(now.getTime() / 1000) - lookbackHours * 60 * 60; + const sessionNames = loadSessionNames(options.sessionIndexPath ?? resolve(os.homedir(), ".codex/session_index.jsonl")); + const workspaceHints = loadThreadWorkspaceHints( + options.globalStatePath ?? resolve(os.homedir(), ".codex/.codex-global-state.json"), + ); + const latestLogByThread = loadRecentThreadActivity( + options.logsDbPath ?? resolve(os.homedir(), ".codex/logs_1.sqlite"), + ); + + let threads = loadThreadsFromStateDb( + options.stateDbPath ?? resolve(os.homedir(), ".codex/state_5.sqlite"), + ); + if (threads.length === 0) { + threads = await loadThreadsFromSessions( + options.sessionsDir ?? resolve(os.homedir(), ".codex/sessions"), + ); + } + + const seenThreadIds = new Set(); + const candidates = []; + for (const thread of threads) { + if (!thread?.id || seenThreadIds.has(thread.id)) continue; + const latestActivitySeconds = latestLogByThread.get(thread.id) ?? thread.updatedAtSeconds; + if (!Number.isFinite(latestActivitySeconds) || latestActivitySeconds < cutoffSeconds) { + continue; + } + + seenThreadIds.add(thread.id); + const hintedPath = workspaceHints.get(thread.id); + const folderPath = resolve(hintedPath || thread.cwd || ""); + const folderName = basename(folderPath); + if (!folderName) continue; + + const sessionName = sessionNames.get(thread.id)?.threadName; + const displayName = sanitizeDisplayName( + sessionName, + sanitizeDisplayName(thread.title, fallbackDisplayName(thread, folderName)), + ); + + candidates.push({ + folderName, + folderRef: folderPath, + threadId: thread.id, + threadDisplayName: displayName, + codexFolderRef: folderPath, + codexThreadRef: thread.id, + lastActiveAt: toIsoFromUnixSeconds(latestActivitySeconds) ?? now.toISOString(), + suggestedImport: true, + }); + } + + candidates.sort((a, b) => b.lastActiveAt.localeCompare(a.lastActiveAt)); + const projects = [...new Set(candidates.map((candidate) => candidate.folderName))].sort((a, b) => + a.localeCompare(b), + ); + return { + projects, + projectCandidates: candidates, + }; +} diff --git a/local-agent/server.mjs b/local-agent/server.mjs index 739c08e..2eece2f 100755 --- a/local-agent/server.mjs +++ b/local-agent/server.mjs @@ -5,13 +5,64 @@ import { createServer } from "node:http"; import { access, readFile, readdir, rm } from "node:fs/promises"; import os from "node:os"; import { join, resolve } from "node:path"; +import { discoverCodexProjectCandidates } from "./codex-session-discovery.mjs"; async function loadConfig(configPath) { const raw = await readFile(resolve(configPath), "utf8"); return JSON.parse(raw); } -async function postHeartbeat(config, runtime) { +async function resolveHeartbeatProjects(config, runtime) { + const staticProjects = Array.isArray(config.projects) ? config.projects : []; + const staticCandidates = Array.isArray(config.projectCandidates) ? config.projectCandidates : []; + if (config.codexSessionDiscoveryEnabled === false) { + return { + projects: staticProjects, + projectCandidates: staticCandidates, + }; + } + + try { + const discovered = await discoverCodexProjectCandidates({ + stateDbPath: config.codexStateDbPath, + logsDbPath: config.codexLogsDbPath, + sessionIndexPath: config.codexSessionIndexPath, + globalStatePath: config.codexGlobalStatePath, + sessionsDir: config.codexSessionsDir, + lookbackHours: config.codexSessionLookbackHours, + }); + const candidateMap = new Map(); + for (const candidate of [...staticCandidates, ...discovered.projectCandidates]) { + candidateMap.set(candidate.codexThreadRef ?? candidate.threadId, candidate); + } + const mergedCandidates = [...candidateMap.values()]; + const mergedProjects = [...new Set([...staticProjects, ...discovered.projects, ...mergedCandidates.map((candidate) => candidate.folderName)])]; + runtime.lastProjectDiscoveryAt = new Date().toISOString(); + runtime.lastProjectDiscoveryOk = true; + runtime.lastProjectDiscoverySummary = `${mergedCandidates.length} threads / ${mergedProjects.length} folders`; + return { + projects: mergedProjects, + projectCandidates: mergedCandidates, + }; + } catch (error) { + runtime.lastProjectDiscoveryAt = new Date().toISOString(); + runtime.lastProjectDiscoveryOk = false; + runtime.lastProjectDiscoverySummary = error instanceof Error ? error.message : String(error); + await postAppLog(config, runtime, { + level: "error", + category: "local_agent.codex_discovery_failed", + message: "Codex 线程扫描失败,已退回静态项目配置。", + detail: runtime.lastProjectDiscoverySummary, + mirrorToMaster: true, + }); + return { + projects: staticProjects, + projectCandidates: staticCandidates, + }; + } +} + +async function postHeartbeat(config, runtime, heartbeatProjects) { const response = await fetch(`${config.controlPlaneUrl.replace(/\/$/, "")}/api/device-heartbeat`, { method: "POST", headers: { "Content-Type": "application/json" }, @@ -25,8 +76,8 @@ async function postHeartbeat(config, runtime) { status: config.status, quota5h: config.quota5h, quota7d: config.quota7d, - projects: config.projects, - projectCandidates: config.projectCandidates, + projects: heartbeatProjects.projects, + projectCandidates: heartbeatProjects.projectCandidates, endpoint: config.endpoint, }), }); @@ -454,11 +505,15 @@ const runtime = { masterTaskBusy: false, activeMasterTask: null, lastMasterTaskPoll: null, + lastProjectDiscoveryAt: null, + lastProjectDiscoveryOk: false, + lastProjectDiscoverySummary: null, }; async function heartbeat() { try { - const result = await postHeartbeat(config, runtime); + const heartbeatProjects = await resolveHeartbeatProjects(config, runtime); + const result = await postHeartbeat(config, runtime, heartbeatProjects); runtime.lastHeartbeatAt = new Date().toISOString(); runtime.lastHeartbeatOk = result.ok; runtime.lastHeartbeatStatus = result.status; diff --git a/src/lib/boss-data.ts b/src/lib/boss-data.ts index c390fd1..cad12f6 100644 --- a/src/lib/boss-data.ts +++ b/src/lib/boss-data.ts @@ -5314,6 +5314,7 @@ export async function upsertDeviceHeartbeat(payload: { }>; }) { const result = await mutateState((state) => { + const existingDevice = state.devices.find((item) => item.id === payload.deviceId) ?? null; const claimedEnrollment = claimEnrollment( state, payload.deviceId, @@ -5336,7 +5337,7 @@ export async function upsertDeviceHeartbeat(payload: { ); const shouldAutoImportLegacyProjects = normalizedCandidates.length === 0; - let device = state.devices.find((item) => item.id === payload.deviceId); + let device = existingDevice; if (!device) { device = { id: payload.deviceId, @@ -5409,12 +5410,56 @@ export async function upsertDeviceHeartbeat(payload: { } } } - const draft = upsertDeviceImportDraftFromHeartbeat(state, { + let draft = upsertDeviceImportDraftFromHeartbeat(state, { deviceId: payload.deviceId, enrollmentId: claimedEnrollment?.enrollmentId, candidates: normalizedCandidates, }); + if ( + draft && + shouldAutoSyncHeartbeatCandidates({ + wasExistingDevice: Boolean(existingDevice), + device, + claimedEnrollment, + draft, + }) + ) { + const autoSyncDraft = draft; + const selectedCandidateIds = resolveAutoSyncCandidateIds(autoSyncDraft); + if (selectedCandidateIds.length > 0) { + autoSyncDraft.selectedCandidateIds = selectedCandidateIds; + autoSyncDraft.status = "pending_resolution"; + autoSyncDraft.updatedAt = nowIso(); + autoSyncDraft.reviewedAt = undefined; + autoSyncDraft.reviewedBy = undefined; + autoSyncDraft.resolutionId = undefined; + state.deviceImportResolutions = state.deviceImportResolutions.filter( + (item) => item.draftId !== autoSyncDraft.draftId, + ); + + const selectedCandidates = autoSyncDraft.candidates.filter((candidate) => + autoSyncDraft.selectedCandidateIds.includes(candidate.candidateId), + ); + const items = selectedCandidates.map((candidate) => + resolveDeviceImportAction(state, payload.deviceId, candidate), + ); + upsertDeviceImportResolutionInState(state, { + deviceId: payload.deviceId, + reviewedBy: "system:auto_sync", + summary: summarizeDeviceImportResolution(device.name, items), + items, + draftId: autoSyncDraft.draftId, + }); + const applied = applyDeviceImportResolutionInState(state, { + deviceId: payload.deviceId, + appliedBy: "system:auto_sync", + draftId: autoSyncDraft.draftId, + }); + draft = applied.draft; + } + } + return { device, token: claimedEnrollment?.token ?? device.token, @@ -5486,6 +5531,35 @@ function summarizeDeviceImportResolution( return `${deviceName} 导入建议:新建 ${createCount} 个会话,关联 ${attachCount} 个现有会话${skipCount > 0 ? `,跳过 ${skipCount} 项` : ""}。`; } +function resolveAutoSyncCandidateIds(draft: DeviceImportDraft) { + const suggestedCandidateIds = draft.candidates + .filter((candidate) => candidate.suggestedImport !== false) + .map((candidate) => candidate.candidateId); + return dedupeStrings( + suggestedCandidateIds.length > 0 + ? suggestedCandidateIds + : draft.candidates.map((candidate) => candidate.candidateId), + ); +} + +function shouldAutoSyncHeartbeatCandidates(input: { + wasExistingDevice: boolean; + device: Device; + claimedEnrollment: DeviceEnrollment | null; + draft: DeviceImportDraft | null; +}) { + if (!input.wasExistingDevice) return false; + if (input.device.source !== "production") return false; + if (!input.draft || input.draft.candidates.length === 0) return false; + if ( + input.claimedEnrollment?.enrollmentId && + input.draft.enrollmentId === input.claimedEnrollment.enrollmentId + ) { + return false; + } + return true; +} + export async function getLatestDeviceImportDraft(deviceId: string) { const state = await readState(); const draft = state.deviceImportDrafts.find((item) => item.deviceId === deviceId) ?? null; @@ -5750,84 +5824,101 @@ function buildImportedThreadProject(device: Device, candidate: DeviceImportCandi }); } +function applyDeviceImportResolutionInState( + state: BossState, + input: { + deviceId: string; + appliedBy: string; + draftId?: string; + }, +) { + const draft = + state.deviceImportDrafts.find( + (item) => item.draftId === input.draftId || item.deviceId === input.deviceId, + ) ?? null; + if (!draft || !draft.resolutionId) throw new Error("DEVICE_IMPORT_RESOLUTION_NOT_FOUND"); + const resolution = state.deviceImportResolutions.find( + (item) => item.resolutionId === draft.resolutionId, + ); + if (!resolution) throw new Error("DEVICE_IMPORT_RESOLUTION_NOT_FOUND"); + const device = state.devices.find((item) => item.id === input.deviceId); + if (!device) throw new Error("DEVICE_NOT_FOUND"); + + const importedProjects: Project[] = []; + for (const item of resolution.items) { + const candidate = draft.candidates.find((entry) => entry.candidateId === item.candidateId); + if (!candidate || item.action === "skip") { + continue; + } + + let targetProject = item.targetProjectId + ? state.projects.find((project) => project.id === item.targetProjectId) + : undefined; + if (item.action === "create_thread_conversation" && !targetProject) { + const draftProject = buildImportedThreadProject(device, candidate); + targetProject = + state.projects.find((project) => project.id === draftProject.id) ?? + state.projects.find( + (project) => + !project.isGroup && + project.deviceIds.includes(device.id) && + ((candidate.codexThreadRef && + project.threadMeta.codexThreadRef === candidate.codexThreadRef) || + project.threadMeta.threadId === candidate.threadId), + ); + if (!targetProject) { + targetProject = draftProject; + state.projects.unshift(targetProject); + } + } else if (item.action === "attach_existing" && !targetProject) { + continue; + } + + if (!targetProject) continue; + if (!targetProject.deviceIds.includes(device.id)) { + targetProject.deviceIds.push(device.id); + } + targetProject.threadMeta.threadDisplayName = candidate.threadDisplayName; + targetProject.threadMeta.folderName = candidate.folderName; + targetProject.threadMeta.threadId = candidate.threadId; + targetProject.threadMeta.codexFolderRef = candidate.codexFolderRef ?? candidate.folderRef; + targetProject.threadMeta.codexThreadRef = candidate.codexThreadRef; + targetProject.threadMeta.updatedAt = candidate.lastActiveAt; + targetProject.preview = `已导入 ${candidate.threadDisplayName}`; + targetProject.updatedAt = nowIso(); + targetProject.lastMessageAt = targetProject.updatedAt; + importedProjects.push({ ...targetProject }); + } + + device.projects = dedupeStrings( + draft.candidates + .filter((candidate) => draft.selectedCandidateIds.includes(candidate.candidateId)) + .map((candidate) => candidate.folderName), + ); + + resolution.status = "applied"; + resolution.appliedAt = nowIso(); + resolution.appliedBy = input.appliedBy; + draft.status = "applied"; + draft.updatedAt = nowIso(); + + return { + draft: { ...draft }, + resolution: { ...resolution }, + importedProjects, + }; +} + export async function applyDeviceImportResolution(input: { deviceId: string; appliedBy: string; }) { - const result = await mutateState((state) => { - const draft = state.deviceImportDrafts.find((item) => item.deviceId === input.deviceId); - if (!draft || !draft.resolutionId) throw new Error("DEVICE_IMPORT_RESOLUTION_NOT_FOUND"); - const resolution = state.deviceImportResolutions.find( - (item) => item.resolutionId === draft.resolutionId, - ); - if (!resolution) throw new Error("DEVICE_IMPORT_RESOLUTION_NOT_FOUND"); - const device = state.devices.find((item) => item.id === input.deviceId); - if (!device) throw new Error("DEVICE_NOT_FOUND"); - - const importedProjects: Project[] = []; - for (const item of resolution.items) { - const candidate = draft.candidates.find((entry) => entry.candidateId === item.candidateId); - if (!candidate || item.action === "skip") { - continue; - } - - let targetProject = item.targetProjectId - ? state.projects.find((project) => project.id === item.targetProjectId) - : undefined; - if (item.action === "create_thread_conversation" && !targetProject) { - const draftProject = buildImportedThreadProject(device, candidate); - targetProject = - state.projects.find((project) => project.id === draftProject.id) ?? - state.projects.find( - (project) => - !project.isGroup && - project.deviceIds.includes(device.id) && - ((candidate.codexThreadRef && - project.threadMeta.codexThreadRef === candidate.codexThreadRef) || - project.threadMeta.threadId === candidate.threadId), - ); - if (!targetProject) { - targetProject = draftProject; - state.projects.unshift(targetProject); - } - } else if (item.action === "attach_existing" && !targetProject) { - continue; - } - - if (!targetProject) continue; - if (!targetProject.deviceIds.includes(device.id)) { - targetProject.deviceIds.push(device.id); - } - targetProject.threadMeta.threadDisplayName = candidate.threadDisplayName; - targetProject.threadMeta.folderName = candidate.folderName; - targetProject.threadMeta.threadId = candidate.threadId; - targetProject.threadMeta.codexFolderRef = candidate.codexFolderRef ?? candidate.folderRef; - targetProject.threadMeta.codexThreadRef = candidate.codexThreadRef; - targetProject.threadMeta.updatedAt = candidate.lastActiveAt; - targetProject.preview = `已导入 ${candidate.threadDisplayName}`; - targetProject.updatedAt = nowIso(); - targetProject.lastMessageAt = targetProject.updatedAt; - importedProjects.push({ ...targetProject }); - } - - device.projects = dedupeStrings( - draft.candidates - .filter((candidate) => draft.selectedCandidateIds.includes(candidate.candidateId)) - .map((candidate) => candidate.folderName), - ); - - resolution.status = "applied"; - resolution.appliedAt = nowIso(); - resolution.appliedBy = input.appliedBy; - draft.status = "applied"; - draft.updatedAt = nowIso(); - - return { - draft: { ...draft }, - resolution: { ...resolution }, - importedProjects, - }; - }); + const result = await mutateState((state) => + applyDeviceImportResolutionInState(state, { + deviceId: input.deviceId, + appliedBy: input.appliedBy, + }), + ); publishBossEvent("devices.updated", { deviceId: input.deviceId }); publishBossEvent("conversation.updated", { deviceId: input.deviceId }); diff --git a/tests/device-import-draft.test.ts b/tests/device-import-draft.test.ts index 36a79ea..b27ca37 100644 --- a/tests/device-import-draft.test.ts +++ b/tests/device-import-draft.test.ts @@ -485,3 +485,70 @@ test("device import routes reject unrelated logged-in members", async () => { }); assert.equal(getResponse.status, 403); }); + +test("existing bound production devices auto-sync suggested candidates into conversations on heartbeat", async () => { + await setup(); + + const heartbeatResponse = await deviceHeartbeatRoute( + new NextRequest("http://127.0.0.1:3000/api/device-heartbeat", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + deviceId: "mac-studio", + token: "boss-mac-studio-token", + name: "Mac Studio", + avatar: "M", + account: "17600003315", + status: "online", + quota5h: 68, + quota7d: 81, + projects: ["Boss 移动控制台", "硬件审计协作"], + endpoint: "mac://kris.local", + projectCandidates: [ + { + folderName: "yuandi", + folderRef: "/Users/kris/code/yuandi", + threadId: "session-yuandi-1", + threadDisplayName: "Epicurus", + codexFolderRef: "/Users/kris/code/yuandi", + codexThreadRef: "session-yuandi-1", + lastActiveAt: "2026-03-30T12:42:56+08:00", + suggestedImport: true, + }, + { + folderName: "wenshenapp", + folderRef: "/Users/kris/code/wenshenapp", + threadId: "session-wenshenapp-1", + threadDisplayName: "wenshenapp · ea5f", + codexFolderRef: "/Users/kris/code/wenshenapp", + codexThreadRef: "session-wenshenapp-1", + lastActiveAt: "2026-03-30T12:34:51+08:00", + suggestedImport: true, + }, + ], + }), + }), + ); + assert.equal(heartbeatResponse.status, 200); + + const payload = (await heartbeatResponse.json()) as { + importDraft?: { status: string; selectedCandidateIds: string[] } | null; + }; + assert.equal(payload.importDraft?.status, "applied"); + assert.equal(payload.importDraft?.selectedCandidateIds.length, 2); + + const nextState = await readState(); + const yuandiProject = nextState.projects.find( + (project) => project.threadMeta.codexThreadRef === "session-yuandi-1", + ); + const wenshenProject = nextState.projects.find( + (project) => project.threadMeta.codexThreadRef === "session-wenshenapp-1", + ); + assert.ok(yuandiProject, "expected a discovered yuandi session to become a real chat window"); + assert.ok(wenshenProject, "expected a discovered wenshenapp session to become a real chat window"); + assert.equal(yuandiProject?.threadMeta.folderName, "yuandi"); + assert.equal(wenshenProject?.threadMeta.folderName, "wenshenapp"); + + const device = nextState.devices.find((item) => item.id === "mac-studio"); + assert.deepEqual(device?.projects, ["yuandi", "wenshenapp"]); +}); diff --git a/tests/local-agent-codex-discovery.test.mjs b/tests/local-agent-codex-discovery.test.mjs new file mode 100644 index 0000000..4f07d5b --- /dev/null +++ b/tests/local-agent-codex-discovery.test.mjs @@ -0,0 +1,196 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import os from "node:os"; +import path from "node:path"; +import { mkdtemp, mkdir, rm, writeFile } from "node:fs/promises"; +import { DatabaseSync } from "node:sqlite"; + +let runtimeRoot = ""; +let discoverCodexProjectCandidates; + +async function setup() { + if (runtimeRoot) return; + runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-local-agent-discovery-")); + ({ discoverCodexProjectCandidates } = await import("../local-agent/codex-session-discovery.mjs")); +} + +test.after(async () => { + if (runtimeRoot) { + await rm(runtimeRoot, { recursive: true, force: true }); + } +}); + +test("discoverCodexProjectCandidates prefers Codex sqlite indexes and session names over raw rollout fallback", async () => { + await setup(); + + const codexRoot = path.join(runtimeRoot, ".codex"); + const now = new Date("2026-03-30T12:45:00+08:00"); + await mkdir(codexRoot, { recursive: true }); + const stateDbPath = path.join(codexRoot, "state_5.sqlite"); + const logsDbPath = path.join(codexRoot, "logs_1.sqlite"); + const sessionIndexPath = path.join(codexRoot, "session_index.jsonl"); + const globalStatePath = path.join(codexRoot, ".codex-global-state.json"); + + const stateDb = new DatabaseSync(stateDbPath); + stateDb.exec(` + CREATE TABLE threads ( + id TEXT PRIMARY KEY, + rollout_path TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + source TEXT NOT NULL, + model_provider TEXT NOT NULL, + cwd TEXT NOT NULL, + title TEXT NOT NULL, + sandbox_policy TEXT NOT NULL, + approval_mode TEXT NOT NULL, + tokens_used INTEGER NOT NULL DEFAULT 0, + has_user_event INTEGER NOT NULL DEFAULT 0, + archived INTEGER NOT NULL DEFAULT 0, + archived_at INTEGER, + git_sha TEXT, + git_branch TEXT, + git_origin_url TEXT, + cli_version TEXT NOT NULL DEFAULT '', + first_user_message TEXT NOT NULL DEFAULT '', + agent_nickname TEXT, + agent_role TEXT, + memory_mode TEXT NOT NULL DEFAULT 'enabled', + model TEXT, + reasoning_effort TEXT, + agent_path TEXT + ); + `); + const insertThread = stateDb.prepare(` + INSERT INTO threads ( + id, rollout_path, created_at, updated_at, source, model_provider, cwd, title, + sandbox_policy, approval_mode, tokens_used, has_user_event, archived, + cli_version, first_user_message, agent_nickname, agent_role, memory_mode, model, reasoning_effort + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 0, 1, ?, '0.118.0', '', ?, ?, 'enabled', 'gpt-5.4', 'medium') + `); + insertThread.run( + "019d3bossmain", + path.join(codexRoot, "sessions/2026/03/30/rollout-boss-main.jsonl"), + 1774845600, + 1774845618, + "desktop", + "openai", + "/Users/kris/code/boss", + "Boss 主线程", + "workspace-write", + "never", + 0, + null, + null, + ); + insertThread.run( + "019d3yuandiexplorer", + path.join(codexRoot, "sessions/2026/03/30/rollout-yuandi-explorer.jsonl"), + 1774845760, + 1774845776, + "desktop", + "openai", + "/Users/kris/.codex/worktrees/tmp123/yuandi", + "Yuandi 子线程", + "workspace-write", + "never", + 0, + "Epicurus", + "explorer", + ); + insertThread.run( + "019d3oldsession", + path.join(codexRoot, "sessions/2026/03/27/rollout-too-old.jsonl"), + 1774584000, + 1774584000, + "desktop", + "openai", + "/Users/kris/code/old-project", + "Old Session", + "workspace-write", + "never", + 0, + null, + null, + ); + stateDb.close(); + + const logsDb = new DatabaseSync(logsDbPath); + logsDb.exec(` + CREATE TABLE logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts INTEGER NOT NULL, + ts_nanos INTEGER NOT NULL, + level TEXT NOT NULL, + target TEXT NOT NULL, + feedback_log_body TEXT, + module_path TEXT, + file TEXT, + line INTEGER, + thread_id TEXT, + process_uuid TEXT, + estimated_bytes INTEGER NOT NULL DEFAULT 0 + ); + `); + const insertLog = logsDb.prepare(` + INSERT INTO logs (ts, ts_nanos, level, target, thread_id, estimated_bytes) + VALUES (?, 0, 'info', 'codex', ?, 0) + `); + insertLog.run(1774845618, "019d3bossmain"); + insertLog.run(1774845776, "019d3yuandiexplorer"); + logsDb.close(); + + await writeFile( + sessionIndexPath, + [ + JSON.stringify({ + id: "019d3bossmain", + thread_name: "Boss 主线程", + updated_at: "2026-03-30T04:40:18.000000Z", + }), + JSON.stringify({ + id: "019d3yuandiexplorer", + thread_name: "Epicurus", + updated_at: "2026-03-30T04:42:56.000000Z", + }), + ].join("\n") + "\n", + "utf8", + ); + await writeFile( + globalStatePath, + JSON.stringify( + { + "thread-workspace-root-hints": { + "019d3yuandiexplorer": "/Users/kris/code/yuandi", + }, + }, + null, + 2, + ), + "utf8", + ); + + const discovered = await discoverCodexProjectCandidates({ + stateDbPath, + logsDbPath, + sessionIndexPath, + globalStatePath, + lookbackHours: 24, + now, + }); + + assert.deepEqual(discovered.projects, ["boss", "yuandi"]); + assert.equal(discovered.projectCandidates.length, 2); + + const bossSession = discovered.projectCandidates.find((item) => item.threadId === "019d3bossmain"); + const yuandiSession = discovered.projectCandidates.find((item) => item.threadId === "019d3yuandiexplorer"); + assert.ok(bossSession); + assert.ok(yuandiSession); + assert.equal(bossSession?.folderName, "boss"); + assert.equal(bossSession?.codexFolderRef, "/Users/kris/code/boss"); + assert.equal(bossSession?.codexThreadRef, "019d3bossmain"); + assert.equal(bossSession?.threadDisplayName, "Boss 主线程"); + assert.equal(yuandiSession?.folderName, "yuandi"); + assert.equal(yuandiSession?.threadDisplayName, "Epicurus"); + assert.equal(yuandiSession?.codexFolderRef, "/Users/kris/code/yuandi"); +});