Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,20 +283,23 @@ export default function ProjectSessionDetailPage({
const lastProcessedPromptRef = useRef<string>("");

useEffect(() => {
if (!session || !aguiSendMessage) return;
if (!session) return;

const initialPrompt = session?.spec?.initialPrompt;

// NOTE: Initial prompt execution handled by backend auto-trigger (StartSession handler)
// Backend waits for subscriber before executing, ensuring events are received
// This works for both UI and headless/API usage

// Track that we've seen this prompt (for workflow changes)
if (initialPrompt && lastProcessedPromptRef.current !== initialPrompt) {
lastProcessedPromptRef.current = initialPrompt;
}
// Only re-run when the initialPrompt value actually changes.
// Previous deps (phase, messages.length, status) caused this to re-evaluate
// on every streamed message, even though it only tracks a ref.
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [session?.spec?.initialPrompt, session?.status?.phase, aguiState.messages.length, aguiState.status]);
}, [session?.spec?.initialPrompt]);

// Workflow management hook
const workflowManagement = useWorkflowManagement({
Expand All @@ -306,22 +309,9 @@ export default function ProjectSessionDetailPage({
onWorkflowActivated: refetchSession,
});

// Poll session status when workflow is queued
useEffect(() => {
if (!workflowManagement.queuedWorkflow) return;

const phase = session?.status?.phase;

// If already running, we'll process workflow in the next effect
if (phase === "Running") return;

// Poll every 2 seconds to check if session is ready
const pollInterval = setInterval(() => {
refetchSession();
}, 2000);

return () => clearInterval(pollInterval);
}, [workflowManagement.queuedWorkflow, session?.status?.phase, refetchSession]);
// NOTE: No separate polling needed for queued workflows - useSession already polls
// at 500-1000ms during transitional states (Pending, Creating, Stopping).
// The previous setInterval(refetchSession, 2000) was redundant and slower.

// Process queued workflow when session becomes Running
useEffect(() => {
Expand All @@ -342,23 +332,9 @@ export default function ProjectSessionDetailPage({
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [session?.status?.phase, workflowManagement.queuedWorkflow]);

// Poll session status when messages are queued
useEffect(() => {
const queuedMessages = sessionQueue.messages.filter(m => !m.sentAt);
if (queuedMessages.length === 0) return;

const phase = session?.status?.phase;

// If already running, we'll process messages in the next effect
if (phase === "Running") return;

// Poll every 2 seconds to check if session is ready
const pollInterval = setInterval(() => {
refetchSession();
}, 2000);

return () => clearInterval(pollInterval);
}, [sessionQueue.messages, session?.status?.phase, refetchSession]);
// NOTE: No separate polling needed for queued messages - useSession already polls
// at 500-1000ms during transitional states (Pending, Creating, Stopping).
// The previous setInterval(refetchSession, 2000) was redundant and slower.

// Process queued messages when session becomes Running
useEffect(() => {
Expand Down
82 changes: 74 additions & 8 deletions components/frontend/src/hooks/use-agui-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
*/

import { useCallback, useEffect, useRef, useState } from 'react'
import type { PlatformEvent, PlatformMessage } from '@/types/agui'
import type { AGUIClientState, PlatformEvent, PlatformMessage } from '@/types/agui'
import { processAGUIEvent } from './agui/event-handlers'
import type { EventHandlerCallbacks } from './agui/event-handlers'
import { initialState } from './agui/types'
Expand Down Expand Up @@ -45,6 +45,19 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur
const reconnectAttemptsRef = useRef(0)
const mountedRef = useRef(false)

// Event batching: buffer SSE events and flush via requestAnimationFrame
// This prevents re-rendering on every token during streaming (~20-50 events/sec → 1 render/frame)
const eventBufferRef = useRef<PlatformEvent[]>([])
const rafIdRef = useRef<number | null>(null)

// Ref snapshot of state fields used by sendMessage (avoids recreating callback on every state change)
const stateSnapshotRef = useRef<{
threadId: string | null
runId: string | null
status: AGUIClientState['status']
}>({ threadId: null, runId: null, status: 'idle' })


// Exponential backoff config for reconnection
const MAX_RECONNECT_DELAY = 30000 // 30 seconds max
const BASE_RECONNECT_DELAY = 1000 // 1 second base
Expand All @@ -54,9 +67,23 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur
mountedRef.current = true
return () => {
mountedRef.current = false
// Cancel any pending rAF flush on unmount
if (rafIdRef.current !== null) {
cancelAnimationFrame(rafIdRef.current)
rafIdRef.current = null
}
}
}, [])

// Keep sendMessage snapshot in sync with state
useEffect(() => {
stateSnapshotRef.current = {
threadId: state.threadId,
runId: state.runId,
status: state.status,
}
}, [state.threadId, state.runId, state.status])

// Process incoming AG-UI events
const processEvent = useCallback(
(event: PlatformEvent) => {
Expand All @@ -76,6 +103,34 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur
[onEvent, onMessage, onError, onTraceId],
)

// Keep a ref to processEvent for use inside rAF callbacks (avoids stale closures)
const processEventRef = useRef(processEvent)
useEffect(() => {
processEventRef.current = processEvent
}, [processEvent])

// Flush all buffered events in a single synchronous pass.
// React 18+ batches all setState calls within the same synchronous callback,
// so N events → N setState calls → 1 re-render (instead of N re-renders).
const flushEventBuffer = useCallback(() => {
rafIdRef.current = null
if (!mountedRef.current) return
const events = eventBufferRef.current
if (events.length === 0) return
eventBufferRef.current = []

for (const event of events) {
processEventRef.current(event)
}
}, [])

// Schedule a flush on the next animation frame (~60fps max)
const scheduleFlush = useCallback(() => {
if (rafIdRef.current === null) {
rafIdRef.current = requestAnimationFrame(flushEventBuffer)
}
}, [flushEventBuffer])

// Connect to the AG-UI event stream
const connect = useCallback(
(runId?: string) => {
Expand Down Expand Up @@ -113,7 +168,9 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur
eventSource.onmessage = (e) => {
try {
const event = JSON.parse(e.data) as PlatformEvent
processEvent(event)
// Buffer events and flush via requestAnimationFrame for batched rendering
eventBufferRef.current.push(event)
scheduleFlush()
} catch (err) {
console.error('Failed to parse AG-UI event:', err)
}
Expand Down Expand Up @@ -164,11 +221,18 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur
}, delay)
}
},
[projectName, sessionName, processEvent, onConnected, onError, onDisconnected],
[projectName, sessionName, scheduleFlush, onConnected, onError, onDisconnected],
)

// Disconnect from the event stream
const disconnect = useCallback(() => {
// Cancel any pending rAF flush and clear buffered events
if (rafIdRef.current !== null) {
cancelAnimationFrame(rafIdRef.current)
rafIdRef.current = null
}
eventBufferRef.current = []

if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current)
reconnectTimeoutRef.current = null
Expand Down Expand Up @@ -222,6 +286,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur

// Send a message to start/continue the conversation
// AG-UI server pattern: POST returns SSE stream directly
// Uses refs for threadId/runId/status so the callback is stable across state changes
const sendMessage = useCallback(
async (content: string) => {
// Send to backend via run endpoint - this returns an SSE stream
Expand All @@ -244,15 +309,16 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur
} as PlatformMessage],
}))


try {
const response = await fetch(runUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
threadId: state.threadId || sessionName,
parentRunId: state.runId,
threadId: stateSnapshotRef.current.threadId || sessionName,
parentRunId: stateSnapshotRef.current.runId,
messages: [userMessage],
}),
})
Expand All @@ -279,8 +345,8 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur
setIsRunActive(true)
}

// Ensure we're connected to the thread stream to receive events.
if (!eventSourceRef.current) {
// Ensure we're connected to the thread stream to receive events
if (stateSnapshotRef.current.status !== 'connected') {
connect()
}
} catch (error) {
Expand All @@ -293,7 +359,7 @@ export function useAGUIStream(options: UseAGUIStreamOptions): UseAGUIStreamRetur
throw error
}
},
[projectName, sessionName, state.threadId, state.runId, connect],
[projectName, sessionName, connect],
)

// Auto-connect on mount if enabled (client-side only)
Expand Down
Loading