Throttle realtime refresh bursts
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { notFound } from "next/navigation";
|
||||
import { RealtimeRefresh } from "@/components/app-runtime";
|
||||
import { AppShell, ForwardComposer, PageNav, StatusBar } from "@/components/app-ui";
|
||||
import { requirePageSession } from "@/lib/boss-auth";
|
||||
import { getProject, readState } from "@/lib/boss-data";
|
||||
@@ -21,6 +22,10 @@ export default async function ForwardPage({
|
||||
|
||||
return (
|
||||
<AppShell bottomNav={false}>
|
||||
<RealtimeRefresh
|
||||
projectIds={[projectId, ...targets.map((target) => target.id)]}
|
||||
events={["conversation.updated", "project.messages.updated"]}
|
||||
/>
|
||||
<StatusBar />
|
||||
<PageNav title="转发到项目" backHref={`/conversations/${projectId}`} />
|
||||
<div className="space-y-3 px-[18px] pb-6">
|
||||
|
||||
@@ -4,6 +4,13 @@ import { useEffect, useRef, useState } from "react";
|
||||
import { usePathname, useRouter, useSearchParams } from "next/navigation";
|
||||
import clsx from "clsx";
|
||||
import type { BossEventName } from "@/lib/boss-events";
|
||||
import {
|
||||
cancelScheduledRefresh,
|
||||
createRefreshThrottleState,
|
||||
markScheduledRefreshExecuted,
|
||||
planThrottledRefresh,
|
||||
shouldRefreshRealtimeEvent,
|
||||
} from "@/lib/realtime-refresh";
|
||||
import type { SkillInventoryDeviceGroup } from "@/lib/boss-projections";
|
||||
import {
|
||||
clearNativeSessionSnapshot,
|
||||
@@ -263,63 +270,43 @@ export function RealtimeRefresh({
|
||||
conversationUpdatedNotes?: string[];
|
||||
}) {
|
||||
const router = useRouter();
|
||||
const throttleStateRef = useRef(createRefreshThrottleState());
|
||||
const pendingTimerRef = useRef<number | null>(null);
|
||||
|
||||
useEffect(() => {
|
||||
const source = new EventSource("/api/v1/events");
|
||||
const projectScopeIds = new Set(
|
||||
[projectId, ...(projectIds ?? [])]
|
||||
.filter((value): value is string => Boolean(value?.trim()))
|
||||
.map((value) => value.trim()),
|
||||
);
|
||||
const throttleState = throttleStateRef.current;
|
||||
const listeners = Array.from(new Set([
|
||||
"conversation.context_indicator.updated",
|
||||
"project.context_risk.updated",
|
||||
...events,
|
||||
]));
|
||||
const shouldRefresh = (event: Event) => {
|
||||
let payload: { projectId?: string; note?: string } | null = null;
|
||||
const eventData = "data" in event && typeof event.data === "string" ? event.data : "";
|
||||
const hasPayloadData = Boolean(eventData.trim());
|
||||
|
||||
if (hasPayloadData) {
|
||||
try {
|
||||
const parsed = JSON.parse(eventData) as { projectId?: string; note?: string };
|
||||
if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) {
|
||||
payload = parsed;
|
||||
}
|
||||
} catch {
|
||||
payload = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (projectScopeIds.size > 0) {
|
||||
if (!payload || typeof payload.projectId !== "string" || !payload.projectId.trim()) {
|
||||
return true;
|
||||
}
|
||||
if (!projectScopeIds.has(payload.projectId)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (event.type === "conversation.updated" && conversationUpdatedNotes?.length) {
|
||||
if (!payload || typeof payload.note !== "string" || !payload.note.trim()) {
|
||||
return false;
|
||||
}
|
||||
if ((payload.note) && !conversationUpdatedNotes.includes(payload.note)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
const listenerMap = new Map<string, (event: Event) => void>();
|
||||
|
||||
for (const event of listeners) {
|
||||
const refresh = (nextEvent: Event) => {
|
||||
if (!shouldRefresh(nextEvent)) {
|
||||
const eventData = "data" in nextEvent ? nextEvent.data : undefined;
|
||||
if (!shouldRefreshRealtimeEvent({
|
||||
eventType: nextEvent.type,
|
||||
eventData,
|
||||
projectId,
|
||||
projectIds,
|
||||
conversationUpdatedNotes,
|
||||
})) {
|
||||
return;
|
||||
}
|
||||
router.refresh();
|
||||
const decision = planThrottledRefresh(throttleState, Date.now(), 400);
|
||||
if (decision.type === "refresh_now") {
|
||||
router.refresh();
|
||||
return;
|
||||
}
|
||||
if (decision.type === "schedule_refresh" && pendingTimerRef.current === null) {
|
||||
pendingTimerRef.current = window.setTimeout(() => {
|
||||
pendingTimerRef.current = null;
|
||||
markScheduledRefreshExecuted(throttleState, Date.now());
|
||||
router.refresh();
|
||||
}, decision.delayMs);
|
||||
}
|
||||
};
|
||||
listenerMap.set(event, refresh);
|
||||
source.addEventListener(event, refresh);
|
||||
@@ -332,6 +319,11 @@ export function RealtimeRefresh({
|
||||
source.removeEventListener(event, refresh);
|
||||
}
|
||||
}
|
||||
if (pendingTimerRef.current !== null) {
|
||||
window.clearTimeout(pendingTimerRef.current);
|
||||
pendingTimerRef.current = null;
|
||||
}
|
||||
cancelScheduledRefresh(throttleState);
|
||||
source.close();
|
||||
};
|
||||
}, [conversationUpdatedNotes, events, projectId, projectIds, router]);
|
||||
|
||||
102
src/lib/realtime-refresh.ts
Normal file
102
src/lib/realtime-refresh.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
export interface RealtimeRefreshScope {
|
||||
projectId?: string;
|
||||
projectIds?: string[];
|
||||
conversationUpdatedNotes?: string[];
|
||||
}
|
||||
|
||||
export interface RefreshThrottleState {
|
||||
lastRefreshAt: number | null;
|
||||
pending: boolean;
|
||||
}
|
||||
|
||||
export type RefreshThrottleDecision =
|
||||
| { type: "refresh_now" }
|
||||
| { type: "schedule_refresh"; delayMs: number }
|
||||
| { type: "skip" };
|
||||
|
||||
interface RealtimeEventPayload {
|
||||
projectId?: string;
|
||||
note?: string;
|
||||
}
|
||||
|
||||
function parseRealtimeEventPayload(eventData: unknown): RealtimeEventPayload | null {
|
||||
if (typeof eventData !== "string" || !eventData.trim()) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(eventData) as RealtimeEventPayload;
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return null;
|
||||
}
|
||||
return parsed;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export function shouldRefreshRealtimeEvent(input: {
|
||||
eventType: string;
|
||||
eventData?: unknown;
|
||||
} & RealtimeRefreshScope) {
|
||||
const payload = parseRealtimeEventPayload(input.eventData);
|
||||
const projectScopeIds = new Set(
|
||||
[input.projectId, ...(input.projectIds ?? [])]
|
||||
.filter((value): value is string => Boolean(value?.trim()))
|
||||
.map((value) => value.trim()),
|
||||
);
|
||||
|
||||
if (projectScopeIds.size > 0) {
|
||||
if (!payload || typeof payload.projectId !== "string" || !payload.projectId.trim()) {
|
||||
return true;
|
||||
}
|
||||
if (!projectScopeIds.has(payload.projectId)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (input.eventType === "conversation.updated" && input.conversationUpdatedNotes?.length) {
|
||||
if (!payload || typeof payload.note !== "string" || !payload.note.trim()) {
|
||||
return false;
|
||||
}
|
||||
if (!input.conversationUpdatedNotes.includes(payload.note)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
export function createRefreshThrottleState(): RefreshThrottleState {
|
||||
return {
|
||||
lastRefreshAt: null,
|
||||
pending: false,
|
||||
};
|
||||
}
|
||||
|
||||
export function planThrottledRefresh(
|
||||
state: RefreshThrottleState,
|
||||
now: number,
|
||||
throttleMs: number,
|
||||
): RefreshThrottleDecision {
|
||||
if (state.pending) {
|
||||
return { type: "skip" };
|
||||
}
|
||||
if (state.lastRefreshAt === null || now - state.lastRefreshAt >= throttleMs) {
|
||||
state.lastRefreshAt = now;
|
||||
return { type: "refresh_now" };
|
||||
}
|
||||
state.pending = true;
|
||||
return {
|
||||
type: "schedule_refresh",
|
||||
delayMs: Math.max(0, throttleMs - (now - state.lastRefreshAt)),
|
||||
};
|
||||
}
|
||||
|
||||
export function markScheduledRefreshExecuted(state: RefreshThrottleState, now: number) {
|
||||
state.lastRefreshAt = now;
|
||||
state.pending = false;
|
||||
}
|
||||
|
||||
export function cancelScheduledRefresh(state: RefreshThrottleState) {
|
||||
state.pending = false;
|
||||
}
|
||||
@@ -22,13 +22,13 @@ test("RealtimeRefresh supports project-scoped refresh filtering", async () => {
|
||||
);
|
||||
assert.match(
|
||||
source,
|
||||
/projectScopeIds\.has\(payload\.projectId\)/,
|
||||
"expected RealtimeRefresh to filter by matching projectId",
|
||||
/shouldRefreshRealtimeEvent\(\{/,
|
||||
"expected RealtimeRefresh to delegate scoped filtering to helper logic",
|
||||
);
|
||||
assert.match(
|
||||
source,
|
||||
/payload\.note\)\s*&&\s*!conversationUpdatedNotes\.includes\(payload\.note\)/,
|
||||
"expected RealtimeRefresh to ignore unmatched conversation.updated notes",
|
||||
/planThrottledRefresh\(throttleState, Date\.now\(\), 400\)/,
|
||||
"expected RealtimeRefresh to throttle burst refreshes on the client",
|
||||
);
|
||||
});
|
||||
|
||||
@@ -71,3 +71,15 @@ test("folder conversation page wires folder thread ids into realtime refresh", a
|
||||
assert.match(folderPage, /projectIds=\{folder\.threads\.map\(\(thread\) => thread\.projectId\)\}/, "expected folder page to scope refreshes to folder thread project ids");
|
||||
assert.match(folderPage, /"conversation\.updated"/, "expected folder page to listen to conversation updates");
|
||||
});
|
||||
|
||||
test("forward page wires realtime refresh for visible source and target projects", async () => {
|
||||
const forwardPage = await readWorkspaceFile("src/app/conversations/[projectId]/forward/page.tsx");
|
||||
|
||||
assert.match(forwardPage, /<RealtimeRefresh/, "expected forward page to render RealtimeRefresh");
|
||||
assert.match(
|
||||
forwardPage,
|
||||
/projectIds=\{\[projectId, \.\.\.targets\.map\(\(target\) => target\.id\)\]\}/,
|
||||
"expected forward page to scope refreshes to visible source and target project ids",
|
||||
);
|
||||
assert.match(forwardPage, /"conversation\.updated"/, "expected forward page to listen to conversation updates");
|
||||
});
|
||||
|
||||
68
tests/realtime-refresh-utils.test.ts
Normal file
68
tests/realtime-refresh-utils.test.ts
Normal file
@@ -0,0 +1,68 @@
|
||||
import test from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import {
|
||||
createRefreshThrottleState,
|
||||
markScheduledRefreshExecuted,
|
||||
planThrottledRefresh,
|
||||
shouldRefreshRealtimeEvent,
|
||||
} from "../src/lib/realtime-refresh";
|
||||
|
||||
test("shouldRefreshRealtimeEvent filters scoped conversation updates by project and note", () => {
|
||||
assert.equal(
|
||||
shouldRefreshRealtimeEvent({
|
||||
eventType: "conversation.updated",
|
||||
eventData: JSON.stringify({ projectId: "project-a", note: "project_goals.updated" }),
|
||||
projectIds: ["project-a", "project-b"],
|
||||
conversationUpdatedNotes: ["project_goals.updated"],
|
||||
}),
|
||||
true,
|
||||
);
|
||||
|
||||
assert.equal(
|
||||
shouldRefreshRealtimeEvent({
|
||||
eventType: "conversation.updated",
|
||||
eventData: JSON.stringify({ projectId: "project-z", note: "project_goals.updated" }),
|
||||
projectIds: ["project-a", "project-b"],
|
||||
conversationUpdatedNotes: ["project_goals.updated"],
|
||||
}),
|
||||
false,
|
||||
);
|
||||
|
||||
assert.equal(
|
||||
shouldRefreshRealtimeEvent({
|
||||
eventType: "conversation.updated",
|
||||
eventData: JSON.stringify({ projectId: "project-a", note: "other.note" }),
|
||||
projectIds: ["project-a", "project-b"],
|
||||
conversationUpdatedNotes: ["project_goals.updated"],
|
||||
}),
|
||||
false,
|
||||
);
|
||||
});
|
||||
|
||||
test("planThrottledRefresh coalesces bursts into one delayed refresh", () => {
|
||||
const state = createRefreshThrottleState();
|
||||
|
||||
assert.deepEqual(planThrottledRefresh(state, 1_000, 400), {
|
||||
type: "refresh_now",
|
||||
});
|
||||
assert.equal(state.lastRefreshAt, 1_000);
|
||||
assert.equal(state.pending, false);
|
||||
|
||||
assert.deepEqual(planThrottledRefresh(state, 1_180, 400), {
|
||||
type: "schedule_refresh",
|
||||
delayMs: 220,
|
||||
});
|
||||
assert.equal(state.pending, true);
|
||||
|
||||
assert.deepEqual(planThrottledRefresh(state, 1_220, 400), {
|
||||
type: "skip",
|
||||
});
|
||||
|
||||
markScheduledRefreshExecuted(state, 1_400);
|
||||
assert.equal(state.pending, false);
|
||||
assert.equal(state.lastRefreshAt, 1_400);
|
||||
|
||||
assert.deepEqual(planThrottledRefresh(state, 1_900, 400), {
|
||||
type: "refresh_now",
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user