feat: ship usable local v1 with demo, workers, approvals, and docker support
This commit is contained in:
633
src/engine.ts
633
src/engine.ts
@@ -26,6 +26,7 @@ export class BossEngine {
|
||||
readonly events = new EventBroker();
|
||||
|
||||
getState(): AppState {
|
||||
this.reconcileState();
|
||||
return this.store.snapshot;
|
||||
}
|
||||
|
||||
@@ -45,20 +46,17 @@ export class BossEngine {
|
||||
updatedAt: timestamp,
|
||||
};
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
state.sessions.unshift(session);
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: session.id,
|
||||
taskId: null,
|
||||
source: "system",
|
||||
type: "session.created",
|
||||
payload: { title: session.title },
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: session.id,
|
||||
taskId: null,
|
||||
source: "system",
|
||||
type: "session.created",
|
||||
payload: { title: session.title },
|
||||
});
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
return this.getSession(session.id);
|
||||
}
|
||||
|
||||
@@ -71,18 +69,76 @@ export class BossEngine {
|
||||
|
||||
return {
|
||||
session,
|
||||
messages: state.messages.filter((message) => message.sessionId === sessionId),
|
||||
tasks: state.tasks.filter((task) => task.sessionId === sessionId),
|
||||
approvals: state.approvals.filter((approval) => approval.sessionId === sessionId),
|
||||
messages: state.messages
|
||||
.filter((message) => message.sessionId === sessionId)
|
||||
.sort((left, right) => left.createdAt.localeCompare(right.createdAt)),
|
||||
tasks: state.tasks
|
||||
.filter((task) => task.sessionId === sessionId)
|
||||
.sort((left, right) => left.createdAt.localeCompare(right.createdAt)),
|
||||
approvals: state.approvals
|
||||
.filter((approval) => approval.sessionId === sessionId)
|
||||
.sort((left, right) => left.createdAt.localeCompare(right.createdAt)),
|
||||
};
|
||||
}
|
||||
|
||||
listSessions(): Session[] {
|
||||
return this.getState().sessions;
|
||||
return this.getState().sessions.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt));
|
||||
}
|
||||
|
||||
getTask(taskId: string): Task {
|
||||
const task = this.getState().tasks.find((candidate) => candidate.id === taskId);
|
||||
if (!task) {
|
||||
throw new Error(`Task not found: ${taskId}`);
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
||||
archiveSession(sessionId: string): SessionDetails {
|
||||
this.commit((state, addEvent) => {
|
||||
const session = state.sessions.find((candidate) => candidate.id === sessionId);
|
||||
if (!session) {
|
||||
throw new Error(`Session not found: ${sessionId}`);
|
||||
}
|
||||
|
||||
const timestamp = now();
|
||||
session.status = "archived";
|
||||
session.updatedAt = timestamp;
|
||||
|
||||
for (const task of state.tasks.filter(
|
||||
(candidate) => candidate.sessionId === sessionId && isActiveTask(candidate),
|
||||
)) {
|
||||
if (!["completed", "failed", "cancelled"].includes(task.status)) {
|
||||
this.detachTaskFromWorker(state, task, timestamp);
|
||||
task.status = "paused";
|
||||
task.summary = "会话已归档,任务暂停。";
|
||||
task.updatedAt = timestamp;
|
||||
addEvent({
|
||||
sessionId,
|
||||
taskId: task.id,
|
||||
source: "system",
|
||||
type: "task.paused",
|
||||
payload: { reason: "session_archived" },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
addEvent({
|
||||
sessionId,
|
||||
taskId: null,
|
||||
source: "system",
|
||||
type: "session.archived",
|
||||
payload: { sessionId },
|
||||
});
|
||||
});
|
||||
|
||||
return this.getSession(sessionId);
|
||||
}
|
||||
|
||||
addMessage(sessionId: string, content: string, channel = "web"): SessionDetails {
|
||||
const session = this.getSession(sessionId).session;
|
||||
if (session.status === "archived") {
|
||||
throw new Error(`Session ${sessionId} is archived`);
|
||||
}
|
||||
const message: Message = {
|
||||
id: createId("msg"),
|
||||
sessionId,
|
||||
@@ -96,7 +152,7 @@ export class BossEngine {
|
||||
throw new Error("Message content is required.");
|
||||
}
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const mutableSession = state.sessions.find((candidate) => candidate.id === sessionId);
|
||||
if (!mutableSession) {
|
||||
throw new Error(`Session not found: ${sessionId}`);
|
||||
@@ -108,21 +164,18 @@ export class BossEngine {
|
||||
mutableSession.title = message.content.slice(0, 32);
|
||||
}
|
||||
state.messages.push(message);
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId,
|
||||
taskId: null,
|
||||
source: "user",
|
||||
type: "session.message.added",
|
||||
payload: {
|
||||
channel,
|
||||
content: message.content,
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId,
|
||||
taskId: null,
|
||||
source: "user",
|
||||
type: "session.message.added",
|
||||
payload: {
|
||||
channel,
|
||||
content: message.content,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
this.applyPlan(session, message.content);
|
||||
return this.getSession(sessionId);
|
||||
}
|
||||
@@ -156,25 +209,22 @@ export class BossEngine {
|
||||
updatedAt: timestamp,
|
||||
};
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
state.workers.push(worker);
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: null,
|
||||
taskId: null,
|
||||
source: "system",
|
||||
type: "worker.registered",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
name: worker.name,
|
||||
os: worker.os,
|
||||
capabilities: worker.capabilities,
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: null,
|
||||
taskId: null,
|
||||
source: "system",
|
||||
type: "worker.registered",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
name: worker.name,
|
||||
os: worker.os,
|
||||
capabilities: worker.capabilities,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
this.syncAssignments();
|
||||
return worker;
|
||||
}
|
||||
@@ -201,9 +251,68 @@ export class BossEngine {
|
||||
return updated;
|
||||
}
|
||||
|
||||
getWorker(workerId: string): WorkerNode {
|
||||
const worker = this.getState().workers.find((candidate) => candidate.id === workerId);
|
||||
if (!worker) {
|
||||
throw new Error(`Worker not found: ${workerId}`);
|
||||
}
|
||||
return worker;
|
||||
}
|
||||
|
||||
markWorkerOffline(workerId: string): WorkerNode {
|
||||
let updated!: WorkerNode;
|
||||
|
||||
this.commit((state, addEvent) => {
|
||||
const worker = state.workers.find((candidate) => candidate.id === workerId);
|
||||
if (!worker) {
|
||||
throw new Error(`Worker not found: ${workerId}`);
|
||||
}
|
||||
|
||||
const timestamp = now();
|
||||
for (const task of state.tasks.filter(
|
||||
(candidate) =>
|
||||
candidate.assignedWorkerId === worker.id &&
|
||||
["assigned", "running"].includes(candidate.status),
|
||||
)) {
|
||||
this.requeueTaskState(state, task, timestamp, `${worker.name} 被手动下线`);
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "system",
|
||||
type: "task.requeued",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
workerName: worker.name,
|
||||
reason: "manual_worker_offline",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
worker.status = "offline";
|
||||
worker.currentTaskId = null;
|
||||
worker.updatedAt = timestamp;
|
||||
worker.lastSeenAt = timestamp;
|
||||
addEvent({
|
||||
sessionId: null,
|
||||
taskId: null,
|
||||
source: "system",
|
||||
type: "worker.offline",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
workerName: worker.name,
|
||||
reason: "manual",
|
||||
},
|
||||
});
|
||||
updated = { ...worker };
|
||||
});
|
||||
|
||||
this.syncAssignments();
|
||||
return updated;
|
||||
}
|
||||
|
||||
heartbeat(workerId: string, load = 0): WorkerNode {
|
||||
let updated!: WorkerNode;
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const worker = state.workers.find((candidate) => candidate.id === workerId);
|
||||
if (!worker) {
|
||||
throw new Error(`Worker not found: ${workerId}`);
|
||||
@@ -216,24 +325,21 @@ export class BossEngine {
|
||||
worker.status = "idle";
|
||||
}
|
||||
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: null,
|
||||
taskId: worker.currentTaskId,
|
||||
source: "worker",
|
||||
type: "worker.heartbeat",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
status: worker.status,
|
||||
load: worker.load,
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: null,
|
||||
taskId: worker.currentTaskId,
|
||||
source: "worker",
|
||||
type: "worker.heartbeat",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
status: worker.status,
|
||||
load: worker.load,
|
||||
},
|
||||
});
|
||||
|
||||
updated = { ...worker };
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
this.syncAssignments();
|
||||
return updated;
|
||||
}
|
||||
@@ -241,7 +347,7 @@ export class BossEngine {
|
||||
claimNextTask(workerId: string): Task | null {
|
||||
let claimedTask: Task | null = null;
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const worker = state.workers.find((candidate) => candidate.id === workerId);
|
||||
if (!worker) {
|
||||
throw new Error(`Worker not found: ${workerId}`);
|
||||
@@ -264,24 +370,18 @@ export class BossEngine {
|
||||
worker.lastSeenAt = task.updatedAt;
|
||||
|
||||
claimedTask = { ...task };
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "worker",
|
||||
type: "task.started",
|
||||
payload: {
|
||||
workerId,
|
||||
title: task.title,
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "worker",
|
||||
type: "task.started",
|
||||
payload: {
|
||||
workerId,
|
||||
title: task.title,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
if (claimedTask) {
|
||||
this.publishLatestEvent();
|
||||
}
|
||||
|
||||
return claimedTask;
|
||||
}
|
||||
|
||||
@@ -296,11 +396,16 @@ export class BossEngine {
|
||||
},
|
||||
): Task {
|
||||
let updated!: Task;
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const task = state.tasks.find((candidate) => candidate.id === taskId);
|
||||
const worker = state.workers.find((candidate) => candidate.id === workerId);
|
||||
if (!task) {
|
||||
throw new Error(`Task not found: ${taskId}`);
|
||||
}
|
||||
if (!worker) {
|
||||
throw new Error(`Worker not found: ${workerId}`);
|
||||
}
|
||||
this.assertTaskOwnership(task, workerId, worker.currentTaskId);
|
||||
|
||||
task.progressPercent = Math.max(0, Math.min(100, input.progressPercent));
|
||||
task.summary = input.summary;
|
||||
@@ -311,32 +416,29 @@ export class BossEngine {
|
||||
task.status = "running";
|
||||
}
|
||||
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "worker",
|
||||
type: "task.progress",
|
||||
payload: {
|
||||
workerId,
|
||||
progressPercent: task.progressPercent,
|
||||
summary: task.summary,
|
||||
currentStep: task.currentStep,
|
||||
nextStep: task.nextStep,
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "worker",
|
||||
type: "task.progress",
|
||||
payload: {
|
||||
workerId,
|
||||
progressPercent: task.progressPercent,
|
||||
summary: task.summary,
|
||||
currentStep: task.currentStep,
|
||||
nextStep: task.nextStep,
|
||||
},
|
||||
});
|
||||
updated = { ...task };
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
return updated;
|
||||
}
|
||||
|
||||
completeTask(taskId: string, workerId: string, summary: string): Task {
|
||||
let updated!: Task;
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const task = state.tasks.find((candidate) => candidate.id === taskId);
|
||||
const worker = state.workers.find((candidate) => candidate.id === workerId);
|
||||
if (!task) {
|
||||
@@ -345,6 +447,7 @@ export class BossEngine {
|
||||
if (!worker) {
|
||||
throw new Error(`Worker not found: ${workerId}`);
|
||||
}
|
||||
this.assertTaskOwnership(task, workerId, worker.currentTaskId);
|
||||
|
||||
task.status = "completed";
|
||||
task.progressPercent = 100;
|
||||
@@ -357,22 +460,19 @@ export class BossEngine {
|
||||
worker.updatedAt = task.updatedAt;
|
||||
worker.lastSeenAt = task.updatedAt;
|
||||
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "worker",
|
||||
type: "task.completed",
|
||||
payload: {
|
||||
workerId,
|
||||
summary,
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "worker",
|
||||
type: "task.completed",
|
||||
payload: {
|
||||
workerId,
|
||||
summary,
|
||||
},
|
||||
});
|
||||
updated = { ...task };
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
this.syncAssignments();
|
||||
return updated;
|
||||
}
|
||||
@@ -380,7 +480,7 @@ export class BossEngine {
|
||||
failTask(taskId: string, workerId: string, errorMessage: string): Task {
|
||||
let updated!: Task;
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const task = state.tasks.find((candidate) => candidate.id === taskId);
|
||||
const worker = state.workers.find((candidate) => candidate.id === workerId);
|
||||
if (!task) {
|
||||
@@ -389,6 +489,7 @@ export class BossEngine {
|
||||
if (!worker) {
|
||||
throw new Error(`Worker not found: ${workerId}`);
|
||||
}
|
||||
this.assertTaskOwnership(task, workerId, worker.currentTaskId);
|
||||
|
||||
task.status = "failed";
|
||||
task.summary = errorMessage;
|
||||
@@ -400,22 +501,19 @@ export class BossEngine {
|
||||
worker.updatedAt = task.updatedAt;
|
||||
worker.lastSeenAt = task.updatedAt;
|
||||
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "worker",
|
||||
type: "task.failed",
|
||||
payload: {
|
||||
workerId,
|
||||
errorMessage,
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "worker",
|
||||
type: "task.failed",
|
||||
payload: {
|
||||
workerId,
|
||||
errorMessage,
|
||||
},
|
||||
});
|
||||
updated = { ...task };
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
this.syncAssignments();
|
||||
return updated;
|
||||
}
|
||||
@@ -432,10 +530,70 @@ export class BossEngine {
|
||||
});
|
||||
}
|
||||
|
||||
resumeTask(taskId: string): Task {
|
||||
let updated!: Task;
|
||||
|
||||
this.commit((state, addEvent) => {
|
||||
const task = state.tasks.find((candidate) => candidate.id === taskId);
|
||||
if (!task) {
|
||||
throw new Error(`Task not found: ${taskId}`);
|
||||
}
|
||||
|
||||
if (task.status !== "paused") {
|
||||
updated = { ...task };
|
||||
return;
|
||||
}
|
||||
|
||||
task.assignedWorkerId = null;
|
||||
task.status = task.approvalStatus === "pending" ? "waiting_approval" : "queued";
|
||||
task.summary = "任务已恢复,等待重新调度。";
|
||||
task.updatedAt = now();
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "system",
|
||||
type: "task.resumed",
|
||||
payload: {
|
||||
summary: task.summary,
|
||||
},
|
||||
});
|
||||
updated = { ...task };
|
||||
});
|
||||
|
||||
this.syncAssignments();
|
||||
return updated;
|
||||
}
|
||||
|
||||
requeueTask(taskId: string): Task {
|
||||
let updated!: Task;
|
||||
|
||||
this.commit((state, addEvent) => {
|
||||
const task = state.tasks.find((candidate) => candidate.id === taskId);
|
||||
if (!task) {
|
||||
throw new Error(`Task not found: ${taskId}`);
|
||||
}
|
||||
|
||||
this.requeueTaskState(state, task, now(), "手动重排");
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "system",
|
||||
type: "task.requeued",
|
||||
payload: {
|
||||
summary: task.summary,
|
||||
},
|
||||
});
|
||||
updated = { ...task };
|
||||
});
|
||||
|
||||
this.syncAssignments();
|
||||
return updated;
|
||||
}
|
||||
|
||||
respondApproval(approvalId: string, approved: boolean, responder: string): ApprovalRequest {
|
||||
let updatedApproval!: ApprovalRequest;
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const approval = state.approvals.find((candidate) => candidate.id === approvalId);
|
||||
if (!approval) {
|
||||
throw new Error(`Approval not found: ${approvalId}`);
|
||||
@@ -455,26 +613,34 @@ export class BossEngine {
|
||||
task.status = approved ? "queued" : "cancelled";
|
||||
task.summary = approved ? "审批已通过,重新进入队列。" : "审批被拒绝,任务已取消。";
|
||||
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: approval.sessionId,
|
||||
taskId: approval.taskId,
|
||||
source: "system",
|
||||
type: approved ? "approval.approved" : "approval.rejected",
|
||||
payload: {
|
||||
approvalId,
|
||||
responder,
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: approval.sessionId,
|
||||
taskId: approval.taskId,
|
||||
source: "system",
|
||||
type: approved ? "approval.approved" : "approval.rejected",
|
||||
payload: {
|
||||
approvalId,
|
||||
responder,
|
||||
},
|
||||
});
|
||||
updatedApproval = { ...approval };
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
this.syncAssignments();
|
||||
return updatedApproval;
|
||||
}
|
||||
|
||||
listEvents(limit = 100): BossEvent[] {
|
||||
const events = this.getState().events;
|
||||
return events.slice(Math.max(0, events.length - limit));
|
||||
}
|
||||
|
||||
reconcileNow(): AppState {
|
||||
this.reconcileState();
|
||||
this.syncAssignments();
|
||||
return this.getState();
|
||||
}
|
||||
|
||||
private applyPlan(session: Session, content: string): void {
|
||||
const sessionDetails = this.getSession(session.id);
|
||||
const result = createPlan(sessionDetails.session, content, sessionDetails.tasks.filter(isActiveTask));
|
||||
@@ -482,7 +648,7 @@ export class BossEngine {
|
||||
const plannerMessage = buildPlannerMessage(result.summary);
|
||||
const timestamp = now();
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const mutableSession = state.sessions.find((candidate) => candidate.id === session.id);
|
||||
if (!mutableSession) {
|
||||
throw new Error(`Session not found: ${session.id}`);
|
||||
@@ -497,8 +663,19 @@ export class BossEngine {
|
||||
(candidate) => candidate.sessionId === session.id && isActiveTask(candidate),
|
||||
)) {
|
||||
if (["running", "assigned", "queued", "planning", "blocked"].includes(task.status)) {
|
||||
this.detachTaskFromWorker(state, task, timestamp);
|
||||
task.status = "paused";
|
||||
task.summary = "检测到新需求,旧任务已暂停。";
|
||||
task.updatedAt = timestamp;
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "manager",
|
||||
type: "task.paused",
|
||||
payload: {
|
||||
reason: "replan",
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -532,21 +709,18 @@ export class BossEngine {
|
||||
};
|
||||
state.messages.push(managerMessage);
|
||||
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: session.id,
|
||||
taskId: null,
|
||||
source: "manager",
|
||||
type: "plan.created",
|
||||
payload: {
|
||||
summary: result.summary,
|
||||
taskIds: tasks.map((task) => task.id),
|
||||
},
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: session.id,
|
||||
taskId: null,
|
||||
source: "manager",
|
||||
type: "plan.created",
|
||||
payload: {
|
||||
summary: result.summary,
|
||||
taskIds: tasks.map((task) => task.id),
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
this.syncAssignments();
|
||||
}
|
||||
|
||||
@@ -558,7 +732,7 @@ export class BossEngine {
|
||||
payload: Record<string, unknown>,
|
||||
): Task {
|
||||
let updated!: Task;
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
const task = state.tasks.find((candidate) => candidate.id === taskId);
|
||||
if (!task) {
|
||||
throw new Error(`Task not found: ${taskId}`);
|
||||
@@ -567,41 +741,29 @@ export class BossEngine {
|
||||
task.status = status;
|
||||
task.updatedAt = now();
|
||||
task.summary = typeof payload.summary === "string" ? payload.summary : task.summary;
|
||||
this.detachTaskFromWorker(state, task, task.updatedAt);
|
||||
|
||||
if (task.assignedWorkerId) {
|
||||
const worker = state.workers.find((candidate) => candidate.id === task.assignedWorkerId);
|
||||
if (worker && worker.currentTaskId === task.id) {
|
||||
worker.currentTaskId = null;
|
||||
worker.status = "idle";
|
||||
worker.updatedAt = task.updatedAt;
|
||||
worker.lastSeenAt = task.updatedAt;
|
||||
}
|
||||
}
|
||||
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source,
|
||||
type: eventType,
|
||||
payload,
|
||||
}),
|
||||
);
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source,
|
||||
type: eventType,
|
||||
payload,
|
||||
});
|
||||
updated = { ...task };
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
this.syncAssignments();
|
||||
return updated;
|
||||
}
|
||||
|
||||
private syncAssignments(): void {
|
||||
const candidates = chooseAssignmentCandidates(this.getState());
|
||||
const candidates = chooseAssignmentCandidates(this.store.snapshot);
|
||||
if (candidates.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.store.mutate((state) => {
|
||||
this.commit((state, addEvent) => {
|
||||
for (const candidate of candidates) {
|
||||
const task = state.tasks.find((item) => item.id === candidate.taskId);
|
||||
const worker = state.workers.find((item) => item.id === candidate.workerId);
|
||||
@@ -619,29 +781,107 @@ export class BossEngine {
|
||||
worker.updatedAt = timestamp;
|
||||
worker.lastSeenAt = timestamp;
|
||||
|
||||
state.events.push(
|
||||
this.makeEvent({
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "system",
|
||||
type: "task.assigned",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
workerName: worker.name,
|
||||
},
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private reconcileState(): void {
|
||||
const staleAfterMs = 20_000;
|
||||
const currentTime = Date.now();
|
||||
this.commit((state, addEvent) => {
|
||||
for (const worker of state.workers) {
|
||||
const age = currentTime - new Date(worker.lastSeenAt).getTime();
|
||||
if (age <= staleAfterMs || worker.status === "offline") {
|
||||
continue;
|
||||
}
|
||||
|
||||
const timestamp = now();
|
||||
worker.status = "offline";
|
||||
worker.updatedAt = timestamp;
|
||||
|
||||
for (const task of state.tasks.filter(
|
||||
(candidate) =>
|
||||
candidate.assignedWorkerId === worker.id &&
|
||||
["assigned", "running"].includes(candidate.status),
|
||||
)) {
|
||||
this.requeueTaskState(state, task, timestamp, `worker ${worker.name} 离线`);
|
||||
addEvent({
|
||||
sessionId: task.sessionId,
|
||||
taskId: task.id,
|
||||
source: "system",
|
||||
type: "task.assigned",
|
||||
type: "task.requeued",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
workerName: worker.name,
|
||||
reason: "heartbeat_timeout",
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
worker.currentTaskId = null;
|
||||
addEvent({
|
||||
sessionId: null,
|
||||
taskId: null,
|
||||
source: "system",
|
||||
type: "worker.offline",
|
||||
payload: {
|
||||
workerId: worker.id,
|
||||
workerName: worker.name,
|
||||
reason: "heartbeat_timeout",
|
||||
},
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
this.publishLatestEvent();
|
||||
}
|
||||
|
||||
private publishLatestEvent(): void {
|
||||
const state = this.getState();
|
||||
const latestEvent = state.events[state.events.length - 1];
|
||||
if (latestEvent) {
|
||||
this.events.publish(latestEvent);
|
||||
private detachTaskFromWorker(state: AppState, task: Task, timestamp: string): void {
|
||||
if (!task.assignedWorkerId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const worker = state.workers.find((candidate) => candidate.id === task.assignedWorkerId);
|
||||
if (worker && worker.currentTaskId === task.id) {
|
||||
worker.currentTaskId = null;
|
||||
if (worker.status !== "offline") {
|
||||
worker.status = "idle";
|
||||
}
|
||||
worker.updatedAt = timestamp;
|
||||
worker.lastSeenAt = timestamp;
|
||||
}
|
||||
|
||||
task.assignedWorkerId = null;
|
||||
}
|
||||
|
||||
private requeueTaskState(state: AppState, task: Task, timestamp: string, reason: string): void {
|
||||
this.detachTaskFromWorker(state, task, timestamp);
|
||||
task.status = task.approvalStatus === "pending" ? "waiting_approval" : "queued";
|
||||
task.summary = `${reason},任务已重新排队。`;
|
||||
task.currentStep = "requeued";
|
||||
task.nextStep = "等待新 worker 认领";
|
||||
task.updatedAt = timestamp;
|
||||
}
|
||||
|
||||
private assertTaskOwnership(task: Task, workerId: string, workerCurrentTaskId: string | null): void {
|
||||
if (task.assignedWorkerId !== workerId) {
|
||||
throw new Error(`Task ${task.id} is not assigned to worker ${workerId}`);
|
||||
}
|
||||
|
||||
if (workerCurrentTaskId !== task.id) {
|
||||
throw new Error(`Worker ${workerId} is not currently executing task ${task.id}`);
|
||||
}
|
||||
|
||||
if (!["assigned", "running"].includes(task.status)) {
|
||||
throw new Error(`Task ${task.id} does not accept worker updates in status ${task.status}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -652,5 +892,26 @@ export class BossEngine {
|
||||
...input,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private commit<T>(
|
||||
mutator: (
|
||||
state: AppState,
|
||||
addEvent: (input: Omit<BossEvent, "id" | "timestamp">) => void,
|
||||
) => T,
|
||||
): T {
|
||||
const published: BossEvent[] = [];
|
||||
const result = this.store.mutate((state) =>
|
||||
mutator(state, (input) => {
|
||||
const event = this.makeEvent(input);
|
||||
state.events.push(event);
|
||||
published.push(event);
|
||||
}),
|
||||
);
|
||||
|
||||
for (const event of published) {
|
||||
this.events.publish(event);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user