feat: sync device codex projects

This commit is contained in:
Codex
2026-03-25 03:42:36 +08:00
parent fd1e0735c7
commit ee9ade6bd3
9 changed files with 497 additions and 14 deletions

View File

@@ -10,6 +10,7 @@ data class AppStatePayload(
val tasks: List<TaskItem> = emptyList(),
val workers: List<WorkerNode> = emptyList(),
val deviceBindings: List<DeviceBinding> = emptyList(),
val deviceProjects: List<DeviceProject> = emptyList(),
val approvals: List<ApprovalRequest> = emptyList(),
val events: List<BossEvent> = emptyList(),
)
@@ -123,6 +124,23 @@ data class DeviceBinding(
val updatedAt: String,
)
@Serializable
data class DeviceProject(
val id: String,
val workerId: String,
val source: String,
val workspaceRoot: String,
val workspaceLabel: String,
val projectName: String,
val status: String,
val primaryThreadId: String? = null,
val primaryThreadTitle: String = "",
val recentThreadTitles: List<String> = emptyList(),
val recentThreadCount: Int = 0,
val pinnedThreadIds: List<String> = emptyList(),
val updatedAt: String,
)
@Serializable
data class DeviceBindingLaunchPayload(
val binding: DeviceBinding,

View File

@@ -74,6 +74,7 @@ import java.time.format.DateTimeFormatter
import site.hyzq.bossandroid.model.ApprovalRequest
import site.hyzq.bossandroid.model.BossEvent
import site.hyzq.bossandroid.model.DeviceBinding
import site.hyzq.bossandroid.model.DeviceProject
import site.hyzq.bossandroid.model.Message
import site.hyzq.bossandroid.model.Session
import site.hyzq.bossandroid.model.TaskItem
@@ -576,6 +577,7 @@ private fun OverviewTab(
onSelectSession: (String) -> Unit,
onSelectWorker: (String?) -> Unit,
) {
val selectedWorker = uiState.workers.firstOrNull { it.id == uiState.selectedWorkerId }
val messagesBySession = remember(uiState.messages) {
uiState.messages.groupBy { it.sessionId }
}
@@ -585,11 +587,44 @@ private fun OverviewTab(
val workerById = remember(uiState.workers) {
uiState.workers.associateBy { it.id }
}
val activeTasks = remember(uiState.tasks) {
uiState.tasks.filter { it.status in listOf("planning", "queued", "assigned", "running", "blocked", "paused", "waiting_approval") }
val selectedWorkerSessionIds = remember(uiState.selectedWorkerId, uiState.sessions, uiState.tasks) {
if (uiState.selectedWorkerId == null) {
uiState.sessions.map { it.id }.toSet()
} else {
mutableSetOf<String>().apply {
uiState.sessions
.filter { session -> session.activeWorkerId == uiState.selectedWorkerId }
.forEach { add(it.id) }
uiState.tasks
.filter {
it.assignedWorkerId == uiState.selectedWorkerId ||
it.preferredWorkerId == uiState.selectedWorkerId
}
.forEach { add(it.sessionId) }
}
}
}
val recentMessages = remember(uiState.messages) {
uiState.messages.sortedByDescending { it.createdAt }.take(12)
val visibleSessions = remember(uiState.sessions, selectedWorkerSessionIds) {
uiState.sessions.filter { it.id in selectedWorkerSessionIds }
}
val activeTasks = remember(uiState.tasks, uiState.selectedWorkerId) {
uiState.tasks.filter {
it.status in listOf("planning", "queued", "assigned", "running", "blocked", "paused", "waiting_approval") &&
(uiState.selectedWorkerId == null ||
it.assignedWorkerId == uiState.selectedWorkerId ||
it.preferredWorkerId == uiState.selectedWorkerId)
}
}
val recentMessages = remember(uiState.messages, selectedWorkerSessionIds) {
uiState.messages
.filter { it.sessionId in selectedWorkerSessionIds }
.sortedByDescending { it.createdAt }
.take(12)
}
val visibleDeviceProjects = remember(uiState.deviceProjects, uiState.selectedWorkerId) {
uiState.deviceProjects.filter { project ->
uiState.selectedWorkerId == null || project.workerId == uiState.selectedWorkerId
}
}
Column(verticalArrangement = Arrangement.spacedBy(16.dp)) {
@@ -613,14 +648,34 @@ private fun OverviewTab(
}
}
SectionHeading("所有项目")
if (uiState.sessions.isEmpty()) {
SectionHeading(if (selectedWorker != null) "${selectedWorker.name} 的 Codex 项目" else "设备 Codex 项目")
if (visibleDeviceProjects.isEmpty()) {
EmptyStateCard(
title = "还没有项目",
body = "你可以在手机上创建也可以在电脑入口创建。Boss 会自动把它们同步到这里。",
title = "当前还没有同步到 Codex 项目",
body = if (selectedWorker != null) {
"这台设备还没把本机 Codex 项目索引上报到 Boss等下一次心跳后会自动刷新。"
} else {
"选中一台设备后,这里会显示该设备当前打开或最近活跃的 Codex 项目。"
},
)
} else {
uiState.sessions.forEach { session ->
visibleDeviceProjects.forEach { project ->
DeviceProjectCard(project = project)
}
}
SectionHeading(if (selectedWorker != null) "该设备关联会话" else "所有项目")
if (visibleSessions.isEmpty()) {
EmptyStateCard(
title = if (selectedWorker != null) "这台设备还没有关联会话" else "还没有项目",
body = if (selectedWorker != null) {
"你可以先切到这台设备发起任务Boss 会把相关会话自动归到它下面。"
} else {
"你可以在手机上创建也可以在电脑入口创建。Boss 会自动把它们同步到这里。"
},
)
} else {
visibleSessions.forEach { session ->
val recentMessage = messagesBySession[session.id]?.maxByOrNull { it.createdAt }
val sessionTasks = tasksBySession[session.id].orEmpty()
val sessionWorkerNames = sessionTasks
@@ -1360,6 +1415,72 @@ private fun ProjectOverviewCard(
}
}
@Composable
private fun DeviceProjectCard(
project: DeviceProject,
) {
OutlinedCard(
modifier = Modifier.fillMaxWidth(),
colors = CardDefaults.outlinedCardColors(
containerColor = if (project.status == "active") {
MaterialTheme.colorScheme.primaryContainer.copy(alpha = 0.35f)
} else {
MaterialTheme.colorScheme.surface
},
),
) {
Column(
modifier = Modifier.padding(16.dp),
verticalArrangement = Arrangement.spacedBy(10.dp),
) {
Row(
modifier = Modifier.fillMaxWidth(),
horizontalArrangement = Arrangement.SpaceBetween,
verticalAlignment = Alignment.CenterVertically,
) {
Column(modifier = Modifier.weight(1f)) {
Text(project.projectName, style = MaterialTheme.typography.titleMedium, fontWeight = FontWeight.SemiBold)
Text(
project.workspaceRoot,
color = MaterialTheme.colorScheme.onSurfaceVariant,
maxLines = 1,
overflow = TextOverflow.Ellipsis,
)
}
StatusChip(
label = if (project.status == "active") "当前活跃" else "最近活跃",
tone = if (project.status == "active") "running" else "pending",
)
}
if (project.primaryThreadTitle.isNotBlank()) {
Text("主对话:${project.primaryThreadTitle}")
}
if (project.recentThreadTitles.isNotEmpty()) {
Text(
"最近对话:${project.recentThreadTitles.joinToString(" · ")}",
color = MaterialTheme.colorScheme.onSurfaceVariant,
maxLines = 2,
overflow = TextOverflow.Ellipsis,
)
}
Row(horizontalArrangement = Arrangement.spacedBy(10.dp)) {
AssistChip(onClick = {}, label = { Text("${project.recentThreadCount} 个对话") })
if (project.pinnedThreadIds.isNotEmpty()) {
AssistChip(onClick = {}, label = { Text("${project.pinnedThreadIds.size} 个置顶") })
}
}
Text(
"最近同步 ${formatRelative(project.updatedAt)}",
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant,
)
}
}
}
@Composable
private fun GlobalMessageCard(
message: Message,

View File

@@ -19,6 +19,7 @@ import site.hyzq.bossandroid.model.AppStatePayload
import site.hyzq.bossandroid.model.ApprovalRequest
import site.hyzq.bossandroid.model.BossEvent
import site.hyzq.bossandroid.model.DeviceBinding
import site.hyzq.bossandroid.model.DeviceProject
import site.hyzq.bossandroid.model.HealthPayload
import site.hyzq.bossandroid.model.Message
import site.hyzq.bossandroid.model.Session
@@ -63,6 +64,7 @@ data class BossUiState(
val approvals: List<ApprovalRequest> = emptyList(),
val workers: List<WorkerNode> = emptyList(),
val deviceBindings: List<DeviceBinding> = emptyList(),
val deviceProjects: List<DeviceProject> = emptyList(),
val events: List<BossEvent> = emptyList(),
val selectedSessionId: String? = null,
val selectedWorkerId: String? = null,
@@ -361,6 +363,10 @@ class BossViewModel(
val approvals = snapshot.approvals.sortedByDescending { it.updatedAt }
val workers = snapshot.workers.sortedBy { it.name.lowercase() }
val deviceBindings = snapshot.deviceBindings.sortedByDescending { it.updatedAt }
val deviceProjects = snapshot.deviceProjects.sortedWith(
compareByDescending<DeviceProject> { it.status == "active" }
.thenByDescending { it.updatedAt },
)
val events = snapshot.events.sortedByDescending { it.timestamp }
val selectedSessionId = preferredSessionId
@@ -383,6 +389,7 @@ class BossViewModel(
approvals = approvals,
workers = workers,
deviceBindings = deviceBindings,
deviceProjects = deviceProjects,
events = events,
health = HealthPayload(
status = "ok",

234
src/codex-project-sync.ts Normal file
View File

@@ -0,0 +1,234 @@
import { createHash } from "node:crypto";
import { existsSync, readFileSync } from "node:fs";
import { basename, join, resolve } from "node:path";
import { homedir } from "node:os";
import type { DeviceProject } from "./types.js";
import { now } from "./utils.js";
interface SessionIndexEntry {
id: string;
thread_name?: string;
updated_at?: string;
}
interface GlobalStatePayload {
["active-workspace-roots"]?: unknown;
["electron-saved-workspace-roots"]?: unknown;
["electron-workspace-root-labels"]?: unknown;
["pinned-thread-ids"]?: unknown;
["thread-workspace-root-hints"]?: unknown;
}
interface ProjectThreadSummary {
id: string;
title: string;
updatedAt: string;
}
interface ProjectDraft {
workspaceRoot: string;
workspaceLabel: string;
projectName: string;
active: boolean;
updatedAt: string;
threads: ProjectThreadSummary[];
pinnedThreadIds: Set<string>;
}
function resolveCodexHome(): string {
const configured = process.env.BOSS_CODEX_HOME?.trim();
if (configured) {
return resolve(configured);
}
return resolve(homedir(), ".codex");
}
function normalizeWorkspaceRoot(value: string): string {
const trimmed = value.trim();
if (!trimmed) {
return "";
}
return trimmed.replace(/[\\/]+$/, "");
}
function readJsonFile<T>(filePath: string): T | null {
if (!existsSync(filePath)) {
return null;
}
try {
return JSON.parse(readFileSync(filePath, "utf8")) as T;
} catch {
return null;
}
}
function readJsonLines<T>(filePath: string): T[] {
if (!existsSync(filePath)) {
return [];
}
const lines = readFileSync(filePath, "utf8")
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean);
const result: T[] = [];
for (const line of lines) {
try {
result.push(JSON.parse(line) as T);
} catch {
// ignore malformed lines
}
}
return result;
}
function asStringList(input: unknown): string[] {
if (!Array.isArray(input)) {
return [];
}
return input
.map((value) => (typeof value === "string" ? normalizeWorkspaceRoot(value) : ""))
.filter(Boolean);
}
function asStringRecord(input: unknown): Record<string, string> {
if (!input || typeof input !== "object" || Array.isArray(input)) {
return {};
}
const entries = Object.entries(input as Record<string, unknown>)
.map(([key, value]) => [key, typeof value === "string" ? normalizeWorkspaceRoot(value) : ""] as const)
.filter(([, value]) => Boolean(value));
return Object.fromEntries(entries);
}
function asWorkspaceLabels(input: unknown): Record<string, string> {
if (!input || typeof input !== "object" || Array.isArray(input)) {
return {};
}
const entries = Object.entries(input as Record<string, unknown>)
.map(([key, value]) => [normalizeWorkspaceRoot(key), typeof value === "string" ? value.trim() : ""] as const)
.filter(([key, value]) => Boolean(key && value));
return Object.fromEntries(entries);
}
function createProjectDraft(
workspaceRoot: string,
workspaceLabels: Record<string, string>,
activeRoots: Set<string>,
): ProjectDraft {
const workspaceLabel = workspaceLabels[workspaceRoot] || basename(workspaceRoot) || workspaceRoot;
return {
workspaceRoot,
workspaceLabel,
projectName: workspaceLabel,
active: activeRoots.has(workspaceRoot),
updatedAt: now(),
threads: [],
pinnedThreadIds: new Set<string>(),
};
}
function projectId(workerId: string, workspaceRoot: string): string {
const digest = createHash("sha1")
.update(`${workerId}:${workspaceRoot}`)
.digest("hex")
.slice(0, 12);
return `device_project_${digest}`;
}
export function scanLocalCodexProjects(workerId: string, workspaceHint?: string): DeviceProject[] {
const codexHome = resolveCodexHome();
const globalState = readJsonFile<GlobalStatePayload>(join(codexHome, ".codex-global-state.json")) ?? {};
const sessionIndex = readJsonLines<SessionIndexEntry>(join(codexHome, "session_index.jsonl"));
const activeRoots = new Set([
...asStringList(globalState["active-workspace-roots"]),
normalizeWorkspaceRoot(workspaceHint ?? ""),
].filter(Boolean));
const workspaceLabels = asWorkspaceLabels(globalState["electron-workspace-root-labels"]);
const threadWorkspaceHints = asStringRecord(globalState["thread-workspace-root-hints"]);
const pinnedThreadIds = new Set(
Array.isArray(globalState["pinned-thread-ids"])
? globalState["pinned-thread-ids"].filter((value): value is string => typeof value === "string" && value.trim().length > 0)
: [],
);
const drafts = new Map<string, ProjectDraft>();
for (const workspaceRoot of activeRoots) {
drafts.set(workspaceRoot, createProjectDraft(workspaceRoot, workspaceLabels, activeRoots));
}
const sortedSessions = sessionIndex
.filter((entry) => entry.id && threadWorkspaceHints[entry.id])
.sort((left, right) => (right.updated_at ?? "").localeCompare(left.updated_at ?? ""))
.slice(0, 300);
for (const entry of sortedSessions) {
const workspaceRoot = threadWorkspaceHints[entry.id];
if (!workspaceRoot) {
continue;
}
const updatedAt = entry.updated_at?.trim() || now();
const threadTitle = entry.thread_name?.trim() || "未命名对话";
const draft =
drafts.get(workspaceRoot) ??
createProjectDraft(workspaceRoot, workspaceLabels, activeRoots);
if (pinnedThreadIds.has(entry.id)) {
draft.pinnedThreadIds.add(entry.id);
}
draft.updatedAt = draft.updatedAt > updatedAt ? draft.updatedAt : updatedAt;
draft.active = draft.active || activeRoots.has(workspaceRoot);
draft.threads.push({
id: entry.id,
title: threadTitle,
updatedAt,
});
drafts.set(workspaceRoot, draft);
}
return Array.from(drafts.values())
.map((draft) => {
const threads = draft.threads
.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt));
const primaryThread = threads[0];
const recentThreadTitles = Array.from(
new Set(threads.map((thread) => thread.title).filter(Boolean)),
).slice(0, 3);
return {
id: projectId(workerId, draft.workspaceRoot),
workerId,
source: "codex",
workspaceRoot: draft.workspaceRoot,
workspaceLabel: draft.workspaceLabel,
projectName: draft.projectName,
status: draft.active ? "active" : "recent",
primaryThreadId: primaryThread?.id ?? null,
primaryThreadTitle: primaryThread?.title ?? "",
recentThreadTitles,
recentThreadCount: threads.length,
pinnedThreadIds: Array.from(draft.pinnedThreadIds),
updatedAt: primaryThread?.updatedAt ?? draft.updatedAt,
} satisfies DeviceProject;
})
.sort((left, right) => {
if (left.status !== right.status) {
return left.status === "active" ? -1 : 1;
}
return right.updatedAt.localeCompare(left.updatedAt);
});
}

View File

@@ -4,6 +4,7 @@ import type {
AppState,
BossEvent,
DeviceBinding,
DeviceProject,
ExecutorKind,
Message,
Session,
@@ -448,6 +449,7 @@ export class BossEngine {
worker.currentTaskId = null;
worker.updatedAt = timestamp;
worker.lastSeenAt = timestamp;
state.deviceProjects = state.deviceProjects.filter((project) => project.workerId !== worker.id);
addEvent({
sessionId: null,
taskId: null,
@@ -466,7 +468,7 @@ export class BossEngine {
return updated;
}
heartbeat(workerId: string, load = 0): WorkerNode {
heartbeat(workerId: string, load = 0, deviceProjects: DeviceProject[] = []): WorkerNode {
let updated!: WorkerNode;
this.commit((state, addEvent) => {
const worker = state.workers.find((candidate) => candidate.id === workerId);
@@ -496,10 +498,60 @@ export class BossEngine {
updated = { ...worker };
});
if (deviceProjects.length > 0 || this.getState().deviceProjects.some((project) => project.workerId === workerId)) {
this.syncWorkerProjects(workerId, deviceProjects);
}
this.syncAssignments();
return updated;
}
syncWorkerProjects(workerId: string, projects: DeviceProject[]): DeviceProject[] {
let updatedProjects: DeviceProject[] = [];
this.commit((state, addEvent) => {
const worker = state.workers.find((candidate) => candidate.id === workerId);
if (!worker) {
throw new Error(`Worker not found: ${workerId}`);
}
const normalizedProjects = projects
.map((project) => ({
...project,
workerId,
}))
.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt));
const existingProjects = state.deviceProjects
.filter((project) => project.workerId === workerId)
.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt));
const changed = JSON.stringify(existingProjects) !== JSON.stringify(normalizedProjects);
if (!changed) {
updatedProjects = normalizedProjects;
return;
}
state.deviceProjects = [
...state.deviceProjects.filter((project) => project.workerId !== workerId),
...normalizedProjects,
];
addEvent({
sessionId: null,
taskId: worker.currentTaskId,
source: "worker",
type: "worker.projects.synced",
payload: {
workerId,
projectCount: normalizedProjects.length,
projectNames: normalizedProjects.map((project) => project.projectName).slice(0, 8),
},
});
updatedProjects = normalizedProjects;
});
return updatedProjects;
}
claimNextTask(workerId: string): Task | null {
let claimedTask: Task | null = null;
@@ -999,6 +1051,7 @@ export class BossEngine {
}
worker.currentTaskId = null;
state.deviceProjects = state.deviceProjects.filter((project) => project.workerId !== worker.id);
addEvent({
sessionId: null,
taskId: null,

View File

@@ -2,7 +2,7 @@ import path from "node:path";
import Fastify from "fastify";
import fastifyStatic from "@fastify/static";
import { BossEngine } from "./engine.js";
import type { DeviceBinding } from "./types.js";
import type { DeviceBinding, DeviceProject } from "./types.js";
const engine = new BossEngine();
const app = Fastify({ logger: process.env.BOSS_DEBUG === "1" });
@@ -308,8 +308,8 @@ app.get(withBase("/api/device-bindings/:token/bootstrap.ps1"), async (request, r
app.post(withBase("/api/workers/:workerId/heartbeat"), async (request) => {
const params = request.params as { workerId: string };
const body = (request.body ?? {}) as { load?: number };
return engine.heartbeat(params.workerId, body.load ?? 0);
const body = (request.body ?? {}) as { load?: number; deviceProjects?: DeviceProject[] };
return engine.heartbeat(params.workerId, body.load ?? 0, body.deviceProjects ?? []);
});
app.post(withBase("/api/workers/:workerId/offline"), async (request) => {

View File

@@ -10,6 +10,7 @@ function defaultState(): AppState {
workers: [],
approvals: [],
deviceBindings: [],
deviceProjects: [],
events: [],
};
}

View File

@@ -15,6 +15,7 @@ export type ApprovalStatus = "pending" | "approved" | "rejected";
export type RiskLevel = "low" | "medium" | "high";
export type DeviceBindingStatus = "pending" | "claimed" | "expired";
export type ExecutorKind = "codex" | "claude";
export type DeviceProjectStatus = "active" | "recent";
export interface Session {
id: string;
@@ -101,6 +102,22 @@ export interface DeviceBinding {
updatedAt: string;
}
export interface DeviceProject {
id: string;
workerId: string;
source: "codex";
workspaceRoot: string;
workspaceLabel: string;
projectName: string;
status: DeviceProjectStatus;
primaryThreadId: string | null;
primaryThreadTitle: string;
recentThreadTitles: string[];
recentThreadCount: number;
pinnedThreadIds: string[];
updatedAt: string;
}
export interface BossEvent {
id: string;
sessionId: string | null;
@@ -118,6 +135,7 @@ export interface AppState {
workers: WorkerNode[];
approvals: ApprovalRequest[];
deviceBindings: DeviceBinding[];
deviceProjects: DeviceProject[];
events: BossEvent[];
}

View File

@@ -1,6 +1,8 @@
import { spawn } from "node:child_process";
import { resolve } from "node:path";
import { setTimeout as delay } from "node:timers/promises";
import { scanLocalCodexProjects } from "./codex-project-sync.js";
import type { DeviceProject } from "./types.js";
interface Task {
id: string;
@@ -20,6 +22,8 @@ interface WorkerOptions {
progressIntervalMs: number;
}
const PROJECT_SYNC_INTERVAL_MS = Number(process.env.BOSS_PROJECT_SYNC_INTERVAL_MS ?? 15_000);
class HttpError extends Error {
constructor(
readonly status: number,
@@ -116,6 +120,18 @@ async function registerWorker(options: WorkerOptions) {
return worker;
}
async function syncHeartbeat(
options: WorkerOptions,
workerId: string,
load: number,
deviceProjects: DeviceProject[],
) {
return postJson(`${options.server}/api/workers/${workerId}/heartbeat`, {
load,
deviceProjects,
});
}
async function taskStillRunnable(server: string, taskId: string) {
try {
const task = (await getJson(`${server}/api/tasks/${taskId}`)) as { status: string };
@@ -256,6 +272,8 @@ async function runCommandTask(options: WorkerOptions, workerId: string, task: Ta
let cancelled = false;
const startedAt = Date.now();
let cachedProjects: DeviceProject[] = [];
let lastProjectSyncAt = 0;
while (!exitState.done) {
await delay(options.progressIntervalMs);
@@ -270,6 +288,12 @@ async function runCommandTask(options: WorkerOptions, workerId: string, task: Ta
break;
}
if (Date.now() - lastProjectSyncAt >= PROJECT_SYNC_INTERVAL_MS) {
cachedProjects = scanLocalCodexProjects(workerId, options.workspace);
lastProjectSyncAt = Date.now();
await syncHeartbeat(options, workerId, 1, cachedProjects);
}
const elapsed = Date.now() - startedAt;
const progressPercent = Math.min(90, 20 + Math.floor(elapsed / options.progressIntervalMs) * 10);
await postJson(`${options.server}/api/tasks/${task.id}/progress`, {
@@ -323,10 +347,17 @@ async function executeTask(options: WorkerOptions, workerId: string, task: Task)
async function main() {
const options = parseArgs(process.argv.slice(2));
let worker = await registerWorker(options);
let cachedProjects: DeviceProject[] = [];
let lastProjectSyncAt = 0;
for (;;) {
try {
await postJson(`${options.server}/api/workers/${worker.id}/heartbeat`, { load: 0 });
if (Date.now() - lastProjectSyncAt >= PROJECT_SYNC_INTERVAL_MS) {
cachedProjects = scanLocalCodexProjects(worker.id, options.workspace);
lastProjectSyncAt = Date.now();
}
await syncHeartbeat(options, worker.id, 0, cachedProjects);
const response = (await postJson(`${options.server}/api/workers/${worker.id}/claim-next`, {})) as {
task: Task | null;
};