diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx index b0b7ad68f..5c142d4cb 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/page.tsx @@ -283,20 +283,23 @@ export default function ProjectSessionDetailPage({ const lastProcessedPromptRef = useRef(""); useEffect(() => { - if (!session || !aguiSendMessage) return; - + if (!session) return; + const initialPrompt = session?.spec?.initialPrompt; - + // NOTE: Initial prompt execution handled by backend auto-trigger (StartSession handler) // Backend waits for subscriber before executing, ensuring events are received // This works for both UI and headless/API usage - + // Track that we've seen this prompt (for workflow changes) if (initialPrompt && lastProcessedPromptRef.current !== initialPrompt) { lastProcessedPromptRef.current = initialPrompt; } + // Only re-run when the initialPrompt value actually changes. + // Previous deps (phase, messages.length, status) caused this to re-evaluate + // on every streamed message, even though it only tracks a ref. // eslint-disable-next-line react-hooks/exhaustive-deps - }, [session?.spec?.initialPrompt, session?.status?.phase, aguiState.messages.length, aguiState.status]); + }, [session?.spec?.initialPrompt]); // Workflow management hook const workflowManagement = useWorkflowManagement({ @@ -306,22 +309,9 @@ export default function ProjectSessionDetailPage({ onWorkflowActivated: refetchSession, }); - // Poll session status when workflow is queued - useEffect(() => { - if (!workflowManagement.queuedWorkflow) return; - - const phase = session?.status?.phase; - - // If already running, we'll process workflow in the next effect - if (phase === "Running") return; - - // Poll every 2 seconds to check if session is ready - const pollInterval = setInterval(() => { - refetchSession(); - }, 2000); - - return () => clearInterval(pollInterval); - }, [workflowManagement.queuedWorkflow, session?.status?.phase, refetchSession]); + // NOTE: No separate polling needed for queued workflows - useSession already polls + // at 500-1000ms during transitional states (Pending, Creating, Stopping). + // The previous setInterval(refetchSession, 2000) was redundant and slower. // Process queued workflow when session becomes Running useEffect(() => { @@ -342,23 +332,9 @@ export default function ProjectSessionDetailPage({ // eslint-disable-next-line react-hooks/exhaustive-deps }, [session?.status?.phase, workflowManagement.queuedWorkflow]); - // Poll session status when messages are queued - useEffect(() => { - const queuedMessages = sessionQueue.messages.filter(m => !m.sentAt); - if (queuedMessages.length === 0) return; - - const phase = session?.status?.phase; - - // If already running, we'll process messages in the next effect - if (phase === "Running") return; - - // Poll every 2 seconds to check if session is ready - const pollInterval = setInterval(() => { - refetchSession(); - }, 2000); - - return () => clearInterval(pollInterval); - }, [sessionQueue.messages, session?.status?.phase, refetchSession]); + // NOTE: No separate polling needed for queued messages - useSession already polls + // at 500-1000ms during transitional states (Pending, Creating, Stopping). + // The previous setInterval(refetchSession, 2000) was redundant and slower. // Process queued messages when session becomes Running useEffect(() => { diff --git a/components/frontend/src/hooks/use-agui-stream.ts b/components/frontend/src/hooks/use-agui-stream.ts index 0cef4a021..a98d10fa7 100644 --- a/components/frontend/src/hooks/use-agui-stream.ts +++ b/components/frontend/src/hooks/use-agui-stream.ts @@ -11,7 +11,7 @@ */ import { useCallback, useEffect, useRef, useState } from 'react' -import type { PlatformEvent, PlatformMessage } from '@/types/agui' +import type { AGUIClientState, PlatformEvent, PlatformMessage } from '@/types/agui' import { processAGUIEvent } from './agui/event-handlers' import type { EventHandlerCallbacks } from './agui/event-handlers' import { initialState } from './agui/types' @@ -45,6 +45,19 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur const reconnectAttemptsRef = useRef(0) const mountedRef = useRef(false) + // Event batching: buffer SSE events and flush via requestAnimationFrame + // This prevents re-rendering on every token during streaming (~20-50 events/sec → 1 render/frame) + const eventBufferRef = useRef([]) + const rafIdRef = useRef(null) + + // Ref snapshot of state fields used by sendMessage (avoids recreating callback on every state change) + const stateSnapshotRef = useRef<{ + threadId: string | null + runId: string | null + status: AGUIClientState['status'] + }>({ threadId: null, runId: null, status: 'idle' }) + + // Exponential backoff config for reconnection const MAX_RECONNECT_DELAY = 30000 // 30 seconds max const BASE_RECONNECT_DELAY = 1000 // 1 second base @@ -54,9 +67,23 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur mountedRef.current = true return () => { mountedRef.current = false + // Cancel any pending rAF flush on unmount + if (rafIdRef.current !== null) { + cancelAnimationFrame(rafIdRef.current) + rafIdRef.current = null + } } }, []) + // Keep sendMessage snapshot in sync with state + useEffect(() => { + stateSnapshotRef.current = { + threadId: state.threadId, + runId: state.runId, + status: state.status, + } + }, [state.threadId, state.runId, state.status]) + // Process incoming AG-UI events const processEvent = useCallback( (event: PlatformEvent) => { @@ -76,6 +103,34 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur [onEvent, onMessage, onError, onTraceId], ) + // Keep a ref to processEvent for use inside rAF callbacks (avoids stale closures) + const processEventRef = useRef(processEvent) + useEffect(() => { + processEventRef.current = processEvent + }, [processEvent]) + + // Flush all buffered events in a single synchronous pass. + // React 18+ batches all setState calls within the same synchronous callback, + // so N events → N setState calls → 1 re-render (instead of N re-renders). + const flushEventBuffer = useCallback(() => { + rafIdRef.current = null + if (!mountedRef.current) return + const events = eventBufferRef.current + if (events.length === 0) return + eventBufferRef.current = [] + + for (const event of events) { + processEventRef.current(event) + } + }, []) + + // Schedule a flush on the next animation frame (~60fps max) + const scheduleFlush = useCallback(() => { + if (rafIdRef.current === null) { + rafIdRef.current = requestAnimationFrame(flushEventBuffer) + } + }, [flushEventBuffer]) + // Connect to the AG-UI event stream const connect = useCallback( (runId?: string) => { @@ -113,7 +168,9 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur eventSource.onmessage = (e) => { try { const event = JSON.parse(e.data) as PlatformEvent - processEvent(event) + // Buffer events and flush via requestAnimationFrame for batched rendering + eventBufferRef.current.push(event) + scheduleFlush() } catch (err) { console.error('Failed to parse AG-UI event:', err) } @@ -164,11 +221,18 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur }, delay) } }, - [projectName, sessionName, processEvent, onConnected, onError, onDisconnected], + [projectName, sessionName, scheduleFlush, onConnected, onError, onDisconnected], ) // Disconnect from the event stream const disconnect = useCallback(() => { + // Cancel any pending rAF flush and clear buffered events + if (rafIdRef.current !== null) { + cancelAnimationFrame(rafIdRef.current) + rafIdRef.current = null + } + eventBufferRef.current = [] + if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current) reconnectTimeoutRef.current = null @@ -222,6 +286,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur // Send a message to start/continue the conversation // AG-UI server pattern: POST returns SSE stream directly + // Uses refs for threadId/runId/status so the callback is stable across state changes const sendMessage = useCallback( async (content: string) => { // Send to backend via run endpoint - this returns an SSE stream @@ -244,6 +309,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur } as PlatformMessage], })) + try { const response = await fetch(runUrl, { method: 'POST', @@ -251,8 +317,8 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur 'Content-Type': 'application/json', }, body: JSON.stringify({ - threadId: state.threadId || sessionName, - parentRunId: state.runId, + threadId: stateSnapshotRef.current.threadId || sessionName, + parentRunId: stateSnapshotRef.current.runId, messages: [userMessage], }), }) @@ -279,8 +345,8 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur setIsRunActive(true) } - // Ensure we're connected to the thread stream to receive events. - if (!eventSourceRef.current) { + // Ensure we're connected to the thread stream to receive events + if (stateSnapshotRef.current.status !== 'connected') { connect() } } catch (error) { @@ -293,7 +359,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur throw error } }, - [projectName, sessionName, state.threadId, state.runId, connect], + [projectName, sessionName, connect], ) // Auto-connect on mount if enabled (client-side only)