feat: add dispatch plan confirmation flow

This commit is contained in:
kris
2026-03-30 10:41:52 +08:00
parent 11724e9834
commit 3b2bf59b65
7 changed files with 755 additions and 112 deletions

View File

@@ -15,6 +15,7 @@ export type MessageSender = "master" | "device" | "user" | "ops" | "audit";
// single-message forwarding, bundle forwarding, and the legacy notice shape.
export type MessageKind =
| "text"
| "system_notice"
| "voice_intent"
| "image_intent"
| "video_intent"
@@ -3814,47 +3815,58 @@ export async function createDispatchPlan(input: {
targets: DispatchPlanTarget[];
}) {
return mutateState((state) => {
const groupProjectId = input.groupProjectId.trim();
const requestMessageId = input.requestMessageId.trim();
const requestedBy = input.requestedBy.trim();
const summary = input.summary?.trim() ?? "";
if (!groupProjectId) throw new Error("DISPATCH_PLAN_GROUP_PROJECT_REQUIRED");
if (!requestMessageId) throw new Error("DISPATCH_PLAN_REQUEST_MESSAGE_REQUIRED");
if (!requestedBy) throw new Error("DISPATCH_PLAN_REQUESTED_BY_REQUIRED");
const validatedTargets = normalizeDispatchPlanTargetsForCreate(state, input.targets);
const existing = state.dispatchPlans.find(
(plan) =>
plan.groupProjectId === groupProjectId &&
plan.requestMessageId === requestMessageId,
);
if (existing) {
const payloadMatches =
existing.requestedBy === requestedBy &&
existing.summary === summary &&
sameDispatchPlanTargets(existing.targets, validatedTargets);
if (!payloadMatches) {
throw new Error("DISPATCH_PLAN_RETRY_MISMATCH");
}
return existing;
}
const plan: DispatchPlan = {
planId: randomToken("dispatch-plan"),
groupProjectId,
requestMessageId,
requestedBy,
status: "pending_user_confirmation",
targets: validatedTargets,
summary,
createdAt: nowIso(),
};
state.dispatchPlans.unshift(plan);
return plan;
return upsertDispatchPlanInState(state, input);
});
}
function upsertDispatchPlanInState(
state: BossState,
input: {
groupProjectId: string;
requestMessageId: string;
requestedBy: string;
summary?: string;
targets: DispatchPlanTarget[];
},
) {
const groupProjectId = input.groupProjectId.trim();
const requestMessageId = input.requestMessageId.trim();
const requestedBy = input.requestedBy.trim();
const summary = input.summary?.trim() ?? "";
if (!groupProjectId) throw new Error("DISPATCH_PLAN_GROUP_PROJECT_REQUIRED");
if (!requestMessageId) throw new Error("DISPATCH_PLAN_REQUEST_MESSAGE_REQUIRED");
if (!requestedBy) throw new Error("DISPATCH_PLAN_REQUESTED_BY_REQUIRED");
const validatedTargets = normalizeDispatchPlanTargetsForCreate(state, input.targets);
const existing = state.dispatchPlans.find(
(plan) => plan.groupProjectId === groupProjectId && plan.requestMessageId === requestMessageId,
);
if (existing) {
const payloadMatches =
existing.requestedBy === requestedBy &&
existing.summary === summary &&
sameDispatchPlanTargets(existing.targets, validatedTargets);
if (!payloadMatches) {
throw new Error("DISPATCH_PLAN_RETRY_MISMATCH");
}
return existing;
}
const plan: DispatchPlan = {
planId: randomToken("dispatch-plan"),
groupProjectId,
requestMessageId,
requestedBy,
status: "pending_user_confirmation",
targets: validatedTargets,
summary,
createdAt: nowIso(),
};
state.dispatchPlans.unshift(plan);
return plan;
}
export async function listDispatchPlansByProject(groupProjectId: string) {
const state = await readState();
const normalizedGroupProjectId = groupProjectId.trim();
@@ -3863,42 +3875,53 @@ export async function listDispatchPlansByProject(groupProjectId: string) {
.sort((a, b) => b.createdAt.localeCompare(a.createdAt));
}
function applyDispatchPlanConfirmationInState(
state: BossState,
input: {
planId: string;
confirmedBy: string;
approvedTargetProjectIds: string[];
},
) {
const plan = state.dispatchPlans.find((item) => item.planId === input.planId);
if (!plan) throw new Error("DISPATCH_PLAN_NOT_FOUND");
if (plan.status === "rejected") throw new Error("DISPATCH_PLAN_REJECTED");
const confirmedBy = input.confirmedBy.trim();
if (!confirmedBy) throw new Error("DISPATCH_PLAN_CONFIRMED_BY_REQUIRED");
requireDispatchActorSession(state, confirmedBy);
const approvedTargetProjectIds = normalizeStringSet(input.approvedTargetProjectIds);
if (approvedTargetProjectIds.length === 0) {
throw new Error("DISPATCH_PLAN_APPROVED_TARGETS_REQUIRED");
}
const canonicalTargetProjectIds = normalizeStringSet(plan.targets.map((target) => target.projectId));
if (approvedTargetProjectIds.some((projectId) => !canonicalTargetProjectIds.includes(projectId))) {
throw new Error("DISPATCH_PLAN_APPROVED_TARGETS_INVALID");
}
if (plan.confirmedBy && plan.confirmedBy !== confirmedBy) {
throw new Error("DISPATCH_PLAN_CONFIRMED_BY_MISMATCH");
}
if (plan.confirmedTargetProjectIds?.length && !sameStringSet(plan.confirmedTargetProjectIds, approvedTargetProjectIds)) {
throw new Error("DISPATCH_PLAN_APPROVED_TARGETS_MISMATCH");
}
if (plan.status !== "dispatched") {
plan.status = "approved";
}
if (!plan.confirmedAt) {
plan.confirmedAt = nowIso();
}
plan.confirmedBy = confirmedBy;
plan.confirmedTargetProjectIds = approvedTargetProjectIds;
return plan;
}
export async function confirmDispatchPlan(input: {
planId: string;
confirmedBy: string;
approvedTargetProjectIds: string[];
}) {
return mutateState((state) => {
const plan = state.dispatchPlans.find((item) => item.planId === input.planId);
if (!plan) throw new Error("DISPATCH_PLAN_NOT_FOUND");
if (plan.status === "rejected") throw new Error("DISPATCH_PLAN_REJECTED");
const confirmedBy = input.confirmedBy.trim();
if (!confirmedBy) throw new Error("DISPATCH_PLAN_CONFIRMED_BY_REQUIRED");
requireDispatchActorSession(state, confirmedBy);
const approvedTargetProjectIds = normalizeStringSet(input.approvedTargetProjectIds);
if (approvedTargetProjectIds.length === 0) {
throw new Error("DISPATCH_PLAN_APPROVED_TARGETS_REQUIRED");
}
const canonicalTargetProjectIds = normalizeStringSet(plan.targets.map((target) => target.projectId));
if (approvedTargetProjectIds.some((projectId) => !canonicalTargetProjectIds.includes(projectId))) {
throw new Error("DISPATCH_PLAN_APPROVED_TARGETS_INVALID");
}
if (plan.confirmedBy && plan.confirmedBy !== confirmedBy) {
throw new Error("DISPATCH_PLAN_CONFIRMED_BY_MISMATCH");
}
if (plan.confirmedTargetProjectIds?.length && !sameStringSet(plan.confirmedTargetProjectIds, approvedTargetProjectIds)) {
throw new Error("DISPATCH_PLAN_APPROVED_TARGETS_MISMATCH");
}
if (plan.status !== "dispatched") {
plan.status = "approved";
}
if (!plan.confirmedAt) {
plan.confirmedAt = nowIso();
}
plan.confirmedBy = confirmedBy;
plan.confirmedTargetProjectIds = approvedTargetProjectIds;
return plan;
return applyDispatchPlanConfirmationInState(state, input);
});
}
@@ -3959,6 +3982,91 @@ export async function createDispatchExecutionsFromPlan(input: {
});
}
export async function confirmDispatchPlanAndCreateExecutions(input: {
groupProjectId: string;
planId: string;
confirmedBy: string;
approvedTargetProjectIds: string[];
}) {
const result = await mutateState((state) => {
const groupProjectId = input.groupProjectId.trim();
if (!groupProjectId) throw new Error("PROJECT_NOT_FOUND");
const groupProject = state.projects.find((item) => item.id === groupProjectId);
if (!groupProject) throw new Error("PROJECT_NOT_FOUND");
if (!groupProject.isGroup) throw new Error("PROJECT_NOT_GROUP_CHAT");
const plan = applyDispatchPlanConfirmationInState(state, {
planId: input.planId,
confirmedBy: input.confirmedBy,
approvedTargetProjectIds: input.approvedTargetProjectIds,
});
if (plan.groupProjectId !== groupProjectId) {
throw new Error("DISPATCH_PLAN_PROJECT_MISMATCH");
}
const canonicalTargetProjectIds = normalizeStringSet(plan.confirmedTargetProjectIds ?? []);
const existingExecutions = state.dispatchExecutions.filter((item) => item.planId === plan.planId);
let executions: DispatchExecution[];
let createdNotice: Message | null = null;
if (existingExecutions.length > 0) {
const existingTargetIds = normalizeStringSet(existingExecutions.map((execution) => execution.targetProjectId));
if (!sameStringSet(existingTargetIds, canonicalTargetProjectIds)) {
throw new Error("DISPATCH_EXECUTION_SET_MISMATCH");
}
if (plan.status !== "dispatched") {
plan.status = "dispatched";
}
executions = existingExecutions;
} else {
const targets = plan.targets.filter((target) =>
canonicalTargetProjectIds.includes(target.projectId),
);
if (targets.length === 0) {
throw new Error("DISPATCH_EXECUTION_TARGETS_REQUIRED");
}
const createdAt = nowIso();
executions = targets.map((target) => {
const execution: DispatchExecution = {
executionId: randomToken("dispatch-exec"),
planId: plan.planId,
groupProjectId: plan.groupProjectId,
targetProjectId: target.projectId,
targetThreadId: target.threadId,
deviceId: target.deviceId,
status: "queued",
createdAt,
};
state.dispatchExecutions.unshift(execution);
return execution;
});
plan.status = "dispatched";
const targetSummary = executions
.map((execution) => {
const project = state.projects.find((item) => item.id === execution.targetProjectId);
return `${project?.threadMeta.threadDisplayName ?? project?.name ?? execution.targetProjectId}`;
})
.join("、");
createdNotice = pushProjectLedgerMessage(state, groupProjectId, {
sender: "master",
senderLabel: "主 Agent",
body: `已确认下发到 ${executions.length} 个线程:${targetSummary}`,
kind: "system_notice",
});
}
return {
plan: { ...plan },
executions: executions.map((execution) => ({ ...execution })),
notice: createdNotice ? { ...createdNotice } : null,
};
});
publishBossEvent("project.messages.updated", { projectId: input.groupProjectId });
publishBossEvent("conversation.updated", { projectId: input.groupProjectId });
return result;
}
export async function completeDispatchExecution(payload: {
executionId: string;
completedByDeviceId: string;
@@ -4048,6 +4156,10 @@ export async function completeMasterAgentTask(payload: {
replyBody?: string;
errorMessage?: string;
requestId?: string;
dispatchPlan?: {
summary?: string;
targets: DispatchPlanTarget[];
};
}) {
const result = await mutateState((state) => {
const task = state.masterAgentTasks.find((item) => item.taskId === payload.taskId);
@@ -4084,6 +4196,7 @@ export async function completeMasterAgentTask(payload: {
}
let attachmentProjectId: string | undefined;
let createdDispatchPlan: DispatchPlan | undefined;
if (task.taskType === "attachment_analysis" && task.attachmentId) {
const project = state.projects.find((item) => item.id === task.projectId);
const match = project ? findProjectAttachment(project, task.attachmentId) : null;
@@ -4124,7 +4237,20 @@ export async function completeMasterAgentTask(payload: {
}
}
if (!attachmentProjectId && payload.status === "completed" && task.replyBody) {
if (task.taskType === "group_dispatch_plan") {
if (payload.status === "completed") {
if (!payload.dispatchPlan) {
throw new Error("MASTER_AGENT_GROUP_DISPATCH_PLAN_REQUIRED");
}
createdDispatchPlan = upsertDispatchPlanInState(state, {
groupProjectId: task.projectId,
requestMessageId: task.requestMessageId,
requestedBy: task.requestedByAccount,
summary: payload.dispatchPlan.summary,
targets: payload.dispatchPlan.targets,
});
}
} else if (!attachmentProjectId && payload.status === "completed" && task.replyBody) {
pushProjectLedgerMessage(state, task.projectId, {
sender: "master",
senderLabel: task.accountLabel ? `主 Agent · ${task.accountLabel}` : "主 Agent",
@@ -4140,7 +4266,10 @@ export async function completeMasterAgentTask(payload: {
});
}
return { ...task };
return {
...task,
dispatchPlan: createdDispatchPlan ? { ...createdDispatchPlan } : undefined,
};
});
publishBossEvent("master_agent.task.updated", {

View File

@@ -3,7 +3,7 @@ import {
AUTH_SESSION_TTL_MS,
aiProviderLabel,
appendProjectMessage,
createDispatchPlan,
completeMasterAgentTask,
getProjectAttachment,
getAttachmentStorageConfig,
getRuntimeAiAccountById,
@@ -14,6 +14,7 @@ import {
updateAttachmentAnalysisResult,
updateAiAccountHealth,
} from "@/lib/boss-data";
import type { DispatchPlanTarget, GroupConversationMember, Project } from "@/lib/boss-data";
import { canInlineAttachmentText, extractAttachmentTextExcerpt } from "@/lib/boss-attachments";
import { readAliyunOssObjectBuffer } from "@/lib/boss-storage-aliyun-oss";
import { readServerFileAttachmentBuffer } from "@/lib/boss-storage-server-file";
@@ -225,12 +226,159 @@ function summarizeDispatchRequest(requestText: string) {
return `${compact.slice(0, 33)}...`;
}
function collectGroupDispatchTargets(
project: Project,
requestText: string,
): DispatchPlanTarget[] {
const members: Array<
Pick<
GroupConversationMember,
"deviceId" | "projectId" | "threadId" | "threadDisplayName" | "folderName"
>
> =
project.groupMembers.length > 0
? project.groupMembers
: project.deviceIds.map((deviceId) => ({
projectId: project.id,
deviceId,
threadId: project.threadMeta.threadId,
threadDisplayName: project.threadMeta.threadDisplayName,
folderName: project.threadMeta.folderName,
}));
return members
.map((member) => ({
deviceId: member.deviceId,
projectId: member.projectId,
threadId: member.threadId,
threadDisplayName: member.threadDisplayName,
folderName: member.folderName,
reason: `群聊消息“${summarizeDispatchRequest(requestText)}”需要该线程补充状态或执行建议。`,
}))
.filter((target, index, array) => {
const signature = `${target.projectId}::${target.deviceId}::${target.threadId}`;
return (
array.findIndex((item) => `${item.projectId}::${item.deviceId}::${item.threadId}` === signature) ===
index
);
});
}
function summarizeGroupDispatchPlan(requestText: string, targets: DispatchPlanTarget[]) {
const targetLabels = targets.map((target) => target.threadDisplayName).filter(Boolean);
return `主 Agent 建议先按线程分发这条群聊消息:${summarizeDispatchRequest(requestText)}${targetLabels.length > 0 ? `。建议目标:${targetLabels.join("、")}` : ""}`;
}
function buildGroupDispatchPlanPrompt(project: Project, requestText: string) {
const memberDigest = (project.groupMembers.length > 0
? project.groupMembers
: project.deviceIds.map((deviceId) => ({
projectId: project.id,
deviceId,
threadId: project.threadMeta.threadId,
threadDisplayName: project.threadMeta.threadDisplayName,
folderName: project.threadMeta.folderName,
}))
)
.map(
(member) =>
`${member.projectId} / ${member.threadDisplayName} / ${member.folderName} / device=${member.deviceId}`,
)
.join("\n");
return [
"你正在处理 Boss 控制台的群聊分发建议任务。",
"目标不是直接回复用户,而是为这条群聊消息推荐后续需要分发到哪些线程。",
"当前服务端会优先使用已有群成员和线程映射做 recommendation workflow。",
`groupProjectId: ${project.id}`,
`groupProjectName: ${project.name}`,
`requestText: ${requestText}`,
"groupMembers:",
memberDigest || "无",
].join("\n");
}
type GroupDispatchRecommendationResult =
| {
ok: true;
taskId: string;
status: "completed";
dispatchPlan: NonNullable<
Awaited<ReturnType<typeof completeMasterAgentTask>>
>["dispatchPlan"] | null;
}
| {
ok: false;
taskId: string;
status: "failed";
dispatchPlan: null;
error: string;
};
async function resolveGroupDispatchPlanTask(taskId: string): Promise<GroupDispatchRecommendationResult> {
const task = await getMasterAgentTask(taskId);
if (!task) {
throw new Error("MASTER_AGENT_TASK_NOT_FOUND");
}
if (task.taskType !== "group_dispatch_plan") {
throw new Error("MASTER_AGENT_TASK_TYPE_INVALID");
}
try {
const state = await readState();
const project = state.projects.find((item) => item.id === task.projectId);
if (!project) {
throw new Error("PROJECT_NOT_FOUND");
}
if (!project.isGroup) {
throw new Error("PROJECT_NOT_GROUP_CHAT");
}
const targets = collectGroupDispatchTargets(project, task.requestText);
if (targets.length === 0) {
throw new Error("GROUP_DISPATCH_TARGETS_REQUIRED");
}
const completedTask = await completeMasterAgentTask({
taskId: task.taskId,
deviceId: task.deviceId,
status: "completed",
dispatchPlan: {
summary: summarizeGroupDispatchPlan(task.requestText, targets),
targets,
},
});
return {
ok: true as const,
taskId: task.taskId,
status: "completed" as const,
dispatchPlan: completedTask.dispatchPlan ?? null,
};
} catch (error) {
const message = error instanceof Error ? error.message : "GROUP_DISPATCH_PLAN_FAILED";
await completeMasterAgentTask({
taskId: task.taskId,
deviceId: task.deviceId,
status: "failed",
errorMessage: message,
});
return {
ok: false as const,
taskId: task.taskId,
status: "failed" as const,
dispatchPlan: null,
error: message,
};
}
}
export async function queueGroupDispatchPlan(params: {
groupProjectId: string;
requestMessageId: string;
requestText: string;
requestedBy: string;
}) {
}): Promise<GroupDispatchRecommendationResult> {
const state = await readState();
const project = state.projects.find((item) => item.id === params.groupProjectId);
if (!project) {
@@ -240,42 +388,18 @@ export async function queueGroupDispatchPlan(params: {
throw new Error("PROJECT_NOT_GROUP_CHAT");
}
const memberTargets = (project.groupMembers.length > 0
? project.groupMembers
: project.deviceIds.map((deviceId) => ({
projectId: project.id,
deviceId,
threadId: project.threadMeta.threadId,
threadDisplayName: project.threadMeta.threadDisplayName,
folderName: project.threadMeta.folderName,
})))
.map((member) => ({
deviceId: member.deviceId,
projectId: member.projectId,
threadId: member.threadId,
threadDisplayName: member.threadDisplayName,
folderName: member.folderName,
reason: `群聊消息“${summarizeDispatchRequest(params.requestText)}”需要该线程补充状态或执行建议。`,
}))
.filter((target, index, array) => {
const signature = `${target.projectId}::${target.deviceId}::${target.threadId}`;
return array.findIndex((item) => `${item.projectId}::${item.deviceId}::${item.threadId}` === signature) === index;
});
if (memberTargets.length === 0) {
throw new Error("GROUP_DISPATCH_TARGETS_REQUIRED");
}
const targetLabels = memberTargets.map((target) => target.threadDisplayName).filter(Boolean);
const summary = `主 Agent 建议先按线程分发这条群聊消息:${summarizeDispatchRequest(params.requestText)}${targetLabels.length > 0 ? `。建议目标:${targetLabels.join("、")}` : ""}`;
return createDispatchPlan({
groupProjectId: project.id,
const task = await queueMasterAgentTask({
projectId: project.id,
taskType: "group_dispatch_plan",
requestMessageId: params.requestMessageId,
requestText: params.requestText,
executionPrompt: buildGroupDispatchPlanPrompt(project, params.requestText),
requestedBy: params.requestedBy,
summary,
targets: memberTargets,
requestedByAccount: params.requestedBy,
deviceId: state.user.boundDeviceId || "mac-studio",
});
return resolveGroupDispatchPlanTask(task.taskId);
}
async function waitForMasterAgentTaskCompletion(taskId: string, timeoutMs = 55_000) {