feat: sync thread status events

This commit is contained in:
kris
2026-04-04 11:06:00 +08:00
parent 8e12fa1e8c
commit f69eebd82d
3 changed files with 585 additions and 61 deletions

View File

@@ -510,6 +510,52 @@ export interface ProjectUnderstandingSnapshot {
sourceKind: "device_import" | "thread_sync";
}
export type ThreadStatusSourceKind = "device_import" | "full_sync" | "incremental_sync";
export type ThreadProgressEventType =
| "phase_changed"
| "progress_updated"
| "blocker_added"
| "blocker_resolved"
| "next_step_changed"
| "architecture_updated"
| "handoff_ready";
export interface ThreadStatusDocument {
documentId: string;
projectId: string;
threadId: string;
threadDisplayName: string;
folderName: string;
deviceId: string;
projectGoal: string;
currentPhase: string;
currentProgress: string;
technicalArchitecture: string;
currentBlockers: string;
recommendedNextStep: string;
keyFiles: string[];
keyCommands: string[];
updatedAt: string;
sourceTaskId: string;
sourceKind: ThreadStatusSourceKind;
}
export interface ThreadProgressEvent {
eventId: string;
projectId: string;
threadId: string;
threadDisplayName: string;
deviceId: string;
eventType: ThreadProgressEventType;
summary: string;
phase?: string;
blockerDelta?: string;
nextStepDelta?: string;
createdAt: string;
sourceTaskId: string;
sourceMessageId?: string;
}
export interface VerificationCode {
id: string;
account: string;
@@ -931,6 +977,8 @@ export interface BossState {
dispatchExecutions: DispatchExecution[];
deviceImportDrafts: DeviceImportDraft[];
deviceImportResolutions: DeviceImportResolution[];
threadStatusDocuments: ThreadStatusDocument[];
threadProgressEvents: ThreadProgressEvent[];
otaUpdates: OtaUpdate[];
otaUpdateLogs: OtaUpdateLog[];
deviceSkills: DeviceSkill[];
@@ -1532,6 +1580,8 @@ const initialState: BossState = {
evidenceModes: ["serial_log"],
},
],
threadStatusDocuments: [],
threadProgressEvents: [],
};
const levelPriority: Record<ContextBudgetLevel, number> = {
@@ -2683,6 +2733,155 @@ function normalizeProjectUnderstanding(
};
}
function normalizeThreadStatusSourceKind(value?: ThreadStatusSourceKind): ThreadStatusSourceKind {
return value === "device_import" || value === "full_sync" || value === "incremental_sync"
? value
: "incremental_sync";
}
function normalizeThreadProgressEventType(value?: ThreadProgressEventType): ThreadProgressEventType {
return value === "phase_changed" ||
value === "progress_updated" ||
value === "blocker_added" ||
value === "blocker_resolved" ||
value === "next_step_changed" ||
value === "architecture_updated" ||
value === "handoff_ready"
? value
: "progress_updated";
}
function compareThreadStatusDocuments(a: ThreadStatusDocument, b: ThreadStatusDocument) {
const updatedDelta = messageTimeValue(b.updatedAt) - messageTimeValue(a.updatedAt);
if (updatedDelta !== 0) return updatedDelta;
return b.documentId.localeCompare(a.documentId);
}
function compareThreadProgressEvents(a: ThreadProgressEvent, b: ThreadProgressEvent) {
const createdDelta = messageTimeValue(b.createdAt) - messageTimeValue(a.createdAt);
if (createdDelta !== 0) return createdDelta;
return b.eventId.localeCompare(a.eventId);
}
function threadProgressEventKey(event: ThreadProgressEvent) {
return `${event.projectId}:${event.threadId}`;
}
function normalizeThreadStatusDocument(
raw: Partial<ThreadStatusDocument>,
fallback?: ThreadStatusDocument,
): ThreadStatusDocument {
const keyFiles = dedupeStrings(
ensureArray(raw.keyFiles as string[] | undefined, fallback?.keyFiles ?? []).map((item) =>
item.trim(),
),
);
const keyCommands = dedupeStrings(
ensureArray(raw.keyCommands as string[] | undefined, fallback?.keyCommands ?? []).map((item) =>
item.trim(),
),
);
return {
documentId: raw.documentId ?? fallback?.documentId ?? randomToken("thread-status"),
projectId: trimToDefined(raw.projectId ?? fallback?.projectId) ?? "",
threadId: trimToDefined(raw.threadId ?? fallback?.threadId) ?? "",
threadDisplayName: trimToDefined(raw.threadDisplayName ?? fallback?.threadDisplayName) ?? "",
folderName: trimToDefined(raw.folderName ?? fallback?.folderName) ?? "",
deviceId: trimToDefined(raw.deviceId ?? fallback?.deviceId) ?? "",
projectGoal: raw.projectGoal?.trim() ?? fallback?.projectGoal ?? "",
currentPhase: raw.currentPhase?.trim() ?? fallback?.currentPhase ?? "待整理",
currentProgress: raw.currentProgress?.trim() ?? fallback?.currentProgress ?? "",
technicalArchitecture: raw.technicalArchitecture?.trim() ?? fallback?.technicalArchitecture ?? "",
currentBlockers: raw.currentBlockers?.trim() ?? fallback?.currentBlockers ?? "",
recommendedNextStep: raw.recommendedNextStep?.trim() ?? fallback?.recommendedNextStep ?? "",
keyFiles,
keyCommands,
updatedAt: raw.updatedAt ?? fallback?.updatedAt ?? nowIso(),
sourceTaskId: trimToDefined(raw.sourceTaskId ?? fallback?.sourceTaskId) ?? randomToken("thread-status"),
sourceKind: normalizeThreadStatusSourceKind(raw.sourceKind ?? fallback?.sourceKind),
};
}
function normalizeThreadProgressEvent(
raw: Partial<ThreadProgressEvent>,
fallback?: ThreadProgressEvent,
): ThreadProgressEvent {
return {
eventId: raw.eventId ?? fallback?.eventId ?? randomToken("thread-event"),
projectId: trimToDefined(raw.projectId ?? fallback?.projectId) ?? "",
threadId: trimToDefined(raw.threadId ?? fallback?.threadId) ?? "",
threadDisplayName: trimToDefined(raw.threadDisplayName ?? fallback?.threadDisplayName) ?? "",
deviceId: trimToDefined(raw.deviceId ?? fallback?.deviceId) ?? "",
eventType: normalizeThreadProgressEventType(raw.eventType ?? fallback?.eventType),
summary: raw.summary?.trim() ?? fallback?.summary ?? "线程状态更新",
phase: trimToDefined(raw.phase ?? fallback?.phase),
blockerDelta: trimToDefined(raw.blockerDelta ?? fallback?.blockerDelta),
nextStepDelta: trimToDefined(raw.nextStepDelta ?? fallback?.nextStepDelta),
createdAt: raw.createdAt ?? fallback?.createdAt ?? nowIso(),
sourceTaskId: trimToDefined(raw.sourceTaskId ?? fallback?.sourceTaskId) ?? randomToken("thread-event"),
sourceMessageId: trimToDefined(raw.sourceMessageId ?? fallback?.sourceMessageId),
};
}
function buildHeartbeatProgressSummary(threadDisplayName: string) {
return `检测到线程有新活动:${threadDisplayName}`;
}
function summarizeThreadReplyBody(body: string) {
const normalized = body
.replace(/\s+/g, " ")
.trim();
if (!normalized) {
return "线程状态更新";
}
return normalized.length > 120 ? `${normalized.slice(0, 117)}...` : normalized;
}
function upsertThreadStatusDocumentInState(
state: BossState,
input: {
projectId: string;
threadId: string;
threadDisplayName: string;
folderName: string;
deviceId: string;
projectGoal: string;
currentPhase: string;
currentProgress: string;
technicalArchitecture: string;
currentBlockers: string;
recommendedNextStep: string;
keyFiles: string[];
keyCommands: string[];
updatedAt: string;
sourceTaskId: string;
sourceKind: ThreadStatusSourceKind;
},
) {
const existing = state.threadStatusDocuments.find(
(item) => item.projectId === input.projectId && item.threadId === input.threadId,
);
const document = normalizeThreadStatusDocument(input, existing);
if (existing) {
Object.assign(existing, document);
return existing;
}
state.threadStatusDocuments.unshift(document);
return document;
}
function appendThreadProgressEventInState(
state: BossState,
input: Omit<ThreadProgressEvent, "eventId">,
) {
const event = normalizeThreadProgressEvent({
eventId: randomToken("thread-event"),
...input,
});
state.threadProgressEvents.unshift(event);
return event;
}
function normalizeState(raw: Partial<BossState> | undefined): BossState {
const base = cloneInitialState();
if (!raw) return syncDerivedState(base);
@@ -2850,6 +3049,24 @@ function normalizeState(raw: Partial<BossState> | undefined): BossState {
base.deviceImportResolutions[index % Math.max(1, base.deviceImportResolutions.length)],
),
),
threadStatusDocuments: ensureArray(
raw.threadStatusDocuments as Partial<ThreadStatusDocument>[] | undefined,
base.threadStatusDocuments,
).map((document, index) =>
normalizeThreadStatusDocument(
document,
base.threadStatusDocuments[index % Math.max(1, base.threadStatusDocuments.length)],
),
),
threadProgressEvents: ensureArray(
raw.threadProgressEvents as Partial<ThreadProgressEvent>[] | undefined,
base.threadProgressEvents,
).map((event, index) =>
normalizeThreadProgressEvent(
event,
base.threadProgressEvents[index % Math.max(1, base.threadProgressEvents.length)],
),
),
otaUpdates: ensureArray(raw.otaUpdates, base.otaUpdates).map((update, index) => ({
...base.otaUpdates[index % base.otaUpdates.length],
...update,
@@ -3001,6 +3218,12 @@ function removeLegacyBossConsoleArtifacts(state: BossState) {
state.threadContextAlerts = state.threadContextAlerts.filter(
(item) => !isLegacyBossConsoleRef(item.projectId),
);
state.threadStatusDocuments = state.threadStatusDocuments.filter(
(item) => !isLegacyBossConsoleRef(item.projectId),
);
state.threadProgressEvents = state.threadProgressEvents.filter(
(item) => !isLegacyBossConsoleRef(item.projectId),
);
state.opsFaults = state.opsFaults.filter((item) => !isLegacyBossConsoleRef(item.projectId));
state.masterAgentTasks = state.masterAgentTasks.filter(
(task) =>
@@ -3448,6 +3671,37 @@ function syncDerivedState(input: BossState) {
state.deviceImportResolutions = state.deviceImportResolutions.filter(
(item) => visibleDeviceIds.has(item.deviceId) && visibleImportDraftIds.has(item.draftId),
);
const visibleProjectIds = new Set(state.projects.map((project) => project.id));
const threadStatusDocumentByThread = new Map<string, ThreadStatusDocument>();
const normalizedThreadStatusDocuments = state.threadStatusDocuments.map((document) =>
normalizeThreadStatusDocument(document),
);
for (const document of normalizedThreadStatusDocuments
.filter((item) => visibleProjectIds.has(item.projectId) && visibleDeviceIds.has(item.deviceId))
.sort(compareThreadStatusDocuments)) {
const key = `${document.projectId}:${document.threadId}`;
if (!threadStatusDocumentByThread.has(key)) {
threadStatusDocumentByThread.set(key, document);
}
}
state.threadStatusDocuments = [...threadStatusDocumentByThread.values()].slice(0, 80);
const progressEventCounts = new Map<string, number>();
const normalizedThreadProgressEvents = state.threadProgressEvents.map((event) =>
normalizeThreadProgressEvent(event),
);
state.threadProgressEvents = normalizedThreadProgressEvents
.filter((item) => visibleProjectIds.has(item.projectId) && visibleDeviceIds.has(item.deviceId))
.sort(compareThreadProgressEvents)
.filter((item) => {
const key = threadProgressEventKey(item);
const nextCount = (progressEventCounts.get(key) ?? 0) + 1;
if (nextCount > 20) {
return false;
}
progressEventCounts.set(key, nextCount);
return true;
})
.slice(0, 400);
state.deviceSkills = state.deviceSkills
.filter((skill) => visibleDeviceIds.has(skill.deviceId))
.sort((a, b) => b.updatedAt.localeCompare(a.updatedAt));
@@ -6883,6 +7137,16 @@ export async function upsertDeviceHeartbeat(payload: {
matchingProject.threadMeta.lastObservedCodexActivityAt,
candidate.lastActiveAt,
) ?? candidate.lastActiveAt;
appendThreadProgressEventInState(state, {
projectId: matchingProject.id,
threadId: matchingProject.threadMeta.threadId,
threadDisplayName: matchingProject.threadMeta.threadDisplayName,
deviceId: matchingProject.deviceIds[0] ?? payload.deviceId,
eventType: "progress_updated",
summary: buildHeartbeatProgressSummary(candidate.threadDisplayName),
createdAt: candidate.lastActiveAt,
sourceTaskId: `heartbeat-${candidate.candidateId}`,
});
if (shouldQueueProjectUnderstandingSync(matchingProject, candidate.lastActiveAt, state)) {
projectUnderstandingSyncRequests.push({
projectId: matchingProject.id,
@@ -7193,6 +7457,24 @@ function applyProjectUnderstandingSnapshotInState(
sourceKind: input.sourceKind,
};
project.projectUnderstanding = snapshot;
upsertThreadStatusDocumentInState(state, {
projectId: project.id,
threadId: project.threadMeta.threadId,
threadDisplayName: project.threadMeta.threadDisplayName,
folderName: project.threadMeta.folderName,
deviceId: project.deviceIds[0] ?? state.user.boundDeviceId ?? PRIMARY_CODEX_NODE_ID,
projectGoal: snapshot.projectGoal,
currentPhase: input.sourceKind === "device_import" ? "导入理解" : "全量理解",
currentProgress: snapshot.currentProgress,
technicalArchitecture: snapshot.technicalArchitecture,
currentBlockers: snapshot.currentBlockers,
recommendedNextStep: snapshot.recommendedNextStep,
keyFiles: [],
keyCommands: [],
updatedAt: snapshot.updatedAt,
sourceTaskId: snapshot.sourceTaskId,
sourceKind: input.sourceKind === "device_import" ? "device_import" : "full_sync",
});
project.threadMeta.lastProjectUnderstandingSyncedAt = snapshot.updatedAt;
project.threadMeta.lastObservedCodexActivityAt =
latestIsoTimestamp(project.threadMeta.lastObservedCodexActivityAt, snapshot.updatedAt) ?? snapshot.updatedAt;
@@ -7322,6 +7604,12 @@ function shouldQueueProjectUnderstandingSync(project: Project, observedActivityA
if (Number.isFinite(latestWatermark) && observedTs <= latestWatermark) {
return false;
}
const hasThreadStatusDocument = state.threadStatusDocuments.some(
(item) => item.projectId === project.id && item.threadId === project.threadMeta.threadId,
);
if (project.projectUnderstanding && hasThreadStatusDocument) {
return false;
}
return !state.masterAgentTasks.some(
(task) =>
task.taskType === "conversation_reply" &&
@@ -8282,12 +8570,33 @@ export async function appendProjectMessage(payload: {
project.lastMessageAt = message.sentAt;
project.preview = message.body;
const shouldTrackThreadProgress =
payload.sender !== "user" &&
isDispatchableThreadProject(project) &&
Boolean(project.threadMeta.codexThreadRef?.trim());
if (shouldTrackThreadProgress) {
project.threadMeta.lastObservedCodexActivityAt = latestIsoTimestamp(
project.threadMeta.lastObservedCodexActivityAt,
message.sentAt,
) ?? message.sentAt;
appendThreadProgressEventInState(state, {
projectId: project.id,
threadId: project.threadMeta.threadId,
threadDisplayName: project.threadMeta.threadDisplayName,
deviceId: project.deviceIds[0] ?? project.id,
eventType: "progress_updated",
summary: summarizeThreadReplyBody(message.body),
phase: project.projectUnderstanding ? "增量同步" : "线程回复",
createdAt: message.sentAt,
sourceTaskId: message.id,
sourceMessageId: message.id,
});
}
return {
message,
shouldQueueUnderstandingSync:
payload.sender !== "user" &&
isDispatchableThreadProject(project) &&
Boolean(project.threadMeta.codexThreadRef?.trim()),
shouldTrackThreadProgress && shouldQueueProjectUnderstandingSync(project, message.sentAt, state),
};
});
if (result.shouldQueueUnderstandingSync) {