import test from "node:test"; import assert from "node:assert/strict"; import os from "node:os"; import path from "node:path"; import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import { DatabaseSync } from "node:sqlite"; import { appendBossUserMessageToCodexThreadRollout } from "../local-agent/codex-thread-rollout-writer.mjs"; let runtimeRoot = ""; async function ensureRuntimeRoot() { if (!runtimeRoot) { runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-codex-rollout-writer-")); } return runtimeRoot; } test.after(async () => { if (runtimeRoot) { await rm(runtimeRoot, { recursive: true, force: true }); } }); async function createThreadBinding({ threadId, rolloutPath }) { const root = await ensureRuntimeRoot(); const dbPath = path.join(root, `state-${Math.random().toString(16).slice(2)}.sqlite`); const db = new DatabaseSync(dbPath); db.exec(` CREATE TABLE threads ( id TEXT PRIMARY KEY, rollout_path TEXT NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, updated_at_ms INTEGER, source TEXT NOT NULL, model_provider TEXT NOT NULL, cwd TEXT NOT NULL, title TEXT NOT NULL, sandbox_policy TEXT NOT NULL, approval_mode TEXT NOT NULL, tokens_used INTEGER NOT NULL DEFAULT 0, has_user_event INTEGER NOT NULL DEFAULT 0, archived INTEGER NOT NULL DEFAULT 0, archived_at INTEGER, git_sha TEXT, git_branch TEXT, git_origin_url TEXT, cli_version TEXT NOT NULL DEFAULT '', first_user_message TEXT NOT NULL DEFAULT '', agent_nickname TEXT, agent_role TEXT, memory_mode TEXT NOT NULL DEFAULT 'enabled', model TEXT, reasoning_effort TEXT, agent_path TEXT ); `); db.prepare(` INSERT INTO threads ( id, rollout_path, created_at, updated_at, updated_at_ms, source, model_provider, cwd, title, sandbox_policy, approval_mode, tokens_used, has_user_event, archived, cli_version, first_user_message, agent_nickname, agent_role, memory_mode, model, reasoning_effort ) VALUES (?, ?, 1774845600, 1774845618, 1774845618000, 'desktop', 'openai', ?, ?, '{"type":"workspace-write"}', 'never', 0, 0, 0, '0.118.0', '', '', '', 'enabled', 'gpt-5.4', 'medium') `).run(threadId, rolloutPath, root, threadId); db.close(); return dbPath; } async function createThreadsDbWithoutBinding() { const root = await ensureRuntimeRoot(); const dbPath = path.join(root, `state-empty-${Math.random().toString(16).slice(2)}.sqlite`); const db = new DatabaseSync(dbPath); db.exec(` CREATE TABLE threads ( id TEXT PRIMARY KEY, rollout_path TEXT NOT NULL, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, updated_at_ms INTEGER, source TEXT NOT NULL, model_provider TEXT NOT NULL, cwd TEXT NOT NULL, title TEXT NOT NULL, sandbox_policy TEXT NOT NULL, approval_mode TEXT NOT NULL, tokens_used INTEGER NOT NULL DEFAULT 0, has_user_event INTEGER NOT NULL DEFAULT 0, archived INTEGER NOT NULL DEFAULT 0, archived_at INTEGER, git_sha TEXT, git_branch TEXT, git_origin_url TEXT, cli_version TEXT NOT NULL DEFAULT '', first_user_message TEXT NOT NULL DEFAULT '', agent_nickname TEXT, agent_role TEXT, memory_mode TEXT NOT NULL DEFAULT 'enabled', model TEXT, reasoning_effort TEXT, agent_path TEXT ); `); db.close(); return dbPath; } test("appendBossUserMessageToCodexThreadRollout writes one user_message event and dedupes by source message id", async () => { const root = await ensureRuntimeRoot(); const rolloutPath = path.join(root, "rollout-thread-real.jsonl"); await writeFile( rolloutPath, `${JSON.stringify({ timestamp: "2026-04-21T08:59:00.000Z", type: "session_meta", payload: { id: "019d-thread-real", cwd: "/Users/kris/code/boss", }, })}\n`, "utf8", ); const stateDbPath = await createThreadBinding({ threadId: "019d-thread-real", rolloutPath, }); const beforeTouchDb = new DatabaseSync(stateDbPath, { readonly: true }); const beforeTouchRow = beforeTouchDb .prepare("SELECT updated_at, updated_at_ms, has_user_event FROM threads WHERE id = ?") .get("019d-thread-real"); beforeTouchDb.close(); const first = await appendBossUserMessageToCodexThreadRollout({ stateDbPath, targetThreadRef: "019d-thread-real", sourceMessageId: "msg-1", message: "请继续推进", sentAt: "2026-04-21T09:00:00.000Z", }); const second = await appendBossUserMessageToCodexThreadRollout({ stateDbPath, targetThreadRef: "019d-thread-real", sourceMessageId: "msg-1", message: "请继续推进", sentAt: "2026-04-21T09:00:00.000Z", }); assert.equal(first.status, "written"); assert.equal(second.status, "duplicate"); const raw = await readFile(rolloutPath, "utf8"); const lines = raw.trim().split("\n").map((line) => JSON.parse(line)); const mirrored = lines.filter( (entry) => entry?.type === "event_msg" && entry?.payload?.type === "user_message" && entry?.payload?.metadata?.bossSourceMessageId === "msg-1", ); const mirroredResponseItems = lines.filter( (entry) => entry?.type === "response_item" && entry?.payload?.type === "message" && entry?.payload?.role === "user" && entry?.payload?.content?.[0]?.type === "input_text" && entry?.payload?.content?.[0]?.text === "请继续推进", ); assert.equal(mirrored.length, 1); assert.equal(mirroredResponseItems.length, 1); assert.equal(mirrored[0]?.payload?.message, "请继续推进"); assert.equal(mirrored[0]?.timestamp, "2026-04-21T09:00:00.000Z"); const afterTouchDb = new DatabaseSync(stateDbPath, { readonly: true }); const afterTouchRow = afterTouchDb .prepare("SELECT updated_at, updated_at_ms, has_user_event FROM threads WHERE id = ?") .get("019d-thread-real"); afterTouchDb.close(); assert.equal(afterTouchRow?.has_user_event, 1); assert.ok( Number(afterTouchRow?.updated_at) > Number(beforeTouchRow?.updated_at), "expected mirrored write to refresh updated_at", ); assert.ok( Number(afterTouchRow?.updated_at_ms) > Number(beforeTouchRow?.updated_at_ms), "expected mirrored write to refresh updated_at_ms", ); }); test("appendBossUserMessageToCodexThreadRollout falls back to sessions dir when the state db is unavailable", async () => { const root = await ensureRuntimeRoot(); const sessionsDir = path.join(root, "sessions"); const nestedSessionDir = path.join(sessionsDir, "2026", "04", "21"); await mkdir(nestedSessionDir, { recursive: true }); const threadId = "019d-session-only"; const rolloutPath = path.join( nestedSessionDir, `rollout-2026-04-21T20-33-36-${threadId}.jsonl`, ); await writeFile( rolloutPath, `${JSON.stringify({ timestamp: "2026-04-21T12:33:36.000Z", type: "session_meta", payload: { id: threadId, cwd: "/tmp/boss-codex-desktop-sync-smoke", }, })}\n`, "utf8", ); const result = await appendBossUserMessageToCodexThreadRollout({ stateDbPath: path.join(root, "missing-state.sqlite"), sessionsDir, targetThreadRef: threadId, sourceMessageId: "msg-session-only", message: "从 APP 发起的一条消息", sentAt: "2026-04-21T12:34:00.000Z", }); assert.equal(result.status, "written"); assert.equal(result.threadTouch.status, "skipped"); const raw = await readFile(rolloutPath, "utf8"); const lines = raw.trim().split("\n").map((line) => JSON.parse(line)); assert.ok( lines.some( (entry) => entry?.type === "event_msg" && entry?.payload?.type === "user_message" && entry?.payload?.metadata?.bossSourceMessageId === "msg-session-only", ), ); }); test("appendBossUserMessageToCodexThreadRollout skips thread touch when rollout is found in sessions but the db has no matching thread row", async () => { const root = await ensureRuntimeRoot(); const sessionsDir = path.join(root, "sessions-touch-skip"); const nestedSessionDir = path.join(sessionsDir, "2026", "04", "21"); await mkdir(nestedSessionDir, { recursive: true }); const threadId = "019d-session-without-thread-row"; const rolloutPath = path.join( nestedSessionDir, `rollout-2026-04-21T20-35-36-${threadId}.jsonl`, ); await writeFile( rolloutPath, `${JSON.stringify({ timestamp: "2026-04-21T12:35:36.000Z", type: "session_meta", payload: { id: threadId, cwd: "/tmp/boss-codex-desktop-sync-smoke", }, })}\n`, "utf8", ); const stateDbPath = await createThreadsDbWithoutBinding(); const result = await appendBossUserMessageToCodexThreadRollout({ stateDbPath, sessionsDir, targetThreadRef: threadId, sourceMessageId: "msg-touch-skipped", message: "这条消息应该只写 rollout,不误报 thread touch", sentAt: "2026-04-21T12:36:00.000Z", }); assert.equal(result.status, "written"); assert.deepEqual(result.threadTouch, { status: "skipped", reason: "thread-not-found", }); });