diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index b76580c19..f24dbbe43 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -498,6 +498,9 @@ export class CollectionStateManager< for (const transaction of committedSyncedTransactions) { // Handle truncate operations first if (transaction.truncate) { + console.log( + `[DEBUG] STATE: TRUNCATE starting, current syncedData size=${this.syncedData.size}`, + ) // TRUNCATE PHASE // 1) Emit a delete for every visible key (synced + optimistic) so downstream listeners/indexes // observe a clear-before-rebuild. We intentionally skip keys already in @@ -507,6 +510,9 @@ export class CollectionStateManager< ...this.syncedData.keys(), ...(truncateOptimisticSnapshot?.upserts.keys() || []), ]) + console.log( + `[DEBUG] STATE: TRUNCATE clearing ${visibleKeys.size} visible keys`, + ) for (const key of visibleKeys) { if (truncateOptimisticSnapshot?.deletes.has(key)) continue const previousValue = @@ -522,6 +528,9 @@ export class CollectionStateManager< this.syncedData.clear() this.syncedMetadata.clear() this.syncedKeys.clear() + console.log( + `[DEBUG] STATE: TRUNCATE complete, syncedData size=${this.syncedData.size}`, + ) // 3) Clear currentVisibleState for truncated keys to ensure subsequent operations // are compared against the post-truncate state (undefined) rather than pre-truncate state @@ -564,15 +573,40 @@ export class CollectionStateManager< // Update synced data switch (operation.type) { case `insert`: + console.log( + `[DEBUG] STATE: INSERT key=${String(key)} value=${JSON.stringify(operation.value)}`, + ) this.syncedData.set(key, operation.value) break case `update`: { + const existingData = this.syncedData.get(key) + const hasExisting = existingData !== undefined + + console.log( + `[DEBUG] STATE: UPDATE key=${String(key)} mode=${rowUpdateMode} hasExisting=${hasExisting}`, + ) + console.log( + `[DEBUG] STATE: UPDATE existing=${JSON.stringify(existingData)}`, + ) + console.log( + `[DEBUG] STATE: UPDATE incoming=${JSON.stringify(operation.value)}`, + ) + + if (!hasExisting) { + console.warn( + `[DEBUG] STATE: ⚠️ UPDATE on non-existent row! key=${String(key)} - this may cause partial data!`, + ) + } + if (rowUpdateMode === `partial`) { const updatedValue = Object.assign( {}, this.syncedData.get(key), operation.value, ) + console.log( + `[DEBUG] STATE: UPDATE merged=${JSON.stringify(updatedValue)}`, + ) this.syncedData.set(key, updatedValue) } else { this.syncedData.set(key, operation.value) @@ -580,6 +614,7 @@ export class CollectionStateManager< break } case `delete`: + console.log(`[DEBUG] STATE: DELETE key=${String(key)}`) this.syncedData.delete(key) break } @@ -781,6 +816,9 @@ export class CollectionStateManager< // Update cached size after synced data changes this.size = this.calculateSize() + console.log( + `[DEBUG] STATE: COMMIT complete - syncedData size=${this.syncedData.size}, total visible size=${this.size}, events emitted=${events.length}`, + ) // Update indexes for all events before emitting if (events.length > 0) { diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 477e37641..e6943883c 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -366,16 +366,27 @@ function createLoadSubsetDedupe>({ } const loadSubset = async (opts: LoadSubsetOptions) => { + console.log( + `[DEBUG] LOAD_SUBSET: syncMode=${syncMode} isBuffering=${isBufferingInitialSync()} opts=${JSON.stringify(opts)}`, + ) + // In progressive mode, use fetchSnapshot during snapshot phase if (isBufferingInitialSync()) { // Progressive mode snapshot phase: fetch and apply immediately const snapshotParams = compileSQL(opts) + console.log(`[DEBUG] LOAD_SUBSET: Progressive mode - fetching snapshot`) try { const { data: rows } = await stream.fetchSnapshot(snapshotParams) + console.log( + `[DEBUG] LOAD_SUBSET: Progressive mode - got ${rows.length} rows`, + ) // Check again if we're still buffering - we might have received up-to-date // and completed the atomic swap while waiting for the snapshot if (!isBufferingInitialSync()) { + console.log( + `[DEBUG] LOAD_SUBSET: Ignoring snapshot - sync completed while fetching`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching`, ) @@ -386,6 +397,9 @@ function createLoadSubsetDedupe>({ if (rows.length > 0) { begin() for (const row of rows) { + console.log( + `[DEBUG] LOAD_SUBSET: Applying row from snapshot: ${JSON.stringify(row.value)}`, + ) write({ type: `insert`, value: row.value, @@ -401,6 +415,7 @@ function createLoadSubsetDedupe>({ ) } } catch (error) { + console.log(`[DEBUG] LOAD_SUBSET: Error fetching snapshot: ${error}`) debug( `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, error, @@ -409,15 +424,22 @@ function createLoadSubsetDedupe>({ } } else if (syncMode === `progressive`) { // Progressive mode after full sync complete: no need to load more + console.log( + `[DEBUG] LOAD_SUBSET: Progressive mode complete - no load needed`, + ) return } else { // On-demand mode: use requestSnapshot + console.log(`[DEBUG] LOAD_SUBSET: On-demand mode - requesting snapshot`) // When cursor is provided, make two calls: // 1. whereCurrent (all ties, no limit) // 2. whereFrom (rows > cursor, with limit) const { cursor, where, orderBy, limit } = opts if (cursor) { + console.log( + `[DEBUG] LOAD_SUBSET: On-demand mode with cursor - making parallel requests`, + ) // Make parallel requests for cursor-based pagination const promises: Array> = [] @@ -429,6 +451,9 @@ function createLoadSubsetDedupe>({ // No limit - get all ties } const whereCurrentParams = compileSQL(whereCurrentOpts) + console.log( + `[DEBUG] LOAD_SUBSET: requesting whereCurrent snapshot params=${JSON.stringify(whereCurrentParams)}`, + ) promises.push(stream.requestSnapshot(whereCurrentParams)) debug( @@ -443,6 +468,9 @@ function createLoadSubsetDedupe>({ limit, } const whereFromParams = compileSQL(whereFromOpts) + console.log( + `[DEBUG] LOAD_SUBSET: requesting whereFrom snapshot params=${JSON.stringify(whereFromParams)} limit=${limit}`, + ) promises.push(stream.requestSnapshot(whereFromParams)) debug( @@ -450,11 +478,19 @@ function createLoadSubsetDedupe>({ ) // Wait for both requests to complete + console.log( + `[DEBUG] LOAD_SUBSET: waiting for both snapshot requests to complete`, + ) await Promise.all(promises) + console.log(`[DEBUG] LOAD_SUBSET: both snapshot requests completed`) } else { // No cursor - standard single request const snapshotParams = compileSQL(opts) + console.log( + `[DEBUG] LOAD_SUBSET: requesting single snapshot params=${JSON.stringify(snapshotParams)}`, + ) await stream.requestSnapshot(snapshotParams) + console.log(`[DEBUG] LOAD_SUBSET: snapshot request completed`) } } } @@ -937,18 +973,27 @@ function createElectricSync>( rowId: RowId, rowTagSet: Set, ): void => { + console.log( + `[DEBUG] TAGS: addTagsToRow rowId=${String(rowId)} adding ${tags.length} tags: ${JSON.stringify(tags)}`, + ) for (const tag of tags) { const parsedTag = parseTag(tag) // Infer tag length from first tag if (tagLength === undefined) { tagLength = getTagLength(parsedTag) + console.log( + `[DEBUG] TAGS: inferred tagLength=${tagLength} from first tag`, + ) initializeTagIndex(tagLength) } // Validate tag length matches const currentTagLength = getTagLength(parsedTag) if (currentTagLength !== tagLength) { + console.warn( + `[DEBUG] TAGS: ⚠️ tag length mismatch: expected ${tagLength}, got ${currentTagLength}, skipping tag ${tag}`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Tag length mismatch: expected ${tagLength}, got ${currentTagLength}`, ) @@ -969,9 +1014,15 @@ function createElectricSync>( rowTagSet: Set, ): void => { if (tagLength === undefined) { + console.log( + `[DEBUG] TAGS: removeTagsFromRow rowId=${String(rowId)} - no tagLength set, skipping`, + ) return } + console.log( + `[DEBUG] TAGS: removeTagsFromRow rowId=${String(rowId)} removing ${removedTags.length} tags: ${JSON.stringify(removedTags)}`, + ) for (const tag of removedTags) { const parsedTag = parseTag(tag) rowTagSet.delete(tag) @@ -992,6 +1043,9 @@ function createElectricSync>( removedTags: Array | undefined, rowId: RowId, ): Set => { + const hadTags = rowTagSets.has(rowId) + const previousTagCount = hadTags ? rowTagSets.get(rowId)!.size : 0 + // Initialize tag set for this row if it doesn't exist (needed for checking deletion) if (!rowTagSets.has(rowId)) { rowTagSets.set(rowId, new Set()) @@ -1008,6 +1062,10 @@ function createElectricSync>( removeTagsFromRow(removedTags, rowId, rowTagSet) } + console.log( + `[DEBUG] TAGS: rowId=${String(rowId)} prevTags=${previousTagCount} newTags=${rowTagSet.size} added=${JSON.stringify(tags)} removed=${JSON.stringify(removedTags)}`, + ) + return rowTagSet } @@ -1015,6 +1073,9 @@ function createElectricSync>( * Clear all tag tracking state (used when truncating) */ const clearTagTrackingState = (): void => { + console.log( + `[DEBUG] TAGS: clearing all tag tracking state (rowTagSets size=${rowTagSets.size})`, + ) rowTagSets.clear() tagIndex.length = 0 tagLength = undefined @@ -1026,14 +1087,24 @@ function createElectricSync>( */ const clearTagsForRow = (rowId: RowId): void => { if (tagLength === undefined) { + console.log( + `[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} - no tagLength set, skipping`, + ) return } const rowTagSet = rowTagSets.get(rowId) if (!rowTagSet) { + console.log( + `[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} - no tags to clear`, + ) return } + console.log( + `[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} clearing ${rowTagSet.size} tags`, + ) + // Remove each tag from the index for (const tag of rowTagSet) { const parsedTag = parseTag(tag) @@ -1058,20 +1129,37 @@ function createElectricSync>( ): boolean => { const rowTagSet = rowTagSets.get(rowId) if (!rowTagSet) { + console.log( + `[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} - no tag set found`, + ) return false } + const originalSize = rowTagSet.size + let removedCount = 0 + // Find tags that match this pattern and remove them for (const tag of rowTagSet) { const parsedTag = parseTag(tag) if (tagMatchesPattern(parsedTag, pattern)) { + console.log( + `[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} removing matching tag ${tag}`, + ) rowTagSet.delete(tag) removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength!) + removedCount++ } } + console.log( + `[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} removed ${removedCount}/${originalSize} tags, remaining=${rowTagSet.size}`, + ) + // Check if row's tag set is now empty if (rowTagSet.size === 0) { + console.log( + `[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} - tag set now empty, row will be deleted`, + ) rowTagSets.delete(rowId) return true } @@ -1088,7 +1176,10 @@ function createElectricSync>( write: (message: ChangeMessageOrDeleteKeyMessage) => void, transactionStarted: boolean, ): boolean => { + console.log(`[DEBUG] MOVE-OUT: patterns=${JSON.stringify(patterns)}`) + if (tagLength === undefined) { + console.log(`[DEBUG] MOVE-OUT: no tag length set yet, ignoring`) debug( `${collectionId ? `[${collectionId}] ` : ``}Received move-out message but no tag length set yet, ignoring`, ) @@ -1101,9 +1192,17 @@ function createElectricSync>( for (const pattern of patterns) { // Find all rows that match this pattern const affectedRowIds = findRowsMatchingPattern(pattern, tagIndex) + console.log( + `[DEBUG] MOVE-OUT: pattern=${JSON.stringify(pattern)} affectedRows=${Array.from(affectedRowIds).map(String).join(',')}`, + ) for (const rowId of affectedRowIds) { - if (removeMatchingTagsFromRow(rowId, pattern)) { + const willDelete = removeMatchingTagsFromRow(rowId, pattern) + console.log( + `[DEBUG] MOVE-OUT: rowId=${String(rowId)} willDelete=${willDelete}`, + ) + + if (willDelete) { // Delete rows with empty tag sets if (!txStarted) { begin() @@ -1191,21 +1290,30 @@ function createElectricSync>( }) }) + const effectiveLog = syncMode === `on-demand` ? `changes_only` : undefined + const effectiveOffset = + shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined) + + console.log( + `[DEBUG] SYNC_START: collectionId=${collectionId} syncMode=${syncMode} log=${effectiveLog} offset=${effectiveOffset} table=${shapeOptions.params?.table} where=${shapeOptions.params?.where}`, + ) + const stream = new ShapeStream({ ...shapeOptions, // In on-demand mode, we only want to sync changes, so we set the log to `changes_only` - log: syncMode === `on-demand` ? `changes_only` : undefined, + log: effectiveLog, // In on-demand mode, we only need the changes from the point of time the collection was created // so we default to `now` when there is no saved offset. - offset: - shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined), + offset: effectiveOffset, signal: abortController.signal, onError: (errorParams) => { + console.log(`[DEBUG] ERROR: ShapeStream error occurred`, errorParams) // Just immediately mark ready if there's an error to avoid blocking // apps waiting for `.preload()` to finish. // Note that Electric sends a 409 error on a `must-refetch` message, but the // ShapeStream handled this and it will not reach this handler, therefor // this markReady will not be triggers by a `must-refetch`. + console.log(`[DEBUG] ERROR: marking collection ready despite error`) markReady() if (shapeOptions.onError) { @@ -1249,6 +1357,30 @@ function createElectricSync>( const rowId = collection.getKeyFromItem(changeMessage.value) const operation = changeMessage.headers.operation + // Check if this row already exists in the collection + const existingRow = collection.state.get(rowId as any) + const isNewRow = existingRow === undefined + const existingTags = rowTagSets.get(rowId) + const isFirstTags = !existingTags || existingTags.size === 0 + + // Log all change messages + console.log( + `[DEBUG] CHANGE: op=${operation} rowId=${String(rowId)} isNew=${isNewRow} hasTags=${hasTags} tags=${JSON.stringify(tags)} value=${JSON.stringify(changeMessage.value)}`, + ) + + // Warn on potential move-in issues + if (isNewRow && operation === `update`) { + console.warn( + `[DEBUG] ⚠️ UPDATE on non-existent row! rowId=${String(rowId)} - may cause partial data`, + ) + } + + if (tags && tags.length > 0 && isFirstTags) { + console.log( + `[DEBUG] 🔄 MOVE-IN: rowId=${String(rowId)} getting first tags`, + ) + } + if (operation === `delete`) { clearTagsForRow(rowId) } else if (hasTags) { @@ -1279,6 +1411,10 @@ function createElectricSync>( }) unsubscribeStream = stream.subscribe((messages: Array>) => { + console.log( + `[DEBUG] STREAM: Received batch of ${messages.length} messages`, + ) + // Track commit point type - up-to-date takes precedence as it also triggers progressive mode atomic swap let commitPoint: `up-to-date` | `subset-end` | null = null @@ -1289,6 +1425,24 @@ function createElectricSync>( batchCommitted.setState(() => false) for (const message of messages) { + // Log each message type + if (isChangeMessage(message)) { + console.log( + `[DEBUG] STREAM MSG: change op=${message.headers.operation} key=${message.key}`, + ) + } else if (isMoveOutMessage(message)) { + console.log( + `[DEBUG] STREAM MSG: move-out patterns=${JSON.stringify(message.headers.patterns)}`, + ) + } else if (isControlMessage(message)) { + console.log( + `[DEBUG] STREAM MSG: control=${message.headers.control}`, + ) + } else { + console.log( + `[DEBUG] STREAM MSG: unknown type headers=${JSON.stringify((message as any).headers)}`, + ) + } // Add message to current batch buffer (for race condition handling) if (isChangeMessage(message) || isMoveOutMessage(message)) { currentBatchMessages.setState((currentBuffer) => { @@ -1339,10 +1493,14 @@ function createElectricSync>( // In buffered initial sync of progressive mode, buffer messages instead of writing if (isBufferingInitialSync()) { + console.log( + `[DEBUG] BUFFERING: message buffered (progressive mode initial sync) op=${message.headers.operation} key=${message.key}`, + ) bufferedMessages.push(message) } else { // Normal processing: write changes immediately if (!transactionStarted) { + console.log(`[DEBUG] TX: begin transaction`) begin() transactionStarted = true } @@ -1353,22 +1511,35 @@ function createElectricSync>( // Track postgres snapshot metadata for resolving awaiting mutations // Skip during buffered initial sync (will be extracted during atomic swap) if (!isBufferingInitialSync()) { + console.log(`[DEBUG] SNAPSHOT_END: received snapshot-end message`) newSnapshots.push(parseSnapshotMessage(message)) + } else { + console.log( + `[DEBUG] SNAPSHOT_END: buffering snapshot-end (progressive mode)`, + ) } } else if (isUpToDateMessage(message)) { // up-to-date takes precedence - also triggers progressive mode atomic swap + console.log( + `[DEBUG] UP_TO_DATE: received up-to-date message isBuffering=${isBufferingInitialSync()}`, + ) commitPoint = `up-to-date` } else if (isSubsetEndMessage(message)) { // subset-end triggers commit but not progressive mode atomic swap + console.log(`[DEBUG] SUBSET_END: received subset-end message`) if (commitPoint !== `up-to-date`) { commitPoint = `subset-end` } } else if (isMoveOutMessage(message)) { // Handle move-out event: buffer if buffering, otherwise process immediately if (isBufferingInitialSync()) { + console.log( + `[DEBUG] BUFFERING: move-out message buffered (progressive mode)`, + ) bufferedMessages.push(message) } else { // Normal processing: process move-out immediately + console.log(`[DEBUG] MOVE-OUT: processing immediately`) transactionStarted = processMoveOutEvent( message.headers.patterns, begin, @@ -1377,19 +1548,25 @@ function createElectricSync>( ) } } else if (isMustRefetchMessage(message)) { + console.log( + `[DEBUG] MUST_REFETCH: received must-refetch, will truncate collection`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`, ) // Start a transaction and truncate the collection if (!transactionStarted) { + console.log(`[DEBUG] TX: begin transaction for must-refetch`) begin() transactionStarted = true } + console.log(`[DEBUG] TRUNCATE: truncating collection`) truncate() // Clear tag tracking state + console.log(`[DEBUG] TRUNCATE: clearing tag tracking state`) clearTagTrackingState() // Reset the loadSubset deduplication state since we're starting fresh @@ -1403,25 +1580,41 @@ function createElectricSync>( } } + console.log( + `[DEBUG] BATCH_END: commitPoint=${commitPoint} transactionStarted=${transactionStarted} isBuffering=${isBufferingInitialSync()}`, + ) + if (commitPoint !== null) { // PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end) if (isBufferingInitialSync() && commitPoint === `up-to-date`) { + console.log( + `[DEBUG] ATOMIC_SWAP: starting atomic swap with ${bufferedMessages.length} buffered messages`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`, ) // Start atomic swap transaction + console.log(`[DEBUG] TX: begin transaction for atomic swap`) begin() // Truncate to clear all snapshot data + console.log(`[DEBUG] ATOMIC_SWAP: truncating collection`) truncate() // Clear tag tracking state for atomic swap + console.log(`[DEBUG] ATOMIC_SWAP: clearing tag tracking state`) clearTagTrackingState() // Apply all buffered change messages and extract txids/snapshots + console.log( + `[DEBUG] ATOMIC_SWAP: replaying ${bufferedMessages.length} buffered messages`, + ) for (const bufferedMsg of bufferedMessages) { if (isChangeMessage(bufferedMsg)) { + console.log( + `[DEBUG] ATOMIC_SWAP: replaying change message op=${bufferedMsg.headers.operation} key=${bufferedMsg.key}`, + ) processChangeMessage(bufferedMsg) // Extract txids from buffered messages (will be committed to store after transaction) @@ -1431,9 +1624,13 @@ function createElectricSync>( ) } } else if (isSnapshotEndMessage(bufferedMsg)) { + console.log( + `[DEBUG] ATOMIC_SWAP: processing buffered snapshot-end`, + ) // Extract snapshots from buffered messages (will be committed to store after transaction) newSnapshots.push(parseSnapshotMessage(bufferedMsg)) } else if (isMoveOutMessage(bufferedMsg)) { + console.log(`[DEBUG] ATOMIC_SWAP: replaying buffered move-out`) // Process buffered move-out messages during atomic swap processMoveOutEvent( bufferedMsg.headers.patterns, @@ -1445,12 +1642,16 @@ function createElectricSync>( } // Commit the atomic swap + console.log(`[DEBUG] TX: commit atomic swap transaction`) commit() // Exit buffering phase by marking that we've received up-to-date // isBufferingInitialSync() will now return false bufferedMessages.length = 0 + console.log( + `[DEBUG] ATOMIC_SWAP: complete, now in normal sync mode`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Atomic swap complete, now in normal sync mode`, ) @@ -1458,14 +1659,21 @@ function createElectricSync>( // Normal mode or on-demand: commit transaction if one was started // Both up-to-date and subset-end trigger a commit if (transactionStarted) { + console.log(`[DEBUG] TX: commit transaction (${commitPoint})`) commit() transactionStarted = false + } else { + console.log( + `[DEBUG] TX: no transaction to commit (${commitPoint})`, + ) } } + console.log(`[DEBUG] READY: marking collection ready`) wrappedMarkReady(isBufferingInitialSync()) // Track that we've received the first up-to-date for progressive mode if (commitPoint === `up-to-date`) { + console.log(`[DEBUG] UP_TO_DATE: hasReceivedUpToDate = true`) hasReceivedUpToDate = true } @@ -1510,12 +1718,14 @@ function createElectricSync>( return { loadSubset: loadSubsetDedupe?.loadSubset, cleanup: () => { + console.log(`[DEBUG] CLEANUP: cleaning up collection ${collectionId}`) // Unsubscribe from the stream unsubscribeStream() // Abort the abort controller to stop the stream abortController.abort() // Reset deduplication tracking so collection can load fresh data if restarted loadSubsetDedupe?.reset() + console.log(`[DEBUG] CLEANUP: complete`) }, } },