feat: narrow thread sync context and dedupe realtime refresh

This commit is contained in:
kris
2026-04-05 03:23:11 +08:00
parent da78e82a90
commit 5a53b60f13
9 changed files with 678 additions and 83 deletions

View File

@@ -7161,7 +7161,7 @@ export async function upsertDeviceHeartbeat(payload: {
sourceTaskId: `heartbeat-${candidate.candidateId}`,
});
}
if (shouldQueueProjectUnderstandingSync(matchingProject, candidate.lastActiveAt, state)) {
if (shouldQueueProjectUnderstandingSync(matchingProject, candidate.lastActiveAt, state, "heartbeat_activity")) {
projectUnderstandingSyncRequests.push({
projectId: matchingProject.id,
observedActivityAt: candidate.lastActiveAt,
@@ -7601,7 +7601,12 @@ function buildProjectUnderstandingTakeoverNotice(projectName: string, snapshot:
.join("\n");
}
function shouldQueueProjectUnderstandingSync(project: Project, observedActivityAt: string, state: BossState) {
function shouldQueueProjectUnderstandingSync(
project: Project,
observedActivityAt: string,
state: BossState,
reason: "heartbeat_activity" | "thread_reply" = "heartbeat_activity",
) {
if (!isDispatchableThreadProject(project)) {
return false;
}
@@ -7621,6 +7626,9 @@ function shouldQueueProjectUnderstandingSync(project: Project, observedActivityA
const hasThreadStatusDocument = state.threadStatusDocuments.some(
(item) => item.projectId === project.id && item.threadId === project.threadMeta.threadId,
);
if (reason === "thread_reply" && hasThreadStatusDocument) {
return false;
}
if (project.projectUnderstanding && hasThreadStatusDocument) {
const lastSyncedTs = Date.parse(
project.threadMeta.lastProjectUnderstandingSyncedAt ??
@@ -7673,7 +7681,7 @@ async function queueProjectUnderstandingSyncTask(input: {
}) {
const state = await readState();
const project = state.projects.find((item) => item.id === input.projectId);
if (!project || !shouldQueueProjectUnderstandingSync(project, input.observedActivityAt, state)) {
if (!project || !shouldQueueProjectUnderstandingSync(project, input.observedActivityAt, state, input.reason)) {
return null;
}
const requestedByAccount = state.user.account || project.deviceIds[0] || "17600003315";
@@ -8625,7 +8633,8 @@ export async function appendProjectMessage(payload: {
return {
message,
shouldQueueUnderstandingSync:
shouldTrackThreadProgress && shouldQueueProjectUnderstandingSync(project, message.sentAt, state),
shouldTrackThreadProgress &&
shouldQueueProjectUnderstandingSync(project, message.sentAt, state, "thread_reply"),
};
});
if (result.shouldQueueUnderstandingSync) {

View File

@@ -261,35 +261,10 @@ function buildRuntimeDigest(
.filter((update) => update.status === "available")
.map((update) => `${update.version} -> ${update.targetScope}`)
.join("\n");
const threadStatusDocuments = [...state.threadStatusDocuments]
.sort((left, right) => {
const updatedDelta = Date.parse(right.updatedAt) - Date.parse(left.updatedAt);
if (updatedDelta !== 0) {
return updatedDelta;
}
return right.documentId.localeCompare(left.documentId);
})
.slice(0, 6)
.map((document) => buildThreadStatusDocumentDigest(state, document));
const recentProgressEvents = [...state.threadProgressEvents]
.sort((left, right) => {
const createdDelta = Date.parse(right.createdAt) - Date.parse(left.createdAt);
if (createdDelta !== 0) {
return createdDelta;
}
return right.eventId.localeCompare(left.eventId);
})
.slice(0, 8)
.map((event) => buildThreadProgressEventDigest(state, event));
const deepPullThreadUnderstandings = state.projects
.filter((project) => project.id !== "master-agent" && project.projectUnderstanding)
.sort((left, right) =>
String(right.projectUnderstanding?.updatedAt ?? right.lastMessageAt).localeCompare(
String(left.projectUnderstanding?.updatedAt ?? left.lastMessageAt),
),
)
.slice(0, 3)
.map((project) => buildDeepPullThreadUnderstandingDigest(project));
const threadRuntimeSelection = selectThreadRuntimeDigestSelection(state, requestText);
const threadStatusDocuments = threadRuntimeSelection.threadStatusDocuments;
const recentProgressEvents = threadRuntimeSelection.recentProgressEvents;
const deepPullThreadUnderstandings = threadRuntimeSelection.deepPullThreadUnderstandings;
const authSummary = [
`登录会话策略:成功登录后默认保持 ${Math.round(AUTH_SESSION_TTL_MS / 24 / 60 / 60_000)} 天。`,
@@ -309,9 +284,13 @@ function buildRuntimeDigest(
"最近进展事件:",
recentProgressEvents.length > 0 ? recentProgressEvents.join("\n") : "无",
"",
"关键时刻深拉线程兜底:",
deepPullThreadUnderstandings.length > 0 ? deepPullThreadUnderstandings.join("\n") : "无",
"",
...(deepPullThreadUnderstandings.length > 0
? [
"关键时刻深拉线程兜底:",
deepPullThreadUnderstandings.join("\n"),
"",
]
: []),
"最近主 Agent 对话:",
recentMessages || "无",
"",
@@ -332,6 +311,111 @@ function buildRuntimeDigest(
].join("\n");
}
function selectThreadRuntimeDigestSelection(
state: Awaited<ReturnType<typeof readState>>,
requestText: string,
) {
const projectsWithRuntimeEvidence = state.projects
.filter((project) =>
state.threadStatusDocuments.some((document) => document.projectId === project.id) ||
state.threadProgressEvents.some((event) => event.projectId === project.id),
)
.sort((left, right) => compareProjectRuntimeDigestActivity(right, left));
const scoredProjects = state.projects
.map((project) => ({
project,
score: scoreMasterAgentDispatchCandidate(project, requestText),
}))
.sort((left, right) => {
if (right.score !== left.score) {
return right.score - left.score;
}
return compareProjectRuntimeDigestActivity(right.project, left.project);
});
const matchedProjects = scoredProjects.filter((item) => item.score > 0).map((item) => item.project);
const matchedNonMasterProjects = matchedProjects.filter((project) => project.id !== "master-agent");
const selectedProjects =
matchedNonMasterProjects.length > 0
? matchedNonMasterProjects
: matchedProjects.length > 0
? matchedProjects
: projectsWithRuntimeEvidence.slice(0, 3);
let selectedProjectIds = new Set(selectedProjects.map((project) => project.id));
let threadStatusDocuments = [...state.threadStatusDocuments]
.filter((document) => selectedProjectIds.has(document.projectId))
.sort((left, right) => {
const updatedDelta = Date.parse(right.updatedAt) - Date.parse(left.updatedAt);
if (updatedDelta !== 0) {
return updatedDelta;
}
return right.documentId.localeCompare(left.documentId);
});
let recentProgressEvents = [...state.threadProgressEvents]
.filter((event) => selectedProjectIds.has(event.projectId))
.sort((left, right) => {
const createdDelta = Date.parse(right.createdAt) - Date.parse(left.createdAt);
if (createdDelta !== 0) {
return createdDelta;
}
return right.eventId.localeCompare(left.eventId);
});
if (threadStatusDocuments.length === 0 && recentProgressEvents.length === 0 && projectsWithRuntimeEvidence.length > 0) {
selectedProjectIds = new Set(projectsWithRuntimeEvidence.slice(0, 3).map((project) => project.id));
threadStatusDocuments = [...state.threadStatusDocuments]
.filter((document) => selectedProjectIds.has(document.projectId))
.sort((left, right) => {
const updatedDelta = Date.parse(right.updatedAt) - Date.parse(left.updatedAt);
if (updatedDelta !== 0) {
return updatedDelta;
}
return right.documentId.localeCompare(left.documentId);
});
recentProgressEvents = [...state.threadProgressEvents]
.filter((event) => selectedProjectIds.has(event.projectId))
.sort((left, right) => {
const createdDelta = Date.parse(right.createdAt) - Date.parse(left.createdAt);
if (createdDelta !== 0) {
return createdDelta;
}
return right.eventId.localeCompare(left.eventId);
});
}
const deepPullThreadUnderstandings =
threadStatusDocuments.length === 0 && recentProgressEvents.length === 0 && projectsWithRuntimeEvidence.length === 0
? state.projects
.filter((project) => project.id !== "master-agent" && project.projectUnderstanding)
.sort((left, right) => compareProjectRuntimeDigestActivity(right, left))
.slice(0, 3)
.map((project) => buildDeepPullThreadUnderstandingDigest(project))
.filter((entry): entry is string => Boolean(entry))
: [];
return {
threadStatusDocuments: threadStatusDocuments.slice(0, 6).map((document) => buildThreadStatusDocumentDigest(state, document)),
recentProgressEvents: recentProgressEvents.slice(0, 8).map((event) => buildThreadProgressEventDigest(state, event)),
deepPullThreadUnderstandings,
};
}
function compareProjectRuntimeDigestActivity(left: Project, right: Project) {
return projectRuntimeDigestActivityValue(left) - projectRuntimeDigestActivityValue(right);
}
function projectRuntimeDigestActivityValue(project: Project) {
return Math.max(
Date.parse(project.updatedAt || ""),
Date.parse(project.lastMessageAt || ""),
Date.parse(project.threadMeta.updatedAt || ""),
Date.parse(project.threadMeta.lastObservedCodexActivityAt || ""),
Date.parse(project.projectUnderstanding?.updatedAt || ""),
);
}
function buildThreadStatusDocumentDigest(
state: Awaited<ReturnType<typeof readState>>,
document: Awaited<ReturnType<typeof readState>>["threadStatusDocuments"][number],