diff --git a/src/app/conversations/[projectId]/forward/page.tsx b/src/app/conversations/[projectId]/forward/page.tsx index 83915b4..2124774 100644 --- a/src/app/conversations/[projectId]/forward/page.tsx +++ b/src/app/conversations/[projectId]/forward/page.tsx @@ -1,4 +1,5 @@ import { notFound } from "next/navigation"; +import { RealtimeRefresh } from "@/components/app-runtime"; import { AppShell, ForwardComposer, PageNav, StatusBar } from "@/components/app-ui"; import { requirePageSession } from "@/lib/boss-auth"; import { getProject, readState } from "@/lib/boss-data"; @@ -21,6 +22,10 @@ export default async function ForwardPage({ return ( + target.id)]} + events={["conversation.updated", "project.messages.updated"]} + />
diff --git a/src/components/app-runtime.tsx b/src/components/app-runtime.tsx index 965a885..e666eb2 100644 --- a/src/components/app-runtime.tsx +++ b/src/components/app-runtime.tsx @@ -4,6 +4,13 @@ import { useEffect, useRef, useState } from "react"; import { usePathname, useRouter, useSearchParams } from "next/navigation"; import clsx from "clsx"; import type { BossEventName } from "@/lib/boss-events"; +import { + cancelScheduledRefresh, + createRefreshThrottleState, + markScheduledRefreshExecuted, + planThrottledRefresh, + shouldRefreshRealtimeEvent, +} from "@/lib/realtime-refresh"; import type { SkillInventoryDeviceGroup } from "@/lib/boss-projections"; import { clearNativeSessionSnapshot, @@ -263,63 +270,43 @@ export function RealtimeRefresh({ conversationUpdatedNotes?: string[]; }) { const router = useRouter(); + const throttleStateRef = useRef(createRefreshThrottleState()); + const pendingTimerRef = useRef(null); useEffect(() => { const source = new EventSource("/api/v1/events"); - const projectScopeIds = new Set( - [projectId, ...(projectIds ?? [])] - .filter((value): value is string => Boolean(value?.trim())) - .map((value) => value.trim()), - ); + const throttleState = throttleStateRef.current; const listeners = Array.from(new Set([ "conversation.context_indicator.updated", "project.context_risk.updated", ...events, ])); - const shouldRefresh = (event: Event) => { - let payload: { projectId?: string; note?: string } | null = null; - const eventData = "data" in event && typeof event.data === "string" ? event.data : ""; - const hasPayloadData = Boolean(eventData.trim()); - - if (hasPayloadData) { - try { - const parsed = JSON.parse(eventData) as { projectId?: string; note?: string }; - if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { - payload = parsed; - } - } catch { - payload = null; - } - } - - if (projectScopeIds.size > 0) { - if (!payload || typeof payload.projectId !== "string" || !payload.projectId.trim()) { - return true; - } - if (!projectScopeIds.has(payload.projectId)) { - return false; - } - } - - if (event.type === "conversation.updated" && conversationUpdatedNotes?.length) { - if (!payload || typeof payload.note !== "string" || !payload.note.trim()) { - return false; - } - if ((payload.note) && !conversationUpdatedNotes.includes(payload.note)) { - return false; - } - } - - return true; - }; const listenerMap = new Map void>(); for (const event of listeners) { const refresh = (nextEvent: Event) => { - if (!shouldRefresh(nextEvent)) { + const eventData = "data" in nextEvent ? nextEvent.data : undefined; + if (!shouldRefreshRealtimeEvent({ + eventType: nextEvent.type, + eventData, + projectId, + projectIds, + conversationUpdatedNotes, + })) { return; } - router.refresh(); + const decision = planThrottledRefresh(throttleState, Date.now(), 400); + if (decision.type === "refresh_now") { + router.refresh(); + return; + } + if (decision.type === "schedule_refresh" && pendingTimerRef.current === null) { + pendingTimerRef.current = window.setTimeout(() => { + pendingTimerRef.current = null; + markScheduledRefreshExecuted(throttleState, Date.now()); + router.refresh(); + }, decision.delayMs); + } }; listenerMap.set(event, refresh); source.addEventListener(event, refresh); @@ -332,6 +319,11 @@ export function RealtimeRefresh({ source.removeEventListener(event, refresh); } } + if (pendingTimerRef.current !== null) { + window.clearTimeout(pendingTimerRef.current); + pendingTimerRef.current = null; + } + cancelScheduledRefresh(throttleState); source.close(); }; }, [conversationUpdatedNotes, events, projectId, projectIds, router]); diff --git a/src/lib/realtime-refresh.ts b/src/lib/realtime-refresh.ts new file mode 100644 index 0000000..d620430 --- /dev/null +++ b/src/lib/realtime-refresh.ts @@ -0,0 +1,102 @@ +export interface RealtimeRefreshScope { + projectId?: string; + projectIds?: string[]; + conversationUpdatedNotes?: string[]; +} + +export interface RefreshThrottleState { + lastRefreshAt: number | null; + pending: boolean; +} + +export type RefreshThrottleDecision = + | { type: "refresh_now" } + | { type: "schedule_refresh"; delayMs: number } + | { type: "skip" }; + +interface RealtimeEventPayload { + projectId?: string; + note?: string; +} + +function parseRealtimeEventPayload(eventData: unknown): RealtimeEventPayload | null { + if (typeof eventData !== "string" || !eventData.trim()) { + return null; + } + try { + const parsed = JSON.parse(eventData) as RealtimeEventPayload; + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return null; + } + return parsed; + } catch { + return null; + } +} + +export function shouldRefreshRealtimeEvent(input: { + eventType: string; + eventData?: unknown; +} & RealtimeRefreshScope) { + const payload = parseRealtimeEventPayload(input.eventData); + const projectScopeIds = new Set( + [input.projectId, ...(input.projectIds ?? [])] + .filter((value): value is string => Boolean(value?.trim())) + .map((value) => value.trim()), + ); + + if (projectScopeIds.size > 0) { + if (!payload || typeof payload.projectId !== "string" || !payload.projectId.trim()) { + return true; + } + if (!projectScopeIds.has(payload.projectId)) { + return false; + } + } + + if (input.eventType === "conversation.updated" && input.conversationUpdatedNotes?.length) { + if (!payload || typeof payload.note !== "string" || !payload.note.trim()) { + return false; + } + if (!input.conversationUpdatedNotes.includes(payload.note)) { + return false; + } + } + + return true; +} + +export function createRefreshThrottleState(): RefreshThrottleState { + return { + lastRefreshAt: null, + pending: false, + }; +} + +export function planThrottledRefresh( + state: RefreshThrottleState, + now: number, + throttleMs: number, +): RefreshThrottleDecision { + if (state.pending) { + return { type: "skip" }; + } + if (state.lastRefreshAt === null || now - state.lastRefreshAt >= throttleMs) { + state.lastRefreshAt = now; + return { type: "refresh_now" }; + } + state.pending = true; + return { + type: "schedule_refresh", + delayMs: Math.max(0, throttleMs - (now - state.lastRefreshAt)), + }; +} + +export function markScheduledRefreshExecuted(state: RefreshThrottleState, now: number) { + state.lastRefreshAt = now; + state.pending = false; +} + +export function cancelScheduledRefresh(state: RefreshThrottleState) { + state.pending = false; +} diff --git a/tests/project-scoped-realtime-refresh.test.ts b/tests/project-scoped-realtime-refresh.test.ts index 7e10b56..b024939 100644 --- a/tests/project-scoped-realtime-refresh.test.ts +++ b/tests/project-scoped-realtime-refresh.test.ts @@ -22,13 +22,13 @@ test("RealtimeRefresh supports project-scoped refresh filtering", async () => { ); assert.match( source, - /projectScopeIds\.has\(payload\.projectId\)/, - "expected RealtimeRefresh to filter by matching projectId", + /shouldRefreshRealtimeEvent\(\{/, + "expected RealtimeRefresh to delegate scoped filtering to helper logic", ); assert.match( source, - /payload\.note\)\s*&&\s*!conversationUpdatedNotes\.includes\(payload\.note\)/, - "expected RealtimeRefresh to ignore unmatched conversation.updated notes", + /planThrottledRefresh\(throttleState, Date\.now\(\), 400\)/, + "expected RealtimeRefresh to throttle burst refreshes on the client", ); }); @@ -71,3 +71,15 @@ test("folder conversation page wires folder thread ids into realtime refresh", a assert.match(folderPage, /projectIds=\{folder\.threads\.map\(\(thread\) => thread\.projectId\)\}/, "expected folder page to scope refreshes to folder thread project ids"); assert.match(folderPage, /"conversation\.updated"/, "expected folder page to listen to conversation updates"); }); + +test("forward page wires realtime refresh for visible source and target projects", async () => { + const forwardPage = await readWorkspaceFile("src/app/conversations/[projectId]/forward/page.tsx"); + + assert.match(forwardPage, / target\.id\)\]\}/, + "expected forward page to scope refreshes to visible source and target project ids", + ); + assert.match(forwardPage, /"conversation\.updated"/, "expected forward page to listen to conversation updates"); +}); diff --git a/tests/realtime-refresh-utils.test.ts b/tests/realtime-refresh-utils.test.ts new file mode 100644 index 0000000..4c28c5e --- /dev/null +++ b/tests/realtime-refresh-utils.test.ts @@ -0,0 +1,68 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { + createRefreshThrottleState, + markScheduledRefreshExecuted, + planThrottledRefresh, + shouldRefreshRealtimeEvent, +} from "../src/lib/realtime-refresh"; + +test("shouldRefreshRealtimeEvent filters scoped conversation updates by project and note", () => { + assert.equal( + shouldRefreshRealtimeEvent({ + eventType: "conversation.updated", + eventData: JSON.stringify({ projectId: "project-a", note: "project_goals.updated" }), + projectIds: ["project-a", "project-b"], + conversationUpdatedNotes: ["project_goals.updated"], + }), + true, + ); + + assert.equal( + shouldRefreshRealtimeEvent({ + eventType: "conversation.updated", + eventData: JSON.stringify({ projectId: "project-z", note: "project_goals.updated" }), + projectIds: ["project-a", "project-b"], + conversationUpdatedNotes: ["project_goals.updated"], + }), + false, + ); + + assert.equal( + shouldRefreshRealtimeEvent({ + eventType: "conversation.updated", + eventData: JSON.stringify({ projectId: "project-a", note: "other.note" }), + projectIds: ["project-a", "project-b"], + conversationUpdatedNotes: ["project_goals.updated"], + }), + false, + ); +}); + +test("planThrottledRefresh coalesces bursts into one delayed refresh", () => { + const state = createRefreshThrottleState(); + + assert.deepEqual(planThrottledRefresh(state, 1_000, 400), { + type: "refresh_now", + }); + assert.equal(state.lastRefreshAt, 1_000); + assert.equal(state.pending, false); + + assert.deepEqual(planThrottledRefresh(state, 1_180, 400), { + type: "schedule_refresh", + delayMs: 220, + }); + assert.equal(state.pending, true); + + assert.deepEqual(planThrottledRefresh(state, 1_220, 400), { + type: "skip", + }); + + markScheduledRefreshExecuted(state, 1_400); + assert.equal(state.pending, false); + assert.equal(state.lastRefreshAt, 1_400); + + assert.deepEqual(planThrottledRefresh(state, 1_900, 400), { + type: "refresh_now", + }); +});