diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..af7c582 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +node_modules +dist +.git +.boss-data + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..88c40b2 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +FROM node:22-bookworm-slim AS deps +WORKDIR /app +COPY package.json package-lock.json ./ +RUN npm ci + +FROM deps AS build +COPY tsconfig.json ./ +COPY src ./src +COPY public ./public +COPY docs ./docs +COPY README.md ./ +RUN npm run build + +FROM node:22-bookworm-slim AS runtime +WORKDIR /app +ENV NODE_ENV=production +COPY --from=deps /app/node_modules ./node_modules +COPY --from=build /app/dist ./dist +COPY --from=build /app/public ./public +COPY package.json package-lock.json README.md ./ +EXPOSE 43210 +CMD ["node", "dist/server.js"] + diff --git a/README.md b/README.md index 713d66e..1fa87d7 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,8 @@ Boss 是一个面向多设备开发协作的 agent control plane。 - SSE 实时事件流 - Web 控制台 - `boss-worker` 模拟执行器 +- `npm run smoke` 自动跑端到端验证 +- `Dockerfile` + `compose.yaml` 支持容器启动 ## 当前推荐方向 @@ -45,7 +47,7 @@ Boss 是一个面向多设备开发协作的 agent control plane。 ```bash npm install -npm run dev +npm run demo ``` 浏览器打开: @@ -54,10 +56,50 @@ npm run dev http://127.0.0.1:43210 ``` -另开终端启动 worker: +如果你只想单独启动服务端: + +```bash +npm run dev +``` + +如果你要手工启动 worker: ```bash npm run worker -- --name win-a --os windows --capability terminal --capability browser npm run worker -- --name win-b --os windows --capability terminal --capability test npm run worker -- --name mac-a --os macos --capability terminal --capability test --capability browser ``` + +一键本地 demo: + +```bash +npm install +npm run demo +``` + +这会拉起: + +- Web/API 服务 +- 2 台 Windows 模拟 worker +- 1 台 Mac 模拟 worker + +自动 smoke test: + +```bash +npm run smoke +``` + +容器启动: + +```bash +docker compose up --build +``` + +## 当前 v1 能力 + +- 创建项目会话并持续对话 +- 自动生成任务树并调度到不同 worker +- worker 心跳、掉线回收、任务重排 +- 审批、暂停、恢复、取消、重排 +- SSE 实时事件流和 Web 控制台 +- 一键 demo 启动 diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 0000000..ee2d894 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,28 @@ +services: + boss: + build: . + ports: + - "43210:43210" + volumes: + - boss-data:/app/.boss-data + + worker-win-a: + build: . + depends_on: + - boss + command: ["node", "dist/worker.js", "--name", "win-a", "--os", "windows", "--capability", "terminal", "--capability", "browser", "--server", "http://boss:43210"] + + worker-win-b: + build: . + depends_on: + - boss + command: ["node", "dist/worker.js", "--name", "win-b", "--os", "windows", "--capability", "terminal", "--capability", "test", "--server", "http://boss:43210"] + + worker-mac-a: + build: . + depends_on: + - boss + command: ["node", "dist/worker.js", "--name", "mac-a", "--os", "macos", "--capability", "terminal", "--capability", "test", "--capability", "browser", "--server", "http://boss:43210"] + +volumes: + boss-data: diff --git a/package.json b/package.json index c50908a..4ba1c50 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,9 @@ "build": "tsc -p tsconfig.json", "start": "node dist/server.js", "check": "tsc --noEmit -p tsconfig.json", - "worker": "tsx src/worker.ts" + "worker": "tsx src/worker.ts", + "demo": "tsx src/demo.ts", + "smoke": "tsx src/smoke.ts" }, "dependencies": { "@fastify/static": "^8.2.0", @@ -20,4 +22,3 @@ "typescript": "^5.9.2" } } - diff --git a/public/app.js b/public/app.js index f57bf8d..35244d9 100644 --- a/public/app.js +++ b/public/app.js @@ -15,17 +15,23 @@ const elements = { taskList: document.querySelector("#task-list"), approvalList: document.querySelector("#approval-list"), eventList: document.querySelector("#event-list"), + planHint: document.querySelector("#plan-hint"), sessionTitleDisplay: document.querySelector("#session-title-display"), sessionSummary: document.querySelector("#session-summary"), createSessionForm: document.querySelector("#create-session-form"), sessionTitleInput: document.querySelector("#session-title"), + createWorkerForm: document.querySelector("#create-worker-form"), + workerName: document.querySelector("#worker-name"), + workerOs: document.querySelector("#worker-os"), + workerCapabilities: document.querySelector("#worker-capabilities"), messageForm: document.querySelector("#message-form"), messageInput: document.querySelector("#message-input"), resetDemo: document.querySelector("#reset-demo"), + archiveSession: document.querySelector("#archive-session"), }; function escapeHtml(input) { - return input + return String(input) .replaceAll("&", "&") .replaceAll("<", "<") .replaceAll(">", ">") @@ -39,11 +45,18 @@ async function request(url, options = {}) { ...options, }); - if (!response.ok) { - throw new Error(`${response.status} ${response.statusText}`); + let payload = {}; + try { + payload = await response.json(); + } catch { + payload = {}; } - return response.json(); + if (!response.ok) { + throw new Error(payload.error || `${response.status} ${response.statusText}`); + } + + return payload; } function selectedSession() { @@ -70,13 +83,20 @@ function eventsForSelectedSession() { } function renderSessions() { + if (state.sessions.length === 0) { + elements.sessionList.innerHTML = `

先创建一个项目会话。

`; + return; + } + elements.sessionList.innerHTML = state.sessions .map((session) => { const active = session.id === state.selectedSessionId ? "active" : ""; + const archived = session.status === "archived" ? "archived" : ""; return ` - `; }) @@ -92,6 +112,11 @@ function renderSessions() { } function renderWorkers() { + if (state.workers.length === 0) { + elements.workerList.innerHTML = `

还没有 worker。可以手动注册,或直接运行 \`npm run demo\`。

`; + return; + } + elements.workerList.innerHTML = state.workers .map( (worker) => ` @@ -101,6 +126,8 @@ function renderWorkers() { ${escapeHtml(worker.status)}
${escapeHtml(worker.os)}
+
current: ${escapeHtml(worker.currentTaskId || "idle")}
+
last seen: ${new Date(worker.lastSeenAt).toLocaleTimeString()}
${worker.capabilities.map((capability) => `${escapeHtml(capability)}`).join("")}
@@ -115,12 +142,21 @@ function renderSessionHeader() { if (!session) { elements.sessionTitleDisplay.textContent = "选择一个项目会话"; elements.sessionSummary.textContent = "创建会话后,在这里持续对话并观察任务状态。"; + elements.archiveSession.disabled = true; + elements.messageInput.disabled = true; + elements.messageInput.placeholder = "先创建或选择一个项目会话。"; return; } - elements.sessionTitleDisplay.textContent = session.title; + elements.sessionTitleDisplay.textContent = `${session.title} (${session.status})`; elements.sessionSummary.textContent = session.lastPlannerSummary || session.activeObjective || "等待用户输入。"; + elements.archiveSession.disabled = session.status === "archived"; + elements.messageInput.disabled = session.status === "archived"; + elements.messageInput.placeholder = + session.status === "archived" + ? "当前会话已归档,不能继续发送消息。" + : "输入需求。示例:先调研登录失败根因,不要急着改代码。"; } function renderMessages() { @@ -143,6 +179,18 @@ function renderMessages() { } function renderTasks() { + const latestPlan = [...state.events] + .reverse() + .find((event) => event.sessionId === state.selectedSessionId && event.type === "plan.created"); + if (latestPlan) { + const pausedCount = Array.isArray(latestPlan.payload.pausedTaskIds) + ? latestPlan.payload.pausedTaskIds.length + : 0; + elements.planHint.textContent = `最新计划创建了 ${latestPlan.payload.taskIds.length} 个任务,自动暂停旧任务 ${pausedCount} 个。`; + } else { + elements.planHint.textContent = ""; + } + const tasks = tasksForSelectedSession(); elements.taskList.innerHTML = tasks.length ? tasks @@ -157,9 +205,23 @@ function renderTasks() {
worker: ${escapeHtml(task.assignedWorkerId || "未分配")}
progress: ${task.progressPercent}%
summary: ${escapeHtml(task.summary || "暂无")}
+
next: ${escapeHtml(task.nextStep || "暂无")}
- - + ${ + ["queued", "assigned", "running"].includes(task.status) + ? `` + : "" + } + ${ + ["paused", "blocked"].includes(task.status) + ? `` + : "" + } + ${ + !["completed", "cancelled", "failed"].includes(task.status) + ? `` + : "" + }
`, @@ -173,6 +235,7 @@ function renderTasks() { const action = button.dataset.action; await request(`/api/tasks/${taskId}/${action}`, { method: "POST", body: "{}" }); await loadSession(state.selectedSessionId); + await loadBootstrap(); render(); }); }); @@ -209,6 +272,7 @@ function renderApprovals() { body: JSON.stringify({ approved, responder: "web-user" }), }); await loadSession(state.selectedSessionId); + await loadBootstrap(); render(); }); }); @@ -288,6 +352,25 @@ elements.createSessionForm.addEventListener("submit", async (event) => { render(); }); +elements.createWorkerForm.addEventListener("submit", async (event) => { + event.preventDefault(); + const name = elements.workerName.value.trim(); + if (!name) return; + const os = elements.workerOs.value; + const capabilities = elements.workerCapabilities.value + .split(",") + .map((item) => item.trim()) + .filter(Boolean); + await request("/api/workers/register", { + method: "POST", + body: JSON.stringify({ name, os, capabilities }), + }); + elements.workerName.value = ""; + elements.workerCapabilities.value = "terminal"; + await loadBootstrap(); + render(); +}); + elements.messageForm.addEventListener("submit", async (event) => { event.preventDefault(); if (!state.selectedSessionId) return; @@ -303,6 +386,17 @@ elements.messageForm.addEventListener("submit", async (event) => { render(); }); +elements.archiveSession.addEventListener("click", async () => { + if (!state.selectedSessionId) return; + await request(`/api/sessions/${state.selectedSessionId}/archive`, { + method: "POST", + body: "{}", + }); + await loadSession(state.selectedSessionId); + await loadBootstrap(); + render(); +}); + elements.resetDemo.addEventListener("click", async () => { await request("/api/demo/reset", { method: "POST", body: "{}" }); state.sessions = []; @@ -330,4 +424,3 @@ loadBootstrap().then(render).catch((error) => { console.error(error); elements.sessionSummary.textContent = error.message; }); - diff --git a/public/index.html b/public/index.html index 61a425f..678cfae 100644 --- a/public/index.html +++ b/public/index.html @@ -32,6 +32,16 @@

设备

+
+ + + + +
@@ -48,6 +58,7 @@

对话

+
@@ -64,6 +75,7 @@

任务树

+
@@ -89,4 +101,3 @@ - diff --git a/public/styles.css b/public/styles.css index d883df1..5ab52d6 100644 --- a/public/styles.css +++ b/public/styles.css @@ -25,7 +25,8 @@ body { button, input, -textarea { +textarea, +select { font: inherit; } @@ -38,6 +39,11 @@ button { cursor: pointer; } +button:disabled { + opacity: 0.55; + cursor: default; +} + button.ghost { background: transparent; color: var(--text); @@ -49,7 +55,8 @@ button.danger { } input, -textarea { +textarea, +select { width: 100%; border-radius: 12px; border: 1px solid var(--line); @@ -108,6 +115,10 @@ textarea { gap: 0.75rem; } +.stack.compact { + margin-bottom: 1rem; +} + .list, .timeline { display: grid; @@ -143,6 +154,10 @@ textarea { outline: 2px solid rgba(31, 111, 235, 0.2); } +.session-item.archived { + opacity: 0.7; +} + .message header, .event header { display: flex; @@ -171,7 +186,8 @@ textarea { .badge.failed, .badge.cancelled, -.badge.rejected { +.badge.rejected, +.badge.offline { color: var(--danger); } @@ -202,6 +218,14 @@ textarea { color: var(--muted); } +.hint { + margin-bottom: 0.75rem; + padding: 0.75rem 0.875rem; + border-radius: 14px; + background: rgba(31, 111, 235, 0.08); + color: var(--accent); +} + pre { margin: 0; white-space: pre-wrap; diff --git a/src/demo.ts b/src/demo.ts new file mode 100644 index 0000000..c0e5a85 --- /dev/null +++ b/src/demo.ts @@ -0,0 +1,129 @@ +import { spawn, type ChildProcess } from "node:child_process"; +import { setTimeout as delay } from "node:timers/promises"; + +const children: ChildProcess[] = []; + +function run(command: string, args: string[]) { + const child = spawn(command, args, { + stdio: "inherit", + shell: false, + env: { + ...process.env, + PORT: process.env.PORT ?? "43210", + }, + }); + child.on("exit", (code, signal) => { + console.log(`[demo] child exited`, { code, signal, command, args: args.join(" ") }); + if (code && code !== 0 && signal !== "SIGINT" && signal !== "SIGTERM") { + shutdown("SIGTERM"); + } + }); + children.push(child); + return child; +} + +async function waitForHealth(url: string) { + for (let attempt = 0; attempt < 30; attempt += 1) { + try { + const response = await fetch(`${url}/api/health`); + if (response.ok) { + return; + } + } catch { + // wait and retry + } + await delay(1_000); + } + + throw new Error(`Server did not become healthy: ${url}`); +} + +async function isHealthy(url: string) { + try { + const response = await fetch(`${url}/api/health`); + if (!response.ok) { + return false; + } + const payload = await response.json(); + return payload?.status === "ok"; + } catch { + return false; + } +} + +function shutdown(signal: NodeJS.Signals) { + for (const child of children) { + if (!child.killed) { + child.kill(signal); + } + } + process.exit(0); +} + +async function main() { + const serverUrl = `http://127.0.0.1:${process.env.PORT ?? "43210"}`; + const hasExisting = await isHealthy(serverUrl); + if (!hasExisting) { + run(process.execPath, ["./node_modules/tsx/dist/cli.mjs", "src/server.ts"]); + await waitForHealth(serverUrl); + } else { + console.log(`Boss server already running at ${serverUrl}, reusing it.`); + } + + run(process.execPath, [ + "./node_modules/tsx/dist/cli.mjs", + "src/worker.ts", + "--name", + "win-a", + "--os", + "windows", + "--capability", + "terminal", + "--capability", + "browser", + "--server", + serverUrl, + ]); + run(process.execPath, [ + "./node_modules/tsx/dist/cli.mjs", + "src/worker.ts", + "--name", + "win-b", + "--os", + "windows", + "--capability", + "terminal", + "--capability", + "test", + "--server", + serverUrl, + ]); + run(process.execPath, [ + "./node_modules/tsx/dist/cli.mjs", + "src/worker.ts", + "--name", + "mac-a", + "--os", + "macos", + "--capability", + "terminal", + "--capability", + "browser", + "--capability", + "test", + "--server", + serverUrl, + ]); + + console.log(`Boss demo running at ${serverUrl}`); + console.log("Press Ctrl+C to stop server and workers."); + await new Promise(() => {}); +} + +process.on("SIGINT", () => shutdown("SIGINT")); +process.on("SIGTERM", () => shutdown("SIGTERM")); + +main().catch((error) => { + console.error(error); + shutdown("SIGTERM"); +}); diff --git a/src/engine.ts b/src/engine.ts index d528709..b98367d 100644 --- a/src/engine.ts +++ b/src/engine.ts @@ -26,6 +26,7 @@ export class BossEngine { readonly events = new EventBroker(); getState(): AppState { + this.reconcileState(); return this.store.snapshot; } @@ -45,20 +46,17 @@ export class BossEngine { updatedAt: timestamp, }; - this.store.mutate((state) => { + this.commit((state, addEvent) => { state.sessions.unshift(session); - state.events.push( - this.makeEvent({ - sessionId: session.id, - taskId: null, - source: "system", - type: "session.created", - payload: { title: session.title }, - }), - ); + addEvent({ + sessionId: session.id, + taskId: null, + source: "system", + type: "session.created", + payload: { title: session.title }, + }); }); - this.publishLatestEvent(); return this.getSession(session.id); } @@ -71,18 +69,76 @@ export class BossEngine { return { session, - messages: state.messages.filter((message) => message.sessionId === sessionId), - tasks: state.tasks.filter((task) => task.sessionId === sessionId), - approvals: state.approvals.filter((approval) => approval.sessionId === sessionId), + messages: state.messages + .filter((message) => message.sessionId === sessionId) + .sort((left, right) => left.createdAt.localeCompare(right.createdAt)), + tasks: state.tasks + .filter((task) => task.sessionId === sessionId) + .sort((left, right) => left.createdAt.localeCompare(right.createdAt)), + approvals: state.approvals + .filter((approval) => approval.sessionId === sessionId) + .sort((left, right) => left.createdAt.localeCompare(right.createdAt)), }; } listSessions(): Session[] { - return this.getState().sessions; + return this.getState().sessions.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt)); + } + + getTask(taskId: string): Task { + const task = this.getState().tasks.find((candidate) => candidate.id === taskId); + if (!task) { + throw new Error(`Task not found: ${taskId}`); + } + return task; + } + + archiveSession(sessionId: string): SessionDetails { + this.commit((state, addEvent) => { + const session = state.sessions.find((candidate) => candidate.id === sessionId); + if (!session) { + throw new Error(`Session not found: ${sessionId}`); + } + + const timestamp = now(); + session.status = "archived"; + session.updatedAt = timestamp; + + for (const task of state.tasks.filter( + (candidate) => candidate.sessionId === sessionId && isActiveTask(candidate), + )) { + if (!["completed", "failed", "cancelled"].includes(task.status)) { + this.detachTaskFromWorker(state, task, timestamp); + task.status = "paused"; + task.summary = "会话已归档,任务暂停。"; + task.updatedAt = timestamp; + addEvent({ + sessionId, + taskId: task.id, + source: "system", + type: "task.paused", + payload: { reason: "session_archived" }, + }); + } + } + + addEvent({ + sessionId, + taskId: null, + source: "system", + type: "session.archived", + payload: { sessionId }, + }); + }); + + return this.getSession(sessionId); } addMessage(sessionId: string, content: string, channel = "web"): SessionDetails { const session = this.getSession(sessionId).session; + if (session.status === "archived") { + throw new Error(`Session ${sessionId} is archived`); + } const message: Message = { id: createId("msg"), sessionId, @@ -96,7 +152,7 @@ export class BossEngine { throw new Error("Message content is required."); } - this.store.mutate((state) => { + this.commit((state, addEvent) => { const mutableSession = state.sessions.find((candidate) => candidate.id === sessionId); if (!mutableSession) { throw new Error(`Session not found: ${sessionId}`); @@ -108,21 +164,18 @@ export class BossEngine { mutableSession.title = message.content.slice(0, 32); } state.messages.push(message); - state.events.push( - this.makeEvent({ - sessionId, - taskId: null, - source: "user", - type: "session.message.added", - payload: { - channel, - content: message.content, - }, - }), - ); + addEvent({ + sessionId, + taskId: null, + source: "user", + type: "session.message.added", + payload: { + channel, + content: message.content, + }, + }); }); - this.publishLatestEvent(); this.applyPlan(session, message.content); return this.getSession(sessionId); } @@ -156,25 +209,22 @@ export class BossEngine { updatedAt: timestamp, }; - this.store.mutate((state) => { + this.commit((state, addEvent) => { state.workers.push(worker); - state.events.push( - this.makeEvent({ - sessionId: null, - taskId: null, - source: "system", - type: "worker.registered", - payload: { - workerId: worker.id, - name: worker.name, - os: worker.os, - capabilities: worker.capabilities, - }, - }), - ); + addEvent({ + sessionId: null, + taskId: null, + source: "system", + type: "worker.registered", + payload: { + workerId: worker.id, + name: worker.name, + os: worker.os, + capabilities: worker.capabilities, + }, + }); }); - this.publishLatestEvent(); this.syncAssignments(); return worker; } @@ -201,9 +251,68 @@ export class BossEngine { return updated; } + getWorker(workerId: string): WorkerNode { + const worker = this.getState().workers.find((candidate) => candidate.id === workerId); + if (!worker) { + throw new Error(`Worker not found: ${workerId}`); + } + return worker; + } + + markWorkerOffline(workerId: string): WorkerNode { + let updated!: WorkerNode; + + this.commit((state, addEvent) => { + const worker = state.workers.find((candidate) => candidate.id === workerId); + if (!worker) { + throw new Error(`Worker not found: ${workerId}`); + } + + const timestamp = now(); + for (const task of state.tasks.filter( + (candidate) => + candidate.assignedWorkerId === worker.id && + ["assigned", "running"].includes(candidate.status), + )) { + this.requeueTaskState(state, task, timestamp, `${worker.name} 被手动下线`); + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "system", + type: "task.requeued", + payload: { + workerId: worker.id, + workerName: worker.name, + reason: "manual_worker_offline", + }, + }); + } + + worker.status = "offline"; + worker.currentTaskId = null; + worker.updatedAt = timestamp; + worker.lastSeenAt = timestamp; + addEvent({ + sessionId: null, + taskId: null, + source: "system", + type: "worker.offline", + payload: { + workerId: worker.id, + workerName: worker.name, + reason: "manual", + }, + }); + updated = { ...worker }; + }); + + this.syncAssignments(); + return updated; + } + heartbeat(workerId: string, load = 0): WorkerNode { let updated!: WorkerNode; - this.store.mutate((state) => { + this.commit((state, addEvent) => { const worker = state.workers.find((candidate) => candidate.id === workerId); if (!worker) { throw new Error(`Worker not found: ${workerId}`); @@ -216,24 +325,21 @@ export class BossEngine { worker.status = "idle"; } - state.events.push( - this.makeEvent({ - sessionId: null, - taskId: worker.currentTaskId, - source: "worker", - type: "worker.heartbeat", - payload: { - workerId: worker.id, - status: worker.status, - load: worker.load, - }, - }), - ); + addEvent({ + sessionId: null, + taskId: worker.currentTaskId, + source: "worker", + type: "worker.heartbeat", + payload: { + workerId: worker.id, + status: worker.status, + load: worker.load, + }, + }); updated = { ...worker }; }); - this.publishLatestEvent(); this.syncAssignments(); return updated; } @@ -241,7 +347,7 @@ export class BossEngine { claimNextTask(workerId: string): Task | null { let claimedTask: Task | null = null; - this.store.mutate((state) => { + this.commit((state, addEvent) => { const worker = state.workers.find((candidate) => candidate.id === workerId); if (!worker) { throw new Error(`Worker not found: ${workerId}`); @@ -264,24 +370,18 @@ export class BossEngine { worker.lastSeenAt = task.updatedAt; claimedTask = { ...task }; - state.events.push( - this.makeEvent({ - sessionId: task.sessionId, - taskId: task.id, - source: "worker", - type: "task.started", - payload: { - workerId, - title: task.title, - }, - }), - ); + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "worker", + type: "task.started", + payload: { + workerId, + title: task.title, + }, + }); }); - if (claimedTask) { - this.publishLatestEvent(); - } - return claimedTask; } @@ -296,11 +396,16 @@ export class BossEngine { }, ): Task { let updated!: Task; - this.store.mutate((state) => { + this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); + const worker = state.workers.find((candidate) => candidate.id === workerId); if (!task) { throw new Error(`Task not found: ${taskId}`); } + if (!worker) { + throw new Error(`Worker not found: ${workerId}`); + } + this.assertTaskOwnership(task, workerId, worker.currentTaskId); task.progressPercent = Math.max(0, Math.min(100, input.progressPercent)); task.summary = input.summary; @@ -311,32 +416,29 @@ export class BossEngine { task.status = "running"; } - state.events.push( - this.makeEvent({ - sessionId: task.sessionId, - taskId: task.id, - source: "worker", - type: "task.progress", - payload: { - workerId, - progressPercent: task.progressPercent, - summary: task.summary, - currentStep: task.currentStep, - nextStep: task.nextStep, - }, - }), - ); + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "worker", + type: "task.progress", + payload: { + workerId, + progressPercent: task.progressPercent, + summary: task.summary, + currentStep: task.currentStep, + nextStep: task.nextStep, + }, + }); updated = { ...task }; }); - this.publishLatestEvent(); return updated; } completeTask(taskId: string, workerId: string, summary: string): Task { let updated!: Task; - this.store.mutate((state) => { + this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); const worker = state.workers.find((candidate) => candidate.id === workerId); if (!task) { @@ -345,6 +447,7 @@ export class BossEngine { if (!worker) { throw new Error(`Worker not found: ${workerId}`); } + this.assertTaskOwnership(task, workerId, worker.currentTaskId); task.status = "completed"; task.progressPercent = 100; @@ -357,22 +460,19 @@ export class BossEngine { worker.updatedAt = task.updatedAt; worker.lastSeenAt = task.updatedAt; - state.events.push( - this.makeEvent({ - sessionId: task.sessionId, - taskId: task.id, - source: "worker", - type: "task.completed", - payload: { - workerId, - summary, - }, - }), - ); + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "worker", + type: "task.completed", + payload: { + workerId, + summary, + }, + }); updated = { ...task }; }); - this.publishLatestEvent(); this.syncAssignments(); return updated; } @@ -380,7 +480,7 @@ export class BossEngine { failTask(taskId: string, workerId: string, errorMessage: string): Task { let updated!: Task; - this.store.mutate((state) => { + this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); const worker = state.workers.find((candidate) => candidate.id === workerId); if (!task) { @@ -389,6 +489,7 @@ export class BossEngine { if (!worker) { throw new Error(`Worker not found: ${workerId}`); } + this.assertTaskOwnership(task, workerId, worker.currentTaskId); task.status = "failed"; task.summary = errorMessage; @@ -400,22 +501,19 @@ export class BossEngine { worker.updatedAt = task.updatedAt; worker.lastSeenAt = task.updatedAt; - state.events.push( - this.makeEvent({ - sessionId: task.sessionId, - taskId: task.id, - source: "worker", - type: "task.failed", - payload: { - workerId, - errorMessage, - }, - }), - ); + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "worker", + type: "task.failed", + payload: { + workerId, + errorMessage, + }, + }); updated = { ...task }; }); - this.publishLatestEvent(); this.syncAssignments(); return updated; } @@ -432,10 +530,70 @@ export class BossEngine { }); } + resumeTask(taskId: string): Task { + let updated!: Task; + + this.commit((state, addEvent) => { + const task = state.tasks.find((candidate) => candidate.id === taskId); + if (!task) { + throw new Error(`Task not found: ${taskId}`); + } + + if (task.status !== "paused") { + updated = { ...task }; + return; + } + + task.assignedWorkerId = null; + task.status = task.approvalStatus === "pending" ? "waiting_approval" : "queued"; + task.summary = "任务已恢复,等待重新调度。"; + task.updatedAt = now(); + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "system", + type: "task.resumed", + payload: { + summary: task.summary, + }, + }); + updated = { ...task }; + }); + + this.syncAssignments(); + return updated; + } + + requeueTask(taskId: string): Task { + let updated!: Task; + + this.commit((state, addEvent) => { + const task = state.tasks.find((candidate) => candidate.id === taskId); + if (!task) { + throw new Error(`Task not found: ${taskId}`); + } + + this.requeueTaskState(state, task, now(), "手动重排"); + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "system", + type: "task.requeued", + payload: { + summary: task.summary, + }, + }); + updated = { ...task }; + }); + + this.syncAssignments(); + return updated; + } + respondApproval(approvalId: string, approved: boolean, responder: string): ApprovalRequest { let updatedApproval!: ApprovalRequest; - this.store.mutate((state) => { + this.commit((state, addEvent) => { const approval = state.approvals.find((candidate) => candidate.id === approvalId); if (!approval) { throw new Error(`Approval not found: ${approvalId}`); @@ -455,26 +613,34 @@ export class BossEngine { task.status = approved ? "queued" : "cancelled"; task.summary = approved ? "审批已通过,重新进入队列。" : "审批被拒绝,任务已取消。"; - state.events.push( - this.makeEvent({ - sessionId: approval.sessionId, - taskId: approval.taskId, - source: "system", - type: approved ? "approval.approved" : "approval.rejected", - payload: { - approvalId, - responder, - }, - }), - ); + addEvent({ + sessionId: approval.sessionId, + taskId: approval.taskId, + source: "system", + type: approved ? "approval.approved" : "approval.rejected", + payload: { + approvalId, + responder, + }, + }); updatedApproval = { ...approval }; }); - this.publishLatestEvent(); this.syncAssignments(); return updatedApproval; } + listEvents(limit = 100): BossEvent[] { + const events = this.getState().events; + return events.slice(Math.max(0, events.length - limit)); + } + + reconcileNow(): AppState { + this.reconcileState(); + this.syncAssignments(); + return this.getState(); + } + private applyPlan(session: Session, content: string): void { const sessionDetails = this.getSession(session.id); const result = createPlan(sessionDetails.session, content, sessionDetails.tasks.filter(isActiveTask)); @@ -482,7 +648,7 @@ export class BossEngine { const plannerMessage = buildPlannerMessage(result.summary); const timestamp = now(); - this.store.mutate((state) => { + this.commit((state, addEvent) => { const mutableSession = state.sessions.find((candidate) => candidate.id === session.id); if (!mutableSession) { throw new Error(`Session not found: ${session.id}`); @@ -497,8 +663,19 @@ export class BossEngine { (candidate) => candidate.sessionId === session.id && isActiveTask(candidate), )) { if (["running", "assigned", "queued", "planning", "blocked"].includes(task.status)) { + this.detachTaskFromWorker(state, task, timestamp); task.status = "paused"; + task.summary = "检测到新需求,旧任务已暂停。"; task.updatedAt = timestamp; + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "manager", + type: "task.paused", + payload: { + reason: "replan", + }, + }); } } } @@ -532,21 +709,18 @@ export class BossEngine { }; state.messages.push(managerMessage); - state.events.push( - this.makeEvent({ - sessionId: session.id, - taskId: null, - source: "manager", - type: "plan.created", - payload: { - summary: result.summary, - taskIds: tasks.map((task) => task.id), - }, - }), - ); + addEvent({ + sessionId: session.id, + taskId: null, + source: "manager", + type: "plan.created", + payload: { + summary: result.summary, + taskIds: tasks.map((task) => task.id), + }, + }); }); - this.publishLatestEvent(); this.syncAssignments(); } @@ -558,7 +732,7 @@ export class BossEngine { payload: Record, ): Task { let updated!: Task; - this.store.mutate((state) => { + this.commit((state, addEvent) => { const task = state.tasks.find((candidate) => candidate.id === taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); @@ -567,41 +741,29 @@ export class BossEngine { task.status = status; task.updatedAt = now(); task.summary = typeof payload.summary === "string" ? payload.summary : task.summary; + this.detachTaskFromWorker(state, task, task.updatedAt); - if (task.assignedWorkerId) { - const worker = state.workers.find((candidate) => candidate.id === task.assignedWorkerId); - if (worker && worker.currentTaskId === task.id) { - worker.currentTaskId = null; - worker.status = "idle"; - worker.updatedAt = task.updatedAt; - worker.lastSeenAt = task.updatedAt; - } - } - - state.events.push( - this.makeEvent({ - sessionId: task.sessionId, - taskId: task.id, - source, - type: eventType, - payload, - }), - ); + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source, + type: eventType, + payload, + }); updated = { ...task }; }); - this.publishLatestEvent(); this.syncAssignments(); return updated; } private syncAssignments(): void { - const candidates = chooseAssignmentCandidates(this.getState()); + const candidates = chooseAssignmentCandidates(this.store.snapshot); if (candidates.length === 0) { return; } - this.store.mutate((state) => { + this.commit((state, addEvent) => { for (const candidate of candidates) { const task = state.tasks.find((item) => item.id === candidate.taskId); const worker = state.workers.find((item) => item.id === candidate.workerId); @@ -619,29 +781,107 @@ export class BossEngine { worker.updatedAt = timestamp; worker.lastSeenAt = timestamp; - state.events.push( - this.makeEvent({ + addEvent({ + sessionId: task.sessionId, + taskId: task.id, + source: "system", + type: "task.assigned", + payload: { + workerId: worker.id, + workerName: worker.name, + }, + }); + } + }); + } + + private reconcileState(): void { + const staleAfterMs = 20_000; + const currentTime = Date.now(); + this.commit((state, addEvent) => { + for (const worker of state.workers) { + const age = currentTime - new Date(worker.lastSeenAt).getTime(); + if (age <= staleAfterMs || worker.status === "offline") { + continue; + } + + const timestamp = now(); + worker.status = "offline"; + worker.updatedAt = timestamp; + + for (const task of state.tasks.filter( + (candidate) => + candidate.assignedWorkerId === worker.id && + ["assigned", "running"].includes(candidate.status), + )) { + this.requeueTaskState(state, task, timestamp, `worker ${worker.name} 离线`); + addEvent({ sessionId: task.sessionId, taskId: task.id, source: "system", - type: "task.assigned", + type: "task.requeued", payload: { workerId: worker.id, workerName: worker.name, + reason: "heartbeat_timeout", }, - }), - ); + }); + } + + worker.currentTaskId = null; + addEvent({ + sessionId: null, + taskId: null, + source: "system", + type: "worker.offline", + payload: { + workerId: worker.id, + workerName: worker.name, + reason: "heartbeat_timeout", + }, + }); } }); - - this.publishLatestEvent(); } - private publishLatestEvent(): void { - const state = this.getState(); - const latestEvent = state.events[state.events.length - 1]; - if (latestEvent) { - this.events.publish(latestEvent); + private detachTaskFromWorker(state: AppState, task: Task, timestamp: string): void { + if (!task.assignedWorkerId) { + return; + } + + const worker = state.workers.find((candidate) => candidate.id === task.assignedWorkerId); + if (worker && worker.currentTaskId === task.id) { + worker.currentTaskId = null; + if (worker.status !== "offline") { + worker.status = "idle"; + } + worker.updatedAt = timestamp; + worker.lastSeenAt = timestamp; + } + + task.assignedWorkerId = null; + } + + private requeueTaskState(state: AppState, task: Task, timestamp: string, reason: string): void { + this.detachTaskFromWorker(state, task, timestamp); + task.status = task.approvalStatus === "pending" ? "waiting_approval" : "queued"; + task.summary = `${reason},任务已重新排队。`; + task.currentStep = "requeued"; + task.nextStep = "等待新 worker 认领"; + task.updatedAt = timestamp; + } + + private assertTaskOwnership(task: Task, workerId: string, workerCurrentTaskId: string | null): void { + if (task.assignedWorkerId !== workerId) { + throw new Error(`Task ${task.id} is not assigned to worker ${workerId}`); + } + + if (workerCurrentTaskId !== task.id) { + throw new Error(`Worker ${workerId} is not currently executing task ${task.id}`); + } + + if (!["assigned", "running"].includes(task.status)) { + throw new Error(`Task ${task.id} does not accept worker updates in status ${task.status}`); } } @@ -652,5 +892,26 @@ export class BossEngine { ...input, }; } -} + private commit( + mutator: ( + state: AppState, + addEvent: (input: Omit) => void, + ) => T, + ): T { + const published: BossEvent[] = []; + const result = this.store.mutate((state) => + mutator(state, (input) => { + const event = this.makeEvent(input); + state.events.push(event); + published.push(event); + }), + ); + + for (const event of published) { + this.events.publish(event); + } + + return result; + } +} diff --git a/src/server.ts b/src/server.ts index 819ce20..0026319 100644 --- a/src/server.ts +++ b/src/server.ts @@ -4,7 +4,37 @@ import fastifyStatic from "@fastify/static"; import { BossEngine } from "./engine.js"; const engine = new BossEngine(); -const app = Fastify({ logger: true }); +const app = Fastify({ logger: process.env.BOSS_DEBUG === "1" }); + +app.setErrorHandler((error, request, reply) => { + const message = + typeof error === "object" && error !== null && "message" in error + ? String(error.message) + : "Internal Server Error"; + const normalized = message.toLowerCase(); + + if (normalized.includes("not found")) { + return reply.status(404).send({ error: "Not Found", message }); + } + + if ( + normalized.includes("is not assigned to worker") || + normalized.includes("is not currently executing") || + normalized.includes("does not accept worker updates") + ) { + return reply.status(409).send({ error: "Conflict", message }); + } + + if ( + normalized.includes("required") || + normalized.includes("archived") + ) { + return reply.status(400).send({ error: "Bad Request", message }); + } + + request.log.error(error); + return reply.status(500).send({ error: "Internal Server Error", message }); +}); await app.register(fastifyStatic, { root: path.resolve(process.cwd(), "public"), @@ -23,6 +53,12 @@ app.get("/api/health", async () => ({ app.get("/api/bootstrap", async () => engine.bootstrap()); +app.get("/api/events", async (request) => { + const query = request.query as { limit?: string }; + const limit = Number(query.limit ?? 100); + return engine.listEvents(Number.isFinite(limit) ? limit : 100); +}); + app.get("/api/sessions", async () => engine.listSessions()); app.post("/api/sessions", async (request) => { @@ -30,11 +66,21 @@ app.post("/api/sessions", async (request) => { return engine.createSession(body.title); }); +app.post("/api/sessions/:sessionId/archive", async (request) => { + const params = request.params as { sessionId: string }; + return engine.archiveSession(params.sessionId); +}); + app.get("/api/sessions/:sessionId", async (request) => { const params = request.params as { sessionId: string }; return engine.getSession(params.sessionId); }); +app.get("/api/tasks/:taskId", async (request) => { + const params = request.params as { taskId: string }; + return engine.getTask(params.taskId); +}); + app.post("/api/sessions/:sessionId/messages", async (request) => { const params = request.params as { sessionId: string }; const body = (request.body ?? {}) as { content?: string; channel?: string }; @@ -64,6 +110,11 @@ app.get("/api/events/stream", async (_request, reply) => { app.get("/api/workers", async () => engine.getState().workers); +app.get("/api/workers/:workerId", async (request) => { + const params = request.params as { workerId: string }; + return engine.getWorker(params.workerId); +}); + app.post("/api/workers/register", async (request) => { const body = request.body as { name?: string; @@ -83,6 +134,11 @@ app.post("/api/workers/:workerId/heartbeat", async (request) => { return engine.heartbeat(params.workerId, body.load ?? 0); }); +app.post("/api/workers/:workerId/offline", async (request) => { + const params = request.params as { workerId: string }; + return engine.markWorkerOffline(params.workerId); +}); + app.post("/api/workers/:workerId/claim-next", async (request) => { const params = request.params as { workerId: string }; return { @@ -135,6 +191,16 @@ app.post("/api/tasks/:taskId/cancel", async (request) => { return engine.cancelTask(params.taskId); }); +app.post("/api/tasks/:taskId/resume", async (request) => { + const params = request.params as { taskId: string }; + return engine.resumeTask(params.taskId); +}); + +app.post("/api/tasks/:taskId/requeue", async (request) => { + const params = request.params as { taskId: string }; + return engine.requeueTask(params.taskId); +}); + app.post("/api/approvals/:approvalId/respond", async (request) => { const params = request.params as { approvalId: string }; const body = request.body as { @@ -149,5 +215,7 @@ app.post("/api/demo/reset", async () => { return ok(); }); +app.post("/api/reconcile", async () => engine.reconcileNow()); + const port = Number(process.env.PORT ?? 43210); await app.listen({ port, host: "0.0.0.0" }); diff --git a/src/smoke.ts b/src/smoke.ts new file mode 100644 index 0000000..4ba3031 --- /dev/null +++ b/src/smoke.ts @@ -0,0 +1,84 @@ +import { setTimeout as delay } from "node:timers/promises"; + +const baseUrl = process.env.BOSS_BASE_URL || "http://127.0.0.1:43210"; + +async function request(path: string, options: RequestInit = {}) { + const response = await fetch(`${baseUrl}${path}`, { + headers: { "Content-Type": "application/json" }, + ...options, + }); + + if (!response.ok) { + throw new Error(`${response.status} ${response.statusText}`); + } + + return response.json(); +} + +async function waitForSessionSettled(sessionId: string, timeoutMs = 30_000) { + const start = Date.now(); + + while (Date.now() - start < timeoutMs) { + const session = await request(`/api/sessions/${sessionId}`); + const pendingApproval = session.approvals.find((approval: { status: string }) => approval.status === "pending"); + if (pendingApproval) { + await request(`/api/approvals/${pendingApproval.id}/respond`, { + method: "POST", + body: JSON.stringify({ approved: true, responder: "smoke-script" }), + }); + } + + const unsettled = session.tasks.filter((task: { status: string }) => + ["planning", "queued", "assigned", "running", "waiting_approval"].includes(task.status), + ); + if (unsettled.length === 0) { + return session; + } + + await delay(1_000); + } + + throw new Error("Session did not settle in time."); +} + +async function main() { + await request("/api/demo/reset", { method: "POST", body: "{}" }); + const created = await request("/api/sessions", { + method: "POST", + body: JSON.stringify({ title: "Smoke Test" }), + }); + const sessionId = created.session.id; + + await request(`/api/sessions/${sessionId}/messages`, { + method: "POST", + body: JSON.stringify({ + content: "先调研登录失败根因,然后 delete 旧缓存目录并做验证。", + channel: "smoke", + }), + }); + + const settled = await waitForSessionSettled(sessionId); + console.log( + JSON.stringify( + { + sessionId, + taskStatuses: settled.tasks.map((task: { title: string; status: string }) => ({ + title: task.title, + status: task.status, + })), + approvalStatuses: settled.approvals.map((approval: { id: string; status: string }) => ({ + id: approval.id, + status: approval.status, + })), + }, + null, + 2, + ), + ); +} + +main().catch((error) => { + console.error(error); + process.exit(1); +}); + diff --git a/src/store.ts b/src/store.ts index 5919224..70122c7 100644 --- a/src/store.ts +++ b/src/store.ts @@ -1,4 +1,4 @@ -import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { copyFileSync, existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "node:fs"; import { dirname } from "node:path"; import type { AppState } from "./types.js"; @@ -15,8 +15,12 @@ function defaultState(): AppState { export class FileStore { private state: AppState; + private readonly backupFilePath: string; + private readonly tempFilePath: string; constructor(private readonly filePath: string) { + this.backupFilePath = `${filePath}.bak`; + this.tempFilePath = `${filePath}.tmp`; this.ensureDirectory(); this.state = this.load(); } @@ -42,20 +46,28 @@ export class FileStore { } private load(): AppState { - if (!existsSync(this.filePath)) { - return defaultState(); + for (const path of [this.filePath, this.backupFilePath]) { + if (!existsSync(path)) { + continue; + } + + try { + const raw = readFileSync(path, "utf8"); + return { ...defaultState(), ...(JSON.parse(raw) as AppState) }; + } catch { + // try backup/default + } } - try { - const raw = readFileSync(this.filePath, "utf8"); - return { ...defaultState(), ...(JSON.parse(raw) as AppState) }; - } catch { - return defaultState(); - } + return defaultState(); } private save(): void { - writeFileSync(this.filePath, `${JSON.stringify(this.state, null, 2)}\n`, "utf8"); + const content = `${JSON.stringify(this.state, null, 2)}\n`; + writeFileSync(this.tempFilePath, content, "utf8"); + if (existsSync(this.filePath)) { + copyFileSync(this.filePath, this.backupFilePath); + } + renameSync(this.tempFilePath, this.filePath); } } - diff --git a/src/worker.ts b/src/worker.ts index 88b5551..bf79557 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -54,6 +54,19 @@ async function postJson(url: string, body: unknown) { return response.json(); } +async function getJson(url: string) { + const response = await fetch(url); + if (!response.ok) { + throw new Error(`Request failed: ${response.status} ${response.statusText}`); + } + return response.json(); +} + +async function taskStillRunnable(server: string, taskId: string) { + const task = await getJson(`${server}/api/tasks/${taskId}`); + return ["assigned", "running"].includes(task.status); +} + async function simulateTask(server: string, workerId: string, task: Task) { const steps = [ { @@ -77,6 +90,9 @@ async function simulateTask(server: string, workerId: string, task: Task) { ]; for (const step of steps) { + if (!(await taskStillRunnable(server, task.id))) { + return; + } await delay(1_500); await postJson(`${server}/api/tasks/${task.id}/progress`, { workerId, @@ -84,6 +100,9 @@ async function simulateTask(server: string, workerId: string, task: Task) { }); } + if (!(await taskStillRunnable(server, task.id))) { + return; + } await delay(1_000); await postJson(`${server}/api/tasks/${task.id}/complete`, { workerId,