feat: harden boss local v1 runtime

This commit is contained in:
Codex
2026-03-23 13:17:11 +08:00
parent 515ce72d0d
commit 47c29c723a
13 changed files with 1281 additions and 178 deletions

View File

@@ -10,6 +10,7 @@ function run(command: string, args: string[]) {
env: {
...process.env,
PORT: process.env.PORT ?? "43210",
BOSS_DATA_FILE: process.env.BOSS_DATA_FILE ?? ".boss-data/demo-store.json",
},
});
child.on("exit", (code, signal) => {

View File

@@ -15,7 +15,9 @@ import { chooseAssignmentCandidates } from "./scheduler.js";
import { FileStore } from "./store.js";
import { createId, now } from "./utils.js";
const DATA_FILE = resolve(process.cwd(), ".boss-data", "store.json");
const DATA_FILE = process.env.BOSS_DATA_FILE
? resolve(process.cwd(), process.env.BOSS_DATA_FILE)
: resolve(process.cwd(), ".boss-data", "store.json");
function isActiveTask(task: Task): boolean {
return !["completed", "failed", "cancelled"].includes(task.status);
@@ -34,6 +36,32 @@ export class BossEngine {
return this.getState();
}
resetDemo(preserveWorkers = true): AppState {
this.commit((state) => {
state.sessions = [];
state.messages = [];
state.tasks = [];
state.approvals = [];
state.events = [];
if (!preserveWorkers) {
state.workers = [];
return;
}
const timestamp = now();
state.workers = state.workers.map((worker) => ({
...worker,
currentTaskId: null,
load: 0,
status: worker.status === "offline" ? "offline" : "idle",
updatedAt: timestamp,
}));
});
return this.getState();
}
createSession(title?: string): SessionDetails {
const timestamp = now();
const session: Session = {
@@ -134,6 +162,29 @@ export class BossEngine {
return this.getSession(sessionId);
}
restoreSession(sessionId: string): SessionDetails {
this.commit((state, addEvent) => {
const session = state.sessions.find((candidate) => candidate.id === sessionId);
if (!session) {
throw new Error(`Session not found: ${sessionId}`);
}
const timestamp = now();
session.status = "active";
session.updatedAt = timestamp;
addEvent({
sessionId,
taskId: null,
source: "system",
type: "session.restored",
payload: { sessionId },
});
});
return this.getSession(sessionId);
}
addMessage(sessionId: string, content: string, channel = "web"): SessionDetails {
const session = this.getSession(sessionId).session;
if (session.status === "archived") {
@@ -415,6 +466,10 @@ export class BossEngine {
if (task.status === "assigned") {
task.status = "running";
}
worker.status = "busy";
worker.currentTaskId = task.id;
worker.updatedAt = task.updatedAt;
worker.lastSeenAt = task.updatedAt;
addEvent({
sessionId: task.sessionId,
@@ -649,6 +704,7 @@ export class BossEngine {
const timestamp = now();
this.commit((state, addEvent) => {
const pausedTaskIds: string[] = [];
const mutableSession = state.sessions.find((candidate) => candidate.id === session.id);
if (!mutableSession) {
throw new Error(`Session not found: ${session.id}`);
@@ -676,6 +732,7 @@ export class BossEngine {
reason: "replan",
},
});
pausedTaskIds.push(task.id);
}
}
}
@@ -717,6 +774,7 @@ export class BossEngine {
payload: {
summary: result.summary,
taskIds: tasks.map((task) => task.id),
pausedTaskIds,
},
});
});

View File

@@ -71,6 +71,11 @@ app.post("/api/sessions/:sessionId/archive", async (request) => {
return engine.archiveSession(params.sessionId);
});
app.post("/api/sessions/:sessionId/restore", async (request) => {
const params = request.params as { sessionId: string };
return engine.restoreSession(params.sessionId);
});
app.get("/api/sessions/:sessionId", async (request) => {
const params = request.params as { sessionId: string };
return engine.getSession(params.sessionId);
@@ -210,9 +215,9 @@ app.post("/api/approvals/:approvalId/respond", async (request) => {
return engine.respondApproval(params.approvalId, body.approved, body.responder ?? "user");
});
app.post("/api/demo/reset", async () => {
engine.store.reset();
return ok();
app.post("/api/demo/reset", async (request) => {
const body = (request.body ?? {}) as { preserveWorkers?: boolean };
return engine.resetDemo(body.preserveWorkers ?? true);
});
app.post("/api/reconcile", async () => engine.reconcileNow());

View File

@@ -1,6 +1,31 @@
import { spawn, type ChildProcess } from "node:child_process";
import { createServer } from "node:net";
import { setTimeout as delay } from "node:timers/promises";
const baseUrl = process.env.BOSS_BASE_URL || "http://127.0.0.1:43210";
const explicitBaseUrl = process.env.BOSS_BASE_URL?.replace(/\/$/, "") ?? "";
const smokeWorkerMode = process.env.SMOKE_WORKER_MODE === "command" ? "command" : "simulate";
const smokeExecutorCommand = process.env.SMOKE_EXECUTOR_COMMAND ?? "";
const smokeWorkspace = process.env.SMOKE_WORKSPACE ?? process.cwd();
let smokePort = process.env.SMOKE_PORT || "";
let baseUrl = explicitBaseUrl;
const children: ChildProcess[] = [];
const workerSpecs = [
{
name: "smoke-win-a",
os: "windows",
capabilities: ["terminal", "browser"],
},
{
name: "smoke-win-b",
os: "windows",
capabilities: ["terminal", "test"],
},
{
name: "smoke-mac-a",
os: "macos",
capabilities: ["terminal", "browser", "test"],
},
] as const;
async function request(path: string, options: RequestInit = {}) {
const response = await fetch(`${baseUrl}${path}`, {
@@ -9,12 +34,132 @@ async function request(path: string, options: RequestInit = {}) {
});
if (!response.ok) {
throw new Error(`${response.status} ${response.statusText}`);
const text = await response.text();
throw new Error(`${response.status} ${response.statusText} ${text}`.trim());
}
return response.json();
}
function run(command: string, args: string[]) {
const child = spawn(command, args, {
stdio: "inherit",
shell: false,
env: {
...process.env,
PORT: smokePort,
BOSS_DATA_FILE: process.env.BOSS_DATA_FILE ?? ".boss-data/smoke-store.json",
},
});
children.push(child);
return child;
}
function portFromUrl(url: string) {
const parsed = new URL(url);
if (parsed.port) {
return parsed.port;
}
return parsed.protocol === "https:" ? "443" : "80";
}
async function chooseLocalPort() {
if (smokePort) {
return smokePort;
}
return new Promise<string>((resolve, reject) => {
const probe = createServer();
probe.on("error", reject);
probe.listen(0, "127.0.0.1", () => {
const address = probe.address();
if (!address || typeof address === "string") {
probe.close(() => reject(new Error("Unable to determine a free smoke port.")));
return;
}
const selectedPort = String(address.port);
probe.close((error) => {
if (error) {
reject(error);
return;
}
resolve(selectedPort);
});
});
});
}
async function isHealthy() {
try {
const health = await request("/api/health");
return health.status === "ok";
} catch {
return false;
}
}
async function waitForHealth(timeoutMs = 20_000) {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
if (await isHealthy()) {
return;
}
await delay(1_000);
}
throw new Error(`Server did not become healthy at ${baseUrl}`);
}
async function ensureEnvironment() {
if (smokeWorkerMode === "command" && !smokeExecutorCommand.trim()) {
throw new Error("SMOKE_WORKER_MODE=command requires SMOKE_EXECUTOR_COMMAND.");
}
if (explicitBaseUrl) {
smokePort = portFromUrl(baseUrl);
if (!(await isHealthy())) {
throw new Error(`Explicit BOSS_BASE_URL is not healthy: ${baseUrl}`);
}
} else {
smokePort = await chooseLocalPort();
baseUrl = `http://127.0.0.1:${smokePort}`;
run(process.execPath, ["./node_modules/tsx/dist/cli.mjs", "src/server.ts"]);
await waitForHealth();
}
for (const spec of workerSpecs) {
const args = [
"./node_modules/tsx/dist/cli.mjs",
"src/worker.ts",
"--name",
spec.name,
"--os",
spec.os,
];
for (const capability of spec.capabilities) {
args.push("--capability", capability);
}
if (smokeWorkerMode === "command") {
args.push("--mode", "command", "--workspace", smokeWorkspace, "--executor", smokeExecutorCommand);
}
args.push("--server", baseUrl);
run(process.execPath, args);
}
await delay(2_000);
}
function shutdown() {
for (const child of children) {
if (!child.killed) {
child.kill("SIGTERM");
}
}
}
async function waitForSessionSettled(sessionId: string, timeoutMs = 30_000) {
const start = Date.now();
@@ -42,43 +187,51 @@ async function waitForSessionSettled(sessionId: string, timeoutMs = 30_000) {
}
async function main() {
await request("/api/demo/reset", { method: "POST", body: "{}" });
const created = await request("/api/sessions", {
method: "POST",
body: JSON.stringify({ title: "Smoke Test" }),
});
const sessionId = created.session.id;
try {
await ensureEnvironment();
await request("/api/demo/reset", {
method: "POST",
body: JSON.stringify({ preserveWorkers: true }),
});
const created = await request("/api/sessions", {
method: "POST",
body: JSON.stringify({ title: "Smoke Test" }),
});
const sessionId = created.session.id;
await request(`/api/sessions/${sessionId}/messages`, {
method: "POST",
body: JSON.stringify({
content: "先调研登录失败根因,然后 delete 旧缓存目录并做验证。",
channel: "smoke",
}),
});
await request(`/api/sessions/${sessionId}/messages`, {
method: "POST",
body: JSON.stringify({
content: "先调研登录失败根因,然后 delete 旧缓存目录并做验证。",
channel: "smoke",
}),
});
const settled = await waitForSessionSettled(sessionId);
console.log(
JSON.stringify(
{
sessionId,
taskStatuses: settled.tasks.map((task: { title: string; status: string }) => ({
title: task.title,
status: task.status,
})),
approvalStatuses: settled.approvals.map((approval: { id: string; status: string }) => ({
id: approval.id,
status: approval.status,
})),
},
null,
2,
),
);
const settled = await waitForSessionSettled(sessionId);
console.log(
JSON.stringify(
{
baseUrl,
sessionId,
taskStatuses: settled.tasks.map((task: { title: string; status: string }) => ({
title: task.title,
status: task.status,
})),
approvalStatuses: settled.approvals.map((approval: { id: string; status: string }) => ({
id: approval.id,
status: approval.status,
})),
},
null,
2,
),
);
} finally {
shutdown();
}
}
main().catch((error) => {
console.error(error);
process.exit(1);
});

View File

@@ -1,3 +1,5 @@
import { spawn } from "node:child_process";
import { resolve } from "node:path";
import { setTimeout as delay } from "node:timers/promises";
interface Task {
@@ -7,12 +9,36 @@ interface Task {
kind: string;
}
function parseArgs(argv: string[]) {
const options = {
interface WorkerOptions {
name: string;
os: string;
capabilities: string[];
server: string;
mode: "simulate" | "command";
executorCommand: string;
workspace: string;
progressIntervalMs: number;
}
class HttpError extends Error {
constructor(
readonly status: number,
readonly responseBody: string,
) {
super(`Request failed: ${status} ${responseBody}`.trim());
}
}
function parseArgs(argv: string[]): WorkerOptions {
const options: WorkerOptions = {
name: "",
os: "linux",
capabilities: ["terminal"],
server: "http://127.0.0.1:43210",
mode: process.env.BOSS_WORKER_MODE === "command" ? "command" : "simulate",
executorCommand: process.env.BOSS_EXECUTOR_COMMAND ?? "",
workspace: resolve(process.env.BOSS_WORKSPACE ?? process.cwd()),
progressIntervalMs: Number(process.env.BOSS_PROGRESS_INTERVAL_MS ?? 4_000),
};
for (let index = 0; index < argv.length; index += 1) {
@@ -29,11 +55,27 @@ function parseArgs(argv: string[]) {
} else if (token === "--server") {
options.server = argv[index + 1] ?? options.server;
index += 1;
} else if (token === "--mode") {
const candidate = argv[index + 1];
options.mode = candidate === "command" ? "command" : "simulate";
index += 1;
} else if (token === "--executor") {
options.executorCommand = argv[index + 1] ?? "";
index += 1;
} else if (token === "--workspace") {
options.workspace = resolve(argv[index + 1] ?? options.workspace);
index += 1;
}
}
if (!options.name) {
throw new Error("Usage: npm run worker -- --name <worker-name> [--os windows|macos|linux]");
throw new Error(
"Usage: npm run worker -- --name <worker-name> [--os windows|macos|linux] [--mode simulate|command] [--executor \"cmd\"] [--workspace /path]",
);
}
if (options.mode === "command" && !options.executorCommand.trim()) {
throw new Error("Command mode requires --executor or BOSS_EXECUTOR_COMMAND.");
}
options.capabilities = Array.from(new Set(options.capabilities));
@@ -48,7 +90,8 @@ async function postJson(url: string, body: unknown) {
});
if (!response.ok) {
throw new Error(`Request failed: ${response.status} ${response.statusText}`);
const text = await response.text();
throw new HttpError(response.status, text);
}
return response.json();
@@ -57,14 +100,32 @@ async function postJson(url: string, body: unknown) {
async function getJson(url: string) {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Request failed: ${response.status} ${response.statusText}`);
const text = await response.text();
throw new HttpError(response.status, text);
}
return response.json();
}
async function registerWorker(options: WorkerOptions) {
const worker = (await postJson(`${options.server}/api/workers/register`, {
name: options.name,
os: options.os,
capabilities: options.capabilities,
})) as { id: string; name: string };
console.log(`worker ready: ${worker.name} (${worker.id})`);
return worker;
}
async function taskStillRunnable(server: string, taskId: string) {
const task = await getJson(`${server}/api/tasks/${taskId}`);
return ["assigned", "running"].includes(task.status);
try {
const task = (await getJson(`${server}/api/tasks/${taskId}`)) as { status: string };
return ["assigned", "running"].includes(task.status);
} catch (error) {
if (error instanceof HttpError && error.status === 404) {
return false;
}
throw error;
}
}
async function simulateTask(server: string, workerId: string, task: Task) {
@@ -93,6 +154,7 @@ async function simulateTask(server: string, workerId: string, task: Task) {
if (!(await taskStillRunnable(server, task.id))) {
return;
}
await delay(1_500);
await postJson(`${server}/api/tasks/${task.id}/progress`, {
workerId,
@@ -103,6 +165,7 @@ async function simulateTask(server: string, workerId: string, task: Task) {
if (!(await taskStillRunnable(server, task.id))) {
return;
}
await delay(1_000);
await postJson(`${server}/api/tasks/${task.id}/complete`, {
workerId,
@@ -110,35 +173,199 @@ async function simulateTask(server: string, workerId: string, task: Task) {
});
}
function appendOutput(lines: string[], chunk: Buffer | string, source: "stdout" | "stderr") {
for (const entry of String(chunk).split(/\r?\n/)) {
const trimmed = entry.trim();
if (!trimmed) {
continue;
}
lines.push(`${source}: ${trimmed}`);
if (lines.length > 30) {
lines.splice(0, lines.length - 30);
}
}
}
function summarizeOutput(lines: string[], fallback: string) {
if (lines.length === 0) {
return fallback;
}
return lines.slice(-3).join(" | ");
}
async function runCommandTask(options: WorkerOptions, workerId: string, task: Task) {
await postJson(`${options.server}/api/tasks/${task.id}/progress`, {
workerId,
progressPercent: 10,
summary: `${task.title}: 已启动外部执行器`,
currentStep: "boot_executor",
nextStep: "run_command",
});
const outputLines: string[] = [];
const child = spawn(options.executorCommand, [], {
cwd: options.workspace,
env: {
...process.env,
BOSS_SERVER_URL: options.server,
BOSS_WORKER_ID: workerId,
BOSS_WORKER_NAME: options.name,
BOSS_WORKER_OS: options.os,
BOSS_WORKER_CAPABILITIES: options.capabilities.join(","),
BOSS_WORKSPACE: options.workspace,
BOSS_TASK_ID: task.id,
BOSS_TASK_TITLE: task.title,
BOSS_TASK_DESCRIPTION: task.description,
BOSS_TASK_KIND: task.kind,
BOSS_TASK_JSON: JSON.stringify(task),
},
shell: true,
stdio: ["ignore", "pipe", "pipe"],
});
child.stdout?.on("data", (chunk) => {
appendOutput(outputLines, chunk, "stdout");
});
child.stderr?.on("data", (chunk) => {
appendOutput(outputLines, chunk, "stderr");
});
const exitState: {
done: boolean;
code: number | null;
signal: NodeJS.Signals | null;
error: Error | null;
} = {
done: false,
code: null,
signal: null,
error: null,
};
child.on("error", (error) => {
exitState.done = true;
exitState.error = error;
});
child.on("exit", (code, signal) => {
exitState.done = true;
exitState.code = code;
exitState.signal = signal;
});
let cancelled = false;
const startedAt = Date.now();
while (!exitState.done) {
await delay(options.progressIntervalMs);
if (exitState.done) {
break;
}
if (!(await taskStillRunnable(options.server, task.id))) {
cancelled = true;
child.kill("SIGTERM");
break;
}
const elapsed = Date.now() - startedAt;
const progressPercent = Math.min(90, 20 + Math.floor(elapsed / options.progressIntervalMs) * 10);
await postJson(`${options.server}/api/tasks/${task.id}/progress`, {
workerId,
progressPercent,
summary: summarizeOutput(outputLines, `${task.title}: 外部执行器运行中`),
currentStep: "run_command",
nextStep: "await_exit",
});
}
while (!exitState.done) {
await delay(100);
}
if (cancelled) {
return;
}
if (exitState.error) {
throw exitState.error;
}
if (exitState.signal && exitState.signal !== "SIGTERM") {
throw new Error(`Executor exited via signal ${exitState.signal}`);
}
if ((exitState.code ?? 1) !== 0) {
throw new Error(summarizeOutput(outputLines, `Executor exited with code ${exitState.code ?? "unknown"}`));
}
if (!(await taskStillRunnable(options.server, task.id))) {
return;
}
await postJson(`${options.server}/api/tasks/${task.id}/complete`, {
workerId,
summary: summarizeOutput(outputLines, `${task.title} 已通过外部执行器完成。`),
});
}
async function executeTask(options: WorkerOptions, workerId: string, task: Task) {
if (options.mode === "command") {
await runCommandTask(options, workerId, task);
return;
}
await simulateTask(options.server, workerId, task);
}
async function main() {
const options = parseArgs(process.argv.slice(2));
const worker = (await postJson(`${options.server}/api/workers/register`, {
name: options.name,
os: options.os,
capabilities: options.capabilities,
})) as { id: string; name: string };
console.log(`worker ready: ${worker.name} (${worker.id})`);
let worker = await registerWorker(options);
for (;;) {
await postJson(`${options.server}/api/workers/${worker.id}/heartbeat`, { load: 0 });
const response = (await postJson(`${options.server}/api/workers/${worker.id}/claim-next`, {})) as {
task: Task | null;
};
try {
await postJson(`${options.server}/api/workers/${worker.id}/heartbeat`, { load: 0 });
const response = (await postJson(`${options.server}/api/workers/${worker.id}/claim-next`, {})) as {
task: Task | null;
};
if (!response.task) {
await delay(2_500);
continue;
}
if (response.task) {
console.log(`claimed task: ${response.task.title}`);
try {
await simulateTask(options.server, worker.id, response.task);
await executeTask(options, worker.id, response.task);
} catch (error) {
if (error instanceof HttpError && [404, 409].includes(error.status)) {
console.warn(`task abandoned: ${response.task.title} (${error.status})`);
continue;
}
const message = error instanceof Error ? error.message : "unknown error";
await postJson(`${options.server}/api/tasks/${response.task.id}/fail`, {
workerId: worker.id,
errorMessage: message,
});
}
} else {
await delay(2_500);
} catch (error) {
if (error instanceof HttpError && error.status === 404) {
console.warn(`worker state lost, re-registering: ${options.name}`);
await delay(1_000);
worker = await registerWorker(options);
continue;
}
if (error instanceof HttpError && error.status === 409) {
await delay(1_500);
continue;
}
throw error;
}
}
}