import { resolve } from "node:path"; import type { ApprovalRequest, AppState, BossEvent, DeviceBinding, ExecutorKind, Message, Session, SessionDetails, Task, WorkerNode, } from "./types.js"; import { EventBroker } from "./event-broker.js"; import { createPlan, buildPlannerMessage, materializeTasks } from "./planner.js"; import { chooseAssignmentCandidates } from "./scheduler.js"; import { FileStore } from "./store.js"; import { createId, now } from "./utils.js"; const DATA_FILE = process.env.BOSS_DATA_FILE ? resolve(process.cwd(), process.env.BOSS_DATA_FILE) : resolve(process.cwd(), ".boss-data", "store.json"); function isActiveTask(task: Task): boolean { return !["completed", "failed", "cancelled"].includes(task.status); } export class BossEngine { readonly store = new FileStore(DATA_FILE); readonly events = new EventBroker(); getState(): AppState { this.reconcileState(); return this.store.snapshot; } bootstrap(): AppState { return this.getState(); } resetDemo(preserveWorkers = true): AppState { this.commit((state) => { state.sessions = []; state.messages = []; state.tasks = []; state.approvals = []; state.events = []; if (!preserveWorkers) { state.workers = []; return; } const timestamp = now(); state.workers = state.workers.map((worker) => ({ ...worker, currentTaskId: null, load: 0, status: worker.status === "offline" ? "offline" : "idle", updatedAt: timestamp, })); }); return this.getState(); } createSession(title?: string): SessionDetails { const timestamp = now(); const session: Session = { id: createId("session"), title: title?.trim() || "未命名项目", status: "active", activeObjective: "", lastPlannerSummary: "", activeWorkerId: null, createdAt: timestamp, updatedAt: timestamp, }; this.commit((state, addEvent) => { state.sessions.unshift(session); addEvent({ sessionId: session.id, taskId: null, source: "system", type: "session.created", payload: { title: session.title }, }); }); return this.getSession(session.id); } getSession(sessionId: string): SessionDetails { const state = this.getState(); const session = state.sessions.find((candidate) => candidate.id === sessionId); if (!session) { throw new Error(`Session not found: ${sessionId}`); } return { session, messages: state.messages .filter((message) => message.sessionId === sessionId) .sort((left, right) => left.createdAt.localeCompare(right.createdAt)), tasks: state.tasks .filter((task) => task.sessionId === sessionId) .sort((left, right) => left.createdAt.localeCompare(right.createdAt)), approvals: state.approvals .filter((approval) => approval.sessionId === sessionId) .sort((left, right) => left.createdAt.localeCompare(right.createdAt)), }; } listSessions(): Session[] { return this.getState().sessions.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt)); } getTask(taskId: string): Task { const task = this.getState().tasks.find((candidate) => candidate.id === taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } return task; } archiveSession(sessionId: string): SessionDetails { this.commit((state, addEvent) => { const session = state.sessions.find((candidate) => candidate.id === sessionId); if (!session) { throw new Error(`Session not found: ${sessionId}`); } const timestamp = now(); session.status = "archived"; session.updatedAt = timestamp; for (const task of state.tasks.filter( (candidate) => candidate.sessionId === sessionId && isActiveTask(candidate), )) { if (!["completed", "failed", "cancelled"].includes(task.status)) { this.detachTaskFromWorker(state, task, timestamp); task.status = "paused"; task.summary = "会话已归档,任务暂停。"; task.updatedAt = timestamp; addEvent({ sessionId, taskId: task.id, source: "system", type: "task.paused", payload: { reason: "session_archived" }, }); } } addEvent({ sessionId, taskId: null, source: "system", type: "session.archived", payload: { sessionId }, }); }); return this.getSession(sessionId); } restoreSession(sessionId: string): SessionDetails { this.commit((state, addEvent) => { const session = state.sessions.find((candidate) => candidate.id === sessionId); if (!session) { throw new Error(`Session not found: ${sessionId}`); } const timestamp = now(); session.status = "active"; session.updatedAt = timestamp; addEvent({ sessionId, taskId: null, source: "system", type: "session.restored", payload: { sessionId }, }); }); return this.getSession(sessionId); } addMessage( sessionId: string, content: string, channel = "web", targetWorkerId: string | null = null, ): SessionDetails { const session = this.getSession(sessionId).session; if (session.status === "archived") { throw new Error(`Session ${sessionId} is archived`); } const targetWorker = targetWorkerId ? this.getWorker(targetWorkerId) : null; const message: Message = { id: createId("msg"), sessionId, role: "user", channel, content: content.trim(), createdAt: now(), }; if (!message.content) { throw new Error("Message content is required."); } this.commit((state, addEvent) => { const mutableSession = state.sessions.find((candidate) => candidate.id === sessionId); if (!mutableSession) { throw new Error(`Session not found: ${sessionId}`); } mutableSession.activeObjective = message.content; mutableSession.activeWorkerId = targetWorker?.id ?? mutableSession.activeWorkerId ?? null; mutableSession.updatedAt = message.createdAt; if (!mutableSession.title || mutableSession.title === "未命名项目") { mutableSession.title = message.content.slice(0, 32); } state.messages.push(message); addEvent({ sessionId, taskId: null, source: "user", type: "session.message.added", payload: { channel, content: message.content, targetWorkerId: targetWorker?.id ?? null, targetWorkerName: targetWorker?.name ?? null, }, }); }); this.applyPlan(session, message.content, targetWorker?.id ?? null); return this.getSession(sessionId); } registerWorker(input: { name: string; os: WorkerNode["os"]; capabilities: string[]; }): WorkerNode { const timestamp = now(); const existing = this.getState().workers.find((worker) => worker.name === input.name); if (existing) { const updated = this.updateWorker(existing.id, { os: input.os, capabilities: input.capabilities, status: "idle", load: 0, }); this.commit((state) => { const pendingBinding = state.deviceBindings.find( (binding) => binding.status === "pending" && binding.name === updated.name && binding.os === updated.os, ); if (!pendingBinding) { return; } pendingBinding.status = "claimed"; pendingBinding.claimedWorkerId = updated.id; pendingBinding.claimedAt = timestamp; pendingBinding.updatedAt = timestamp; }); return updated; } const worker: WorkerNode = { id: createId("worker"), name: input.name, os: input.os, capabilities: Array.from(new Set(input.capabilities)), status: "idle", currentTaskId: null, load: 0, lastSeenAt: timestamp, createdAt: timestamp, updatedAt: timestamp, }; this.commit((state, addEvent) => { state.workers.push(worker); const pendingBinding = state.deviceBindings.find( (binding) => binding.status === "pending" && binding.name === worker.name && binding.os === worker.os, ); if (pendingBinding) { pendingBinding.status = "claimed"; pendingBinding.claimedWorkerId = worker.id; pendingBinding.claimedAt = timestamp; pendingBinding.updatedAt = timestamp; } addEvent({ sessionId: null, taskId: null, source: "system", type: "worker.registered", payload: { workerId: worker.id, name: worker.name, os: worker.os, capabilities: worker.capabilities, }, }); }); this.syncAssignments(); return worker; } createDeviceBinding(input: { name: string; os: WorkerNode["os"]; capabilities: string[]; executor: ExecutorKind; workspaceHint?: string; }): DeviceBinding { const timestamp = now(); const binding: DeviceBinding = { id: createId("binding"), token: createId("bindtoken"), name: input.name.trim(), os: input.os, capabilities: Array.from(new Set(input.capabilities)).filter(Boolean), executor: input.executor, workspaceHint: input.workspaceHint?.trim() ?? "", status: "pending", claimedWorkerId: null, claimedAt: null, createdAt: timestamp, updatedAt: timestamp, }; if (!binding.name) { throw new Error("Binding name is required."); } if (binding.capabilities.length === 0) { binding.capabilities = ["terminal"]; } this.commit((state, addEvent) => { state.deviceBindings = state.deviceBindings.filter( (item) => !(item.status === "pending" && item.name === binding.name && item.os === binding.os), ); state.deviceBindings.unshift(binding); addEvent({ sessionId: null, taskId: null, source: "system", type: "device.binding.created", payload: { bindingId: binding.id, name: binding.name, os: binding.os, executor: binding.executor, }, }); }); return binding; } listDeviceBindings(): DeviceBinding[] { return this.getState().deviceBindings.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt)); } getDeviceBindingByToken(token: string): DeviceBinding { const binding = this.getState().deviceBindings.find((item) => item.token === token); if (!binding) { throw new Error(`Device binding not found: ${token}`); } return binding; } updateWorker( workerId: string, input: Partial>, ): WorkerNode { let updated!: WorkerNode; this.store.mutate((state) => { const worker = state.workers.find((candidate) => candidate.id === workerId); if (!worker) { throw new Error(`Worker not found: ${workerId}`); } if (input.os) worker.os = input.os; if (input.capabilities) worker.capabilities = Array.from(new Set(input.capabilities)); if (input.status) worker.status = input.status; if (typeof input.load === "number") worker.load = input.load; worker.updatedAt = now(); worker.lastSeenAt = worker.updatedAt; updated = { ...worker }; }); return updated; } getWorker(workerId: string): WorkerNode { const worker = this.getState().workers.find((candidate) => candidate.id === workerId); if (!worker) { throw new Error(`Worker not found: ${workerId}`); } return worker; } markWorkerOffline(workerId: string): WorkerNode { let updated!: WorkerNode; this.commit((state, addEvent) => { const worker = state.workers.find((candidate) => candidate.id === workerId); if (!worker) { throw new Error(`Worker not found: ${workerId}`); } const timestamp = now(); for (const task of state.tasks.filter( (candidate) => candidate.assignedWorkerId === worker.id && ["assigned", "running"].includes(candidate.status), )) { this.requeueTaskState(state, task, timestamp, `${worker.name} 被手动下线`); addEvent({ sessionId: task.sessionId, taskId: task.id, source: "system", type: "task.requeued", payload: { workerId: worker.id, workerName: worker.name, reason: "manual_worker_offline", }, }); } worker.status = "offline"; worker.currentTaskId = null; worker.updatedAt = timestamp; worker.lastSeenAt = timestamp; addEvent({ sessionId: null, taskId: null, source: "system", type: "worker.offline", payload: { workerId: worker.id, workerName: worker.name, reason: "manual", }, }); updated = { ...worker }; }); this.syncAssignments(); return updated; } heartbeat(workerId: string, load = 0): WorkerNode { let updated!: WorkerNode; this.commit((state, addEvent) => { const worker = state.workers.find((candidate) => candidate.id === workerId); if (!worker) { throw new Error(`Worker not found: ${workerId}`); } worker.lastSeenAt = now(); worker.updatedAt = worker.lastSeenAt; worker.load = load; if (!worker.currentTaskId && worker.status !== "offline") { worker.status = "idle"; } addEvent({ sessionId: null, taskId: worker.currentTaskId, source: "worker", type: "worker.heartbeat", payload: { workerId: worker.id, status: worker.status, load: worker.load, }, }); updated = { ...worker }; }); this.syncAssignments(); return updated; } claimNextTask(workerId: string): Task | null { let claimedTask: Task | null = null; this.commit((state, addEvent) => { const worker = state.workers.find((candidate) => candidate.id === workerId); if (!worker) { throw new Error(`Worker not found: ${workerId}`); } const task = state.tasks.find( (candidate) => candidate.assignedWorkerId === workerId && candidate.status === "assigned", ); if (!task) { return; } task.status = "running"; task.currentStep = "start"; task.summary = "任务已被 worker 接收。"; task.updatedAt = now(); worker.status = "busy"; worker.currentTaskId = task.id; worker.updatedAt = task.updatedAt; worker.lastSeenAt = task.updatedAt; claimedTask = { ...task }; addEvent({ sessionId: task.sessionId, taskId: task.id, source: "worker", type: "task.started", payload: { workerId, title: task.title, }, }); }); return claimedTask; } reportProgress( taskId: string, workerId: string, input: { progressPercent: number; summary: string; currentStep: string; nextStep: string; }, ): Task { let updated!: Task; this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); const worker = state.workers.find((candidate) => candidate.id === workerId); if (!task) { throw new Error(`Task not found: ${taskId}`); } if (!worker) { throw new Error(`Worker not found: ${workerId}`); } this.assertTaskOwnership(task, workerId, worker.currentTaskId); task.progressPercent = Math.max(0, Math.min(100, input.progressPercent)); task.summary = input.summary; task.currentStep = input.currentStep; task.nextStep = input.nextStep; task.updatedAt = now(); if (task.status === "assigned") { task.status = "running"; } worker.status = "busy"; worker.currentTaskId = task.id; worker.updatedAt = task.updatedAt; worker.lastSeenAt = task.updatedAt; addEvent({ sessionId: task.sessionId, taskId: task.id, source: "worker", type: "task.progress", payload: { workerId, progressPercent: task.progressPercent, summary: task.summary, currentStep: task.currentStep, nextStep: task.nextStep, }, }); updated = { ...task }; }); return updated; } completeTask(taskId: string, workerId: string, summary: string): Task { let updated!: Task; this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); const worker = state.workers.find((candidate) => candidate.id === workerId); if (!task) { throw new Error(`Task not found: ${taskId}`); } if (!worker) { throw new Error(`Worker not found: ${workerId}`); } this.assertTaskOwnership(task, workerId, worker.currentTaskId); task.status = "completed"; task.progressPercent = 100; task.summary = summary; task.currentStep = "done"; task.nextStep = ""; task.updatedAt = now(); worker.status = "idle"; worker.currentTaskId = null; worker.updatedAt = task.updatedAt; worker.lastSeenAt = task.updatedAt; addEvent({ sessionId: task.sessionId, taskId: task.id, source: "worker", type: "task.completed", payload: { workerId, summary, }, }); updated = { ...task }; }); this.syncAssignments(); return updated; } failTask(taskId: string, workerId: string, errorMessage: string): Task { let updated!: Task; this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); const worker = state.workers.find((candidate) => candidate.id === workerId); if (!task) { throw new Error(`Task not found: ${taskId}`); } if (!worker) { throw new Error(`Worker not found: ${workerId}`); } this.assertTaskOwnership(task, workerId, worker.currentTaskId); task.status = "failed"; task.summary = errorMessage; task.currentStep = "failed"; task.nextStep = ""; task.updatedAt = now(); worker.status = "idle"; worker.currentTaskId = null; worker.updatedAt = task.updatedAt; worker.lastSeenAt = task.updatedAt; addEvent({ sessionId: task.sessionId, taskId: task.id, source: "worker", type: "task.failed", payload: { workerId, errorMessage, }, }); updated = { ...task }; }); this.syncAssignments(); return updated; } pauseTask(taskId: string): Task { return this.transitionTask(taskId, "paused", "system", "task.paused", { summary: "任务已被暂停。", }); } cancelTask(taskId: string): Task { return this.transitionTask(taskId, "cancelled", "system", "task.cancelled", { summary: "任务已被取消。", }); } resumeTask(taskId: string): Task { let updated!: Task; this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } if (task.status !== "paused") { updated = { ...task }; return; } task.assignedWorkerId = null; task.status = task.approvalStatus === "pending" ? "waiting_approval" : "queued"; task.summary = "任务已恢复,等待重新调度。"; task.updatedAt = now(); addEvent({ sessionId: task.sessionId, taskId: task.id, source: "system", type: "task.resumed", payload: { summary: task.summary, }, }); updated = { ...task }; }); this.syncAssignments(); return updated; } requeueTask(taskId: string): Task { let updated!: Task; this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } this.requeueTaskState(state, task, now(), "手动重排"); addEvent({ sessionId: task.sessionId, taskId: task.id, source: "system", type: "task.requeued", payload: { summary: task.summary, }, }); updated = { ...task }; }); this.syncAssignments(); return updated; } respondApproval(approvalId: string, approved: boolean, responder: string): ApprovalRequest { let updatedApproval!: ApprovalRequest; this.commit((state, addEvent) => { const approval = state.approvals.find((candidate) => candidate.id === approvalId); if (!approval) { throw new Error(`Approval not found: ${approvalId}`); } const task = state.tasks.find((candidate) => candidate.id === approval.taskId); if (!task) { throw new Error(`Task not found for approval: ${approval.taskId}`); } const timestamp = now(); approval.status = approved ? "approved" : "rejected"; approval.responder = responder; approval.updatedAt = timestamp; task.approvalStatus = approval.status; task.updatedAt = timestamp; task.status = approved ? "queued" : "cancelled"; task.summary = approved ? "审批已通过,重新进入队列。" : "审批被拒绝,任务已取消。"; addEvent({ sessionId: approval.sessionId, taskId: approval.taskId, source: "system", type: approved ? "approval.approved" : "approval.rejected", payload: { approvalId, responder, }, }); updatedApproval = { ...approval }; }); this.syncAssignments(); return updatedApproval; } listEvents(limit = 100): BossEvent[] { const events = this.getState().events; return events.slice(Math.max(0, events.length - limit)); } reconcileNow(): AppState { this.reconcileState(); this.syncAssignments(); return this.getState(); } private applyPlan(session: Session, content: string, targetWorkerId: string | null): void { const sessionDetails = this.getSession(session.id); const targetWorker = targetWorkerId ? this.getWorker(targetWorkerId) : null; const result = createPlan( sessionDetails.session, content, sessionDetails.tasks.filter(isActiveTask), targetWorker, ); const tasks = materializeTasks(session.id, result); const plannerMessage = buildPlannerMessage(result.summary); const timestamp = now(); this.commit((state, addEvent) => { const pausedTaskIds: string[] = []; const mutableSession = state.sessions.find((candidate) => candidate.id === session.id); if (!mutableSession) { throw new Error(`Session not found: ${session.id}`); } mutableSession.activeObjective = content; mutableSession.activeWorkerId = targetWorker?.id ?? mutableSession.activeWorkerId ?? null; mutableSession.lastPlannerSummary = plannerMessage; mutableSession.updatedAt = timestamp; if (result.pauseExistingTasks) { for (const task of state.tasks.filter( (candidate) => candidate.sessionId === session.id && isActiveTask(candidate), )) { if (["running", "assigned", "queued", "planning", "blocked"].includes(task.status)) { this.detachTaskFromWorker(state, task, timestamp); task.status = "paused"; task.summary = "检测到新需求,旧任务已暂停。"; task.updatedAt = timestamp; addEvent({ sessionId: task.sessionId, taskId: task.id, source: "manager", type: "task.paused", payload: { reason: "replan", }, }); pausedTaskIds.push(task.id); } } } state.tasks.push(...tasks); for (const task of tasks) { if (task.approvalStatus === "pending") { state.approvals.push({ id: createId("approval"), sessionId: session.id, taskId: task.id, kind: "dangerous_action", summary: `任务 "${task.title}" 包含高风险关键词,需要审批后才能执行。`, riskLevel: "high", status: "pending", requester: "manager", responder: null, createdAt: timestamp, updatedAt: timestamp, }); } } const managerMessage: Message = { id: createId("msg"), sessionId: session.id, role: "manager", channel: "system", content: plannerMessage, createdAt: timestamp, }; state.messages.push(managerMessage); addEvent({ sessionId: session.id, taskId: null, source: "manager", type: "plan.created", payload: { summary: result.summary, taskIds: tasks.map((task) => task.id), pausedTaskIds, }, }); }); this.syncAssignments(); } private transitionTask( taskId: string, status: Task["status"], source: BossEvent["source"], eventType: string, payload: Record, ): Task { let updated!: Task; this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } task.status = status; task.updatedAt = now(); task.summary = typeof payload.summary === "string" ? payload.summary : task.summary; this.detachTaskFromWorker(state, task, task.updatedAt); addEvent({ sessionId: task.sessionId, taskId: task.id, source, type: eventType, payload, }); updated = { ...task }; }); this.syncAssignments(); return updated; } private syncAssignments(): void { const candidates = chooseAssignmentCandidates(this.store.snapshot); if (candidates.length === 0) { return; } this.commit((state, addEvent) => { for (const candidate of candidates) { const task = state.tasks.find((item) => item.id === candidate.taskId); const worker = state.workers.find((item) => item.id === candidate.workerId); if (!task || !worker || task.status !== "queued" || worker.status !== "idle") { continue; } const timestamp = now(); task.status = "assigned"; task.assignedWorkerId = worker.id; task.updatedAt = timestamp; task.summary = `已分配给 ${worker.name}`; worker.status = "busy"; worker.currentTaskId = task.id; worker.updatedAt = timestamp; worker.lastSeenAt = timestamp; addEvent({ sessionId: task.sessionId, taskId: task.id, source: "system", type: "task.assigned", payload: { workerId: worker.id, workerName: worker.name, }, }); } }); } private reconcileState(): void { const staleAfterMs = 20_000; const currentTime = Date.now(); this.commit((state, addEvent) => { for (const worker of state.workers) { const age = currentTime - new Date(worker.lastSeenAt).getTime(); if (age <= staleAfterMs || worker.status === "offline") { continue; } const timestamp = now(); worker.status = "offline"; worker.updatedAt = timestamp; for (const task of state.tasks.filter( (candidate) => candidate.assignedWorkerId === worker.id && ["assigned", "running"].includes(candidate.status), )) { this.requeueTaskState(state, task, timestamp, `worker ${worker.name} 离线`); addEvent({ sessionId: task.sessionId, taskId: task.id, source: "system", type: "task.requeued", payload: { workerId: worker.id, workerName: worker.name, reason: "heartbeat_timeout", }, }); } worker.currentTaskId = null; addEvent({ sessionId: null, taskId: null, source: "system", type: "worker.offline", payload: { workerId: worker.id, workerName: worker.name, reason: "heartbeat_timeout", }, }); } }); } private detachTaskFromWorker(state: AppState, task: Task, timestamp: string): void { if (!task.assignedWorkerId) { return; } const worker = state.workers.find((candidate) => candidate.id === task.assignedWorkerId); if (worker && worker.currentTaskId === task.id) { worker.currentTaskId = null; if (worker.status !== "offline") { worker.status = "idle"; } worker.updatedAt = timestamp; worker.lastSeenAt = timestamp; } task.assignedWorkerId = null; } private requeueTaskState(state: AppState, task: Task, timestamp: string, reason: string): void { this.detachTaskFromWorker(state, task, timestamp); task.status = task.approvalStatus === "pending" ? "waiting_approval" : "queued"; task.summary = `${reason},任务已重新排队。`; task.currentStep = "requeued"; task.nextStep = "等待新 worker 认领"; task.updatedAt = timestamp; } private assertTaskOwnership(task: Task, workerId: string, workerCurrentTaskId: string | null): void { if (task.assignedWorkerId !== workerId) { throw new Error(`Task ${task.id} is not assigned to worker ${workerId}`); } if (workerCurrentTaskId !== task.id) { throw new Error(`Worker ${workerId} is not currently executing task ${task.id}`); } if (!["assigned", "running"].includes(task.status)) { throw new Error(`Task ${task.id} does not accept worker updates in status ${task.status}`); } } private makeEvent(input: Omit): BossEvent { return { id: createId("evt"), timestamp: now(), ...input, }; } private commit( mutator: ( state: AppState, addEvent: (input: Omit) => void, ) => T, ): T { const published: BossEvent[] = []; const result = this.store.mutate((state) => mutator(state, (input) => { const event = this.makeEvent(input); state.events.push(event); published.push(event); }), ); for (const event of published) { this.events.publish(event); } return result; } }