feat: add master agent task recovery endpoint

This commit is contained in:
AI Bot
2026-06-06 19:05:42 +08:00
parent 755e30612c
commit 643da5b738
6 changed files with 945 additions and 168 deletions

View File

@@ -234,6 +234,94 @@ function riskAggregateValue(risks: Array<{ kind: string; title: string }>, match
return risks.filter(matcher).length;
}
function minutesSince(value?: string): number | null {
if (!value) return null;
const timestamp = Date.parse(value);
if (!Number.isFinite(timestamp)) return null;
return Math.max(0, Math.floor((Date.now() - timestamp) / 60_000));
}
function buildDataSafetySummary(backupStatus: BossStateBackupStatus) {
const ageMinutes = minutesSince(backupStatus.lastBackupAt);
const healthLabel =
backupStatus.status === "error"
? "备份异常"
: backupStatus.status === "empty" || !backupStatus.lastBackupAt
? "暂无备份"
: ageMinutes !== null && ageMinutes > 24 * 60
? "备份过期"
: "备份正常";
const nextAction =
healthLabel === "备份异常"
? "检查备份目录与状态文件写入权限"
: healthLabel === "暂无备份"
? "立即创建首个状态快照"
: healthLabel === "备份过期"
? "补创建状态快照并检查自动备份任务"
: "保持自动快照并定期演练恢复";
return {
mode: backupStatus.mode,
status: backupStatus.status,
restorePointCount: backupStatus.restorePointCount,
lastBackupAt: backupStatus.lastBackupAt ?? "",
ageMinutes,
healthLabel,
rpoLabel: "文件 MVP以最近成功快照为准",
rtoLabel: "文件 MVP人工恢复目标 30-60 分钟",
nextAction,
};
}
function canRecoverActiveTask(task: BossState["masterAgentTasks"][number]) {
if (task.recoverable) return true;
if (task.phase === "recoverable_failed") return true;
if (task.phase === "turn_started" || task.phase === "awaiting_reply" || task.phase === "completing") {
return false;
}
const maxAttempts = task.maxAttempts ?? 1;
return (task.attemptCount ?? 0) < maxAttempts;
}
function buildTaskRiskSummary(state: BossState) {
const activeStatuses = new Set(["queued", "running", "needs_user_action"]);
const activeTasks = state.masterAgentTasks.filter((task) => activeStatuses.has(task.status));
const rows = activeTasks
.map((task) => {
const activeAt = task.lastProgressAt ?? task.claimedAt ?? task.requestedAt;
const ageMinutes = minutesSince(activeAt);
const stale = ageMinutes !== null && ageMinutes > 10;
const recoverable = canRecoverActiveTask(task);
return {
taskId: task.taskId,
projectId: task.projectId,
deviceId: task.deviceId,
status: task.status,
phase: task.phase ?? task.status,
stale,
recoverable,
lastProgressAt: task.lastProgressAt ?? "",
summary: task.requestText || task.errorMessage || task.taskType,
};
})
.sort((left, right) => Number(right.stale) - Number(left.stale) || right.taskId.localeCompare(left.taskId))
.slice(0, 20);
return {
counts: {
active: activeTasks.length,
stale: activeTasks.filter((task) => {
const activeAt = task.lastProgressAt ?? task.claimedAt ?? task.requestedAt;
const ageMinutes = minutesSince(activeAt);
return ageMinutes !== null && ageMinutes > 10;
}).length,
recoverable: activeTasks.filter(canRecoverActiveTask).length,
needsUserAction: activeTasks.filter((task) => task.status === "needs_user_action" || task.phase === "needs_user_action").length,
},
rows,
};
}
function buildBackofficeInsights(state: BossState, options: { surface: BackofficeSurface; backupStatus: BossStateBackupStatus }) {
const overview = buildAdminOverview(state);
const devices = state.devices;
@@ -344,6 +432,8 @@ function buildBackofficeInsights(state: BossState, options: { surface: Backoffic
backupDir: options.backupStatus.backupDir,
detail: options.backupStatus.detail,
},
dataSafetySummary: buildDataSafetySummary(options.backupStatus),
taskRiskSummary: buildTaskRiskSummary(state),
capabilitySummary: {
guiReady,
cliReady,

View File

@@ -0,0 +1,94 @@
import { NextRequest } from "next/server";
import { jsonNoStore } from "@/lib/api-response";
import { requireRequestSession } from "@/lib/boss-auth";
import { requireCsrfSafeMutation } from "@/lib/boss-csrf";
import {
canRetryMasterAgentTaskSafely,
getMasterAgentTask,
retryRecoverableMasterAgentTask,
} from "@/lib/boss-data";
function stringValue(value: unknown) {
return typeof value === "string" ? value.trim() : "";
}
async function paramsTaskId(context: { params: Promise<{ taskId: string }> }) {
const params = await context.params;
return params.taskId;
}
function forbidden() {
return jsonNoStore({ ok: false, message: "FORBIDDEN" }, { status: 403 });
}
export async function GET(
request: NextRequest,
context: { params: Promise<{ taskId: string }> },
) {
const session = await requireRequestSession(request);
if (!session) {
return jsonNoStore({ ok: false, message: "UNAUTHORIZED" }, { status: 401 });
}
const taskId = await paramsTaskId(context);
const task = await getMasterAgentTask(taskId);
if (!task) {
return jsonNoStore({ ok: false, message: "MASTER_AGENT_TASK_NOT_FOUND" }, { status: 404 });
}
if (session.role !== "highest_admin" && task.requestedByAccount !== session.account) {
return forbidden();
}
const phase = task.phase ?? task.status;
const lastProgressAt = task.lastProgressAt ?? task.claimedAt ?? task.requestedAt;
const canRetry = canRetryMasterAgentTaskSafely(task);
return jsonNoStore({
ok: true,
recovery: {
taskId: task.taskId,
status: task.status,
phase,
canRetry,
safeNextAction: canRetry ? "retry" : task.status === "needs_user_action" ? "user_action" : "inspect",
diagnosis: `任务处于 ${phase},最后进度时间 ${lastProgressAt}`,
lastErrorCode: task.lastErrorCode,
lastProgressAt,
},
});
}
export async function POST(
request: NextRequest,
context: { params: Promise<{ taskId: string }> },
) {
const csrf = requireCsrfSafeMutation(request);
if (csrf) return csrf;
const session = await requireRequestSession(request);
if (!session) {
return jsonNoStore({ ok: false, message: "UNAUTHORIZED" }, { status: 401 });
}
if (session.role !== "highest_admin") {
return forbidden();
}
const body = (await request.json().catch(() => ({}))) as Record<string, unknown>;
const action = stringValue(body.action);
if (action !== "retry") {
return jsonNoStore({ ok: false, message: "TASK_RECOVERY_ACTION_INVALID" }, { status: 400 });
}
const taskId = await paramsTaskId(context);
try {
const task = await retryRecoverableMasterAgentTask({
taskId,
actorAccount: session.account,
reason: stringValue(body.reason) || "管理员从恢复面板重试任务",
});
return jsonNoStore({ ok: true, action, task });
} catch (error) {
const message = error instanceof Error ? error.message : "TASK_RECOVERY_FAILED";
const status = message === "MASTER_AGENT_TASK_NOT_FOUND" ? 404 : 400;
return jsonNoStore({ ok: false, message }, { status });
}
}

View File

@@ -274,6 +274,7 @@ export interface ExecutionProgressSnapshot {
projectId: string;
targetProjectId?: string;
taskType?: MasterAgentTaskType;
phase?: MasterAgentTaskPhase;
controlMode?: ExecutionProgressControlMode;
runtimeKind?: ComputerControlRuntimeKind;
controlPlatform?: ComputerControlPlatform;
@@ -308,6 +309,7 @@ export interface ExecutionProgressSnapshot {
export interface ExecutionProgressInput {
steps?: Array<Partial<ExecutionProgressStep> & { text?: string }>;
phase?: MasterAgentTaskPhase;
branch?: Partial<ExecutionProgressBranchDetails>;
artifacts?: Array<Partial<ExecutionProgressArtifact> & { label?: string }>;
agents?: Array<Partial<ExecutionProgressAgent> & { name?: string }>;
@@ -464,6 +466,19 @@ export type MasterAgentTaskStatus =
| "failed"
| "timed_out"
| "canceled";
export type MasterAgentTaskPhase =
| "queued"
| "claimed"
| "executor_starting"
| "turn_started"
| "awaiting_reply"
| "completing"
| "completed"
| "recoverable_failed"
| "terminal_failed"
| "timed_out"
| "canceled"
| "needs_user_action";
export type DialogGuardInterventionAction =
| "allow_once"
| "allow_for_device_dialog"
@@ -1211,6 +1226,12 @@ export interface PermissionAuditLog {
| "risk.notification.dispatched"
| "dialog_guard.intervention_required"
| "dialog_guard.intervention_resolved"
| "backup.snapshot_created"
| "backup.snapshot_verified"
| "backup.restore_previewed"
| "backup.restore_dry_run"
| "backup.snapshot_restored"
| "master_agent.task_retried"
| "task.authorized"
| "task.denied";
targetAccount?: string;
@@ -1390,10 +1411,15 @@ export interface MasterAgentTask {
confirmationScopeKey?: string;
externalReplyTarget?: ExternalReplyTarget;
status: MasterAgentTaskStatus;
phase?: MasterAgentTaskPhase;
requestedAt: string;
claimedAt?: string;
lastClaimedAt?: string;
leaseExpiresAt?: string;
lastProgressAt?: string;
lastErrorCode?: string;
recoverable?: boolean;
nextRetryAt?: string;
attemptCount?: number;
maxAttempts?: number;
completedAt?: string;
@@ -3302,6 +3328,12 @@ function normalizePermissionAuditLog(raw: Partial<PermissionAuditLog>): Permissi
raw.action === "risk.notification.dispatched" ||
raw.action === "dialog_guard.intervention_required" ||
raw.action === "dialog_guard.intervention_resolved" ||
raw.action === "backup.snapshot_created" ||
raw.action === "backup.snapshot_verified" ||
raw.action === "backup.restore_previewed" ||
raw.action === "backup.restore_dry_run" ||
raw.action === "backup.snapshot_restored" ||
raw.action === "master_agent.task_retried" ||
raw.action === "task.authorized" ||
raw.action === "task.denied"
? raw.action
@@ -4068,18 +4100,25 @@ function normalizeExecutionProgressSnapshot(raw: Partial<ExecutionProgressSnapsh
relayViaMasterAgent: false,
};
const controlMode = rawControlMode ?? (nativeRemoteControl ? "native_remote_control" : undefined);
const phase = normalizeMasterAgentTaskPhase(raw.phase, status);
return {
taskId: raw.taskId?.trim() || randomToken("mastertask"),
projectId: raw.projectId?.trim() || "",
targetProjectId: raw.targetProjectId?.trim() || undefined,
taskType: normalizedTaskType,
phase,
controlMode,
runtimeKind,
controlPlatform,
computerUseProvider,
title: raw.title?.trim() || (nativeRemoteControl ? "远程控制进度" : "进度"),
status,
steps: normalizeExecutionProgressSteps(taskShim, status, raw.steps),
status: phaseToExecutionProgressStatus(phase, status),
steps: normalizeExecutionProgressSteps(
taskShim,
phaseToExecutionProgressStatus(phase, status),
phase,
raw.steps,
),
branch: nativeRemoteControl ? undefined : normalizeExecutionProgressBranch(raw.branch),
artifacts: normalizeExecutionProgressArtifacts(raw.artifacts),
agents: nativeRemoteControl ? undefined : normalizeExecutionProgressAgents(raw.agents),
@@ -4725,7 +4764,19 @@ export function migrateBossState(raw: Partial<BossState> | undefined): BossState
switchedAt: item.switchedAt ?? nowIso(),
reason: item.reason ?? "未注明切换原因",
})),
masterAgentTasks: ensureArray(raw.masterAgentTasks, base.masterAgentTasks).map((task) => ({
masterAgentTasks: ensureArray(raw.masterAgentTasks, base.masterAgentTasks).map((task) => {
const status: MasterAgentTaskStatus =
task.status === "queued" ||
task.status === "running" ||
task.status === "needs_user_action" ||
task.status === "completed" ||
task.status === "failed" ||
task.status === "timed_out" ||
task.status === "canceled"
? task.status
: "queued";
const phase = normalizeMasterAgentTaskPhase(task.phase, status);
return {
taskId: task.taskId ?? randomToken("mastertask"),
projectId: task.projectId ?? "master-agent",
taskType: task.taskType ?? "conversation_reply",
@@ -4884,20 +4935,19 @@ export function migrateBossState(raw: Partial<BossState> | undefined): BossState
deliveryError: trimToDefined(task.externalReplyTarget.deliveryError),
}
: undefined,
status:
task.status === "queued" ||
task.status === "running" ||
task.status === "needs_user_action" ||
task.status === "completed" ||
task.status === "failed" ||
task.status === "timed_out" ||
task.status === "canceled"
? task.status
: "queued",
status,
phase,
requestedAt: task.requestedAt ?? nowIso(),
claimedAt: trimToDefined(task.claimedAt),
lastClaimedAt: trimToDefined(task.lastClaimedAt),
leaseExpiresAt: trimToDefined(task.leaseExpiresAt),
lastProgressAt: trimToDefined(task.lastProgressAt),
lastErrorCode: trimToDefined(task.lastErrorCode) ?? phaseErrorCode(phase),
recoverable:
typeof task.recoverable === "boolean"
? task.recoverable
: phase === "recoverable_failed",
nextRetryAt: trimToDefined(task.nextRetryAt),
attemptCount: normalizeOptionalNumber(task.attemptCount),
maxAttempts: normalizeOptionalNumber(task.maxAttempts),
completedAt: trimToDefined(task.completedAt),
@@ -4911,7 +4961,8 @@ export function migrateBossState(raw: Partial<BossState> | undefined): BossState
requestId: task.requestId,
targetUrl: trimToDefined(task.targetUrl),
targetApp: trimToDefined(task.targetApp),
})),
};
}),
dispatchPlans: ensureArray(raw.dispatchPlans, base.dispatchPlans).map((plan, index) =>
normalizeDispatchPlan(plan, base.dispatchPlans[index % Math.max(1, base.dispatchPlans.length)]),
),
@@ -6089,13 +6140,55 @@ function deriveExecutionProgressStepStatuses(status: ExecutionProgressStatus, co
});
}
function deriveExecutionProgressStepStatusesFromPhase(
phase: MasterAgentTaskPhase | undefined,
status: ExecutionProgressStatus,
count: number,
) {
if (!phase) {
return deriveExecutionProgressStepStatuses(status, count);
}
if (phase === "completed") {
return Array.from({ length: count }, () => "done" as const);
}
if (
phase === "recoverable_failed" ||
phase === "terminal_failed" ||
phase === "timed_out" ||
phase === "canceled"
) {
const failedIndex = Math.min(count - 1, count >= 5 ? 3 : 2);
return Array.from({ length: count }, (_, index): ExecutionProgressStepStatus => {
if (index < failedIndex) return "done";
if (index === failedIndex) return "failed";
return "pending";
});
}
const activeIndex =
phase === "queued"
? 0
: phase === "claimed" || phase === "executor_starting"
? Math.min(1, count - 1)
: phase === "turn_started" || phase === "awaiting_reply"
? Math.min(count >= 5 ? 3 : 2, count - 1)
: phase === "completing"
? count - 1
: Math.min(1, count - 1);
return Array.from({ length: count }, (_, index): ExecutionProgressStepStatus => {
if (index < activeIndex) return "done";
if (index === activeIndex) return "running";
return "pending";
});
}
function normalizeExecutionProgressSteps(
task: Pick<MasterAgentTask, "taskType" | "relayViaMasterAgent">,
status: ExecutionProgressStatus,
phase?: MasterAgentTaskPhase,
input?: ExecutionProgressInput["steps"],
) {
const defaultTexts = defaultExecutionProgressStepTexts(task);
const defaultStatuses = deriveExecutionProgressStepStatuses(status, defaultTexts.length);
const defaultStatuses = deriveExecutionProgressStepStatusesFromPhase(phase, status, defaultTexts.length);
const sourceSteps = input && input.length > 0 ? input : defaultTexts.map((text, index) => ({
id: `step-${index + 1}`,
text,
@@ -6159,8 +6252,12 @@ function buildExecutionProgressSnapshot(
if (!projectId || !shouldShowTaskExecutionProgress(task)) {
return null;
}
const normalizedStatus = normalizeExecutionProgressStatus(status);
const steps = normalizeExecutionProgressSteps(task, normalizedStatus, input?.steps);
const taskPhase = normalizeMasterAgentTaskPhase(input?.phase ?? task.phase, task.status);
const normalizedStatus = phaseToExecutionProgressStatus(
taskPhase,
normalizeExecutionProgressStatus(status),
);
const steps = normalizeExecutionProgressSteps(task, normalizedStatus, taskPhase, input?.steps);
const nativeRemoteControl =
task.taskType === "browser_control" ||
task.taskType === "desktop_control" ||
@@ -6171,6 +6268,7 @@ function buildExecutionProgressSnapshot(
projectId,
targetProjectId: task.targetProjectId,
taskType: task.taskType,
phase: taskPhase,
controlMode: nativeRemoteControl ? "native_remote_control" : "codex_thread",
runtimeKind: task.runtimeKind,
controlPlatform: task.controlPlatform,
@@ -9875,6 +9973,57 @@ function isCliWriteTask(task: MasterAgentTask) {
return true;
}
function canRunWriteTaskThroughGui(device: Device | undefined, task: MasterAgentTask) {
if (!device || device.status !== "online" || isDeviceRevoked(device)) {
return false;
}
const appServerHealth = resolveCodexAppServerHealth(device);
const capabilities = device.capabilities;
const hasGuiExecutionChannel = Boolean(
appServerHealth !== "unavailable" || capabilities?.gui?.connected,
);
if (!hasGuiExecutionChannel) {
return false;
}
return Boolean(
task.targetCodexThreadRef ||
task.targetThreadId ||
task.targetCodexFolderRef ||
task.targetProjectId,
);
}
export type CodexAppServerHealth = "available" | "degraded" | "unavailable";
export function resolveCodexAppServerHealth(device: Device | undefined): CodexAppServerHealth {
if (!device || device.status !== "online" || isDeviceRevoked(device)) {
return "unavailable";
}
const capability = device.capabilities?.codexAppServer;
if (!capability?.connected) {
return "unavailable";
}
const lastSeenAtMs = Date.parse(capability.lastSeenAt ?? device.lastSeenAt ?? "");
if (Number.isFinite(lastSeenAtMs) && Date.now() - lastSeenAtMs > 2 * 60 * 1000) {
return "degraded";
}
const metadata = capability.metadata;
const errors =
metadata && typeof metadata === "object" && Array.isArray((metadata as { errors?: unknown }).errors)
? ((metadata as { errors?: unknown[] }).errors ?? [])
: [];
const driftLevel =
metadata &&
typeof metadata === "object" &&
typeof (metadata as { protocolDriftSummary?: { driftLevel?: unknown } }).protocolDriftSummary?.driftLevel === "string"
? (metadata as { protocolDriftSummary?: { driftLevel?: string } }).protocolDriftSummary?.driftLevel
: undefined;
if (errors.length > 0 || driftLevel === "warning" || driftLevel === "error") {
return "degraded";
}
return "available";
}
function resolveProjectConflictScopeForTask(
state: BossState,
task: MasterAgentTask,
@@ -9904,6 +10053,7 @@ function resolveProjectConflictScopeForTask(
}
const STALE_RUNNING_CONVERSATION_REPLY_MS = 15 * 60 * 1000;
const STALE_QUEUED_CONVERSATION_REPLY_MS = 60 * 60 * 1000;
const MASTER_AGENT_TASK_DEFAULT_LEASE_MS = 30 * 60 * 1000;
const MASTER_AGENT_TASK_CONVERSATION_LEASE_MS = STALE_RUNNING_CONVERSATION_REPLY_MS;
const MASTER_AGENT_TASK_DEFAULT_MAX_ATTEMPTS = 3;
@@ -9916,6 +10066,76 @@ function isTerminalMasterAgentTaskStatus(status: MasterAgentTaskStatus) {
return status === "completed" || status === "failed" || status === "timed_out" || status === "canceled";
}
function isMasterAgentTaskPhase(value: unknown): value is MasterAgentTaskPhase {
return (
value === "queued" ||
value === "claimed" ||
value === "executor_starting" ||
value === "turn_started" ||
value === "awaiting_reply" ||
value === "completing" ||
value === "completed" ||
value === "recoverable_failed" ||
value === "terminal_failed" ||
value === "timed_out" ||
value === "canceled" ||
value === "needs_user_action"
);
}
function defaultMasterAgentTaskPhase(status: MasterAgentTaskStatus): MasterAgentTaskPhase {
switch (status) {
case "queued":
return "queued";
case "running":
return "claimed";
case "needs_user_action":
return "needs_user_action";
case "completed":
return "completed";
case "timed_out":
return "timed_out";
case "canceled":
return "canceled";
case "failed":
return "terminal_failed";
default:
return "queued";
}
}
function normalizeMasterAgentTaskPhase(
phase: unknown,
status: MasterAgentTaskStatus,
): MasterAgentTaskPhase {
return isMasterAgentTaskPhase(phase) ? phase : defaultMasterAgentTaskPhase(status);
}
function phaseToExecutionProgressStatus(
phase: MasterAgentTaskPhase,
fallback: ExecutionProgressStatus,
): ExecutionProgressStatus {
if (phase === "completed") return "completed";
if (
phase === "recoverable_failed" ||
phase === "terminal_failed" ||
phase === "timed_out" ||
phase === "canceled"
) {
return "failed";
}
if (phase === "queued") return "queued";
return fallback === "completed" || fallback === "failed" ? fallback : "running";
}
function phaseErrorCode(phase: MasterAgentTaskPhase) {
if (phase === "recoverable_failed") return "RECOVERABLE_TASK_FAILURE";
if (phase === "terminal_failed") return "TERMINAL_TASK_FAILURE";
if (phase === "timed_out") return "TASK_TIMED_OUT";
if (phase === "canceled") return "TASK_CANCELED";
return undefined;
}
function codexModelChannelAccountSummarySignedIn(device: Device) {
const metadata = device.capabilities?.codexAppServer?.metadata;
const accountSummary =
@@ -9931,7 +10151,7 @@ function hasUsableCodexModelChannelInState(device: Device | undefined) {
}
const capabilities = device.capabilities;
const hasCodexTransport = Boolean(
capabilities?.codexAppServer?.connected ||
resolveCodexAppServerHealth(device) !== "unavailable" ||
capabilities?.cli?.connected ||
capabilities?.gui?.connected,
);
@@ -10055,15 +10275,39 @@ function taskLeaseExpired(task: MasterAgentTask, nowMs = Date.now()) {
}
function taskCanRetryAfterExpiredLease(task: MasterAgentTask) {
if (
task.phase === "turn_started" ||
task.phase === "awaiting_reply" ||
task.phase === "completing"
) {
return false;
}
const maxAttempts = task.maxAttempts ?? defaultMasterAgentTaskMaxAttempts(task.taskType);
return (task.attemptCount ?? 0) < maxAttempts;
}
function requeueRecoverableMasterAgentTaskInState(task: MasterAgentTask, now: string, reason: string) {
task.status = "queued";
task.phase = "recoverable_failed";
task.leaseExpiresAt = undefined;
task.claimedAt = undefined;
task.lastClaimedAt = task.lastClaimedAt ?? now;
task.lastProgressAt = now;
task.lastErrorKind = "recoverable_lease_timeout";
task.lastErrorCode = "RECOVERABLE_LEASE_TIMEOUT";
task.recoverable = true;
task.nextRetryAt = now;
task.errorMessage = reason;
}
function expireMasterAgentTaskInState(task: MasterAgentTask, now: string, reason: string) {
task.status = "timed_out";
task.phase = "timed_out";
task.completedAt = now;
task.leaseExpiresAt = undefined;
task.lastErrorKind = "lease_timeout";
task.lastErrorCode = "TASK_TIMED_OUT";
task.recoverable = false;
task.errorMessage = reason;
}
@@ -10071,11 +10315,22 @@ function isStaleRunningConversationReplyTask(task: MasterAgentTask, nowMs = Date
if (task.taskType !== "conversation_reply" || task.status !== "running") {
return false;
}
const claimedAtMs = Date.parse(task.claimedAt ?? "");
if (!Number.isFinite(claimedAtMs)) {
const lastActivityMs = Date.parse(task.lastProgressAt ?? task.claimedAt ?? "");
if (!Number.isFinite(lastActivityMs)) {
return true;
}
return nowMs - claimedAtMs >= STALE_RUNNING_CONVERSATION_REPLY_MS;
return nowMs - lastActivityMs >= STALE_RUNNING_CONVERSATION_REPLY_MS;
}
function isStaleQueuedConversationReplyTask(task: MasterAgentTask, nowMs = Date.now()) {
if (task.taskType !== "conversation_reply" || task.status !== "queued") {
return false;
}
const requestedAtMs = Date.parse(task.requestedAt ?? "");
if (!Number.isFinite(requestedAtMs)) {
return false;
}
return nowMs - requestedAtMs >= STALE_QUEUED_CONVERSATION_REPLY_MS;
}
function buildControlTaskCompletionUserSummary(task: MasterAgentTask) {
@@ -10094,24 +10349,63 @@ function buildControlTaskCompletionUserSummary(task: MasterAgentTask) {
return lines.join("");
}
function hasRecentProjectReplyDuplicate(
project: Project | undefined,
input: { body?: string; senderLabel?: string; at?: string; windowMs?: number },
) {
const body = input.body?.trim();
const senderLabel = input.senderLabel?.trim();
if (!project || !body || !senderLabel) {
return false;
}
const atMs = Date.parse(input.at ?? "");
const referenceMs = Number.isFinite(atMs) ? atMs : Date.now();
const windowMs = input.windowMs ?? 5 * 60 * 1000;
return project.messages.some((message) => {
if (message.kind === "execution_progress") {
return false;
}
if (message.body.trim() !== body || message.senderLabel.trim() !== senderLabel) {
return false;
}
const sentAtMs = Date.parse(message.sentAt);
if (!Number.isFinite(sentAtMs)) {
return false;
}
return Math.abs(referenceMs - sentAtMs) <= windowMs;
});
}
async function sweepExpiredMasterAgentTasksForDevice(deviceId: string, nowMs = Date.now()) {
const expired: MasterAgentTask[] = [];
await mutateStateIfChanged(async (state) => {
let changed = false;
const now = new Date(nowMs).toISOString();
for (const task of state.masterAgentTasks) {
if (task.deviceId !== deviceId || !taskLeaseExpired(task, nowMs)) {
const staleQueued = isStaleQueuedConversationReplyTask(task, nowMs);
const expiredRunning = taskLeaseExpired(task, nowMs);
if (task.deviceId !== deviceId || (!expiredRunning && !staleQueued)) {
continue;
}
if (taskCanRetryAfterExpiredLease(task)) {
if (expiredRunning && taskCanRetryAfterExpiredLease(task)) {
requeueRecoverableMasterAgentTaskInState(
task,
now,
"Master Agent task lease expired before Codex turn started; queued for safe retry.",
);
upsertTaskExecutionProgressMessageInState(state, task, "failed", { phase: task.phase });
expired.push({ ...task });
changed = true;
continue;
}
expireMasterAgentTaskInState(
task,
now,
`Master Agent task timed out after ${task.attemptCount ?? 0} attempts.`,
staleQueued
? "Master Agent task expired before being claimed."
: `Master Agent task timed out after ${task.attemptCount ?? 0} attempts.`,
);
upsertTaskExecutionProgressMessageInState(state, task, "failed");
upsertTaskExecutionProgressMessageInState(state, task, "failed", { phase: task.phase });
expired.push({ ...task });
changed = true;
}
@@ -10154,7 +10448,8 @@ export async function claimNextMasterAgentTask(deviceId: string) {
if (!device || isDeviceRevoked(device)) {
return null;
}
if (device?.preferredExecutionMode === "gui" && isCliWriteTask(claimable)) {
const claimableViaGui = canRunWriteTaskThroughGui(device, claimable);
if (device?.preferredExecutionMode === "gui" && isCliWriteTask(claimable) && !claimableViaGui) {
return null;
}
@@ -10219,8 +10514,10 @@ export async function claimNextMasterAgentTask(deviceId: string) {
const previousClaimedAt = next.claimedAt;
const claimedAt = nowIso();
next.status = "running";
next.phase = "claimed";
next.lastClaimedAt = previousClaimedAt;
next.claimedAt = claimedAt;
next.lastProgressAt = claimedAt;
next.attemptCount = (next.attemptCount ?? 0) + 1;
next.maxAttempts = next.maxAttempts ?? defaultMasterAgentTaskMaxAttempts(next.taskType);
next.leaseExpiresAt = new Date(Date.now() + masterAgentTaskLeaseMs(next)).toISOString();
@@ -10229,6 +10526,9 @@ export async function claimNextMasterAgentTask(deviceId: string) {
next.canceledBy = undefined;
next.cancelReason = undefined;
next.lastErrorKind = undefined;
next.lastErrorCode = undefined;
next.recoverable = false;
next.nextRetryAt = undefined;
next.errorMessage = undefined;
if (next.taskType === "attachment_analysis" && next.attachmentId) {
const project = state.projects.find((item) => item.id === next.projectId);
@@ -10303,12 +10603,15 @@ export async function cancelMasterAgentTask(input: {
}
const now = nowIso();
task.status = "canceled";
task.phase = "canceled";
task.completedAt = now;
task.canceledAt = now;
task.canceledBy = input.actorAccount;
task.cancelReason = input.reason?.trim() || undefined;
task.leaseExpiresAt = undefined;
task.lastErrorKind = "user_canceled";
task.lastErrorCode = "TASK_CANCELED";
task.recoverable = false;
task.errorMessage = task.cancelReason ?? "Task canceled by user.";
if (task.dispatchExecutionId) {
const execution = state.dispatchExecutions.find(
@@ -10336,10 +10639,90 @@ export async function cancelMasterAgentTask(input: {
return result;
}
export function canRetryMasterAgentTaskSafely(task: MasterAgentTask) {
return (
task.recoverable === true &&
(
task.phase === "queued" ||
task.phase === "claimed" ||
task.phase === "executor_starting" ||
task.phase === "recoverable_failed"
) &&
task.status !== "completed" &&
task.status !== "canceled" &&
task.status !== "timed_out"
);
}
export async function retryRecoverableMasterAgentTask(input: {
taskId: string;
actorAccount: string;
reason?: string;
}) {
const result = await mutateState((state) => {
const task = state.masterAgentTasks.find((item) => item.taskId === input.taskId);
if (!task) {
throw new Error("MASTER_AGENT_TASK_NOT_FOUND");
}
if (!canRetryMasterAgentTaskSafely(task)) {
throw new Error("MASTER_AGENT_TASK_RETRY_UNSAFE");
}
const retriedAt = nowIso();
task.status = "queued";
task.phase = "queued";
task.claimedAt = undefined;
task.lastClaimedAt = undefined;
task.leaseExpiresAt = undefined;
task.lastProgressAt = retriedAt;
task.completedAt = undefined;
task.canceledAt = undefined;
task.canceledBy = undefined;
task.cancelReason = undefined;
task.lastErrorKind = undefined;
task.lastErrorCode = undefined;
task.errorMessage = undefined;
task.recoverable = false;
task.nextRetryAt = undefined;
upsertTaskExecutionProgressMessageInState(state, task, "queued", { phase: "queued" });
state.permissionAuditLogs.unshift(
normalizePermissionAuditLog({
auditId: randomToken("audit"),
actorAccount: input.actorAccount,
action: "master_agent.task_retried",
projectId: task.projectId,
deviceId: task.deviceId,
detail: input.reason?.trim() || `重试任务:${task.taskId}`,
requestId: task.taskId,
createdAt: retriedAt,
afterJson: {
taskId: task.taskId,
phase: task.phase,
status: task.status,
},
}),
);
state.permissionAuditLogs = state.permissionAuditLogs.slice(0, 500);
return { ...task };
});
publishBossEvent("master_agent.task.updated", {
taskId: result.taskId,
deviceId: result.deviceId,
status: result.status,
});
const progressProjectId = resolveTaskExecutionProgressProjectId(result);
if (progressProjectId) {
publishBossEvent("project.messages.updated", { projectId: progressProjectId });
publishBossEvent("conversation.updated", { projectId: progressProjectId });
}
return result;
}
export async function updateMasterAgentTaskProgress(payload: {
taskId: string;
deviceId: string;
status?: "queued" | "running";
phase?: MasterAgentTaskPhase;
executionProgress?: ExecutionProgressInput;
requestId?: string;
}) {
@@ -10363,8 +10746,30 @@ export async function updateMasterAgentTaskProgress(payload: {
task.maxAttempts = task.maxAttempts ?? defaultMasterAgentTaskMaxAttempts(task.taskType);
task.leaseExpiresAt = task.leaseExpiresAt ?? new Date(Date.now() + masterAgentTaskLeaseMs(task)).toISOString();
}
const phase = normalizeMasterAgentTaskPhase(
payload.phase ?? payload.executionProgress?.phase ?? task.phase,
task.status,
);
const progressedAt = nowIso();
task.phase = phase;
task.lastProgressAt = progressedAt;
if (task.status === "running") {
task.leaseExpiresAt = new Date(Date.now() + masterAgentTaskLeaseMs(task)).toISOString();
}
if (phase === "recoverable_failed" || phase === "terminal_failed") {
task.lastErrorCode = phaseErrorCode(phase);
task.recoverable = phase === "recoverable_failed";
}
task.requestId = payload.requestId?.trim() || task.requestId;
upsertTaskExecutionProgressMessageInState(state, task, progressStatus, payload.executionProgress);
upsertTaskExecutionProgressMessageInState(
state,
task,
phaseToExecutionProgressStatus(phase, progressStatus),
{
...payload.executionProgress,
phase,
},
);
return { ...task };
});
publishBossEvent("master_agent.task.updated", {
@@ -10462,7 +10867,9 @@ export async function completeMasterAgentTask(payload: {
createdAt: existing?.createdAt ?? createdAt,
});
task.status = "needs_user_action";
task.phase = "needs_user_action";
task.requestId = payload.requestId;
task.lastProgressAt = createdAt;
task.targetApp = payload.appName?.trim() || task.targetApp;
if (existing) {
Object.assign(existing, intervention);
@@ -10508,6 +10915,7 @@ export async function completeMasterAgentTask(payload: {
failedAccount.updatedAt = failedAt;
}
task.status = "queued";
task.phase = "queued";
task.deviceId = failover.deviceId;
task.accountId = failover.accountId;
task.accountLabel = failover.accountLabel;
@@ -10521,7 +10929,11 @@ export async function completeMasterAgentTask(payload: {
task.leaseExpiresAt = undefined;
task.claimedAt = undefined;
task.lastClaimedAt = undefined;
task.lastProgressAt = nowIso();
task.lastErrorKind = "model_channel_failover";
task.lastErrorCode = "MODEL_CHANNEL_FAILOVER";
task.recoverable = true;
task.nextRetryAt = undefined;
task.errorMessage = payload.errorMessage?.trim() || "MASTER_CODEX_NODE_EXEC_FAILED";
task.replyBody = undefined;
task.requestId = undefined;
@@ -10535,11 +10947,19 @@ export async function completeMasterAgentTask(payload: {
}
}
task.status = payload.status;
task.phase =
payload.status === "completed"
? "completed"
: "terminal_failed";
task.completedAt = nowIso();
task.leaseExpiresAt = undefined;
task.lastProgressAt = task.completedAt;
task.replyBody = payload.replyBody?.trim() || undefined;
task.errorMessage = payload.errorMessage?.trim() || undefined;
task.lastErrorKind = payload.status === "failed" ? "runtime_failed" : undefined;
task.lastErrorCode = payload.status === "failed" ? "RUNTIME_FAILED" : undefined;
task.recoverable = false;
task.nextRetryAt = undefined;
task.requestId = payload.requestId;
task.targetUrl = payload.targetUrl?.trim() || undefined;
task.targetApp = payload.targetApp?.trim() || undefined;
@@ -10797,7 +11217,12 @@ export async function completeMasterAgentTask(payload: {
kind: replyKind,
})
: null;
if (!convertedMirroredReply) {
const duplicateRecentReply = hasRecentProjectReplyDuplicate(threadProject, {
body: task.replyBody,
senderLabel: replySenderLabel,
at: task.completedAt,
});
if (!convertedMirroredReply && !duplicateRecentReply) {
pushProjectLedgerMessage(state, threadProject?.id ?? task.projectId, {
sender: replySender,
senderLabel: replySenderLabel,

View File

@@ -273,6 +273,27 @@ test.beforeEach(async () => {
createdAt: now,
},
];
state.masterAgentTasks = [
{
taskId: "task-stale",
projectId: "project-acme",
taskType: "conversation_reply",
requestMessageId: "message-stale-request",
requestText: "请继续处理 Acme 生产项目的等待回复。",
executionPrompt: "继续 Acme 生产项目的 conversation_reply并回写安全摘要。",
requestedBy: "开发同事",
requestedByAccount: "dev@acme.com",
deviceId: "win-1",
status: "running",
phase: "awaiting_reply",
requestedAt: "2026-04-30T08:00:00+08:00",
claimedAt: "2026-04-30T08:01:00+08:00",
lastProgressAt: "2026-04-30T08:01:00+08:00",
leaseExpiresAt: "2026-04-30T08:02:00+08:00",
attemptCount: 1,
maxAttempts: 2,
},
];
await data.writeState(state);
});
@@ -370,6 +391,13 @@ test("backoffice bff exposes yudao style management contract without secrets", a
["Boss API", "OTA", "Codex Provider", "Computer Use", "Skill Hub"],
);
assert.equal(payload.insights.riskAggregates.some((item: { label: string }) => item.label === "设备离线"), true);
assert.equal(payload.insights.dataSafetySummary.restorePointCount >= 0, true);
assert.match(payload.insights.dataSafetySummary.rpoLabel, /文件 MVP|企业标准/);
assert.equal(Array.isArray(payload.insights.taskRiskSummary.rows), true);
assert.equal(typeof payload.insights.taskRiskSummary.counts.stale, "number");
const staleTask = payload.insights.taskRiskSummary.rows.find((row: { taskId: string }) => row.taskId === "task-stale");
assert.equal(staleTask?.stale, true);
assert.equal(staleTask?.phase, "awaiting_reply");
assert.equal(payload.yudaoMapping.tenant, "adminCompanies");
assert.equal(payload.yudaoMapping.user, "authAccounts");
assert.equal(payload.yudaoMapping.role, "BOSS_PERMISSION_TEMPLATES");

View File

@@ -0,0 +1,122 @@
import test from "node:test";
import assert from "node:assert/strict";
import os from "node:os";
import path from "node:path";
import { mkdtemp, rm } from "node:fs/promises";
import { NextRequest } from "next/server";
import type { MasterAgentTask } from "../src/lib/boss-data";
let runtimeRoot = "";
let data: typeof import("../src/lib/boss-data.ts");
let authCookie = "";
let getRecovery: (typeof import("../src/app/api/v1/master-agent/tasks/[taskId]/recovery/route.ts"))["GET"];
let postRecovery: (typeof import("../src/app/api/v1/master-agent/tasks/[taskId]/recovery/route.ts"))["POST"];
let baseState: Awaited<ReturnType<typeof import("../src/lib/boss-data.ts")["readState"]>>;
async function setup() {
if (runtimeRoot) return;
runtimeRoot = await mkdtemp(path.join(os.tmpdir(), "boss-task-recovery-"));
process.env.BOSS_RUNTIME_ROOT = runtimeRoot;
process.env.BOSS_STATE_FILE = path.join(runtimeRoot, "boss-state.json");
const [dataModule, authModule, routeModule] = await Promise.all([
import("../src/lib/boss-data.ts"),
import("../src/lib/boss-auth.ts"),
import("../src/app/api/v1/master-agent/tasks/[taskId]/recovery/route.ts"),
]);
data = dataModule;
authCookie = authModule.AUTH_SESSION_COOKIE;
getRecovery = routeModule.GET;
postRecovery = routeModule.POST;
baseState = structuredClone(await data.readState());
}
test.after(async () => {
if (runtimeRoot) await rm(runtimeRoot, { recursive: true, force: true });
});
function task(overrides: Partial<MasterAgentTask>): MasterAgentTask {
return {
taskId: "task-recoverable",
projectId: "project-1",
taskType: "conversation_reply",
requestMessageId: "msg-1",
requestText: "继续执行",
executionPrompt: "继续执行",
requestedBy: "Boss",
requestedByAccount: "owner@boss.com",
deviceId: "mac-1",
status: "running",
phase: "executor_starting",
requestedAt: "2026-06-06T08:00:00.000Z",
lastProgressAt: "2026-06-06T08:01:00.000Z",
attemptCount: 1,
maxAttempts: 2,
recoverable: true,
lastErrorCode: "EXECUTOR_START_FAILED",
...overrides,
};
}
async function authedRequest(method = "GET", body?: unknown) {
const session = await data.createAuthSession({
account: "owner@boss.com",
role: "highest_admin",
displayName: "Owner",
loginMethod: "password",
});
return new NextRequest("http://127.0.0.1:3000/api/v1/master-agent/tasks/task-recoverable/recovery", {
method,
headers: {
"content-type": "application/json",
cookie: `${authCookie}=${session.sessionToken}`,
},
body: body ? JSON.stringify(body) : undefined,
});
}
test.beforeEach(async () => {
await setup();
const state = structuredClone(baseState);
state.authAccounts = [
{
id: "account-owner",
account: "owner@boss.com",
passwordHash: "secret",
displayName: "Owner",
role: "highest_admin",
createdAt: "2026-06-06T08:00:00.000Z",
updatedAt: "2026-06-06T08:00:00.000Z",
},
];
state.masterAgentTasks = [task({})];
await data.writeState(state);
});
test("task recovery GET returns safe diagnosis", async () => {
const response = await getRecovery(
await authedRequest(),
{ params: Promise.resolve({ taskId: "task-recoverable" }) },
);
assert.equal(response.status, 200);
const payload = await response.json();
assert.equal(payload.ok, true);
assert.equal(payload.recovery.taskId, "task-recoverable");
assert.equal(payload.recovery.canRetry, true);
assert.equal(payload.recovery.safeNextAction, "retry");
assert.equal(payload.recovery.diagnosis.includes("executor_starting"), true);
});
test("task recovery POST retry requeues only recoverable pre-turn task", async () => {
const response = await postRecovery(
await authedRequest("POST", { action: "retry", reason: "executor recovered" }),
{ params: Promise.resolve({ taskId: "task-recoverable" }) },
);
assert.equal(response.status, 200);
const payload = await response.json();
assert.equal(payload.ok, true);
assert.equal(payload.task.status, "queued");
assert.equal(payload.task.phase, "queued");
const state = await data.readState();
assert.equal(state.permissionAuditLogs.some((log) => log.action === "master_agent.task_retried"), true);
});

View File

@@ -3,6 +3,7 @@ import assert from "node:assert/strict";
import os from "node:os";
import path from "node:path";
import { mkdtemp, rm } from "node:fs/promises";
import type { Device, MasterAgentTask } from "../src/lib/boss-data";
let runtimeRoot = "";
let data: typeof import("../src/lib/boss-data");
@@ -19,164 +20,181 @@ test.after(async () => {
if (runtimeRoot) await rm(runtimeRoot, { recursive: true, force: true });
});
test.beforeEach(async () => {
await setup();
await rm(runtimeRoot, { recursive: true, force: true });
});
async function queueDesktopTask(taskId: string) {
return data.queueMasterAgentTask({
function makeQueuedTask(taskId: string, overrides: Partial<MasterAgentTask> = {}): MasterAgentTask {
return {
taskId,
projectId: "master-agent",
taskType: "desktop_control",
requestMessageId: `${taskId}-message`,
requestText: "打开 Chrome",
executionPrompt: "打开 Chrome",
requestedBy: "krisolo",
taskType: "conversation_reply",
requestMessageId: `${taskId}-request`,
requestText: "回复一句收到",
executionPrompt: "回复一句收到",
requestedBy: "Boss 测试",
requestedByAccount: "krisolo",
deviceId: "mac-studio",
runtimeKind: "computer-use-runtime",
controlPlatform: "macos",
});
status: "queued",
requestedAt: new Date().toISOString(),
...overrides,
};
}
test("claiming a desktop control task records attempt count and a server lease", async () => {
await queueDesktopTask("lease-task");
const claimed = await data.claimNextMasterAgentTask("mac-studio");
assert.equal(claimed?.taskId, "lease-task");
assert.equal(claimed?.status, "running");
assert.equal(claimed?.attemptCount, 1);
assert.ok(claimed?.leaseExpiresAt);
assert.ok(Date.parse(claimed.leaseExpiresAt) > Date.parse(claimed.claimedAt ?? ""));
});
test("an expired running desktop control task can be reclaimed with a new attempt", async () => {
await queueDesktopTask("reclaim-task");
const firstClaim = await data.claimNextMasterAgentTask("mac-studio");
assert.equal(firstClaim?.attemptCount, 1);
test("master agent task claim, progress, and complete maintain reliability phase", async () => {
await setup();
const state = await data.readState();
const task = state.masterAgentTasks.find((item) => item.taskId === "reclaim-task");
assert.ok(task);
task.claimedAt = "2000-01-01T00:00:00.000Z";
task.leaseExpiresAt = "2000-01-01T00:01:00.000Z";
await data.writeState(state);
const reclaimed = await data.claimNextMasterAgentTask("mac-studio");
assert.equal(reclaimed?.taskId, "reclaim-task");
assert.equal(reclaimed?.status, "running");
assert.equal(reclaimed?.attemptCount, 2);
assert.notEqual(reclaimed?.claimedAt, firstClaim?.claimedAt);
});
test("an expired running task is timed out after max attempts instead of being claimed forever", async () => {
await queueDesktopTask("timeout-task");
await data.claimNextMasterAgentTask("mac-studio");
const state = await data.readState();
const task = state.masterAgentTasks.find((item) => item.taskId === "timeout-task");
assert.ok(task);
task.status = "running";
task.attemptCount = 3;
task.maxAttempts = 3;
task.leaseExpiresAt = "2000-01-01T00:01:00.000Z";
state.masterAgentTasks.unshift(makeQueuedTask("task-phase-normal"));
await data.writeState(state);
const claimed = await data.claimNextMasterAgentTask("mac-studio");
assert.equal(claimed, null);
const nextTask = (await data.readState()).masterAgentTasks.find((item) => item.taskId === "timeout-task");
assert.equal(nextTask?.status, "timed_out");
assert.match(nextTask?.errorMessage ?? "", /timed out/i);
});
test("canceling a running task prevents late success completion from overwriting the terminal state", async () => {
await queueDesktopTask("cancel-task");
const claimed = await data.claimNextMasterAgentTask("mac-studio");
assert.equal(claimed?.taskId, "task-phase-normal");
assert.equal(claimed?.status, "running");
assert.equal(claimed?.phase, "claimed");
assert.ok(claimed?.lastProgressAt);
const canceled = await data.cancelMasterAgentTask({
taskId: "cancel-task",
actorAccount: "krisolo",
reason: "用户取消演示任务",
});
assert.equal(canceled.status, "canceled");
assert.ok(canceled.canceledAt);
const late = await data.completeMasterAgentTask({
taskId: "cancel-task",
deviceId: "mac-studio",
status: "completed",
replyBody: "迟到的成功结果",
});
assert.equal(late.status, "canceled");
assert.equal(late.replyBody, undefined);
const finalTask = (await data.readState()).masterAgentTasks.find((item) => item.taskId === "cancel-task");
assert.equal(finalTask?.status, "canceled");
assert.equal(finalTask?.replyBody, undefined);
});
test("streaming task progress updates mutate the progress card without completing the task", async () => {
await queueDesktopTask("live-progress-task");
const claimed = await data.claimNextMasterAgentTask("mac-studio");
assert.equal(claimed?.status, "running");
const updated = await data.updateMasterAgentTaskProgress({
taskId: "live-progress-task",
const progressed = await data.updateMasterAgentTaskProgress({
taskId: "task-phase-normal",
deviceId: "mac-studio",
status: "running",
executionProgress: {
steps: [
{ text: "读取 app-server 事件流", status: "done" },
{ text: "等待目标线程回复", status: "running" },
],
artifacts: [{ label: "codex_app_server_protocol.schemas.json", kind: "file" }],
},
phase: "awaiting_reply",
});
assert.equal(progressed.status, "running");
assert.equal(progressed.phase, "awaiting_reply");
assert.ok(progressed.leaseExpiresAt);
assert.equal(updated.status, "running");
assert.equal(updated.completedAt, undefined);
const state = await data.readState();
const progressMessage = state.projects
.find((project) => project.id === "master-agent")
?.messages.find((message) => message.executionProgress?.taskId === "live-progress-task");
assert.equal(progressMessage?.executionProgress?.status, "running");
assert.equal(progressMessage?.executionProgress?.steps[0]?.text, "读取 app-server 事件流");
assert.equal(progressMessage?.executionProgress?.steps[1]?.status, "running");
assert.equal(progressMessage?.executionProgress?.artifacts?.[0]?.label, "codex_app_server_protocol.schemas.json");
const completed = await data.completeMasterAgentTask({
taskId: "task-phase-normal",
deviceId: "mac-studio",
status: "completed",
replyBody: "收到。",
});
assert.equal(completed.status, "completed");
assert.equal(completed.phase, "completed");
assert.equal(completed.recoverable, false);
});
test("queued thread collaboration tasks retain source and target thread references", async () => {
const task = await data.queueMasterAgentTask({
taskId: "thread-collaboration-task",
projectId: "master-agent",
taskType: "conversation_reply",
requestMessageId: "msg-thread-collaboration",
requestText: "让源线程和目标线程对一下方案",
executionPrompt: "让源线程和目标线程对一下方案",
requestedBy: "krisolo",
requestedByAccount: "krisolo",
deviceId: "mac-studio",
intentCategory: "thread_collaboration",
sourceThreadId: "source-thread-id",
sourceThreadDisplayName: "源线程",
sourceCodexThreadRef: "019d-source-codex",
targetThreadId: "target-thread-id",
targetThreadDisplayName: "目标线程",
targetCodexThreadRef: "019d-target-codex",
});
assert.equal(task.intentCategory, "thread_collaboration");
assert.equal(task.sourceThreadId, "source-thread-id");
assert.equal(task.sourceThreadDisplayName, "源线程");
assert.equal(task.sourceCodexThreadRef, "019d-source-codex");
test("expired pre-turn task is safely requeued and claimed again", async () => {
await setup();
const state = await data.readState();
state.masterAgentTasks.unshift(
makeQueuedTask("task-phase-retry", {
status: "running",
phase: "executor_starting",
claimedAt: "2020-01-01T08:00:00.000Z",
lastProgressAt: "2020-01-01T08:00:00.000Z",
leaseExpiresAt: "2020-01-01T08:01:00.000Z",
attemptCount: 1,
maxAttempts: 2,
}),
);
await data.writeState(state);
const claimed = await data.claimNextMasterAgentTask("mac-studio");
assert.equal(claimed?.sourceCodexThreadRef, "019d-source-codex");
assert.equal(claimed?.targetCodexThreadRef, "019d-target-codex");
assert.equal(claimed?.taskId, "task-phase-retry");
assert.equal(claimed?.status, "running");
assert.equal(claimed?.phase, "claimed");
assert.equal(claimed?.attemptCount, 2);
assert.equal(claimed?.recoverable, false);
});
test("expired task after turn start is timed out instead of duplicated", async () => {
await setup();
const state = await data.readState();
state.masterAgentTasks.unshift(
makeQueuedTask("task-phase-no-duplicate", {
status: "running",
phase: "turn_started",
claimedAt: "2020-01-01T08:00:00.000Z",
lastProgressAt: "2020-01-01T08:00:00.000Z",
leaseExpiresAt: "2020-01-01T08:01:00.000Z",
attemptCount: 1,
maxAttempts: 2,
}),
);
await data.writeState(state);
const claimed = await data.claimNextMasterAgentTask("mac-studio");
assert.notEqual(claimed?.taskId, "task-phase-no-duplicate");
const nextState = await data.readState();
const task = nextState.masterAgentTasks.find((item) => item.taskId === "task-phase-no-duplicate");
assert.equal(task?.status, "timed_out");
assert.equal(task?.phase, "timed_out");
assert.equal(task?.recoverable, false);
});
test("codex app server health distinguishes available, degraded, and unavailable", async () => {
await setup();
assert.equal(data.resolveCodexAppServerHealth(undefined), "unavailable");
assert.equal(
data.resolveCodexAppServerHealth({
id: "device-offline",
name: "离线设备",
avatar: "D",
account: "krisolo",
source: "production",
status: "offline",
projects: [],
quota5h: 0,
quota7d: 0,
lastSeenAt: "2026-06-06T08:00:00.000Z",
endpoint: "",
token: "",
note: "",
capabilities: {
codexAppServer: {
connected: false,
lastSeenAt: "2026-06-06T08:00:00.000Z",
},
},
} satisfies Device),
"unavailable",
);
assert.equal(
data.resolveCodexAppServerHealth({
id: "device-degraded",
name: "降级设备",
avatar: "D",
account: "krisolo",
source: "production",
status: "online",
projects: [],
quota5h: 0,
quota7d: 0,
lastSeenAt: new Date().toISOString(),
endpoint: "",
token: "",
note: "",
capabilities: {
codexAppServer: {
connected: true,
lastSeenAt: new Date().toISOString(),
metadata: { errors: ["thread/turns/list:STDIN_CLOSED"] },
},
},
} satisfies Device),
"degraded",
);
assert.equal(
data.resolveCodexAppServerHealth({
id: "device-available",
name: "可用设备",
avatar: "D",
account: "krisolo",
source: "production",
status: "online",
projects: [],
quota5h: 0,
quota7d: 0,
lastSeenAt: new Date().toISOString(),
endpoint: "",
token: "",
note: "",
capabilities: {
codexAppServer: {
connected: true,
lastSeenAt: new Date().toISOString(),
metadata: {},
},
},
} satisfies Device),
"available",
);
});