diff --git a/packages/durable-session-proxy/src/protocol.ts b/packages/durable-session-proxy/src/protocol.ts index 79f868a..4e3ca5b 100644 --- a/packages/durable-session-proxy/src/protocol.ts +++ b/packages/durable-session-proxy/src/protocol.ts @@ -13,7 +13,7 @@ import { DurableStream } from '@durable-streams/client' import { sessionStateSchema, createSessionDB, - createMessagesPipeline, + createMessagesCollection, createModelMessagesCollection, } from '@electric-sql/durable-session' import type { StreamChunk, AgentSpec, ProxySessionState, AIDBProtocolOptions } from './types' @@ -212,9 +212,8 @@ export class AIDBSessionProtocol { // After this, all historical messages are in the collections await sessionDB.preload() - // Create the messages pipeline - const { messages } = createMessagesPipeline({ - sessionId, + // Create the messages collection from chunks + const messages = createMessagesCollection({ chunksCollection: sessionDB.collections.chunks, }) @@ -391,7 +390,7 @@ export class AIDBSessionProtocol { actorId, role: 'user' as const, chunk: JSON.stringify({ - type: 'user-message', + type: 'whole-message', message, }), seq: 0, diff --git a/packages/durable-session/src/client.ts b/packages/durable-session/src/client.ts index 4693bfd..02c7307 100644 --- a/packages/durable-session/src/client.ts +++ b/packages/durable-session/src/client.ts @@ -4,14 +4,13 @@ * Provides TanStack AI-compatible API backed by Durable Streams * with real-time sync and multi-agent support. * - * All derived collections contain fully materialized objects. - * No helper functions needed to access data. + * All derived collections contain fully materialized MessageRow objects. + * Consumers filter message.parts to access specific part types (ToolCallPart, etc.). */ import { createCollection, createOptimisticAction } from '@tanstack/db' import type { Transaction } from '@tanstack/db' -import type { UIMessage, AnyClientTool } from '@tanstack/ai' -import type { ChunkRow } from './schema' +import type { UIMessage, AnyClientTool, ToolCallPart } from '@tanstack/ai' import type { DurableChatClientOptions, MessageRow, @@ -25,22 +24,20 @@ import type { ApprovalResponseInput, ActorType, } from './types' -import { createSessionDB, getChunkKey, type SessionDB } from './collection' +import { createSessionDB, type SessionDB } from './collection' import { - createCollectedMessagesCollection, createMessagesCollection, createToolCallsCollection, + createPendingApprovalsCollection, createToolResultsCollection, - createApprovalsCollection, createActiveGenerationsCollection, createSessionMetaCollectionOptions, createSessionStatsCollection, createPresenceCollection, createInitialSessionMeta, updateConnectionStatus, - updateSyncProgress, } from './collections' -import { extractTextContent } from './materialize' +import { extractTextContent, messageRowToUIMessage } from './materialize' /** * Unified input for all message optimistic actions. @@ -104,8 +101,7 @@ export class DurableChatClient< // Collections are typed via inference from createCollections() // Created synchronously in constructor - always available - private readonly _collections: ReturnType['collections'] - private readonly _collectedMessages: ReturnType['collectedMessages'] + private readonly _collections: ReturnType private _isConnected = false private _isPaused = false @@ -146,9 +142,7 @@ export class DurableChatClient< }) // Create all collections synchronously (always from _db.collections) - const { collections, collectedMessages } = this.createCollections() - this._collections = collections - this._collectedMessages = collectedMessages + this._collections = this.createCollections() // Initialize session metadata this._collections.sessionMeta.insert( @@ -168,8 +162,9 @@ export class DurableChatClient< /** * Create all derived collections from the chunks collection. * - * This implements the live query pipeline pattern: - * chunks → collectedMessages → messages (and other derived collections) + * Pipeline architecture: + * - chunks → (subquery) → messages (root materialized collection) + * - Derived collections filter messages via .fn.where() on parts * * CRITICAL: Materialization happens inside fn.select(). No imperative code * outside this pattern. @@ -179,50 +174,37 @@ export class DurableChatClient< // Note: rawPresence contains per-device records; we expose aggregated presence const { chunks, presence: rawPresence, agents } = this._db.collections - // Stage 1: Create collected messages (intermediate - groups by messageId) - const collectedMessages = createCollectedMessagesCollection({ - sessionId: this.sessionId, - chunksCollection: chunks, - }) - - // Stage 2: Create materialized messages collection + // Root materialized collection: chunks → messages + // Uses inline subquery for chunk aggregation const messages = createMessagesCollection({ - sessionId: this.sessionId, - collectedMessagesCollection: collectedMessages, + chunksCollection: chunks, }) - // Derive tool calls from collected messages + // Derived collections filter on message parts (lazy evaluation) const toolCalls = createToolCallsCollection({ - sessionId: this.sessionId, - collectedMessagesCollection: collectedMessages, + messagesCollection: messages, }) - // Derive tool results from collected messages - const toolResults = createToolResultsCollection({ - sessionId: this.sessionId, - collectedMessagesCollection: collectedMessages, + const pendingApprovals = createPendingApprovalsCollection({ + messagesCollection: messages, }) - // Derive approvals from collected messages - const approvals = createApprovalsCollection({ - sessionId: this.sessionId, - collectedMessagesCollection: collectedMessages, + const toolResults = createToolResultsCollection({ + messagesCollection: messages, }) - // Derive active generations from messages const activeGenerations = createActiveGenerationsCollection({ - sessionId: this.sessionId, messagesCollection: messages, }) - // Create session metadata collection (local state) + // Session metadata collection (local state) const sessionMeta = createCollection( createSessionMetaCollectionOptions({ sessionId: this.sessionId, }) ) - // Create session statistics collection (derived from chunks) + // Session statistics collection (aggregated from chunks) const sessionStats = createSessionStatsCollection({ sessionId: this.sessionId, chunksCollection: chunks, @@ -236,19 +218,16 @@ export class DurableChatClient< }) return { - collections: { - chunks, - presence, - agents, - messages, - toolCalls, - toolResults, - approvals, - activeGenerations, - sessionMeta, - sessionStats, - }, - collectedMessages, + chunks, + presence, + agents, + messages, + toolCalls, + pendingApprovals, + toolResults, + activeGenerations, + sessionMeta, + sessionStats, } } @@ -261,10 +240,7 @@ export class DurableChatClient< * Messages are accessed directly from the materialized collection. */ get messages(): UIMessage[] { - // Convert MessageRow to UIMessage - return [...this._collections.messages.values()].map((row) => - this.messageRowToUIMessage(row) - ) + return [...this._collections.messages.values()].map(messageRowToUIMessage) } /** @@ -390,42 +366,24 @@ export class DurableChatClient< * Create the unified optimistic action for all message types. * Handles user, assistant, and system messages with the same pattern. * - * IMPORTANT: We insert into the chunks collection (not the messages collection) - * because messages is a derived collection from a live query pipeline. Inserting - * directly into a derived collection causes TanStack DB reconciliation bugs where - * synced data becomes invisible while the optimistic mutation is pending. - * - * By inserting into the chunks collection with the user-message format, the - * optimistic row flows through the normal pipeline: chunks → collectedMessages → messages. + * Optimistic updates insert into the messages collection directly. + * This ensures the optimistic state propagates to all derived collections + * (toolCalls, pendingApprovals, toolResults, activeGenerations). */ private createMessageAction() { return createOptimisticAction({ onMutate: ({ content, messageId, role }) => { - // For optimistic inserts, we use seq=0 since user messages are single-chunk. - // The key format is `${messageId}:${seq}`. - const seq = 0 - const id = getChunkKey(messageId, seq) - const createdAt = new Date() - // Insert into chunks collection with user-message format. - // This flows through the live query pipeline: chunks → collectedMessages → messages - this._collections.chunks.insert({ - id, - messageId, - actorId: this.actorId, + // Insert into messages collection directly + // This propagates to all derived collections + this._collections.messages.insert({ + id: messageId, role, - chunk: JSON.stringify({ - type: 'user-message', - message: { - id: messageId, - role, - parts: [{ type: 'text' as const, content }], - createdAt: createdAt.toISOString(), - }, - }), - createdAt: createdAt.toISOString(), - seq, + parts: [{ type: 'text' as const, content }], + actorId: this.actorId, + isComplete: true, + createdAt, }) }, mutationFn: async ({ content, messageId, role, agent }) => { @@ -536,21 +494,28 @@ export class DurableChatClient< /** * Create the optimistic action for adding tool results. * - * Uses client-generated messageId for predictable tool result IDs, - * enabling proper optimistic updates. + * Inserts a new message with a ToolResultPart into the messages collection. + * Uses client-generated messageId for predictable IDs. */ private createAddToolResultAction() { return createOptimisticAction({ onMutate: ({ messageId, toolCallId, output, error }) => { - const resultId = `${messageId}:${toolCallId}` - this._collections.toolResults.insert({ - id: resultId, - toolCallId, - messageId, - output, - error: error ?? null, + const createdAt = new Date() + + // Insert a new message with tool-result part + this._collections.messages.insert({ + id: messageId, + role: 'assistant', + parts: [{ + type: 'tool-result' as const, + toolCallId, + content: typeof output === 'string' ? output : JSON.stringify(output), + state: error ? 'error' as const : 'complete' as const, + ...(error && { error }), + }], actorId: this.actorId, - createdAt: new Date(), + isComplete: true, + createdAt, }) }, mutationFn: async ({ messageId, toolCallId, output, error }) => { @@ -586,21 +551,27 @@ export class DurableChatClient< /** * Create the optimistic action for approval responses. * - * Note: We use optimistic updates for approvals since we're updating - * an existing row (not inserting). The approval ID is known client-side. - * The optimistic update provides instant feedback while the server - * processes the response. + * Finds the message containing the tool call with the approval and updates + * the approval.approved field. This propagates to pendingApprovals collection. */ private createApprovalResponseAction() { return createOptimisticAction({ onMutate: ({ id, approved }) => { - const approval = this._collections.approvals.get(id) - if (approval) { - this._collections.approvals.update(id, (draft) => { - draft.status = approved ? 'approved' : 'denied' - draft.respondedBy = this.actorId - draft.respondedAt = new Date() - }) + // Find the message containing this approval + for (const message of this._collections.messages.values()) { + for (const part of message.parts) { + if (part.type === 'tool-call' && part.approval?.id === id) { + // Update the message with the approval response + this._collections.messages.update(message.id, (draft) => { + for (const p of draft.parts) { + if (p.type === 'tool-call' && p.approval?.id === id) { + (p as ToolCallPart).approval!.approved = approved + } + } + }) + return + } + } } }, mutationFn: async ({ id, approved }) => { @@ -825,22 +796,6 @@ export class DurableChatClient< // Private Helpers // ═══════════════════════════════════════════════════════════════════════ - /** - * Convert MessageRow to UIMessage. - * - * Includes actorId for avatar display (agent ID for assistant messages, - * user ID for user messages). - */ - private messageRowToUIMessage(row: MessageRow): UIMessage & { actorId: string } { - return { - id: row.id, - role: row.role as 'user' | 'assistant', - parts: row.parts, - createdAt: row.createdAt, - actorId: row.actorId, - } - } - /** * Update session metadata. */ diff --git a/packages/durable-session/src/collections/active-generations.ts b/packages/durable-session/src/collections/active-generations.ts index 3bbdfba..83f52e7 100644 --- a/packages/durable-session/src/collections/active-generations.ts +++ b/packages/durable-session/src/collections/active-generations.ts @@ -19,8 +19,6 @@ import type { MessageRow, ActiveGenerationRow } from '../types' * Options for creating an active generations collection. */ export interface ActiveGenerationsCollectionOptions { - /** Session identifier */ - sessionId: string /** Messages collection to derive from */ messagesCollection: Collection } @@ -72,7 +70,6 @@ export function createActiveGenerationsCollection( // Filter messages for incomplete ones and transform to ActiveGenerationRow // Order by createdAt to ensure chronological ordering - // startSync: true ensures the collection starts syncing immediately. return createLiveQueryCollection({ query: (q) => q @@ -80,6 +77,5 @@ export function createActiveGenerationsCollection( .orderBy(({ message }) => message.createdAt, 'asc') .fn.where(({ message }) => !message.isComplete) .fn.select(({ message }) => messageToActiveGeneration(message)), - startSync: true, }) } diff --git a/packages/durable-session/src/collections/approvals.ts b/packages/durable-session/src/collections/approvals.ts deleted file mode 100644 index 29453f4..0000000 --- a/packages/durable-session/src/collections/approvals.ts +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Approvals collection - two-stage derived pipeline. - * - * Tracks approval requests and responses for tool calls that require user authorization. - * Derived from the collectedMessages intermediate collection. - * - * This follows the pattern: aggregate first → materialize second - */ - -import { createLiveQueryCollection } from '@tanstack/db' -import type { Collection } from '@tanstack/db' -import type { ApprovalRow } from '../types' -import { extractApprovals } from '../materialize' -import type { CollectedMessageRows } from './messages' -import type { ChunkRow } from '../schema' - -// ============================================================================ -// Approvals Collection -// ============================================================================ - -/** - * Options for creating an approvals collection. - */ -export interface ApprovalsCollectionOptions { - /** Session identifier */ - sessionId: string - /** Collected messages collection (intermediate from messages pipeline) */ - collectedMessagesCollection: Collection -} - -/** - * Creates the approvals collection from collected messages. - * - * Uses fn.select to extract approvals from each message's collected rows, - * then flattens them into individual ApprovalRow entries. - * - * Approval lifecycle: - * - pending: Approval requested, waiting for user response - * - approved: User approved the tool call - * - denied: User denied the tool call - * - * @example - * ```typescript - * const approvals = createApprovalsCollection({ - * sessionId: 'my-session', - * collectedMessagesCollection, - * }) - * - * // Access approvals directly - * for (const approval of approvals.values()) { - * console.log(approval.id, approval.status, approval.toolCallId) - * } - * - * // Or filter for pending approvals - * const pending = [...approvals.values()].filter(a => a.status === 'pending') - * ``` - */ -export function createApprovalsCollection( - options: ApprovalsCollectionOptions -): Collection { - const { collectedMessagesCollection } = options - - // Extract approvals from each message's collected rows - // fn.select can return an array which will be flattened - // Order by startedAt to ensure chronological message ordering - // startSync: true ensures the collection starts syncing immediately. - return createLiveQueryCollection({ - query: (q) => - q - .from({ collected: collectedMessagesCollection }) - .orderBy(({ collected }) => collected.startedAt, 'asc') - .fn.select(({ collected }) => extractApprovals(collected.rows)), - startSync: true, - }) -} diff --git a/packages/durable-session/src/collections/index.ts b/packages/durable-session/src/collections/index.ts index aef6b6a..ba30662 100644 --- a/packages/durable-session/src/collections/index.ts +++ b/packages/durable-session/src/collections/index.ts @@ -1,42 +1,23 @@ /** * Collection exports for @electric-sql/durable-session * - * All collections follow the two-stage pipeline pattern: - * 1. Aggregate stage: groupBy + collect to gather rows - * 2. Materialize stage: fn.select to transform into final types + * Pipeline architecture: + * - chunks → (subquery) → messages (root materialized collection) + * - Derived collections filter messages via .fn.where() on parts * - * Collections contain fully materialized objects - no helper functions needed. + * All derived collections return MessageRow[], preserving full message context. + * Consumers filter message.parts to access specific part types (ToolCallPart, etc.). */ -// Messages collection (two-stage pipeline) +// Messages collection (root) and derived collections export { - createCollectedMessagesCollection, createMessagesCollection, - createMessagesPipeline, - type CollectedMessageRows, - type CollectedMessagesCollectionOptions, - type MessagesCollectionOptions, - type MessagesPipelineOptions, - type MessagesPipelineResult, -} from './messages' - -// Tool calls collection (derived from collectedMessages) -export { createToolCallsCollection, - type ToolCallsCollectionOptions, -} from './tool-calls' - -// Tool results collection (derived from collectedMessages) -export { + createPendingApprovalsCollection, createToolResultsCollection, - type ToolResultsCollectionOptions, -} from './tool-results' - -// Approvals collection (derived from collectedMessages) -export { - createApprovalsCollection, - type ApprovalsCollectionOptions, -} from './approvals' + type MessagesCollectionOptions, + type DerivedMessagesCollectionOptions, +} from './messages' // Active generations collection (derived from messages) export { @@ -44,7 +25,7 @@ export { type ActiveGenerationsCollectionOptions, } from './active-generations' -// Session metadata collection (local state - not derived) +// Session metadata collection (local state) export { createSessionMetaCollectionOptions, createInitialSessionMeta, @@ -53,7 +34,7 @@ export { type SessionMetaCollectionOptions, } from './session-meta' -// Session statistics collection (two-stage pipeline) +// Session statistics collection (aggregated from chunks) export { createSessionStatsCollection, computeSessionStats, diff --git a/packages/durable-session/src/collections/messages.ts b/packages/durable-session/src/collections/messages.ts index 30af65e..96eecf0 100644 --- a/packages/durable-session/src/collections/messages.ts +++ b/packages/durable-session/src/collections/messages.ts @@ -1,10 +1,13 @@ /** - * Messages collection - two-stage derived pipeline. + * Messages collection - core live query pipeline. * - * Stage 1: collectedMessages - groups chunk rows by messageId using collect() - * Stage 2: messages - materializes MessageRow objects using fn.select() + * Architecture: + * - chunks → (subquery: groupBy + collect) → messages + * - Derived collections use .fn.where() to filter by message parts * - * This follows the pattern: aggregate first → materialize second + * The subquery inlines the chunk aggregation, eliminating the need for + * a separate intermediate collection. Derived collections are lazy - + * filtering overhead only incurred if the collection is accessed. * * CRITICAL: Materialization happens INSIDE fn.select(). No imperative code * outside this pattern. @@ -17,202 +20,205 @@ import { minStr, } from '@tanstack/db' import type { Collection } from '@tanstack/db' +import type { ToolCallPart } from '@tanstack/ai' import type { ChunkRow } from '../schema' import type { MessageRow } from '../types' import { materializeMessage } from '../materialize' // ============================================================================ -// Stage 1: Collected Messages (intermediate) +// Messages Collection (Root) // ============================================================================ /** - * Intermediate type - collected rows grouped by messageId. - * This is the output of Stage 1 and input to Stage 2. - */ -export interface CollectedMessageRows { - messageId: string - rows: ChunkRow[] - /** - * The earliest createdAt timestamp (ISO 8601 string) among collected rows. - * Used for ordering messages chronologically. - * ISO 8601 strings sort lexicographically correctly. - */ - startedAt: string | null | undefined - /** - * Number of rows in this group. - * Used as a discriminator to force change detection when new rows are added. - * Without this, TanStack DB might not detect that the rows array has changed - * since the messageId key and startedAt remain the same. - */ - rowCount: number -} - -/** - * Options for creating a collected messages collection. + * Options for creating a messages collection. */ -export interface CollectedMessagesCollectionOptions { - /** Session identifier */ - sessionId: string +export interface MessagesCollectionOptions { /** Chunks collection from stream-db */ chunksCollection: Collection } /** - * Creates the Stage 1 collection: groups chunk rows by messageId. + * Creates the messages collection with inline subquery for chunk aggregation. + * + * This is the root materialized collection in the live query pipeline. + * All derived collections (toolCalls, pendingApprovals, etc.) derive from this. + * + * The subquery groups chunks by messageId and collects them, then the outer + * query materializes each group into a MessageRow using TanStack AI's + * StreamProcessor. + * + * @example + * ```typescript + * const messages = createMessagesCollection({ + * chunksCollection: db.collections.chunks, + * }) + * + * // Access messages directly + * for (const message of messages.values()) { + * console.log(message.id, message.role, message.parts) + * } * - * This is an intermediate collection - consumers should use the - * messages collection (Stage 2) which materializes the rows. + * // Filter tool calls from message parts + * const toolCalls = message.parts.filter(p => p.type === 'tool-call') + * ``` */ -export function createCollectedMessagesCollection( - options: CollectedMessagesCollectionOptions -): Collection { +export function createMessagesCollection( + options: MessagesCollectionOptions +): Collection { const { chunksCollection } = options - // Use config object form with startSync: true to ensure the collection - // starts syncing immediately upon creation, providing demand for upstream data - const collection = createLiveQueryCollection({ - query: (q) => - q + return createLiveQueryCollection({ + query: (q) => { + // Subquery: group chunks by messageId and collect them + const collected = q .from({ chunk: chunksCollection }) .groupBy(({ chunk }) => chunk.messageId) .select(({ chunk }) => ({ messageId: chunk.messageId, rows: collect(chunk), - // Capture earliest timestamp for message ordering (ISO 8601 strings sort lexicographically) + // Capture earliest timestamp for ordering (ISO 8601 strings sort lexicographically) startedAt: minStr(chunk.createdAt), - // Count rows as a discriminator to force change detection when new rows are added. - // Without this, TanStack DB might not detect that the rows array has changed - // since the messageId key and startedAt remain the same. + // Count as discriminator to force change detection when rows are added rowCount: count(chunk), - })), - getKey: (row) => row.messageId, - startSync: true, - }) + })) - return collection + // Main query: materialize messages from collected chunks + return q + .from({ collected }) + .orderBy(({ collected }) => collected.startedAt, 'asc') + .fn.select(({ collected }) => materializeMessage(collected.rows)) + }, + getKey: (row) => row.id, + }) } // ============================================================================ -// Stage 2: Materialized Messages +// Derived Collections // ============================================================================ /** - * Options for creating a messages collection. + * Options for creating a derived collection from messages. */ -export interface MessagesCollectionOptions { - /** Session identifier */ - sessionId: string - /** Collected messages collection (Stage 1) */ - collectedMessagesCollection: Collection +export interface DerivedMessagesCollectionOptions { + /** Messages collection to derive from */ + messagesCollection: Collection } /** - * Creates the Stage 2 collection: materializes MessageRow from collected rows. + * Creates a collection of messages that contain tool calls. * - * This is the collection that consumers should use - it contains fully - * materialized MessageRow objects, not intermediate collected rows. + * Filters messages where at least one part has type 'tool-call'. + * The collection is lazy - filtering only runs when accessed. * * @example * ```typescript - * // First create the intermediate collection - * const collectedMessages = createCollectedMessagesCollection({ - * sessionId: 'my-session', - * chunksCollection: db.collections.chunks, + * const toolCalls = createToolCallsCollection({ + * messagesCollection: messages, * }) * - * // Then create the materialized messages collection - * const messages = createMessagesCollection({ - * sessionId: 'my-session', - * collectedMessagesCollection: collectedMessages, - * }) - * - * // Access messages directly - no helper functions needed - * for (const message of messages.values()) { - * console.log(message.id, message.role, message.parts) + * for (const message of toolCalls.values()) { + * for (const part of message.parts) { + * if (part.type === 'tool-call') { + * console.log(part.name, part.state, part.arguments) + * } + * } * } * ``` */ -export function createMessagesCollection( - options: MessagesCollectionOptions +export function createToolCallsCollection( + options: DerivedMessagesCollectionOptions ): Collection { - const { collectedMessagesCollection } = options + const { messagesCollection } = options - // Pass query function to createLiveQueryCollection to let it infer types. - // Use config object form to provide explicit getKey - this is required for - // optimistic mutations (insert/update/delete) on derived collections. - // startSync: true ensures the collection starts syncing immediately. - const collection = createLiveQueryCollection({ + return createLiveQueryCollection({ query: (q) => q - .from({ collected: collectedMessagesCollection }) - .orderBy(({ collected }) => collected.startedAt, 'asc') - .fn.select(({ collected }) => materializeMessage(collected.rows)), + .from({ message: messagesCollection }) + .fn.where(({ message }) => + message.parts.some((p): p is ToolCallPart => p.type === 'tool-call') + ) + .orderBy(({ message }) => message.createdAt, 'asc'), getKey: (row) => row.id, - startSync: true, }) - - return collection } -// ============================================================================ -// Combined Factory (convenience) -// ============================================================================ - /** - * Options for creating both stages of the messages pipeline. + * Creates a collection of messages that have pending approval requests. + * + * Filters messages where at least one tool call part has: + * - approval.needsApproval === true + * - approval.approved === undefined (not yet responded) + * + * @example + * ```typescript + * const pending = createPendingApprovalsCollection({ + * messagesCollection: messages, + * }) + * + * for (const message of pending.values()) { + * for (const part of message.parts) { + * if (part.type === 'tool-call' && part.approval?.needsApproval) { + * console.log(`Approval needed for ${part.name}: ${part.approval.id}`) + * } + * } + * } + * ``` */ -export interface MessagesPipelineOptions { - /** Session identifier */ - sessionId: string - /** Chunks collection from stream-db */ - chunksCollection: Collection -} +export function createPendingApprovalsCollection( + options: DerivedMessagesCollectionOptions +): Collection { + const { messagesCollection } = options -/** - * Result of creating the messages pipeline. - */ -export interface MessagesPipelineResult { - /** Stage 1: Collected messages (intermediate - usually not needed directly) */ - collectedMessages: Collection - /** Stage 2: Materialized messages (use this one) */ - messages: Collection + return createLiveQueryCollection({ + query: (q) => + q + .from({ message: messagesCollection }) + .fn.where(({ message }) => + message.parts.some( + (p): p is ToolCallPart => + p.type === 'tool-call' && + p.approval?.needsApproval === true && + p.approval.approved === undefined + ) + ) + .orderBy(({ message }) => message.createdAt, 'asc'), + getKey: (row) => row.id, + }) } /** - * Creates the complete messages pipeline (both stages). + * Creates a collection of messages that contain tool results. * - * This is a convenience function that creates both the intermediate - * collected messages collection and the final materialized messages collection. + * Filters messages where at least one part has type 'tool-result'. * * @example * ```typescript - * const { messages } = createMessagesPipeline({ - * sessionId: 'my-session', - * chunksCollection: db.collections.chunks, + * const results = createToolResultsCollection({ + * messagesCollection: messages, * }) * - * // Access messages directly - * const allMessages = messages.toArray() - * const singleMessage = messages.get('message-id') + * for (const message of results.values()) { + * for (const part of message.parts) { + * if (part.type === 'tool-result') { + * console.log(part.toolCallId, part.content) + * } + * } + * } * ``` */ -export function createMessagesPipeline( - options: MessagesPipelineOptions -): MessagesPipelineResult { - const { sessionId, chunksCollection } = options - - // Stage 1: Create collected messages collection - const collectedMessages = createCollectedMessagesCollection({ - sessionId, - chunksCollection, - }) +export function createToolResultsCollection( + options: DerivedMessagesCollectionOptions +): Collection { + const { messagesCollection } = options - // Stage 2: Create materialized messages collection - const messages = createMessagesCollection({ - sessionId, - collectedMessagesCollection: collectedMessages, + return createLiveQueryCollection({ + query: (q) => + q + .from({ message: messagesCollection }) + .fn.where(({ message }) => + message.parts.some((p) => p.type === 'tool-result') + ) + .orderBy(({ message }) => message.createdAt, 'asc'), + getKey: (row) => row.id, }) - - return { collectedMessages, messages } } - diff --git a/packages/durable-session/src/collections/session-stats.ts b/packages/durable-session/src/collections/session-stats.ts index 433f233..5becb32 100644 --- a/packages/durable-session/src/collections/session-stats.ts +++ b/packages/durable-session/src/collections/session-stats.ts @@ -1,24 +1,19 @@ /** - * Session statistics collection - two-stage derived pipeline. + * Session statistics collection - aggregated from chunks. * - * Computes aggregate statistics from the stream. Uses groupBy + collect - * to gather all rows, then fn.select to compute the stats. + * Computes aggregate statistics from the stream by: + * 1. Collecting all chunks for the session + * 2. Materializing messages and extracting stats from parts * - * This follows the pattern: aggregate first → materialize second + * Uses TanStack AI's MessagePart types for type-safe filtering. */ import { createLiveQueryCollection, collect } from '@tanstack/db' import type { Collection } from '@tanstack/db' +import type { ToolCallPart, ToolResultPart } from '@tanstack/ai' import type { ChunkRow } from '../schema' import type { SessionStatsRow, MessageRow } from '../types' -import { - groupRowsByMessage, - materializeMessage, - extractToolCalls, - extractApprovals, - detectActiveGenerations, - parseChunk, -} from '../materialize' +import { materializeMessage, parseChunk } from '../materialize' // ============================================================================ // Session Stats Collection @@ -67,7 +62,6 @@ export function createSessionStatsCollection( const { sessionId, chunksCollection } = options // Stage 1: Create intermediate collection with collected rows - // startSync: true ensures the collection starts syncing immediately. const collectedRows = createLiveQueryCollection({ query: (q) => q @@ -77,11 +71,9 @@ export function createSessionStatsCollection( sessionId, rows: collect(chunk), })), - startSync: true, }) // Stage 2: Compute stats from collected rows - // startSync: true ensures the collection starts syncing immediately. return createLiveQueryCollection({ query: (q) => q @@ -89,13 +81,32 @@ export function createSessionStatsCollection( .fn.select(({ collected }) => computeSessionStats(collected.sessionId, collected.rows) ), - startSync: true, }) } +/** + * Group chunk rows by messageId. + */ +function groupRowsByMessage(rows: ChunkRow[]): Map { + const grouped = new Map() + + for (const row of rows) { + const existing = grouped.get(row.messageId) + if (existing) { + existing.push(row) + } else { + grouped.set(row.messageId, [row]) + } + } + + return grouped +} + /** * Compute session statistics from chunk rows. * + * Materializes messages and counts parts by type to derive statistics. + * * @param sessionId - Session identifier * @param rows - All chunk rows * @returns Computed statistics @@ -121,31 +132,50 @@ export function computeSessionStats( } } - // Count message types + // Count message types and extract part counts let userMessageCount = 0 let assistantMessageCount = 0 + let toolCallCount = 0 + let pendingApprovalCount = 0 + let activeGenerationCount = 0 let firstMessageAt: Date | null = null let lastMessageAt: Date | null = null for (const msg of messages) { + // Count by role if (msg.role === 'user') { userMessageCount++ } else if (msg.role === 'assistant') { assistantMessageCount++ } + // Track timestamps if (!firstMessageAt || msg.createdAt < firstMessageAt) { firstMessageAt = msg.createdAt } if (!lastMessageAt || msg.createdAt > lastMessageAt) { lastMessageAt = msg.createdAt } - } - // Extract tool calls and approvals - const toolCalls = extractToolCalls(rows) - const approvals = extractApprovals(rows) - const activeGenerations = detectActiveGenerations(grouped) + // Count tool calls and pending approvals from parts + for (const part of msg.parts) { + if (part.type === 'tool-call') { + toolCallCount++ + const toolCallPart = part as ToolCallPart + if ( + toolCallPart.approval?.needsApproval === true && + toolCallPart.approval.approved === undefined + ) { + pendingApprovalCount++ + } + } + } + + // Count active generations (incomplete messages) + if (!msg.isComplete) { + activeGenerationCount++ + } + } // Extract token usage from chunks const { totalTokens, promptTokens, completionTokens } = extractTokenUsage(rows) @@ -155,12 +185,12 @@ export function computeSessionStats( messageCount: messages.length, userMessageCount, assistantMessageCount, - toolCallCount: toolCalls.length, - approvalCount: approvals.length, + toolCallCount, + approvalCount: pendingApprovalCount, totalTokens, promptTokens, completionTokens, - activeGenerationCount: activeGenerations.length, + activeGenerationCount, firstMessageAt, lastMessageAt, } diff --git a/packages/durable-session/src/collections/tool-calls.ts b/packages/durable-session/src/collections/tool-calls.ts deleted file mode 100644 index 4de7a15..0000000 --- a/packages/durable-session/src/collections/tool-calls.ts +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Tool calls collection - two-stage derived pipeline. - * - * Stage 1: collectedByMessage - groups stream rows by messageId using collect() - * Stage 2: toolCalls - extracts ToolCallRow objects using fn.select, one per tool call - * - * This follows the pattern: aggregate first → materialize second - * - * Note: Tool calls are extracted from the collectedMessages intermediate collection - * since tool call chunks are associated with messages. - */ - -import { createLiveQueryCollection } from '@tanstack/db' -import type { Collection } from '@tanstack/db' -import type { ToolCallRow } from '../types' -import { extractToolCalls } from '../materialize' -import type { CollectedMessageRows } from './messages' -import type { ChunkRow } from '../schema' - -// ============================================================================ -// Tool Calls Collection -// ============================================================================ - -/** - * Options for creating a tool calls collection. - */ -export interface ToolCallsCollectionOptions { - /** Session identifier */ - sessionId: string - /** Collected messages collection (intermediate from messages pipeline) */ - collectedMessagesCollection: Collection -} - -/** - * Creates the tool calls collection from collected messages. - * - * Uses fn.select to extract tool calls from each message's collected rows, - * then flattens them into individual ToolCallRow entries. - * - * @example - * ```typescript - * const toolCalls = createToolCallsCollection({ - * sessionId: 'my-session', - * collectedMessagesCollection, - * }) - * - * // Access tool calls directly - * for (const toolCall of toolCalls.values()) { - * console.log(toolCall.id, toolCall.name, toolCall.state) - * } - * ``` - */ -export function createToolCallsCollection( - options: ToolCallsCollectionOptions -): Collection { - const { collectedMessagesCollection } = options - - // Extract tool calls from each message's collected rows - // fn.select can return an array which will be flattened - // Order by startedAt to ensure chronological message ordering - // startSync: true ensures the collection starts syncing immediately. - return createLiveQueryCollection({ - query: (q) => - q - .from({ collected: collectedMessagesCollection }) - .orderBy(({ collected }) => collected.startedAt, 'asc') - .fn.select(({ collected }) => extractToolCalls(collected.rows)), - startSync: true, - }) -} diff --git a/packages/durable-session/src/collections/tool-results.ts b/packages/durable-session/src/collections/tool-results.ts deleted file mode 100644 index 11dacab..0000000 --- a/packages/durable-session/src/collections/tool-results.ts +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Tool results collection - two-stage derived pipeline. - * - * Extracts tool execution results from stream rows, derived from the - * collectedMessages intermediate collection. - * - * This follows the pattern: aggregate first → materialize second - */ - -import { createLiveQueryCollection } from '@tanstack/db' -import type { Collection } from '@tanstack/db' -import type { ToolResultRow } from '../types' -import { extractToolResults } from '../materialize' -import type { CollectedMessageRows } from './messages' -import type { ChunkRow } from '../schema' - -// ============================================================================ -// Tool Results Collection -// ============================================================================ - -/** - * Options for creating a tool results collection. - */ -export interface ToolResultsCollectionOptions { - /** Session identifier */ - sessionId: string - /** Collected messages collection (intermediate from messages pipeline) */ - collectedMessagesCollection: Collection -} - -/** - * Creates the tool results collection from collected messages. - * - * Uses fn.select to extract tool results from each message's collected rows, - * then flattens them into individual ToolResultRow entries. - * - * @example - * ```typescript - * const toolResults = createToolResultsCollection({ - * sessionId: 'my-session', - * collectedMessagesCollection, - * }) - * - * // Access tool results directly - * for (const result of toolResults.values()) { - * console.log(result.toolCallId, result.output) - * } - * ``` - */ -export function createToolResultsCollection( - options: ToolResultsCollectionOptions -): Collection { - const { collectedMessagesCollection } = options - - // Extract tool results from each message's collected rows - // fn.select can return an array which will be flattened - // Order by startedAt to ensure chronological message ordering - // startSync: true ensures the collection starts syncing immediately. - return createLiveQueryCollection({ - query: (q) => - q - .from({ collected: collectedMessagesCollection }) - .orderBy(({ collected }) => collected.startedAt, 'asc') - .fn.select(({ collected }) => extractToolResults(collected.rows)), - startSync: true, - }) -} diff --git a/packages/durable-session/src/index.ts b/packages/durable-session/src/index.ts index 4eb8692..dcfe6b6 100644 --- a/packages/durable-session/src/index.ts +++ b/packages/durable-session/src/index.ts @@ -10,8 +10,11 @@ * - Multi-agent support with webhook registration * - Reactive collections for custom UI needs * - * All derived collections contain fully materialized objects - no helper - * functions needed. Access data directly from collections. + * Architecture: + * - chunks → (subquery) → messages (root materialized collection) + * - Derived collections filter messages via .fn.where() on parts + * - All collections return MessageRow[], preserving full message context + * - Consumers filter message.parts to access specific part types * * @example * ```typescript @@ -28,14 +31,28 @@ * await client.sendMessage('Hello!') * console.log(client.messages) * - * // Access collections directly (no helper functions needed) + * // Access collections directly * for (const message of client.collections.messages.values()) { * console.log(message.id, message.role, message.parts) * } * - * // Filter with standard collection methods - * const pending = [...client.collections.approvals.values()] - * .filter(a => a.status === 'pending') + * // Filter tool calls from message parts + * for (const message of client.collections.toolCalls.values()) { + * for (const part of message.parts) { + * if (part.type === 'tool-call') { + * console.log(part.name, part.state, part.arguments) + * } + * } + * } + * + * // Check for pending approvals + * for (const message of client.collections.pendingApprovals.values()) { + * for (const part of message.parts) { + * if (part.type === 'tool-call' && part.approval?.needsApproval) { + * console.log(`Approval needed: ${part.name}`) + * } + * } + * } * ``` * * @packageDocumentation @@ -78,18 +95,16 @@ export type { MessageRole, MessageRow, + // Re-exported TanStack AI types for consumer convenience + MessagePart, + TextPart, + ToolCallPart, + ToolResultPart, + ThinkingPart, + // Active generation types ActiveGenerationRow, - // Tool types - ToolCallState, - ToolCallRow, - ToolResultRow, - - // Approval types - ApprovalStatus, - ApprovalRow, - // Session types ConnectionStatus, SessionMetaRow, @@ -127,28 +142,18 @@ export { type SessionDB, } from './collection' +// ============================================================================ +// Collection Factories +// ============================================================================ + export { - // Messages collection (two-stage pipeline) - createCollectedMessagesCollection, + // Messages collection (root) and derived collections createMessagesCollection, - createMessagesPipeline, - type CollectedMessageRows, - type CollectedMessagesCollectionOptions, - type MessagesCollectionOptions, - type MessagesPipelineOptions, - type MessagesPipelineResult, - - // Tool calls collection createToolCallsCollection, - type ToolCallsCollectionOptions, - - // Tool results collection + createPendingApprovalsCollection, createToolResultsCollection, - type ToolResultsCollectionOptions, - - // Approvals collection - createApprovalsCollection, - type ApprovalsCollectionOptions, + type MessagesCollectionOptions, + type DerivedMessagesCollectionOptions, // Active generations collection createActiveGenerationsCollection, @@ -184,13 +189,8 @@ export { export { materializeMessage, parseChunk, - extractToolCalls, - extractToolResults, - extractApprovals, - detectActiveGenerations, - groupRowsByMessage, - materializeAllMessages, extractTextContent, isUserMessage, isAssistantMessage, + messageRowToUIMessage, } from './materialize' diff --git a/packages/durable-session/src/materialize.ts b/packages/durable-session/src/materialize.ts index e24f373..0652900 100644 --- a/packages/durable-session/src/materialize.ts +++ b/packages/durable-session/src/materialize.ts @@ -2,64 +2,42 @@ * Message materialization from stream chunks. * * Handles two formats: - * 1. User messages: Single row with {type: 'user-message', message: UIMessage} + * 1. User messages: Single row with {type: 'whole-message', message: UIMessage} * 2. Assistant messages: Multiple rows with TanStack AI StreamChunks * - * Chunk processing for assistant messages is delegated to TanStack AI's StreamProcessor. + * Chunk processing is delegated to TanStack AI's StreamProcessor, which handles: + * - Text content accumulation + * - Tool call parsing (including arguments streaming) + * - Tool result handling + * - Approval request tracking + * + * The output is a MessageRow with parts using TanStack AI's MessagePart types. + * Derived collections filter on these parts rather than using separate extraction. */ import { StreamProcessor } from '@tanstack/ai' -import type { - StreamChunk, - ContentStreamChunk, - ToolCallStreamChunk, - ToolResultStreamChunk, - DoneStreamChunk, - ApprovalRequestedStreamChunk, -} from '@tanstack/ai' +import type { DoneStreamChunk, StreamChunk, UIMessage } from '@tanstack/ai' import type { ChunkRow } from './schema' import type { MessageRow, MessageRole, - ToolCallRow, - ToolCallState, - ToolResultRow, - ApprovalRow, - ApprovalStatus, - ActiveGenerationRow, - UserMessageChunk, + WholeMessageChunk, DurableStreamChunk, } from './types' // ============================================================================ -// Type Guards for StreamChunk +// Type Guards // ============================================================================ -function isContentChunk(chunk: StreamChunk): chunk is ContentStreamChunk { - return chunk.type === 'content' -} - -function isToolCallChunk(chunk: StreamChunk): chunk is ToolCallStreamChunk { - return chunk.type === 'tool_call' -} - -function isToolResultChunk(chunk: StreamChunk): chunk is ToolResultStreamChunk { - return chunk.type === 'tool_result' -} - function isDoneChunk(chunk: StreamChunk): chunk is DoneStreamChunk { return chunk.type === 'done' } -function isApprovalRequestedChunk(chunk: StreamChunk): chunk is ApprovalRequestedStreamChunk { - return chunk.type === 'approval-requested' -} - /** - * Type guard for UserMessageChunk. + * Type guard for WholeMessageChunk. */ -function isUserMessageChunk(chunk: DurableStreamChunk | null): chunk is UserMessageChunk { - return chunk !== null && chunk.type === 'user-message' +function isWholeMessageChunk(chunk: DurableStreamChunk | null): chunk is WholeMessageChunk { + return chunk !== null && chunk.type === 'whole-message' } // ============================================================================ @@ -81,12 +59,12 @@ export function parseChunk(chunkJson: string): DurableStreamChunk | null { } /** - * Materialize a user message from a single row. + * Materialize a whole message from a single row. * User messages are stored as complete UIMessage objects. */ -function materializeUserMessage( +function materializeWholeMessage( row: ChunkRow, - chunk: UserMessageChunk + chunk: WholeMessageChunk ): MessageRow { const { message } = chunk @@ -126,8 +104,8 @@ function materializeAssistantMessage(rows: ChunkRow[]): MessageRow { continue } - // Skip user-message chunks (shouldn't be in assistant messages, but guard) - if (isUserMessageChunk(chunk)) continue + // Skip whole-message chunks (shouldn't be in assistant messages, but guard) + if (isWholeMessageChunk(chunk)) continue // Process TanStack AI StreamChunk try { @@ -170,7 +148,7 @@ function materializeAssistantMessage(rows: ChunkRow[]): MessageRow { * Materialize a MessageRow from collected chunk rows. * * Handles two formats: - * 1. User messages: Single row with {type: 'user-message', message: UIMessage} + * 1. User messages: Single row with {type: 'whole-message', message: UIMessage} * 2. Assistant messages: Multiple rows with TanStack AI StreamChunks * * @param rows - Chunk rows for a single message @@ -189,306 +167,15 @@ export function materializeMessage(rows: ChunkRow[]): MessageRow { throw new Error('Failed to parse first chunk') } - // Check if this is a complete user message - if (isUserMessageChunk(firstChunk)) { - return materializeUserMessage(sorted[0], firstChunk) + // Check if this is a whole message + if (isWholeMessageChunk(firstChunk)) { + return materializeWholeMessage(sorted[0], firstChunk) } // Otherwise, process as streamed assistant message return materializeAssistantMessage(sorted) } -// ============================================================================ -// Tool Call Extraction -// ============================================================================ - -/** - * Extract tool calls from chunk rows. - * - * @param rows - Chunk rows to extract from - * @returns Array of tool call rows - */ -export function extractToolCalls(rows: ChunkRow[]): ToolCallRow[] { - const toolCallMap = new Map() - - for (const row of rows) { - const chunk = parseChunk(row.chunk) - if (!chunk) continue - - // Skip user-message chunks (not relevant for tool calls) - if (isUserMessageChunk(chunk)) continue - - // Handle tool_call chunks - const streamChunk = chunk as StreamChunk - if (isToolCallChunk(streamChunk)) { - const toolCallId = streamChunk.toolCall.id - if (!toolCallId) continue - - const existing = toolCallMap.get(toolCallId) - - if (existing) { - // Update existing tool call - accumulate arguments - existing.arguments += streamChunk.toolCall.function.arguments - if (streamChunk.toolCall.function.name) { - existing.name = streamChunk.toolCall.function.name - } - } else { - // Create new tool call - toolCallMap.set(toolCallId, { - id: toolCallId, - messageId: row.messageId, - name: streamChunk.toolCall.function.name ?? '', - arguments: streamChunk.toolCall.function.arguments ?? '', - input: null, - state: 'pending' as ToolCallState, - actorId: row.actorId, - createdAt: new Date(row.createdAt), - }) - } - } - - // Check for tool-input-available to mark as executing - if (streamChunk.type === 'tool-input-available') { - const toolInputChunk = streamChunk as { - toolCallId?: string - input?: unknown - } - if (toolInputChunk.toolCallId) { - const toolCall = toolCallMap.get(toolInputChunk.toolCallId) - if (toolCall) { - toolCall.input = toolInputChunk.input ?? null - toolCall.state = 'executing' - } - } - } - } - - // Try to parse arguments as input for any tool calls that don't have input yet - for (const toolCall of toolCallMap.values()) { - if (toolCall.input === null && toolCall.arguments) { - try { - toolCall.input = JSON.parse(toolCall.arguments) - } catch { - // Keep input as null if parsing fails - } - } - } - - return Array.from(toolCallMap.values()) -} - -// ============================================================================ -// Tool Result Extraction -// ============================================================================ - -/** - * Extract tool results from chunk rows. - * - * @param rows - Chunk rows to extract from - * @returns Array of tool result rows - */ -export function extractToolResults(rows: ChunkRow[]): ToolResultRow[] { - const results: ToolResultRow[] = [] - - for (const row of rows) { - const chunk = parseChunk(row.chunk) - if (!chunk) continue - - // Skip user-message chunks - if (isUserMessageChunk(chunk)) continue - - const streamChunk = chunk as StreamChunk - if (isToolResultChunk(streamChunk)) { - results.push({ - id: `${row.messageId}:${streamChunk.toolCallId}`, - toolCallId: streamChunk.toolCallId, - messageId: row.messageId, - output: streamChunk.content ?? null, - error: null, - actorId: row.actorId, - createdAt: new Date(row.createdAt), - }) - } - } - - return results -} - -// ============================================================================ -// Approval Extraction -// ============================================================================ - -/** - * Extract approvals from chunk rows. - * - * @param rows - Chunk rows to extract from - * @returns Array of approval rows - */ -export function extractApprovals(rows: ChunkRow[]): ApprovalRow[] { - const approvalMap = new Map() - - for (const row of rows) { - const chunk = parseChunk(row.chunk) - if (!chunk) continue - - // Skip user-message chunks - if (isUserMessageChunk(chunk)) continue - - const streamChunk = chunk as StreamChunk - - // Handle approval-requested chunks - if (isApprovalRequestedChunk(streamChunk)) { - const approvalId = streamChunk.approval.id - if (approvalId) { - approvalMap.set(approvalId, { - id: approvalId, - toolCallId: streamChunk.toolCallId ?? '', - messageId: row.messageId, - status: 'pending' as ApprovalStatus, - requestedBy: row.actorId, - requestedAt: new Date(row.createdAt), - respondedBy: null, - respondedAt: null, - }) - } - } - - // Note: approval responses would typically come through a separate mechanism - // (e.g., a POST to the proxy endpoint), not as stream chunks - } - - return Array.from(approvalMap.values()) -} - -// ============================================================================ -// Active Generation Detection -// ============================================================================ - -/** - * Check if message rows indicate a complete message. - * User messages are always complete. Assistant messages are complete if they have a 'done' chunk. - */ -function isMessageComplete(rows: ChunkRow[]): boolean { - if (rows.length === 0) return false - - const sorted = [...rows].sort((a, b) => a.seq - b.seq) - const firstChunk = parseChunk(sorted[0].chunk) - - // User messages are always complete - if (firstChunk && isUserMessageChunk(firstChunk)) { - return true - } - - // For assistant messages, check for done, stop, or error chunk - for (const row of rows) { - const chunk = parseChunk(row.chunk) - if (!chunk) continue - - if (isUserMessageChunk(chunk)) continue - - const streamChunk = chunk as StreamChunk - if (isDoneChunk(streamChunk)) { - return true - } - // Also check for stop/error chunks (stop is from our proxy, not in TanStack AI types) - const chunkType = (chunk as { type: string }).type - if (chunkType === 'stop' || chunkType === 'error') { - return true - } - // Also check legacy message-end for backward compatibility - if ((chunk as any).type === 'message-end') { - return true - } - } - - return false -} - -/** - * Detect active generations (incomplete messages) from rows. - * - * @param rowsByMessage - Map of messageId to rows - * @returns Array of active generation rows - */ -export function detectActiveGenerations( - rowsByMessage: Map -): ActiveGenerationRow[] { - const activeGenerations: ActiveGenerationRow[] = [] - - for (const [messageId, rows] of rowsByMessage) { - if (rows.length === 0) continue - - // Check if message is complete - if (isMessageComplete(rows)) continue - - // Sort by seq to find first and last - const sorted = [...rows].sort((a, b) => a.seq - b.seq) - const first = sorted[0] - const last = sorted[sorted.length - 1] - - activeGenerations.push({ - messageId, - actorId: first.actorId, - startedAt: new Date(first.createdAt), - lastChunkSeq: last.seq, - lastChunkAt: new Date(last.createdAt), - }) - } - - return activeGenerations -} - -// ============================================================================ -// Message Grouping -// ============================================================================ - -/** - * Group chunk rows by messageId. - * - * @param rows - Chunk rows to group - * @returns Map of messageId to rows - */ -export function groupRowsByMessage( - rows: ChunkRow[] -): Map { - const grouped = new Map() - - for (const row of rows) { - const existing = grouped.get(row.messageId) - if (existing) { - existing.push(row) - } else { - grouped.set(row.messageId, [row]) - } - } - - return grouped -} - -/** - * Materialize all messages from chunk rows. - * - * @param rows - All chunk rows - * @returns Array of materialized message rows, sorted by createdAt - */ -export function materializeAllMessages(rows: ChunkRow[]): MessageRow[] { - const grouped = groupRowsByMessage(rows) - const messages: MessageRow[] = [] - - for (const [, messageRows] of grouped) { - try { - messages.push(materializeMessage(messageRows)) - } catch (error) { - // Skip invalid messages but log for debugging - console.warn('Failed to materialize message:', error) - } - } - - // Sort by createdAt for chronological order - messages.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()) - - return messages -} // ============================================================================ // Content Extraction Helpers @@ -526,3 +213,38 @@ export function isUserMessage(row: MessageRow): boolean { export function isAssistantMessage(row: MessageRow): boolean { return row.role === 'assistant' } + +// ============================================================================ +// UIMessage Conversion +// ============================================================================ + +/** + * Convert a MessageRow to a TanStack AI UIMessage. + * + * This is a pure transformation function that maps the internal MessageRow + * representation to the public UIMessage interface expected by TanStack AI. + * + * @param row - MessageRow from the messages collection + * @returns UIMessage compatible with TanStack AI + * + * @example + * ```typescript + * import { messageRowToUIMessage } from '@electric-sql/ai-db' + * import { useLiveQuery } from '@tanstack/react-db' + * + * function Chat({ messagesCollection }) { + * const { data: messageRows } = useLiveQuery(messagesCollection) + * const messages = messageRows?.map(messageRowToUIMessage) ?? [] + * // messages is now UIMessage[] compatible with TanStack AI + * } + * ``` + */ +export function messageRowToUIMessage(row: MessageRow): UIMessage & { actorId: string } { + return { + id: row.id, + role: row.role as 'user' | 'assistant', + parts: row.parts, + createdAt: row.createdAt, + actorId: row.actorId, + } +} diff --git a/packages/durable-session/src/schema.ts b/packages/durable-session/src/schema.ts index c824ae2..23a2840 100644 --- a/packages/durable-session/src/schema.ts +++ b/packages/durable-session/src/schema.ts @@ -54,7 +54,7 @@ export const chunkValueSchema = z.object({ actorId: z.string(), /** Message role - aligns with TanStack AI UIMessage.role */ role: z.enum(['user', 'assistant', 'system']), - /** JSON-encoded chunk content - could be UserMessageChunk or StreamChunk */ + /** JSON-encoded chunk content - could be WholeMessageChunk or StreamChunk */ chunk: z.string(), /** Sequence number within message - monotonically increasing per messageId */ seq: z.number(), diff --git a/packages/durable-session/src/types.ts b/packages/durable-session/src/types.ts index 651b685..20e1343 100644 --- a/packages/durable-session/src/types.ts +++ b/packages/durable-session/src/types.ts @@ -2,9 +2,13 @@ * Core type definitions for @electric-sql/durable-session * * Defines the stream protocol types, collection schemas, and API interfaces. + * + * Design principles: + * - Use TanStack AI types directly (MessagePart, ToolCallPart, etc.) + * - Single MessageRow type for all materialized messages + * - Derived collections filter on message parts, not custom row types */ -import { z } from 'zod' import type { StreamChunk, UIMessage, @@ -13,6 +17,7 @@ import type { } from '@tanstack/ai' import type { Collection } from '@tanstack/db' import type { LiveMode } from '@durable-streams/state' +import type { SessionDB } from './collection' // Re-export schema types export type { ChunkRow, ChunkValue, PresenceRow, AgentRow } from './schema' @@ -20,29 +25,38 @@ export type { ChunkRow, ChunkValue, PresenceRow, AgentRow } from './schema' // Re-export LiveMode from @durable-streams/state export type { LiveMode } +// Re-export TanStack AI message part types for consumer convenience +export type { + MessagePart, + TextPart, + ToolCallPart, + ToolResultPart, + ThinkingPart, +} from '@tanstack/ai' + // ============================================================================ // Stream Protocol Types // ============================================================================ /** - * Complete user message chunk - stored as single row in stream. + * Whole message chunk - stored as single row in stream. * Used for messages that are complete when written (user input, cached messages). * * This is different from TanStack AI's StreamChunk types, which are designed - * for streaming assistant responses. User messages are complete when sent, + * for streaming assistant responses. Whole messages are complete when sent, * so we store them as complete UIMessage objects. */ -export interface UserMessageChunk { - type: 'user-message' +export interface WholeMessageChunk { + type: 'whole-message' message: UIMessage } /** * Union of all chunk types we handle. - * - UserMessageChunk: Complete messages (user input) + * - WholeMessageChunk: Complete messages (user input) * - StreamChunk: TanStack AI streaming chunks (assistant responses) */ -export type DurableStreamChunk = UserMessageChunk | StreamChunk +export type DurableStreamChunk = WholeMessageChunk | StreamChunk /** * Actor types in the chat session. @@ -61,16 +75,34 @@ export type MessageRole = 'user' | 'assistant' | 'system' /** * Materialized message row from stream. * + * Extends TanStack AI's UIMessage with durable session metadata. * Messages are materialized from ChunkRow arrays via the live query pipeline. - * User messages (single chunk) and assistant messages (multiple streaming chunks) - * are both represented with this type. + * + * Message parts use TanStack AI's discriminated union types directly: + * - TextPart: { type: 'text', content: string } + * - ToolCallPart: { type: 'tool-call', id, name, arguments, state, approval?, output? } + * - ToolResultPart: { type: 'tool-result', toolCallId, content, state, error? } + * - ThinkingPart: { type: 'thinking', content: string } + * + * @example + * ```typescript + * // Filter for tool calls in a message + * const toolCalls = message.parts.filter( + * (p): p is ToolCallPart => p.type === 'tool-call' + * ) + * + * // Check for pending approvals + * const pendingApprovals = toolCalls.filter( + * tc => tc.approval?.needsApproval && tc.approval.approved === undefined + * ) + * ``` */ export interface MessageRow { /** Message identifier (same as messageId from chunks) */ id: string /** Message role */ role: MessageRole - /** Materialized message parts from parsed chunks */ + /** Message parts - uses TanStack AI's MessagePart type directly */ parts: MessagePart[] /** Actor identifier who wrote this message */ actorId: string @@ -100,92 +132,6 @@ export interface ActiveGenerationRow { lastChunkAt: Date } -// ============================================================================ -// Tool Call Types -// ============================================================================ - -/** - * Tool call state. - */ -export type ToolCallState = 'pending' | 'executing' | 'complete' - -/** - * Derived tool call row from stream. - */ -export interface ToolCallRow { - /** Tool call identifier from chunk */ - id: string - /** Message containing this tool call */ - messageId: string - /** Tool name */ - name: string - /** Accumulated JSON string arguments */ - arguments: string - /** Parsed input (when complete) */ - input: unknown | null - /** Tool call state */ - state: ToolCallState - /** Actor identifier */ - actorId: string - /** Tool call creation timestamp */ - createdAt: Date -} - -// ============================================================================ -// Tool Result Types -// ============================================================================ - -/** - * Derived tool result row from stream. - */ -export interface ToolResultRow { - /** Result identifier */ - id: string - /** Associated tool call identifier */ - toolCallId: string - /** Message containing this result */ - messageId: string - /** Tool output */ - output: unknown - /** Error message if failed */ - error: string | null - /** Actor identifier */ - actorId: string - /** Result creation timestamp */ - createdAt: Date -} - -// ============================================================================ -// Approval Types -// ============================================================================ - -/** - * Approval status. - */ -export type ApprovalStatus = 'pending' | 'approved' | 'denied' - -/** - * Derived approval row from stream. - */ -export interface ApprovalRow { - /** Approval identifier from chunk */ - id: string - /** Associated tool call identifier */ - toolCallId: string - /** Message containing this approval */ - messageId: string - /** Approval status */ - status: ApprovalStatus - /** Actor who requested approval */ - requestedBy: string - /** When approval was requested */ - requestedAt: Date - /** Actor who responded to approval */ - respondedBy: string | null - /** When approval was responded to */ - respondedAt: Date | null -} - // ============================================================================ // Session Metadata Types // ============================================================================ @@ -284,13 +230,28 @@ import type { ChunkRow, PresenceRow, AgentRow } from './schema' /** * All collections exposed by DurableChatClient. * - * All derived collections contain fully materialized objects - no helper - * functions needed to access the data. This is achieved through a two-stage - * pipeline: aggregate first (groupBy + collect), then materialize (fn.select). + * Architecture: + * - `chunks`, `presence`, `agents`: Synced from Durable Stream via stream-db + * - `messages`: Root materialized collection (groupBy + collect → materialize) + * - `toolCalls`, `pendingApprovals`, `toolResults`: Derived from messages via .fn.where() + * - `activeGenerations`: Derived from messages (incomplete messages) + * - `sessionMeta`, `sessionStats`: Local/aggregated state * * The `chunks` and `agents` collections are synced directly from the Durable * Stream via stream-db. The `presence` collection is aggregated from raw * per-device presence records. Other collections are derived from chunks. + * + * Pipeline: + * ``` + * chunks → (subquery) → messages + * ↓ + * .fn.where(parts filtering) + * ↓ + * toolCalls (lazy) + * pendingApprovals (lazy) + * toolResults (lazy) + * activeGenerations (lazy) + * ``` */ export interface DurableChatCollections { /** Root chunks collection synced from Durable Stream via stream-db */ @@ -299,16 +260,16 @@ export interface DurableChatCollections { presence: Collection /** Agents collection - registered webhook agents (from stream-db) */ agents: Collection - /** Materialized messages (keyed by messageId) */ + /** All materialized messages (keyed by messageId) */ messages: Collection - /** Active generations - messages currently being streamed (keyed by messageId) */ + /** Messages containing tool calls (keyed by messageId) */ + toolCalls: Collection + /** Messages with pending approval requests (keyed by messageId) */ + pendingApprovals: Collection + /** Messages containing tool results (keyed by messageId) */ + toolResults: Collection + /** Active generations - incomplete messages (keyed by messageId) */ activeGenerations: Collection - /** Materialized tool calls (keyed by toolCallId) */ - toolCalls: Collection - /** Materialized tool results (keyed by resultId) */ - toolResults: Collection - /** Materialized approvals (keyed by approvalId) */ - approvals: Collection /** Session metadata collection (local state) */ sessionMeta: Collection /** Session statistics (keyed by sessionId) */ @@ -372,7 +333,7 @@ export interface DurableChatClientOptions< * This allows tests to inject mock collections with controlled data. * @internal */ - sessionDB?: import('./collection').SessionDB + sessionDB?: SessionDB } // ============================================================================ diff --git a/packages/durable-session/tests/active-generations.test.ts b/packages/durable-session/tests/active-generations.test.ts index 733367d..5162e7b 100644 --- a/packages/durable-session/tests/active-generations.test.ts +++ b/packages/durable-session/tests/active-generations.test.ts @@ -13,10 +13,7 @@ import { flushPromises, TEST_MESSAGE_IDS, } from './fixtures/test-helpers' -import { - createCollectedMessagesCollection, - createMessagesCollection, -} from '../src/collections/messages' +import { createMessagesCollection } from '../src/collections/messages' import { createActiveGenerationsCollection } from '../src/collections/active-generations' import type { ChunkRow } from '../src/schema' import type { MessageRow, ActiveGenerationRow } from '../src/types' @@ -37,25 +34,19 @@ describe('active generations collection', () => { chunksCollection = mock.collection controller = mock.controller - // Create the pipeline: chunks -> collectedMessages -> messages -> activeGenerations - // Note: derived collections use startSync: true, so they start syncing immediately - const collectedMessagesCollection = createCollectedMessagesCollection({ - sessionId: 'test-session', - chunksCollection, - }) - + // Create the pipeline: chunks -> messages -> activeGenerations messagesCollection = createMessagesCollection({ - sessionId: 'test-session', - collectedMessagesCollection, + chunksCollection, }) activeGenerations = createActiveGenerationsCollection({ - sessionId: 'test-session', messagesCollection, }) - // Initialize chunks collection + // Initialize collections - preload creates demand for syncing chunksCollection.preload() + messagesCollection.preload() + activeGenerations.preload() controller.markReady() }) diff --git a/packages/durable-session/tests/client-integration.test.ts b/packages/durable-session/tests/client-integration.test.ts index 028b326..e62865b 100644 --- a/packages/durable-session/tests/client-integration.test.ts +++ b/packages/durable-session/tests/client-integration.test.ts @@ -146,9 +146,9 @@ describe('DurableChatClient', () => { expect(typeof client.collections.toolResults.size).toBe('number') }) - it('should expose approvals collection', () => { - expect(client.collections.approvals).toBeDefined() - expect(typeof client.collections.approvals.size).toBe('number') + it('should expose pendingApprovals collection', () => { + expect(client.collections.pendingApprovals).toBeDefined() + expect(typeof client.collections.pendingApprovals.size).toBe('number') }) it('should expose sessionMeta collection', () => { diff --git a/packages/durable-session/tests/fixtures/test-data.json b/packages/durable-session/tests/fixtures/test-data.json index b214cef..0543b4e 100644 --- a/packages/durable-session/tests/fixtures/test-data.json +++ b/packages/durable-session/tests/fixtures/test-data.json @@ -4,7 +4,7 @@ "messageId": "a845300a-45e0-461e-9e84-20451c100833", "actorId": "bf366f79-41d3-4588-b310-f0598713b4c8", "actorType": "user", - "chunk": "{\"type\":\"user-message\",\"message\":{\"id\":\"a845300a-45e0-461e-9e84-20451c100833\",\"role\":\"user\",\"parts\":[{\"type\":\"text\",\"content\":\"Hello\"}],\"createdAt\":\"2025-12-13T06:16:23.767Z\"}}", + "chunk": "{\"type\":\"whole-message\",\"message\":{\"id\":\"a845300a-45e0-461e-9e84-20451c100833\",\"role\":\"user\",\"parts\":[{\"type\":\"text\",\"content\":\"Hello\"}],\"createdAt\":\"2025-12-13T06:16:23.767Z\"}}", "createdAt": "2025-12-13T06:16:23.767Z", "seq": 0 }, @@ -121,7 +121,7 @@ "messageId": "f289b345-c4ab-45fd-8b29-7b39e25e3c5d", "actorId": "db8a74d0-6686-4c46-827a-96f30131bda0", "actorType": "user", - "chunk": "{\"type\":\"user-message\",\"message\":{\"id\":\"f289b345-c4ab-45fd-8b29-7b39e25e3c5d\",\"role\":\"user\",\"parts\":[{\"type\":\"text\",\"content\":\"I don't know\"}],\"createdAt\":\"2025-12-13T06:18:42.367Z\"}}", + "chunk": "{\"type\":\"whole-message\",\"message\":{\"id\":\"f289b345-c4ab-45fd-8b29-7b39e25e3c5d\",\"role\":\"user\",\"parts\":[{\"type\":\"text\",\"content\":\"I don't know\"}],\"createdAt\":\"2025-12-13T06:18:42.367Z\"}}", "createdAt": "2025-12-13T06:18:42.367Z", "seq": 0 }, diff --git a/packages/durable-session/tests/messages-collection.test.ts b/packages/durable-session/tests/messages-collection.test.ts index 7975727..640dc12 100644 --- a/packages/durable-session/tests/messages-collection.test.ts +++ b/packages/durable-session/tests/messages-collection.test.ts @@ -17,11 +17,7 @@ import { TEST_MESSAGE_IDS, EXPECTED_CONTENT, } from './fixtures/test-helpers' -import { - createCollectedMessagesCollection, - createMessagesCollection, - type CollectedMessageRows, -} from '../src/collections/messages' +import { createMessagesCollection } from '../src/collections/messages' import type { ChunkRow } from '../src/schema' import type { MessageRow } from '../src/types' import type { Collection } from '@tanstack/db' @@ -34,7 +30,6 @@ describe('messages collection', () => { // Collections to be set up in beforeEach let chunksCollection: Collection let controller: ReturnType['controller'] - let collectedMessagesCollection: Collection let messagesCollection: Collection beforeEach(() => { @@ -43,20 +38,14 @@ describe('messages collection', () => { chunksCollection = mock.collection controller = mock.controller - // Create the two-stage pipeline - // Note: derived collections use startSync: true, so they start syncing immediately - collectedMessagesCollection = createCollectedMessagesCollection({ - sessionId: 'test-session', - chunksCollection, - }) - + // Create messages collection with inline subquery messagesCollection = createMessagesCollection({ - sessionId: 'test-session', - collectedMessagesCollection, + chunksCollection, }) - // Initialize chunks collection + // Initialize collections - preload creates demand for syncing chunksCollection.preload() + messagesCollection.preload() controller.markReady() }) @@ -178,21 +167,6 @@ describe('messages collection', () => { expect((textPart as { type: 'text'; content: string })?.content).toBe(EXPECTED_CONTENT.ASSISTANT_1) }) - it('should track correct offsets for assistant message', async () => { - // Emit user message first - controller.emit(getMessageRows(testData, TEST_MESSAGE_IDS.USER_1)) - await flushPromises() - - // Emit assistant message - const assistantMessageId = TEST_MESSAGE_IDS.ASSISTANT_1 - const assistantRows = getMessageRows(testData, assistantMessageId) - controller.emit(assistantRows) - await flushPromises() - - const assistantMessage = messagesCollection.get(assistantMessageId) - expect(assistantMessage?.startOffset).toBe(assistantRows[0].offset) - expect(assistantMessage?.endOffset).toBe(assistantRows[assistantRows.length - 1].offset) - }) }) // ========================================================================== @@ -242,7 +216,7 @@ describe('messages collection', () => { // Get all messages and verify order const messages = [...messagesCollection.values()] - // Messages should be ordered by startOffset (chronological) + // Messages should be ordered chronologically by createdAt // The first message should be the first user message expect(messages[0].id).toBe(TEST_MESSAGE_IDS.USER_1) expect(messages[1].id).toBe(TEST_MESSAGE_IDS.ASSISTANT_1) @@ -291,43 +265,6 @@ describe('messages collection', () => { }) }) - // ========================================================================== - // Intermediate Collection Tests - // ========================================================================== - - describe('collectedMessages (intermediate stage)', () => { - it('should group rows by messageId', async () => { - // Emit all data - controller.emit(streamRowsToChunkRows(testData)) - await flushPromises() - - // Should have 4 groups (one per message) - expect(collectedMessagesCollection.size).toBe(4) - - // Check that user message has 1 row - const collected1 = collectedMessagesCollection.get(TEST_MESSAGE_IDS.USER_1) - expect(collected1?.rows).toHaveLength(1) - - // Check that first assistant message has all its chunks - const assistantRows = getMessageRows(testData, TEST_MESSAGE_IDS.ASSISTANT_1) - const collected2 = collectedMessagesCollection.get(TEST_MESSAGE_IDS.ASSISTANT_1) - expect(collected2?.rows).toHaveLength(assistantRows.length) - }) - - it('should have startedAt set to earliest timestamp', async () => { - controller.emit(streamRowsToChunkRows(testData)) - await flushPromises() - - const collected = collectedMessagesCollection.get(TEST_MESSAGE_IDS.ASSISTANT_1) - expect(collected?.startedAt).toBeDefined() - - // startedAt should be the createdAt of the first row - const rows = getMessageRows(testData, TEST_MESSAGE_IDS.ASSISTANT_1) - const sortedRows = [...rows].sort((a, b) => a.createdAt.localeCompare(b.createdAt)) - expect(collected?.startedAt).toBe(sortedRows[0].createdAt) - }) - }) - // ========================================================================== // Optimistic Insert Bug Reproduction // ========================================================================== @@ -365,12 +302,9 @@ describe('messages collection', () => { role: 'user' | 'assistant' }>({ onMutate: ({ messageId, content, role }) => { - const seq = (optimisticSeq++).toString().padStart(16, '0') - const optimisticOffset = `zzzzzzzzzzzzzzzz_${seq}` - const createdAt = new Date() - // Insert into chunks collection with user-message format + // Insert into chunks collection with whole-message format // This flows through the live query pipeline: chunks → collectedMessages → messages stream.insert({ id: `${messageId}:0`, // Primary key: messageId:seq @@ -378,7 +312,7 @@ describe('messages collection', () => { actorId: 'test-user', role, // Now using 'role' instead of 'actorType' chunk: JSON.stringify({ - type: 'user-message', + type: 'whole-message', message: { id: messageId, role, @@ -772,7 +706,6 @@ describe('messages collection', () => { await flushPromises() expect(messagesCollection.size).toBe(0) - expect(collectedMessagesCollection.size).toBe(0) }) it('should handle duplicate emissions gracefully', async () => { diff --git a/packages/react-durable-session/package.json b/packages/react-durable-session/package.json index 610a096..87e2942 100644 --- a/packages/react-durable-session/package.json +++ b/packages/react-durable-session/package.json @@ -31,10 +31,11 @@ "@testing-library/jest-dom": "^6.6.3", "@testing-library/react": "^16.3.0", "@types/node": "^24.10.0", - "@types/react": "^18.3.0", + "@types/react": "^19.0.0", + "@types/react-dom": "^19.0.0", "jsdom": "^26.1.0", - "react": "^18.3.0", - "react-dom": "^18.3.0", + "react": "^19.0.0", + "react-dom": "^19.0.0", "typescript": "^5.9.3", "vitest": "^2.1.8" }, diff --git a/packages/react-durable-session/src/index.ts b/packages/react-durable-session/src/index.ts index 1dd1bf0..e0e2eb3 100644 --- a/packages/react-durable-session/src/index.ts +++ b/packages/react-durable-session/src/index.ts @@ -58,11 +58,7 @@ export { type MessageRole, type MessageRow, type ActiveGenerationRow, - type ToolCallState, - type ToolCallRow, - type ToolResultRow, - type ApprovalStatus, - type ApprovalRow, + type RawPresenceRow, type PresenceRow, type AgentRow, type ConnectionStatus, @@ -77,6 +73,13 @@ export { type ForkOptions, type ForkResult, + // Re-exported TanStack AI types for consumer convenience + type MessagePart, + type TextPart, + type ToolCallPart, + type ToolResultPart, + type ThinkingPart, + // Materialization helpers extractTextContent, isUserMessage, diff --git a/packages/react-durable-session/src/use-durable-chat.ts b/packages/react-durable-session/src/use-durable-chat.ts index 45b3e6a..2ad57ef 100644 --- a/packages/react-durable-session/src/use-durable-chat.ts +++ b/packages/react-durable-session/src/use-durable-chat.ts @@ -2,20 +2,124 @@ * useDurableChat - React hook for durable chat. * * Provides TanStack AI-compatible API backed by Durable Streams - * with automatic React integration. */ - -import { useState, useEffect, useRef, useCallback } from 'react' -import { DurableChatClient } from '@electric-sql/durable-session' +import { + useState, + useEffect, + useRef, + useCallback, + useMemo, + useSyncExternalStore, +} from 'react' +import { DurableChatClient, messageRowToUIMessage } from '@electric-sql/durable-session' import type { DurableChatClientOptions } from '@electric-sql/durable-session' +import type { Collection } from '@tanstack/react-db' import type { UIMessage, AnyClientTool } from '@tanstack/ai' import type { UseDurableChatOptions, UseDurableChatReturn } from './types' +/** + * Extract the item type from a Collection. + * + * TanStack DB's Collection has 5 type parameters: + * `Collection` + * + * This helper extracts `T` (the item type) from any Collection variant. + */ +type CollectionItem = C extends Collection + ? T + : never + +/** + * SSR-safe hook for subscribing to TanStack DB collection data. + * This is a workaround to useLiveQuery not yet supporting SSR + * as per https://github.com/TanStack/db/pull/709 + */ +function useCollectionData>( + collection: C +): CollectionItem[] { + type T = CollectionItem + + // Track version to know when to create a new snapshot. + // Incremented by subscription callback when collection changes. + const versionRef = useRef(0) + + // Cache the last snapshot to maintain stable reference. + // useSyncExternalStore requires getSnapshot to return the same reference + // when data hasn't changed, otherwise it triggers infinite re-renders. + const snapshotRef = useRef<{ version: number; data: T[] }>({ + version: -1, // Force initial snapshot creation + data: [], + }) + + // Subscribe callback - increments version to signal data changed. + // Stored in ref to maintain stable reference for useSyncExternalStore. + const subscribeRef = useRef((onStoreChange: () => void): (() => void) => { + const subscription = collection.subscribeChanges(() => { + versionRef.current++ + onStoreChange() + }) + return () => subscription.unsubscribe() + }) + + // Update subscribe ref when collection changes + subscribeRef.current = (onStoreChange: () => void): (() => void) => { + const subscription = collection.subscribeChanges(() => { + versionRef.current++ + onStoreChange() + }) + return () => subscription.unsubscribe() + } + + // Snapshot callback - returns cached data unless version changed. + // Stored in ref to maintain stable reference for useSyncExternalStore. + const getSnapshotRef = useRef((): T[] => { + const currentVersion = versionRef.current + const cached = snapshotRef.current + + // Return cached snapshot if version hasn't changed + if (cached.version === currentVersion) { + return cached.data + } + + // Version changed - create new snapshot and cache it + const data = [...collection.values()] as T[] + snapshotRef.current = { version: currentVersion, data } + return data + }) + + // Update getSnapshot ref when collection changes + getSnapshotRef.current = (): T[] => { + const currentVersion = versionRef.current + const cached = snapshotRef.current + + if (cached.version === currentVersion) { + return cached.data + } + + const data = [...collection.values()] as T[] + snapshotRef.current = { version: currentVersion, data } + return data + } + + // Pass the same function for both getSnapshot and getServerSnapshot. + // This ensures server and client render the same initial state (empty array), + // preventing hydration mismatches while enabling proper SSR. + return useSyncExternalStore( + subscribeRef.current, + getSnapshotRef.current, + getSnapshotRef.current + ) +} + /** * React hook for durable chat with TanStack AI-compatible API. * + * Provides reactive data binding with automatic updates when underlying + * collection data changes. Supports SSR through proper `useSyncExternalStore` + * integration. + * * The client and collections are always available synchronously. - * Connection state is managed separately via connectionStatus. + * Connection state is managed separately via `connectionStatus`. * * @example Basic usage * ```typescript @@ -25,9 +129,6 @@ import type { UseDurableChatOptions, UseDurableChatReturn } from './types' * proxyUrl: 'http://localhost:4000', * }) * - * // collections is always defined - use directly with useLiveQuery - * const chunks = useLiveQuery(q => q.from({ row: collections.chunks }), [collections.chunks]) - * * return ( *
* {messages.map(m => )} @@ -36,21 +137,29 @@ import type { UseDurableChatOptions, UseDurableChatReturn } from './types' * ) * } * ``` + * + * @example Custom queries with useLiveQuery + * ```typescript + * import { useLiveQuery, eq } from '@tanstack/react-db' + * + * function Chat() { + * const { collections } = useDurableChat({ ... }) + * + * // Use collections with useLiveQuery for custom queries + * const pendingToolCalls = useLiveQuery(q => + * q.from({ tc: collections.toolCalls }) + * .where(({ tc }) => eq(tc.state, 'pending')) + * ) + * } + * ``` */ export function useDurableChat< TTools extends ReadonlyArray = AnyClientTool[], >(options: UseDurableChatOptions): UseDurableChatReturn { const { autoConnect = true, client: providedClient, ...clientOptions } = options - // Reactive state - must be declared first (before client creation uses them) - const [messages, setMessages] = useState(options.initialMessages ?? []) - const [isLoading, setIsLoading] = useState(false) - const [error, setError] = useState() - const [connectionStatus, setConnectionStatus] = useState< - 'disconnected' | 'connecting' | 'connected' | 'error' - >('disconnected') - // Error handler ref - allows client's onError to call setError + const [error, setError] = useState() const onErrorRef = useRef<(err: Error) => void>(() => {}) onErrorRef.current = (err) => { setError(err) @@ -58,7 +167,6 @@ export function useDurableChat< } // Create client synchronously - always available immediately - // Use ref to persist across renders and track when we need a new client const clientRef = useRef<{ client: DurableChatClient; key: string } | null>(null) const key = `${clientOptions.sessionId}:${clientOptions.proxyUrl}` @@ -66,14 +174,12 @@ export function useDurableChat< // The isDisposed check handles React Strict Mode: cleanup disposes the client, // so the next render must create a fresh one with a new AbortController. if (providedClient) { - // Use provided client (for testing) if (!clientRef.current || clientRef.current.client !== providedClient) { clientRef.current = { client: providedClient, key: 'provided' } } } else if (!clientRef.current || clientRef.current.key !== key || clientRef.current.client.isDisposed) { // Dispose old client if exists (may already be disposed, which is fine) clientRef.current?.client.dispose() - // Create new client synchronously clientRef.current = { client: new DurableChatClient({ ...clientOptions, @@ -85,56 +191,36 @@ export function useDurableChat< const client = clientRef.current.client - // Side effects: connect, subscribe, cleanup - useEffect(() => { - const unsubscribes: Array<{ unsubscribe: () => void }> = [] - - // Subscribe to collection changes - const collections = client.collections - unsubscribes.push( - collections.activeGenerations.subscribeChanges(() => { - setIsLoading(client.isLoading) - }), - collections.messages.subscribeChanges(() => { - setMessages(client.messages as UIMessage[]) - }), - collections.sessionMeta.subscribeChanges(() => { - setConnectionStatus(client.connectionStatus) - }) - ) + const messageRows = useCollectionData(client.collections.messages) + const activeGenerations = useCollectionData(client.collections.activeGenerations) + const sessionMetaRows = useCollectionData(client.collections.sessionMeta) - // Sync initial state - setMessages(client.messages as UIMessage[]) - setIsLoading(client.isLoading) - setConnectionStatus(client.connectionStatus) + const messages = useMemo( + // Transform MessageRow[] to UIMessage[] + () => messageRows.map(messageRowToUIMessage), + [messageRows] + ) - // Auto-connect if enabled and not already connected + const isLoading = activeGenerations.length > 0 + const connectionStatus = sessionMetaRows[0]?.connectionStatus ?? 'disconnected' + + useEffect(() => { if (autoConnect && client.connectionStatus === 'disconnected') { - setConnectionStatus('connecting') - client - .connect() - .then(() => { - setConnectionStatus('connected') - setMessages(client.messages as UIMessage[]) - }) - .catch((err) => { - setConnectionStatus('error') - setError(err) - }) + client.connect().catch((err) => { + setError(err instanceof Error ? err : new Error(String(err))) + }) } // Cleanup: unsubscribe and dispose (disposal is idempotent) return () => { - unsubscribes.forEach((u) => u.unsubscribe()) - // Only dispose if this is not a provided client if (!providedClient) { client.dispose() } } - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [client, autoConnect]) + }, [client, autoConnect, providedClient]) + + // Action Callbacks - // Callbacks - client is always defined, no null checks needed const sendMessage = useCallback(async (content: string) => { try { await client.sendMessage(content) @@ -171,7 +257,6 @@ export function useDurableChat< const clear = useCallback(() => { client.clear() - setMessages([]) }, [client]) const addToolResult = useCallback( @@ -207,13 +292,9 @@ export function useDurableChat< }, [client]) const connect = useCallback(async () => { - setConnectionStatus('connecting') try { await client.connect() - setConnectionStatus('connected') - setMessages(client.messages as UIMessage[]) } catch (err) { - setConnectionStatus('error') setError(err instanceof Error ? err : new Error(String(err))) throw err } @@ -221,7 +302,6 @@ export function useDurableChat< const disconnect = useCallback(() => { client.disconnect() - setConnectionStatus('disconnected') }, [client]) const pause = useCallback(() => { diff --git a/packages/react-durable-session/tests/setup.ts b/packages/react-durable-session/tests/setup.ts index dbdb75c..5f77552 100644 --- a/packages/react-durable-session/tests/setup.ts +++ b/packages/react-durable-session/tests/setup.ts @@ -5,3 +5,25 @@ */ import '@testing-library/jest-dom/vitest' + +/** + * Suppress React act() warnings from useSyncExternalStore subscriptions. + * + * The useDurableChat hook uses useSyncExternalStore to subscribe to TanStack DB + * collections. When external store subscription callbacks fire (e.g., when + * markReady() is called during connection), they trigger React state updates. + * These updates originate from outside React's normal update cycle, so React + * logs act() warnings even though the updates are handled correctly. + * + * This is a known limitation when testing hooks that use useSyncExternalStore + * with external data sources. The hook behavior is correct - these warnings + * are false positives. See: https://github.com/testing-library/react-testing-library/issues/1061 + */ +const originalError = console.error +console.error = (...args: unknown[]) => { + const message = typeof args[0] === 'string' ? args[0] : '' + if (message.includes('was not wrapped in act')) { + return + } + originalError.apply(console, args) +} diff --git a/packages/react-durable-session/tests/use-durable-chat.test.tsx b/packages/react-durable-session/tests/use-durable-chat.test.tsx index 394f4d3..6f2df13 100644 --- a/packages/react-durable-session/tests/use-durable-chat.test.tsx +++ b/packages/react-durable-session/tests/use-durable-chat.test.tsx @@ -16,7 +16,7 @@ import { DurableChatClient } from '@electric-sql/durable-session' import { useDurableChat } from '../src/use-durable-chat' import type { UIMessage } from '@tanstack/ai' -// Import test helpers from ai-db +// Import test helpers from durable-session import { createMockSessionDB, loadTestData, @@ -25,7 +25,7 @@ import { TEST_MESSAGE_IDS, EXPECTED_CONTENT, type MockSessionDBControllers, -} from '../../ai-db/tests/fixtures/test-helpers' +} from '../../durable-session/tests/fixtures/test-helpers' describe('useDurableChat integration', () => { const testData = loadTestData() @@ -35,27 +35,34 @@ describe('useDurableChat integration', () => { let controllers: MockSessionDBControllers let client: DurableChatClient - beforeEach(async () => { + beforeEach(() => { // Create mock session DB with controllers for all collections mockSessionDB = createMockSessionDB('test-session') controllers = mockSessionDB.controllers // Create real client with injected mock session DB + // NOTE: Don't connect here - let each test connect inside act() to avoid + // act() warnings from subscription callbacks firing during initial render client = new DurableChatClient({ sessionId: 'test-session', proxyUrl: 'http://localhost:4000', sessionDB: mockSessionDB.sessionDB, }) - - // Connect the client - this calls sessionDB.preload() which sets up collections - // After connect, the hook will detect connectionStatus === 'connected' and set up subscriptions - await client.connect() }) afterEach(() => { client.dispose() }) + /** + * Helper to wait for the client to be connected. + */ + async function waitForConnected(result: { current: { connectionStatus: string } }): Promise { + await waitFor(() => { + expect(result.current.connectionStatus).toBe('connected') + }) + } + describe('initial state', () => { it('should return empty messages array initially', async () => { const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) @@ -75,11 +82,10 @@ describe('useDurableChat integration', () => { expect(result.current.error).toBeUndefined() }) - it('should have connected connection status when client is pre-connected', () => { - // Client is pre-connected in beforeEach, hook detects this and sets up subscriptions - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + it('should have connected connection status after autoConnect', async () => { + const { result } = renderHook(() => useDurableChat({ client })) - expect(result.current.connectionStatus).toBe('connected') + await waitForConnected(result) }) }) @@ -121,7 +127,8 @@ describe('useDurableChat integration', () => { describe('message materialization via live query pipeline', () => { it('should materialize a user message from stream data', async () => { - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + const { result } = renderHook(() => useDurableChat({ client })) + await waitForConnected(result) // Emit user message (wrapped in act for React state updates) const userMessageRows = getMessageRows(testData, TEST_MESSAGE_IDS.USER_1) @@ -141,7 +148,8 @@ describe('useDurableChat integration', () => { }) it('should materialize user + assistant messages in correct order', async () => { - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + const { result } = renderHook(() => useDurableChat({ client })) + await waitForConnected(result) // Emit user message await act(async () => { @@ -169,7 +177,8 @@ describe('useDurableChat integration', () => { }) it('should handle full conversation flow', async () => { - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + const { result } = renderHook(() => useDurableChat({ client })) + await waitForConnected(result) // Emit all test data at once (simulates reconnect/catch-up) await act(async () => { @@ -201,7 +210,8 @@ describe('useDurableChat integration', () => { describe('streaming updates', () => { it('should update messages reactively as chunks stream in', async () => { - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + const { result } = renderHook(() => useDurableChat({ client })) + await waitForConnected(result) // Emit user message first await act(async () => { @@ -260,7 +270,8 @@ describe('useDurableChat integration', () => { * - ... and so on to "Hi there! How can I assist you today?" */ it('should update message TEXT CONTENT incrementally as chunks stream in', async () => { - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + const { result } = renderHook(() => useDurableChat({ client })) + await waitForConnected(result) // Helper to extract text content from a message const getTextContent = (message: UIMessage): string => { @@ -339,7 +350,8 @@ describe('useDurableChat integration', () => { * all chunks in rapid fire (no delays between) to verify the behavior. */ it('should handle rapid chunk arrival (all chunks emitted quickly)', async () => { - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + const { result } = renderHook(() => useDurableChat({ client })) + await waitForConnected(result) // Helper to extract text content from a message const getTextContent = (message: UIMessage): string => { @@ -395,7 +407,7 @@ describe('useDurableChat integration', () => { const observedContents: string[] = [] const { result } = renderHook(() => { - const hook = useDurableChat({ client, autoConnect: false }) + const hook = useDurableChat({ client }) // Track observed content on each render const assistantMsg = hook.messages.find(m => m.role === 'assistant') if (assistantMsg) { @@ -408,6 +420,8 @@ describe('useDurableChat integration', () => { return hook }) + await waitForConnected(result) + // Patch subscribeChanges to count fires const originalSubscribe = client.collections.messages.subscribeChanges.bind(client.collections.messages) client.collections.messages.subscribeChanges = (cb) => { @@ -451,7 +465,8 @@ describe('useDurableChat integration', () => { * get the complete message content. */ it('should handle all chunks in a single transaction (batch sync)', async () => { - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + const { result } = renderHook(() => useDurableChat({ client })) + await waitForConnected(result) // Helper to extract text content from a message const getTextContent = (message: UIMessage): string => { @@ -491,7 +506,8 @@ describe('useDurableChat integration', () => { describe('isLoading state', () => { it('should track isLoading based on active generations', async () => { - const { result } = renderHook(() => useDurableChat({ client, autoConnect: false })) + const { result } = renderHook(() => useDurableChat({ client })) + await waitForConnected(result) // Initially not loading expect(result.current.isLoading).toBe(false) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0f7ce79..95a3531 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1441,22 +1441,25 @@ importers: version: 6.9.1 '@testing-library/react': specifier: ^16.3.0 - version: 16.3.0(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@18.3.27))(@types/react@18.3.27)(react-dom@18.3.1(react@18.3.1))(react@18.3.1) + version: 16.3.0(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@19.2.7))(@types/react@19.2.7)(react-dom@19.2.1(react@19.2.1))(react@19.2.1) '@types/node': specifier: ^24.10.0 version: 24.10.1 '@types/react': - specifier: ^18.3.0 - version: 18.3.27 + specifier: ^19.0.0 + version: 19.2.7 + '@types/react-dom': + specifier: ^19.0.0 + version: 19.2.3(@types/react@19.2.7) jsdom: specifier: ^26.1.0 version: 26.1.0 react: - specifier: ^18.3.0 - version: 18.3.1 + specifier: ^19.0.0 + version: 19.2.1 react-dom: - specifier: ^18.3.0 - version: 18.3.1(react@18.3.1) + specifier: ^19.0.0 + version: 19.2.1(react@19.2.1) typescript: specifier: ^5.9.3 version: 5.9.3 @@ -4134,9 +4137,6 @@ packages: '@types/pg@8.15.6': resolution: {integrity: sha512-NoaMtzhxOrubeL/7UZuNTrejB4MPAJ0RpxZqXQf2qXuVlTPuG6Y8p4u9dKRaue4yjmC7ZhzVO2/Yyyn25znrPQ==} - '@types/prop-types@15.7.15': - resolution: {integrity: sha512-F6bEyamV9jKGAFBEmlQnesRPGOQqS2+Uwi0Em15xenOxHaf2hv6L8YCVn3rPdPJOiJfPiCnLIRyvwVaqMY3MIw==} - '@types/qs@6.14.0': resolution: {integrity: sha512-eOunJqu0K1923aExK6y8p6fsihYEn/BYuQ4g0CxAAgFc4b/ZLN4CrsRZ55srTdqoiLzU2B2evC+apEIxprEzkQ==} @@ -4148,9 +4148,6 @@ packages: peerDependencies: '@types/react': ^19.2.0 - '@types/react@18.3.27': - resolution: {integrity: sha512-cisd7gxkzjBKU2GgdYrTdtQx1SORymWyaAFhaxQPK9bYO9ot3Y5OikQRvY0VYQtvwjeQnizCINJAenh/V7MK2w==} - '@types/react@19.2.7': resolution: {integrity: sha512-MWtvHrGZLFttgeEj28VXHxpmwYbor/ATPYbBfSFZEIRK0ecCFLl2Qo55z52Hss+UV9CRN7trSeq1zbgx7YDWWg==} @@ -7695,11 +7692,6 @@ packages: react-base16-styling@0.6.0: resolution: {integrity: sha512-yvh/7CArceR/jNATXOKDlvTnPKPmGZz7zsenQ3jUwLzHkNUR0CvY3yGYJbWJ/nnxsL8Sgmt5cO3/SILVuPO6TQ==} - react-dom@18.3.1: - resolution: {integrity: sha512-5m4nQKp+rZRb09LNH59GM4BxTh9251/ylbKIbpe7TpGxfJ+9kv6BLkLBXIjjspbgbnIBNqlI23tRnTWT0snUIw==} - peerDependencies: - react: ^18.3.1 - react-dom@19.2.1: resolution: {integrity: sha512-ibrK8llX2a4eOskq1mXKu/TGZj9qzomO+sNfO98M6d9zIPOEhlBkMkBUBLd1vgS0gQsLDBzA+8jJBVXDnfHmJg==} peerDependencies: @@ -7736,10 +7728,6 @@ packages: peerDependencies: react: ^16.8.0 || ^17.0.0 || ^18.0.0 || ^19.0.0 - react@18.3.1: - resolution: {integrity: sha512-wS+hAgJShR0KhEvPJArfuPVN1+Hz1t0Y6n5jLrGQbkb4urgPE/0Rve+1kMB1v/oWgHgm4WIcV+i7F2pTVj+2iQ==} - engines: {node: '>=0.10.0'} - react@19.2.1: resolution: {integrity: sha512-DGrYcCWK7tvYMnWh79yrPHt+vdx9tY+1gPZa7nJQtO/p8bLTDaHp4dzwEhQB7pZ4Xe3ok4XKuEPrVuc+wlpkmw==} engines: {node: '>=0.10.0'} @@ -7981,9 +7969,6 @@ packages: resolution: {integrity: sha512-xAg7SOnEhrm5zI3puOOKyy1OMcMlIJZYNJY7xLBwSze0UjhPLnWfj2GF2EpT0jmzaJKIWKHLsaSSajf35bcYnA==} engines: {node: '>=v12.22.7'} - scheduler@0.23.2: - resolution: {integrity: sha512-UOShsPwz7NrMUqhR6t0hWjFduvOzbtv7toDH1/hIrfRNIDBnnBWd0CwJTGvTpngVlmwGCdP9/Zl/tVrDqcuYzQ==} - scheduler@0.27.0: resolution: {integrity: sha512-eNv+WrVbKu1f3vbYJT/xtiF5syA5HPIMtf9IgY/nKg0sWqzAUEvqY/xm7OcZc/qafLx/iO9FgOmeSAp4v5ti/Q==} @@ -11740,16 +11725,6 @@ snapshots: picocolors: 1.1.1 redent: 3.0.0 - '@testing-library/react@16.3.0(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@18.3.27))(@types/react@18.3.27)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)': - dependencies: - '@babel/runtime': 7.28.4 - '@testing-library/dom': 10.4.1 - react: 18.3.1 - react-dom: 18.3.1(react@18.3.1) - optionalDependencies: - '@types/react': 18.3.27 - '@types/react-dom': 19.2.3(@types/react@18.3.27) - '@testing-library/react@16.3.0(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@19.2.7))(@types/react@19.2.7)(react-dom@19.2.1(react@19.2.1))(react@19.2.1)': dependencies: '@babel/runtime': 7.28.4 @@ -11879,26 +11854,14 @@ snapshots: pg-protocol: 1.10.3 pg-types: 2.2.0 - '@types/prop-types@15.7.15': {} - '@types/qs@6.14.0': {} '@types/range-parser@1.2.7': {} - '@types/react-dom@19.2.3(@types/react@18.3.27)': - dependencies: - '@types/react': 18.3.27 - optional: true - '@types/react-dom@19.2.3(@types/react@19.2.7)': dependencies: '@types/react': 19.2.7 - '@types/react@18.3.27': - dependencies: - '@types/prop-types': 15.7.15 - csstype: 3.2.3 - '@types/react@19.2.7': dependencies: csstype: 3.2.3 @@ -16398,12 +16361,6 @@ snapshots: lodash.flow: 3.5.0 pure-color: 1.3.0 - react-dom@18.3.1(react@18.3.1): - dependencies: - loose-envify: 1.4.0 - react: 18.3.1 - scheduler: 0.23.2 - react-dom@19.2.1(react@19.2.1): dependencies: react: 19.2.1 @@ -16456,10 +16413,6 @@ snapshots: transitivePeerDependencies: - '@types/react' - react@18.3.1: - dependencies: - loose-envify: 1.4.0 - react@19.2.1: {} read-cache@1.0.0: @@ -16850,10 +16803,6 @@ snapshots: dependencies: xmlchars: 2.2.0 - scheduler@0.23.2: - dependencies: - loose-envify: 1.4.0 - scheduler@0.27.0: {} scule@1.3.0: {} diff --git a/vitest.workspace.ts b/vitest.workspace.ts index e86e8b5..290f779 100644 --- a/vitest.workspace.ts +++ b/vitest.workspace.ts @@ -33,11 +33,11 @@ export default defineWorkspace([ }, { test: { - name: 'ai-db', - root: './packages/ai-db', + name: 'tanstack-ai-transport', + root: './packages/tanstack-ai-transport', globals: true, - environment: 'node', - include: ['tests/**/*.test.ts'], + environment: 'jsdom', + include: ['test/**/*.test.ts'], coverage: { provider: 'v8', reporter: ['text', 'json', 'html'], @@ -48,11 +48,11 @@ export default defineWorkspace([ }, { test: { - name: 'tanstack-ai-transport', - root: './packages/tanstack-ai-transport', + name: 'durable-session', + root: './packages/durable-session', globals: true, - environment: 'jsdom', - include: ['test/**/*.test.ts'], + environment: 'node', + include: ['tests/**/*.test.ts'], coverage: { provider: 'v8', reporter: ['text', 'json', 'html'], @@ -63,8 +63,8 @@ export default defineWorkspace([ }, { test: { - name: 'react-ai-db', - root: './packages/react-ai-db', + name: 'react-durable-session', + root: './packages/react-durable-session', globals: true, environment: 'jsdom', include: ['tests/**/*.test.ts', 'tests/**/*.test.tsx'],