feat: bootstrap boss control plane prototype

This commit is contained in:
Codex
2026-03-23 12:43:39 +08:00
commit 0ab83990b2
24 changed files with 5534 additions and 0 deletions

656
src/engine.ts Normal file
View File

@@ -0,0 +1,656 @@
import { resolve } from "node:path";
import type {
ApprovalRequest,
AppState,
BossEvent,
Message,
Session,
SessionDetails,
Task,
WorkerNode,
} from "./types.js";
import { EventBroker } from "./event-broker.js";
import { createPlan, buildPlannerMessage, materializeTasks } from "./planner.js";
import { chooseAssignmentCandidates } from "./scheduler.js";
import { FileStore } from "./store.js";
import { createId, now } from "./utils.js";
const DATA_FILE = resolve(process.cwd(), ".boss-data", "store.json");
function isActiveTask(task: Task): boolean {
return !["completed", "failed", "cancelled"].includes(task.status);
}
export class BossEngine {
readonly store = new FileStore(DATA_FILE);
readonly events = new EventBroker();
getState(): AppState {
return this.store.snapshot;
}
bootstrap(): AppState {
return this.getState();
}
createSession(title?: string): SessionDetails {
const timestamp = now();
const session: Session = {
id: createId("session"),
title: title?.trim() || "未命名项目",
status: "active",
activeObjective: "",
lastPlannerSummary: "",
createdAt: timestamp,
updatedAt: timestamp,
};
this.store.mutate((state) => {
state.sessions.unshift(session);
state.events.push(
this.makeEvent({
sessionId: session.id,
taskId: null,
source: "system",
type: "session.created",
payload: { title: session.title },
}),
);
});
this.publishLatestEvent();
return this.getSession(session.id);
}
getSession(sessionId: string): SessionDetails {
const state = this.getState();
const session = state.sessions.find((candidate) => candidate.id === sessionId);
if (!session) {
throw new Error(`Session not found: ${sessionId}`);
}
return {
session,
messages: state.messages.filter((message) => message.sessionId === sessionId),
tasks: state.tasks.filter((task) => task.sessionId === sessionId),
approvals: state.approvals.filter((approval) => approval.sessionId === sessionId),
};
}
listSessions(): Session[] {
return this.getState().sessions;
}
addMessage(sessionId: string, content: string, channel = "web"): SessionDetails {
const session = this.getSession(sessionId).session;
const message: Message = {
id: createId("msg"),
sessionId,
role: "user",
channel,
content: content.trim(),
createdAt: now(),
};
if (!message.content) {
throw new Error("Message content is required.");
}
this.store.mutate((state) => {
const mutableSession = state.sessions.find((candidate) => candidate.id === sessionId);
if (!mutableSession) {
throw new Error(`Session not found: ${sessionId}`);
}
mutableSession.activeObjective = message.content;
mutableSession.updatedAt = message.createdAt;
if (!mutableSession.title || mutableSession.title === "未命名项目") {
mutableSession.title = message.content.slice(0, 32);
}
state.messages.push(message);
state.events.push(
this.makeEvent({
sessionId,
taskId: null,
source: "user",
type: "session.message.added",
payload: {
channel,
content: message.content,
},
}),
);
});
this.publishLatestEvent();
this.applyPlan(session, message.content);
return this.getSession(sessionId);
}
registerWorker(input: {
name: string;
os: WorkerNode["os"];
capabilities: string[];
}): WorkerNode {
const timestamp = now();
const existing = this.getState().workers.find((worker) => worker.name === input.name);
if (existing) {
return this.updateWorker(existing.id, {
os: input.os,
capabilities: input.capabilities,
status: "idle",
load: 0,
});
}
const worker: WorkerNode = {
id: createId("worker"),
name: input.name,
os: input.os,
capabilities: Array.from(new Set(input.capabilities)),
status: "idle",
currentTaskId: null,
load: 0,
lastSeenAt: timestamp,
createdAt: timestamp,
updatedAt: timestamp,
};
this.store.mutate((state) => {
state.workers.push(worker);
state.events.push(
this.makeEvent({
sessionId: null,
taskId: null,
source: "system",
type: "worker.registered",
payload: {
workerId: worker.id,
name: worker.name,
os: worker.os,
capabilities: worker.capabilities,
},
}),
);
});
this.publishLatestEvent();
this.syncAssignments();
return worker;
}
updateWorker(
workerId: string,
input: Partial<Pick<WorkerNode, "os" | "capabilities" | "status" | "load">>,
): WorkerNode {
let updated!: WorkerNode;
this.store.mutate((state) => {
const worker = state.workers.find((candidate) => candidate.id === workerId);
if (!worker) {
throw new Error(`Worker not found: ${workerId}`);
}
if (input.os) worker.os = input.os;
if (input.capabilities) worker.capabilities = Array.from(new Set(input.capabilities));
if (input.status) worker.status = input.status;
if (typeof input.load === "number") worker.load = input.load;
worker.updatedAt = now();
worker.lastSeenAt = worker.updatedAt;
updated = { ...worker };
});
return updated;
}
heartbeat(workerId: string, load = 0): WorkerNode {
let updated!: WorkerNode;
this.store.mutate((state) => {
const worker = state.workers.find((candidate) => candidate.id === workerId);
if (!worker) {
throw new Error(`Worker not found: ${workerId}`);
}
worker.lastSeenAt = now();
worker.updatedAt = worker.lastSeenAt;
worker.load = load;
if (!worker.currentTaskId && worker.status !== "offline") {
worker.status = "idle";
}
state.events.push(
this.makeEvent({
sessionId: null,
taskId: worker.currentTaskId,
source: "worker",
type: "worker.heartbeat",
payload: {
workerId: worker.id,
status: worker.status,
load: worker.load,
},
}),
);
updated = { ...worker };
});
this.publishLatestEvent();
this.syncAssignments();
return updated;
}
claimNextTask(workerId: string): Task | null {
let claimedTask: Task | null = null;
this.store.mutate((state) => {
const worker = state.workers.find((candidate) => candidate.id === workerId);
if (!worker) {
throw new Error(`Worker not found: ${workerId}`);
}
const task = state.tasks.find(
(candidate) => candidate.assignedWorkerId === workerId && candidate.status === "assigned",
);
if (!task) {
return;
}
task.status = "running";
task.currentStep = "start";
task.summary = "任务已被 worker 接收。";
task.updatedAt = now();
worker.status = "busy";
worker.currentTaskId = task.id;
worker.updatedAt = task.updatedAt;
worker.lastSeenAt = task.updatedAt;
claimedTask = { ...task };
state.events.push(
this.makeEvent({
sessionId: task.sessionId,
taskId: task.id,
source: "worker",
type: "task.started",
payload: {
workerId,
title: task.title,
},
}),
);
});
if (claimedTask) {
this.publishLatestEvent();
}
return claimedTask;
}
reportProgress(
taskId: string,
workerId: string,
input: {
progressPercent: number;
summary: string;
currentStep: string;
nextStep: string;
},
): Task {
let updated!: Task;
this.store.mutate((state) => {
const task = state.tasks.find((candidate) => candidate.id === taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
task.progressPercent = Math.max(0, Math.min(100, input.progressPercent));
task.summary = input.summary;
task.currentStep = input.currentStep;
task.nextStep = input.nextStep;
task.updatedAt = now();
if (task.status === "assigned") {
task.status = "running";
}
state.events.push(
this.makeEvent({
sessionId: task.sessionId,
taskId: task.id,
source: "worker",
type: "task.progress",
payload: {
workerId,
progressPercent: task.progressPercent,
summary: task.summary,
currentStep: task.currentStep,
nextStep: task.nextStep,
},
}),
);
updated = { ...task };
});
this.publishLatestEvent();
return updated;
}
completeTask(taskId: string, workerId: string, summary: string): Task {
let updated!: Task;
this.store.mutate((state) => {
const task = state.tasks.find((candidate) => candidate.id === taskId);
const worker = state.workers.find((candidate) => candidate.id === workerId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
if (!worker) {
throw new Error(`Worker not found: ${workerId}`);
}
task.status = "completed";
task.progressPercent = 100;
task.summary = summary;
task.currentStep = "done";
task.nextStep = "";
task.updatedAt = now();
worker.status = "idle";
worker.currentTaskId = null;
worker.updatedAt = task.updatedAt;
worker.lastSeenAt = task.updatedAt;
state.events.push(
this.makeEvent({
sessionId: task.sessionId,
taskId: task.id,
source: "worker",
type: "task.completed",
payload: {
workerId,
summary,
},
}),
);
updated = { ...task };
});
this.publishLatestEvent();
this.syncAssignments();
return updated;
}
failTask(taskId: string, workerId: string, errorMessage: string): Task {
let updated!: Task;
this.store.mutate((state) => {
const task = state.tasks.find((candidate) => candidate.id === taskId);
const worker = state.workers.find((candidate) => candidate.id === workerId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
if (!worker) {
throw new Error(`Worker not found: ${workerId}`);
}
task.status = "failed";
task.summary = errorMessage;
task.currentStep = "failed";
task.nextStep = "";
task.updatedAt = now();
worker.status = "idle";
worker.currentTaskId = null;
worker.updatedAt = task.updatedAt;
worker.lastSeenAt = task.updatedAt;
state.events.push(
this.makeEvent({
sessionId: task.sessionId,
taskId: task.id,
source: "worker",
type: "task.failed",
payload: {
workerId,
errorMessage,
},
}),
);
updated = { ...task };
});
this.publishLatestEvent();
this.syncAssignments();
return updated;
}
pauseTask(taskId: string): Task {
return this.transitionTask(taskId, "paused", "system", "task.paused", {
summary: "任务已被暂停。",
});
}
cancelTask(taskId: string): Task {
return this.transitionTask(taskId, "cancelled", "system", "task.cancelled", {
summary: "任务已被取消。",
});
}
respondApproval(approvalId: string, approved: boolean, responder: string): ApprovalRequest {
let updatedApproval!: ApprovalRequest;
this.store.mutate((state) => {
const approval = state.approvals.find((candidate) => candidate.id === approvalId);
if (!approval) {
throw new Error(`Approval not found: ${approvalId}`);
}
const task = state.tasks.find((candidate) => candidate.id === approval.taskId);
if (!task) {
throw new Error(`Task not found for approval: ${approval.taskId}`);
}
const timestamp = now();
approval.status = approved ? "approved" : "rejected";
approval.responder = responder;
approval.updatedAt = timestamp;
task.approvalStatus = approval.status;
task.updatedAt = timestamp;
task.status = approved ? "queued" : "cancelled";
task.summary = approved ? "审批已通过,重新进入队列。" : "审批被拒绝,任务已取消。";
state.events.push(
this.makeEvent({
sessionId: approval.sessionId,
taskId: approval.taskId,
source: "system",
type: approved ? "approval.approved" : "approval.rejected",
payload: {
approvalId,
responder,
},
}),
);
updatedApproval = { ...approval };
});
this.publishLatestEvent();
this.syncAssignments();
return updatedApproval;
}
private applyPlan(session: Session, content: string): void {
const sessionDetails = this.getSession(session.id);
const result = createPlan(sessionDetails.session, content, sessionDetails.tasks.filter(isActiveTask));
const tasks = materializeTasks(session.id, result);
const plannerMessage = buildPlannerMessage(result.summary);
const timestamp = now();
this.store.mutate((state) => {
const mutableSession = state.sessions.find((candidate) => candidate.id === session.id);
if (!mutableSession) {
throw new Error(`Session not found: ${session.id}`);
}
mutableSession.activeObjective = content;
mutableSession.lastPlannerSummary = plannerMessage;
mutableSession.updatedAt = timestamp;
if (result.pauseExistingTasks) {
for (const task of state.tasks.filter(
(candidate) => candidate.sessionId === session.id && isActiveTask(candidate),
)) {
if (["running", "assigned", "queued", "planning", "blocked"].includes(task.status)) {
task.status = "paused";
task.updatedAt = timestamp;
}
}
}
state.tasks.push(...tasks);
for (const task of tasks) {
if (task.approvalStatus === "pending") {
state.approvals.push({
id: createId("approval"),
sessionId: session.id,
taskId: task.id,
kind: "dangerous_action",
summary: `任务 "${task.title}" 包含高风险关键词,需要审批后才能执行。`,
riskLevel: "high",
status: "pending",
requester: "manager",
responder: null,
createdAt: timestamp,
updatedAt: timestamp,
});
}
}
const managerMessage: Message = {
id: createId("msg"),
sessionId: session.id,
role: "manager",
channel: "system",
content: plannerMessage,
createdAt: timestamp,
};
state.messages.push(managerMessage);
state.events.push(
this.makeEvent({
sessionId: session.id,
taskId: null,
source: "manager",
type: "plan.created",
payload: {
summary: result.summary,
taskIds: tasks.map((task) => task.id),
},
}),
);
});
this.publishLatestEvent();
this.syncAssignments();
}
private transitionTask(
taskId: string,
status: Task["status"],
source: BossEvent["source"],
eventType: string,
payload: Record<string, unknown>,
): Task {
let updated!: Task;
this.store.mutate((state) => {
const task = state.tasks.find((candidate) => candidate.id === taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
task.status = status;
task.updatedAt = now();
task.summary = typeof payload.summary === "string" ? payload.summary : task.summary;
if (task.assignedWorkerId) {
const worker = state.workers.find((candidate) => candidate.id === task.assignedWorkerId);
if (worker && worker.currentTaskId === task.id) {
worker.currentTaskId = null;
worker.status = "idle";
worker.updatedAt = task.updatedAt;
worker.lastSeenAt = task.updatedAt;
}
}
state.events.push(
this.makeEvent({
sessionId: task.sessionId,
taskId: task.id,
source,
type: eventType,
payload,
}),
);
updated = { ...task };
});
this.publishLatestEvent();
this.syncAssignments();
return updated;
}
private syncAssignments(): void {
const candidates = chooseAssignmentCandidates(this.getState());
if (candidates.length === 0) {
return;
}
this.store.mutate((state) => {
for (const candidate of candidates) {
const task = state.tasks.find((item) => item.id === candidate.taskId);
const worker = state.workers.find((item) => item.id === candidate.workerId);
if (!task || !worker || task.status !== "queued" || worker.status !== "idle") {
continue;
}
const timestamp = now();
task.status = "assigned";
task.assignedWorkerId = worker.id;
task.updatedAt = timestamp;
task.summary = `已分配给 ${worker.name}`;
worker.status = "busy";
worker.currentTaskId = task.id;
worker.updatedAt = timestamp;
worker.lastSeenAt = timestamp;
state.events.push(
this.makeEvent({
sessionId: task.sessionId,
taskId: task.id,
source: "system",
type: "task.assigned",
payload: {
workerId: worker.id,
workerName: worker.name,
},
}),
);
}
});
this.publishLatestEvent();
}
private publishLatestEvent(): void {
const state = this.getState();
const latestEvent = state.events[state.events.length - 1];
if (latestEvent) {
this.events.publish(latestEvent);
}
}
private makeEvent(input: Omit<BossEvent, "id" | "timestamp">): BossEvent {
return {
id: createId("evt"),
timestamp: now(),
...input,
};
}
}

19
src/event-broker.ts Normal file
View File

@@ -0,0 +1,19 @@
import type { BossEvent } from "./types.js";
type Listener = (event: BossEvent) => void;
export class EventBroker {
private readonly listeners = new Set<Listener>();
subscribe(listener: Listener): () => void {
this.listeners.add(listener);
return () => this.listeners.delete(listener);
}
publish(event: BossEvent): void {
for (const listener of this.listeners) {
listener(event);
}
}
}

181
src/planner.ts Normal file
View File

@@ -0,0 +1,181 @@
import type { Session, Task } from "./types.js";
import { containsKeyword, createId, now } from "./utils.js";
interface DraftTask {
title: string;
description: string;
kind: string;
requiredOs: Task["requiredOs"];
requiredCapabilities: string[];
priority: Task["priority"];
dependencyIndexes: number[];
approvalStatus: Task["approvalStatus"];
}
export interface PlannerResult {
summary: string;
tasks: DraftTask[];
pauseExistingTasks: boolean;
}
function inferRequiredOs(content: string): Task["requiredOs"] {
if (containsKeyword(content, ["mac", "macos", "xcode", "ios", "swift"])) {
return "macos";
}
if (containsKeyword(content, ["windows", "win", "powershell", ".exe", "注册表"])) {
return "windows";
}
return "any";
}
function inferCapabilities(content: string): string[] {
const capabilities = ["terminal"];
if (containsKeyword(content, ["ui", "browser", "web", "playwright", "页面", "前端"])) {
capabilities.push("browser");
}
if (containsKeyword(content, ["test", "测试", "验证", "ci"])) {
capabilities.push("test");
}
return Array.from(new Set(capabilities));
}
function requiresApproval(content: string): boolean {
return containsKeyword(content, [
"rm -rf",
"delete",
"删除",
"force push",
"强推",
"drop database",
]);
}
export function createPlan(session: Session, content: string, activeTasks: Task[]): PlannerResult {
const baseOs = inferRequiredOs(content);
const baseCapabilities = inferCapabilities(content);
const approvalStatus = requiresApproval(content) ? "pending" : "not_required";
const pauseExistingTasks = activeTasks.some((task) =>
["planning", "queued", "assigned", "running", "paused", "blocked"].includes(task.status),
);
const replan = pauseExistingTasks;
if (containsKeyword(content, ["调研", "研究", "定位", "排查", "分析"])) {
return {
summary: replan
? `已根据新要求重排调研任务:${content}`
: `已生成调研型任务树:${content}`,
pauseExistingTasks: replan,
tasks: [
{
title: "并行收集线索与上下文",
description: `围绕目标开展初步调研:${content}`,
kind: "research",
requiredOs: "any",
requiredCapabilities: ["terminal"],
priority: "high",
dependencyIndexes: [],
approvalStatus,
},
{
title: "定位问题根因或约束",
description: `在合适设备上并行定位问题:${content}`,
kind: "investigation",
requiredOs: baseOs,
requiredCapabilities: baseCapabilities,
priority: "high",
dependencyIndexes: [],
approvalStatus,
},
{
title: "整理结论与执行建议",
description: "汇总两条并行调研结果,并输出下一步建议。",
kind: "summary",
requiredOs: "any",
requiredCapabilities: ["terminal"],
priority: "medium",
dependencyIndexes: [0, 1],
approvalStatus: "not_required",
},
],
};
}
return {
summary: replan
? `已根据最新需求重排执行计划:${content}`
: `已生成执行型任务树:${content}`,
pauseExistingTasks: replan,
tasks: [
{
title: "分析需求并准备执行环境",
description: `分析用户目标并确认执行边界:${content}`,
kind: "planning",
requiredOs: "any",
requiredCapabilities: ["terminal"],
priority: "high",
dependencyIndexes: [],
approvalStatus: "not_required",
},
{
title: "实现核心变更",
description: `在匹配设备上执行主任务:${content}`,
kind: "implementation",
requiredOs: baseOs,
requiredCapabilities: baseCapabilities,
priority: "high",
dependencyIndexes: [0],
approvalStatus,
},
{
title: "验证结果并整理回报",
description: "运行验证步骤,输出风险和下一步建议。",
kind: "validation",
requiredOs: "any",
requiredCapabilities: Array.from(new Set([...baseCapabilities, "test"])),
priority: "medium",
dependencyIndexes: [1],
approvalStatus: "not_required",
},
],
};
}
export function materializeTasks(sessionId: string, result: PlannerResult): Task[] {
const createdAt = now();
const placeholders = result.tasks.map((draft) => ({
id: createId("task"),
draft,
}));
return placeholders.map(({ id, draft }) => ({
id,
sessionId,
parentTaskId: null,
title: draft.title,
description: draft.description,
kind: draft.kind,
status: draft.approvalStatus === "pending" ? "waiting_approval" : "queued",
priority: draft.priority,
requiredOs: draft.requiredOs,
requiredCapabilities: draft.requiredCapabilities,
dependencyIds: draft.dependencyIndexes.map((index) => placeholders[index].id),
assignedWorkerId: null,
approvalStatus: draft.approvalStatus,
progressPercent: 0,
summary: "",
currentStep: "",
nextStep: "",
createdAt,
updatedAt: createdAt,
}));
}
export function buildPlannerMessage(summary: string): string {
return `${summary}。系统会继续调度可执行子任务,并在需要审批时暂停。`;
}

64
src/scheduler.ts Normal file
View File

@@ -0,0 +1,64 @@
import type { AppState, Task, WorkerNode } from "./types.js";
function dependenciesSatisfied(task: Task, state: AppState): boolean {
return task.dependencyIds.every((dependencyId) => {
const dependency = state.tasks.find((candidate) => candidate.id === dependencyId);
return dependency?.status === "completed";
});
}
function workerIsIdle(worker: WorkerNode): boolean {
return worker.status === "idle" && !worker.currentTaskId;
}
function scoreWorker(task: Task, worker: WorkerNode): number {
let score = 0;
if (task.requiredOs === "any" || task.requiredOs === worker.os) {
score += 10;
}
for (const capability of task.requiredCapabilities) {
if (worker.capabilities.includes(capability)) {
score += 4;
}
}
return score - worker.load;
}
export function chooseAssignmentCandidates(state: AppState): Array<{
taskId: string;
workerId: string;
}> {
const tasks = state.tasks
.filter((task) => task.status === "queued" && dependenciesSatisfied(task, state))
.sort((left, right) => left.createdAt.localeCompare(right.createdAt));
const availableWorkers = new Map(
state.workers.filter(workerIsIdle).map((worker) => [worker.id, worker]),
);
const assignments: Array<{ taskId: string; workerId: string }> = [];
for (const task of tasks) {
let bestWorker: WorkerNode | null = null;
let bestScore = Number.NEGATIVE_INFINITY;
for (const worker of availableWorkers.values()) {
const score = scoreWorker(task, worker);
if (score > bestScore) {
bestScore = score;
bestWorker = worker;
}
}
if (!bestWorker || bestScore < 0) {
continue;
}
assignments.push({ taskId: task.id, workerId: bestWorker.id });
availableWorkers.delete(bestWorker.id);
}
return assignments;
}

153
src/server.ts Normal file
View File

@@ -0,0 +1,153 @@
import path from "node:path";
import Fastify from "fastify";
import fastifyStatic from "@fastify/static";
import { BossEngine } from "./engine.js";
const engine = new BossEngine();
const app = Fastify({ logger: true });
await app.register(fastifyStatic, {
root: path.resolve(process.cwd(), "public"),
prefix: "/",
});
function ok() {
return { ok: true };
}
app.get("/api/health", async () => ({
status: "ok",
sessions: engine.getState().sessions.length,
workers: engine.getState().workers.length,
}));
app.get("/api/bootstrap", async () => engine.bootstrap());
app.get("/api/sessions", async () => engine.listSessions());
app.post("/api/sessions", async (request) => {
const body = (request.body ?? {}) as { title?: string };
return engine.createSession(body.title);
});
app.get("/api/sessions/:sessionId", async (request) => {
const params = request.params as { sessionId: string };
return engine.getSession(params.sessionId);
});
app.post("/api/sessions/:sessionId/messages", async (request) => {
const params = request.params as { sessionId: string };
const body = (request.body ?? {}) as { content?: string; channel?: string };
return engine.addMessage(params.sessionId, body.content ?? "", body.channel ?? "web");
});
app.get("/api/events/stream", async (_request, reply) => {
reply.raw.setHeader("Content-Type", "text/event-stream");
reply.raw.setHeader("Cache-Control", "no-cache");
reply.raw.setHeader("Connection", "keep-alive");
reply.raw.flushHeaders?.();
const unsubscribe = engine.events.subscribe((event) => {
reply.raw.write(`data: ${JSON.stringify(event)}\n\n`);
});
const interval = setInterval(() => {
reply.raw.write(": ping\n\n");
}, 15_000);
reply.raw.on("close", () => {
clearInterval(interval);
unsubscribe();
reply.raw.end();
});
});
app.get("/api/workers", async () => engine.getState().workers);
app.post("/api/workers/register", async (request) => {
const body = request.body as {
name?: string;
os?: "windows" | "macos" | "linux";
capabilities?: string[];
};
return engine.registerWorker({
name: body.name ?? "worker",
os: body.os ?? "linux",
capabilities: body.capabilities ?? ["terminal"],
});
});
app.post("/api/workers/:workerId/heartbeat", async (request) => {
const params = request.params as { workerId: string };
const body = (request.body ?? {}) as { load?: number };
return engine.heartbeat(params.workerId, body.load ?? 0);
});
app.post("/api/workers/:workerId/claim-next", async (request) => {
const params = request.params as { workerId: string };
return {
task: engine.claimNextTask(params.workerId),
};
});
app.post("/api/tasks/:taskId/progress", async (request) => {
const params = request.params as { taskId: string };
const body = request.body as {
workerId: string;
progressPercent: number;
summary: string;
currentStep: string;
nextStep: string;
};
return engine.reportProgress(params.taskId, body.workerId, {
progressPercent: body.progressPercent,
summary: body.summary,
currentStep: body.currentStep,
nextStep: body.nextStep,
});
});
app.post("/api/tasks/:taskId/complete", async (request) => {
const params = request.params as { taskId: string };
const body = request.body as {
workerId: string;
summary: string;
};
return engine.completeTask(params.taskId, body.workerId, body.summary);
});
app.post("/api/tasks/:taskId/fail", async (request) => {
const params = request.params as { taskId: string };
const body = request.body as {
workerId: string;
errorMessage: string;
};
return engine.failTask(params.taskId, body.workerId, body.errorMessage);
});
app.post("/api/tasks/:taskId/pause", async (request) => {
const params = request.params as { taskId: string };
return engine.pauseTask(params.taskId);
});
app.post("/api/tasks/:taskId/cancel", async (request) => {
const params = request.params as { taskId: string };
return engine.cancelTask(params.taskId);
});
app.post("/api/approvals/:approvalId/respond", async (request) => {
const params = request.params as { approvalId: string };
const body = request.body as {
approved: boolean;
responder?: string;
};
return engine.respondApproval(params.approvalId, body.approved, body.responder ?? "user");
});
app.post("/api/demo/reset", async () => {
engine.store.reset();
return ok();
});
const port = Number(process.env.PORT ?? 43210);
await app.listen({ port, host: "0.0.0.0" });

61
src/store.ts Normal file
View File

@@ -0,0 +1,61 @@
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { dirname } from "node:path";
import type { AppState } from "./types.js";
function defaultState(): AppState {
return {
sessions: [],
messages: [],
tasks: [],
workers: [],
approvals: [],
events: [],
};
}
export class FileStore {
private state: AppState;
constructor(private readonly filePath: string) {
this.ensureDirectory();
this.state = this.load();
}
get snapshot(): AppState {
return structuredClone(this.state);
}
mutate<T>(mutator: (state: AppState) => T): T {
const result = mutator(this.state);
this.save();
return result;
}
reset(): AppState {
this.state = defaultState();
this.save();
return this.snapshot;
}
private ensureDirectory(): void {
mkdirSync(dirname(this.filePath), { recursive: true });
}
private load(): AppState {
if (!existsSync(this.filePath)) {
return defaultState();
}
try {
const raw = readFileSync(this.filePath, "utf8");
return { ...defaultState(), ...(JSON.parse(raw) as AppState) };
} catch {
return defaultState();
}
}
private save(): void {
writeFileSync(this.filePath, `${JSON.stringify(this.state, null, 2)}\n`, "utf8");
}
}

110
src/types.ts Normal file
View File

@@ -0,0 +1,110 @@
export type SessionStatus = "active" | "archived";
export type TaskStatus =
| "planning"
| "queued"
| "assigned"
| "running"
| "blocked"
| "paused"
| "waiting_approval"
| "completed"
| "failed"
| "cancelled";
export type WorkerStatus = "idle" | "busy" | "offline";
export type ApprovalStatus = "pending" | "approved" | "rejected";
export type RiskLevel = "low" | "medium" | "high";
export interface Session {
id: string;
title: string;
status: SessionStatus;
activeObjective: string;
lastPlannerSummary: string;
createdAt: string;
updatedAt: string;
}
export interface Message {
id: string;
sessionId: string;
role: "user" | "manager" | "system";
channel: string;
content: string;
createdAt: string;
}
export interface Task {
id: string;
sessionId: string;
parentTaskId: string | null;
title: string;
description: string;
kind: string;
status: TaskStatus;
priority: "low" | "medium" | "high";
requiredOs: "any" | "windows" | "macos" | "linux";
requiredCapabilities: string[];
dependencyIds: string[];
assignedWorkerId: string | null;
approvalStatus: "not_required" | ApprovalStatus;
progressPercent: number;
summary: string;
currentStep: string;
nextStep: string;
createdAt: string;
updatedAt: string;
}
export interface WorkerNode {
id: string;
name: string;
os: "windows" | "macos" | "linux";
capabilities: string[];
status: WorkerStatus;
currentTaskId: string | null;
load: number;
lastSeenAt: string;
createdAt: string;
updatedAt: string;
}
export interface ApprovalRequest {
id: string;
sessionId: string;
taskId: string;
kind: string;
summary: string;
riskLevel: RiskLevel;
status: ApprovalStatus;
requester: string;
responder: string | null;
createdAt: string;
updatedAt: string;
}
export interface BossEvent {
id: string;
sessionId: string | null;
taskId: string | null;
source: "user" | "manager" | "system" | "worker";
type: string;
timestamp: string;
payload: Record<string, unknown>;
}
export interface AppState {
sessions: Session[];
messages: Message[];
tasks: Task[];
workers: WorkerNode[];
approvals: ApprovalRequest[];
events: BossEvent[];
}
export interface SessionDetails {
session: Session;
messages: Message[];
tasks: Task[];
approvals: ApprovalRequest[];
}

15
src/utils.ts Normal file
View File

@@ -0,0 +1,15 @@
import { randomUUID } from "node:crypto";
export function createId(prefix: string): string {
return `${prefix}_${randomUUID().replace(/-/g, "").slice(0, 12)}`;
}
export function now(): string {
return new Date().toISOString();
}
export function containsKeyword(input: string, keywords: string[]): boolean {
const normalized = input.toLowerCase();
return keywords.some((keyword) => normalized.includes(keyword.toLowerCase()));
}

130
src/worker.ts Normal file
View File

@@ -0,0 +1,130 @@
import { setTimeout as delay } from "node:timers/promises";
interface Task {
id: string;
title: string;
description: string;
kind: string;
}
function parseArgs(argv: string[]) {
const options = {
name: "",
os: "linux",
capabilities: ["terminal"],
server: "http://127.0.0.1:43210",
};
for (let index = 0; index < argv.length; index += 1) {
const token = argv[index];
if (token === "--name") {
options.name = argv[index + 1] ?? "";
index += 1;
} else if (token === "--os") {
options.os = argv[index + 1] ?? "linux";
index += 1;
} else if (token === "--capability") {
options.capabilities.push(argv[index + 1] ?? "terminal");
index += 1;
} else if (token === "--server") {
options.server = argv[index + 1] ?? options.server;
index += 1;
}
}
if (!options.name) {
throw new Error("Usage: npm run worker -- --name <worker-name> [--os windows|macos|linux]");
}
options.capabilities = Array.from(new Set(options.capabilities));
return options;
}
async function postJson(url: string, body: unknown) {
const response = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
if (!response.ok) {
throw new Error(`Request failed: ${response.status} ${response.statusText}`);
}
return response.json();
}
async function simulateTask(server: string, workerId: string, task: Task) {
const steps = [
{
progressPercent: 20,
summary: `${task.title}: 收集上下文和工作目录`,
currentStep: "prepare",
nextStep: "inspect",
},
{
progressPercent: 55,
summary: `${task.title}: 正在执行主要步骤`,
currentStep: "execute",
nextStep: "verify",
},
{
progressPercent: 85,
summary: `${task.title}: 正在整理结果`,
currentStep: "summarize",
nextStep: "complete",
},
];
for (const step of steps) {
await delay(1_500);
await postJson(`${server}/api/tasks/${task.id}/progress`, {
workerId,
...step,
});
}
await delay(1_000);
await postJson(`${server}/api/tasks/${task.id}/complete`, {
workerId,
summary: `${task.title} 已完成。`,
});
}
async function main() {
const options = parseArgs(process.argv.slice(2));
const worker = (await postJson(`${options.server}/api/workers/register`, {
name: options.name,
os: options.os,
capabilities: options.capabilities,
})) as { id: string; name: string };
console.log(`worker ready: ${worker.name} (${worker.id})`);
for (;;) {
await postJson(`${options.server}/api/workers/${worker.id}/heartbeat`, { load: 0 });
const response = (await postJson(`${options.server}/api/workers/${worker.id}/claim-next`, {})) as {
task: Task | null;
};
if (response.task) {
console.log(`claimed task: ${response.task.title}`);
try {
await simulateTask(options.server, worker.id, response.task);
} catch (error) {
const message = error instanceof Error ? error.message : "unknown error";
await postJson(`${options.server}/api/tasks/${response.task.id}/fail`, {
workerId: worker.id,
errorMessage: message,
});
}
} else {
await delay(2_500);
}
}
}
main().catch((error) => {
console.error(error);
process.exit(1);
});