From 23113edeb95e6fa44f970ea9b9cd60329e7e528d Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Dec 2025 04:39:49 +0000 Subject: [PATCH 1/5] Add exhaustive debug logging to investigate move-in bug Adds console.log statements to trace the flow of data when rows move into scope in on-demand mode with subset filtering. Key areas logged: - SYNC_START: Collection sync initialization (mode, offset, table, where) - STREAM: All incoming message batches and individual message types - CHANGE: Every change message with operation, row ID, existing state, tags - MOVE-IN: Detection of rows receiving their first tags - MOVE-OUT: Move-out event processing and row deletion - TAGS: Tag addition/removal with before/after counts - LOAD_SUBSET: Snapshot requests and responses - STATE: Insert/update/delete operations with data merging details Critical warnings are logged when: - UPDATE received for non-existent row (potential partial data bug) - Move-in row has undefined values To use: Run the app and check console for [DEBUG] prefixed logs. --- packages/db/src/collection/state.ts | 14 +++++ .../electric-db-collection/src/electric.ts | 63 +++++++++++++++++-- 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index b76580c19..4261253ee 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -564,15 +564,28 @@ export class CollectionStateManager< // Update synced data switch (operation.type) { case `insert`: + console.log(`[DEBUG] STATE: INSERT key=${String(key)} value=${JSON.stringify(operation.value)}`) this.syncedData.set(key, operation.value) break case `update`: { + const existingData = this.syncedData.get(key) + const hasExisting = existingData !== undefined + + console.log(`[DEBUG] STATE: UPDATE key=${String(key)} mode=${rowUpdateMode} hasExisting=${hasExisting}`) + console.log(`[DEBUG] STATE: UPDATE existing=${JSON.stringify(existingData)}`) + console.log(`[DEBUG] STATE: UPDATE incoming=${JSON.stringify(operation.value)}`) + + if (!hasExisting) { + console.warn(`[DEBUG] STATE: ⚠️ UPDATE on non-existent row! key=${String(key)} - this may cause partial data!`) + } + if (rowUpdateMode === `partial`) { const updatedValue = Object.assign( {}, this.syncedData.get(key), operation.value, ) + console.log(`[DEBUG] STATE: UPDATE merged=${JSON.stringify(updatedValue)}`) this.syncedData.set(key, updatedValue) } else { this.syncedData.set(key, operation.value) @@ -580,6 +593,7 @@ export class CollectionStateManager< break } case `delete`: + console.log(`[DEBUG] STATE: DELETE key=${String(key)}`) this.syncedData.delete(key) break } diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 477e37641..8b288ee8c 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -366,16 +366,21 @@ function createLoadSubsetDedupe>({ } const loadSubset = async (opts: LoadSubsetOptions) => { + console.log(`[DEBUG] LOAD_SUBSET: syncMode=${syncMode} isBuffering=${isBufferingInitialSync()} opts=${JSON.stringify(opts)}`) + // In progressive mode, use fetchSnapshot during snapshot phase if (isBufferingInitialSync()) { // Progressive mode snapshot phase: fetch and apply immediately const snapshotParams = compileSQL(opts) + console.log(`[DEBUG] LOAD_SUBSET: Progressive mode - fetching snapshot`) try { const { data: rows } = await stream.fetchSnapshot(snapshotParams) + console.log(`[DEBUG] LOAD_SUBSET: Progressive mode - got ${rows.length} rows`) // Check again if we're still buffering - we might have received up-to-date // and completed the atomic swap while waiting for the snapshot if (!isBufferingInitialSync()) { + console.log(`[DEBUG] LOAD_SUBSET: Ignoring snapshot - sync completed while fetching`) debug( `${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching`, ) @@ -386,6 +391,7 @@ function createLoadSubsetDedupe>({ if (rows.length > 0) { begin() for (const row of rows) { + console.log(`[DEBUG] LOAD_SUBSET: Applying row from snapshot: ${JSON.stringify(row.value)}`) write({ type: `insert`, value: row.value, @@ -401,6 +407,7 @@ function createLoadSubsetDedupe>({ ) } } catch (error) { + console.log(`[DEBUG] LOAD_SUBSET: Error fetching snapshot: ${error}`) debug( `${collectionId ? `[${collectionId}] ` : ``}Error fetching snapshot: %o`, error, @@ -409,9 +416,11 @@ function createLoadSubsetDedupe>({ } } else if (syncMode === `progressive`) { // Progressive mode after full sync complete: no need to load more + console.log(`[DEBUG] LOAD_SUBSET: Progressive mode complete - no load needed`) return } else { // On-demand mode: use requestSnapshot + console.log(`[DEBUG] LOAD_SUBSET: On-demand mode - requesting snapshot`) // When cursor is provided, make two calls: // 1. whereCurrent (all ties, no limit) // 2. whereFrom (rows > cursor, with limit) @@ -992,6 +1001,9 @@ function createElectricSync>( removedTags: Array | undefined, rowId: RowId, ): Set => { + const hadTags = rowTagSets.has(rowId) + const previousTagCount = hadTags ? rowTagSets.get(rowId)!.size : 0 + // Initialize tag set for this row if it doesn't exist (needed for checking deletion) if (!rowTagSets.has(rowId)) { rowTagSets.set(rowId, new Set()) @@ -1008,6 +1020,8 @@ function createElectricSync>( removeTagsFromRow(removedTags, rowId, rowTagSet) } + console.log(`[DEBUG] TAGS: rowId=${String(rowId)} prevTags=${previousTagCount} newTags=${rowTagSet.size} added=${JSON.stringify(tags)} removed=${JSON.stringify(removedTags)}`) + return rowTagSet } @@ -1088,7 +1102,10 @@ function createElectricSync>( write: (message: ChangeMessageOrDeleteKeyMessage) => void, transactionStarted: boolean, ): boolean => { + console.log(`[DEBUG] MOVE-OUT: patterns=${JSON.stringify(patterns)}`) + if (tagLength === undefined) { + console.log(`[DEBUG] MOVE-OUT: no tag length set yet, ignoring`) debug( `${collectionId ? `[${collectionId}] ` : ``}Received move-out message but no tag length set yet, ignoring`, ) @@ -1101,9 +1118,13 @@ function createElectricSync>( for (const pattern of patterns) { // Find all rows that match this pattern const affectedRowIds = findRowsMatchingPattern(pattern, tagIndex) + console.log(`[DEBUG] MOVE-OUT: pattern=${JSON.stringify(pattern)} affectedRows=${Array.from(affectedRowIds).map(String).join(',')}`) for (const rowId of affectedRowIds) { - if (removeMatchingTagsFromRow(rowId, pattern)) { + const willDelete = removeMatchingTagsFromRow(rowId, pattern) + console.log(`[DEBUG] MOVE-OUT: rowId=${String(rowId)} willDelete=${willDelete}`) + + if (willDelete) { // Delete rows with empty tag sets if (!txStarted) { begin() @@ -1191,14 +1212,18 @@ function createElectricSync>( }) }) + const effectiveLog = syncMode === `on-demand` ? `changes_only` : undefined + const effectiveOffset = shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined) + + console.log(`[DEBUG] SYNC_START: collectionId=${collectionId} syncMode=${syncMode} log=${effectiveLog} offset=${effectiveOffset} table=${shapeOptions.params?.table} where=${shapeOptions.params?.where}`) + const stream = new ShapeStream({ ...shapeOptions, // In on-demand mode, we only want to sync changes, so we set the log to `changes_only` - log: syncMode === `on-demand` ? `changes_only` : undefined, + log: effectiveLog, // In on-demand mode, we only need the changes from the point of time the collection was created // so we default to `now` when there is no saved offset. - offset: - shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined), + offset: effectiveOffset, signal: abortController.signal, onError: (errorParams) => { // Just immediately mark ready if there's an error to avoid blocking @@ -1249,6 +1274,24 @@ function createElectricSync>( const rowId = collection.getKeyFromItem(changeMessage.value) const operation = changeMessage.headers.operation + // Check if this row already exists in the collection + const existingRow = collection.state.get(rowId as any) + const isNewRow = existingRow === undefined + const existingTags = rowTagSets.get(rowId) + const isFirstTags = !existingTags || existingTags.size === 0 + + // Log all change messages + console.log(`[DEBUG] CHANGE: op=${operation} rowId=${String(rowId)} isNew=${isNewRow} hasTags=${hasTags} tags=${JSON.stringify(tags)} value=${JSON.stringify(changeMessage.value)}`) + + // Warn on potential move-in issues + if (isNewRow && operation === `update`) { + console.warn(`[DEBUG] ⚠️ UPDATE on non-existent row! rowId=${String(rowId)} - may cause partial data`) + } + + if (tags && tags.length > 0 && isFirstTags) { + console.log(`[DEBUG] 🔄 MOVE-IN: rowId=${String(rowId)} getting first tags`) + } + if (operation === `delete`) { clearTagsForRow(rowId) } else if (hasTags) { @@ -1279,6 +1322,8 @@ function createElectricSync>( }) unsubscribeStream = stream.subscribe((messages: Array>) => { + console.log(`[DEBUG] STREAM: Received batch of ${messages.length} messages`) + // Track commit point type - up-to-date takes precedence as it also triggers progressive mode atomic swap let commitPoint: `up-to-date` | `subset-end` | null = null @@ -1289,6 +1334,16 @@ function createElectricSync>( batchCommitted.setState(() => false) for (const message of messages) { + // Log each message type + if (isChangeMessage(message)) { + console.log(`[DEBUG] STREAM MSG: change op=${message.headers.operation} key=${message.key}`) + } else if (isMoveOutMessage(message)) { + console.log(`[DEBUG] STREAM MSG: move-out patterns=${JSON.stringify(message.headers.patterns)}`) + } else if (isControlMessage(message)) { + console.log(`[DEBUG] STREAM MSG: control=${message.headers.control}`) + } else { + console.log(`[DEBUG] STREAM MSG: unknown type headers=${JSON.stringify(message.headers)}`) + } // Add message to current batch buffer (for race condition handling) if (isChangeMessage(message) || isMoveOutMessage(message)) { currentBatchMessages.setState((currentBuffer) => { From 56bbe109764587ffae57a250969f6a8022844d10 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Dec 2025 04:46:08 +0000 Subject: [PATCH 2/5] Add more exhaustive debug logging for every code path Additional logging added to cover all code paths: - BUFFERING: When messages are buffered in progressive mode - TX: All transaction begin/commit points - SNAPSHOT_END/SUBSET_END: Control message handling - UP_TO_DATE: When up-to-date is received - MUST_REFETCH: When must-refetch triggers truncate - ATOMIC_SWAP: All steps of progressive mode atomic swap - BATCH_END: Summary at end of each message batch - READY: When collection is marked ready - LOAD_SUBSET: Cursor-based pagination requests - TAGS: All tag operations (add, remove, clear, match) - addTagsToRow with tag details - removeTagsFromRow with tag details - removeMatchingTagsFromRow with before/after counts - clearTagsForRow and clearTagTrackingState --- .../electric-db-collection/src/electric.ts | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 8b288ee8c..02fe553af 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -427,6 +427,7 @@ function createLoadSubsetDedupe>({ const { cursor, where, orderBy, limit } = opts if (cursor) { + console.log(`[DEBUG] LOAD_SUBSET: On-demand mode with cursor - making parallel requests`) // Make parallel requests for cursor-based pagination const promises: Array> = [] @@ -438,6 +439,7 @@ function createLoadSubsetDedupe>({ // No limit - get all ties } const whereCurrentParams = compileSQL(whereCurrentOpts) + console.log(`[DEBUG] LOAD_SUBSET: requesting whereCurrent snapshot params=${JSON.stringify(whereCurrentParams)}`) promises.push(stream.requestSnapshot(whereCurrentParams)) debug( @@ -452,6 +454,7 @@ function createLoadSubsetDedupe>({ limit, } const whereFromParams = compileSQL(whereFromOpts) + console.log(`[DEBUG] LOAD_SUBSET: requesting whereFrom snapshot params=${JSON.stringify(whereFromParams)} limit=${limit}`) promises.push(stream.requestSnapshot(whereFromParams)) debug( @@ -459,11 +462,15 @@ function createLoadSubsetDedupe>({ ) // Wait for both requests to complete + console.log(`[DEBUG] LOAD_SUBSET: waiting for both snapshot requests to complete`) await Promise.all(promises) + console.log(`[DEBUG] LOAD_SUBSET: both snapshot requests completed`) } else { // No cursor - standard single request const snapshotParams = compileSQL(opts) + console.log(`[DEBUG] LOAD_SUBSET: requesting single snapshot params=${JSON.stringify(snapshotParams)}`) await stream.requestSnapshot(snapshotParams) + console.log(`[DEBUG] LOAD_SUBSET: snapshot request completed`) } } } @@ -946,18 +953,21 @@ function createElectricSync>( rowId: RowId, rowTagSet: Set, ): void => { + console.log(`[DEBUG] TAGS: addTagsToRow rowId=${String(rowId)} adding ${tags.length} tags: ${JSON.stringify(tags)}`) for (const tag of tags) { const parsedTag = parseTag(tag) // Infer tag length from first tag if (tagLength === undefined) { tagLength = getTagLength(parsedTag) + console.log(`[DEBUG] TAGS: inferred tagLength=${tagLength} from first tag`) initializeTagIndex(tagLength) } // Validate tag length matches const currentTagLength = getTagLength(parsedTag) if (currentTagLength !== tagLength) { + console.warn(`[DEBUG] TAGS: ⚠️ tag length mismatch: expected ${tagLength}, got ${currentTagLength}, skipping tag ${tag}`) debug( `${collectionId ? `[${collectionId}] ` : ``}Tag length mismatch: expected ${tagLength}, got ${currentTagLength}`, ) @@ -978,9 +988,11 @@ function createElectricSync>( rowTagSet: Set, ): void => { if (tagLength === undefined) { + console.log(`[DEBUG] TAGS: removeTagsFromRow rowId=${String(rowId)} - no tagLength set, skipping`) return } + console.log(`[DEBUG] TAGS: removeTagsFromRow rowId=${String(rowId)} removing ${removedTags.length} tags: ${JSON.stringify(removedTags)}`) for (const tag of removedTags) { const parsedTag = parseTag(tag) rowTagSet.delete(tag) @@ -1029,6 +1041,7 @@ function createElectricSync>( * Clear all tag tracking state (used when truncating) */ const clearTagTrackingState = (): void => { + console.log(`[DEBUG] TAGS: clearing all tag tracking state (rowTagSets size=${rowTagSets.size})`) rowTagSets.clear() tagIndex.length = 0 tagLength = undefined @@ -1040,14 +1053,18 @@ function createElectricSync>( */ const clearTagsForRow = (rowId: RowId): void => { if (tagLength === undefined) { + console.log(`[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} - no tagLength set, skipping`) return } const rowTagSet = rowTagSets.get(rowId) if (!rowTagSet) { + console.log(`[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} - no tags to clear`) return } + console.log(`[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} clearing ${rowTagSet.size} tags`) + // Remove each tag from the index for (const tag of rowTagSet) { const parsedTag = parseTag(tag) @@ -1072,20 +1089,29 @@ function createElectricSync>( ): boolean => { const rowTagSet = rowTagSets.get(rowId) if (!rowTagSet) { + console.log(`[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} - no tag set found`) return false } + const originalSize = rowTagSet.size + let removedCount = 0 + // Find tags that match this pattern and remove them for (const tag of rowTagSet) { const parsedTag = parseTag(tag) if (tagMatchesPattern(parsedTag, pattern)) { + console.log(`[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} removing matching tag ${tag}`) rowTagSet.delete(tag) removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength!) + removedCount++ } } + console.log(`[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} removed ${removedCount}/${originalSize} tags, remaining=${rowTagSet.size}`) + // Check if row's tag set is now empty if (rowTagSet.size === 0) { + console.log(`[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} - tag set now empty, row will be deleted`) rowTagSets.delete(rowId) return true } @@ -1394,10 +1420,12 @@ function createElectricSync>( // In buffered initial sync of progressive mode, buffer messages instead of writing if (isBufferingInitialSync()) { + console.log(`[DEBUG] BUFFERING: message buffered (progressive mode initial sync) op=${message.headers.operation} key=${message.key}`) bufferedMessages.push(message) } else { // Normal processing: write changes immediately if (!transactionStarted) { + console.log(`[DEBUG] TX: begin transaction`) begin() transactionStarted = true } @@ -1408,22 +1436,29 @@ function createElectricSync>( // Track postgres snapshot metadata for resolving awaiting mutations // Skip during buffered initial sync (will be extracted during atomic swap) if (!isBufferingInitialSync()) { + console.log(`[DEBUG] SNAPSHOT_END: received snapshot-end message`) newSnapshots.push(parseSnapshotMessage(message)) + } else { + console.log(`[DEBUG] SNAPSHOT_END: buffering snapshot-end (progressive mode)`) } } else if (isUpToDateMessage(message)) { // up-to-date takes precedence - also triggers progressive mode atomic swap + console.log(`[DEBUG] UP_TO_DATE: received up-to-date message isBuffering=${isBufferingInitialSync()}`) commitPoint = `up-to-date` } else if (isSubsetEndMessage(message)) { // subset-end triggers commit but not progressive mode atomic swap + console.log(`[DEBUG] SUBSET_END: received subset-end message`) if (commitPoint !== `up-to-date`) { commitPoint = `subset-end` } } else if (isMoveOutMessage(message)) { // Handle move-out event: buffer if buffering, otherwise process immediately if (isBufferingInitialSync()) { + console.log(`[DEBUG] BUFFERING: move-out message buffered (progressive mode)`) bufferedMessages.push(message) } else { // Normal processing: process move-out immediately + console.log(`[DEBUG] MOVE-OUT: processing immediately`) transactionStarted = processMoveOutEvent( message.headers.patterns, begin, @@ -1432,19 +1467,23 @@ function createElectricSync>( ) } } else if (isMustRefetchMessage(message)) { + console.log(`[DEBUG] MUST_REFETCH: received must-refetch, will truncate collection`) debug( `${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`, ) // Start a transaction and truncate the collection if (!transactionStarted) { + console.log(`[DEBUG] TX: begin transaction for must-refetch`) begin() transactionStarted = true } + console.log(`[DEBUG] TRUNCATE: truncating collection`) truncate() // Clear tag tracking state + console.log(`[DEBUG] TRUNCATE: clearing tag tracking state`) clearTagTrackingState() // Reset the loadSubset deduplication state since we're starting fresh @@ -1458,25 +1497,33 @@ function createElectricSync>( } } + console.log(`[DEBUG] BATCH_END: commitPoint=${commitPoint} transactionStarted=${transactionStarted} isBuffering=${isBufferingInitialSync()}`) + if (commitPoint !== null) { // PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end) if (isBufferingInitialSync() && commitPoint === `up-to-date`) { + console.log(`[DEBUG] ATOMIC_SWAP: starting atomic swap with ${bufferedMessages.length} buffered messages`) debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`, ) // Start atomic swap transaction + console.log(`[DEBUG] TX: begin transaction for atomic swap`) begin() // Truncate to clear all snapshot data + console.log(`[DEBUG] ATOMIC_SWAP: truncating collection`) truncate() // Clear tag tracking state for atomic swap + console.log(`[DEBUG] ATOMIC_SWAP: clearing tag tracking state`) clearTagTrackingState() // Apply all buffered change messages and extract txids/snapshots + console.log(`[DEBUG] ATOMIC_SWAP: replaying ${bufferedMessages.length} buffered messages`) for (const bufferedMsg of bufferedMessages) { if (isChangeMessage(bufferedMsg)) { + console.log(`[DEBUG] ATOMIC_SWAP: replaying change message op=${bufferedMsg.headers.operation} key=${bufferedMsg.key}`) processChangeMessage(bufferedMsg) // Extract txids from buffered messages (will be committed to store after transaction) @@ -1486,9 +1533,11 @@ function createElectricSync>( ) } } else if (isSnapshotEndMessage(bufferedMsg)) { + console.log(`[DEBUG] ATOMIC_SWAP: processing buffered snapshot-end`) // Extract snapshots from buffered messages (will be committed to store after transaction) newSnapshots.push(parseSnapshotMessage(bufferedMsg)) } else if (isMoveOutMessage(bufferedMsg)) { + console.log(`[DEBUG] ATOMIC_SWAP: replaying buffered move-out`) // Process buffered move-out messages during atomic swap processMoveOutEvent( bufferedMsg.headers.patterns, @@ -1500,12 +1549,14 @@ function createElectricSync>( } // Commit the atomic swap + console.log(`[DEBUG] TX: commit atomic swap transaction`) commit() // Exit buffering phase by marking that we've received up-to-date // isBufferingInitialSync() will now return false bufferedMessages.length = 0 + console.log(`[DEBUG] ATOMIC_SWAP: complete, now in normal sync mode`) debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Atomic swap complete, now in normal sync mode`, ) @@ -1513,14 +1564,19 @@ function createElectricSync>( // Normal mode or on-demand: commit transaction if one was started // Both up-to-date and subset-end trigger a commit if (transactionStarted) { + console.log(`[DEBUG] TX: commit transaction (${commitPoint})`) commit() transactionStarted = false + } else { + console.log(`[DEBUG] TX: no transaction to commit (${commitPoint})`) } } + console.log(`[DEBUG] READY: marking collection ready`) wrappedMarkReady(isBufferingInitialSync()) // Track that we've received the first up-to-date for progressive mode if (commitPoint === `up-to-date`) { + console.log(`[DEBUG] UP_TO_DATE: hasReceivedUpToDate = true`) hasReceivedUpToDate = true } From 94503648959b9118f5d442407bdd6ad3c67091a2 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Dec 2025 04:47:55 +0000 Subject: [PATCH 3/5] Add final round of exhaustive debug logging Additional logging for remaining code paths: - CLEANUP: When collection cleanup is called - ERROR: When ShapeStream errors occur - STATE TRUNCATE: Before/after truncate with sizes - STATE COMMIT: Final commit summary with sizes - LOAD_SUBSET: More detail on cursor pagination - TAGS: All tag operation entry/exit points This covers all major code paths for debugging the move-in bug. --- packages/db/src/collection/state.ts | 4 ++++ packages/electric-db-collection/src/electric.ts | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index 4261253ee..4c41c8bcb 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -498,6 +498,7 @@ export class CollectionStateManager< for (const transaction of committedSyncedTransactions) { // Handle truncate operations first if (transaction.truncate) { + console.log(`[DEBUG] STATE: TRUNCATE starting, current syncedData size=${this.syncedData.size}`) // TRUNCATE PHASE // 1) Emit a delete for every visible key (synced + optimistic) so downstream listeners/indexes // observe a clear-before-rebuild. We intentionally skip keys already in @@ -507,6 +508,7 @@ export class CollectionStateManager< ...this.syncedData.keys(), ...(truncateOptimisticSnapshot?.upserts.keys() || []), ]) + console.log(`[DEBUG] STATE: TRUNCATE clearing ${visibleKeys.size} visible keys`) for (const key of visibleKeys) { if (truncateOptimisticSnapshot?.deletes.has(key)) continue const previousValue = @@ -522,6 +524,7 @@ export class CollectionStateManager< this.syncedData.clear() this.syncedMetadata.clear() this.syncedKeys.clear() + console.log(`[DEBUG] STATE: TRUNCATE complete, syncedData size=${this.syncedData.size}`) // 3) Clear currentVisibleState for truncated keys to ensure subsequent operations // are compared against the post-truncate state (undefined) rather than pre-truncate state @@ -795,6 +798,7 @@ export class CollectionStateManager< // Update cached size after synced data changes this.size = this.calculateSize() + console.log(`[DEBUG] STATE: COMMIT complete - syncedData size=${this.syncedData.size}, total visible size=${this.size}, events emitted=${events.length}`) // Update indexes for all events before emitting if (events.length > 0) { diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 02fe553af..e71fe5d57 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1252,11 +1252,13 @@ function createElectricSync>( offset: effectiveOffset, signal: abortController.signal, onError: (errorParams) => { + console.log(`[DEBUG] ERROR: ShapeStream error occurred`, errorParams) // Just immediately mark ready if there's an error to avoid blocking // apps waiting for `.preload()` to finish. // Note that Electric sends a 409 error on a `must-refetch` message, but the // ShapeStream handled this and it will not reach this handler, therefor // this markReady will not be triggers by a `must-refetch`. + console.log(`[DEBUG] ERROR: marking collection ready despite error`) markReady() if (shapeOptions.onError) { @@ -1621,12 +1623,14 @@ function createElectricSync>( return { loadSubset: loadSubsetDedupe?.loadSubset, cleanup: () => { + console.log(`[DEBUG] CLEANUP: cleaning up collection ${collectionId}`) // Unsubscribe from the stream unsubscribeStream() // Abort the abort controller to stop the stream abortController.abort() // Reset deduplication tracking so collection can load fresh data if restarted loadSubsetDedupe?.reset() + console.log(`[DEBUG] CLEANUP: complete`) }, } }, From 6197404094cba0ae036bee1367d69198beba2b08 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 19 Dec 2025 04:50:06 +0000 Subject: [PATCH 4/5] ci: apply automated fixes --- packages/db/src/collection/state.ts | 40 +++- .../electric-db-collection/src/electric.ts | 191 +++++++++++++----- 2 files changed, 173 insertions(+), 58 deletions(-) diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index 4c41c8bcb..f24dbbe43 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -498,7 +498,9 @@ export class CollectionStateManager< for (const transaction of committedSyncedTransactions) { // Handle truncate operations first if (transaction.truncate) { - console.log(`[DEBUG] STATE: TRUNCATE starting, current syncedData size=${this.syncedData.size}`) + console.log( + `[DEBUG] STATE: TRUNCATE starting, current syncedData size=${this.syncedData.size}`, + ) // TRUNCATE PHASE // 1) Emit a delete for every visible key (synced + optimistic) so downstream listeners/indexes // observe a clear-before-rebuild. We intentionally skip keys already in @@ -508,7 +510,9 @@ export class CollectionStateManager< ...this.syncedData.keys(), ...(truncateOptimisticSnapshot?.upserts.keys() || []), ]) - console.log(`[DEBUG] STATE: TRUNCATE clearing ${visibleKeys.size} visible keys`) + console.log( + `[DEBUG] STATE: TRUNCATE clearing ${visibleKeys.size} visible keys`, + ) for (const key of visibleKeys) { if (truncateOptimisticSnapshot?.deletes.has(key)) continue const previousValue = @@ -524,7 +528,9 @@ export class CollectionStateManager< this.syncedData.clear() this.syncedMetadata.clear() this.syncedKeys.clear() - console.log(`[DEBUG] STATE: TRUNCATE complete, syncedData size=${this.syncedData.size}`) + console.log( + `[DEBUG] STATE: TRUNCATE complete, syncedData size=${this.syncedData.size}`, + ) // 3) Clear currentVisibleState for truncated keys to ensure subsequent operations // are compared against the post-truncate state (undefined) rather than pre-truncate state @@ -567,19 +573,29 @@ export class CollectionStateManager< // Update synced data switch (operation.type) { case `insert`: - console.log(`[DEBUG] STATE: INSERT key=${String(key)} value=${JSON.stringify(operation.value)}`) + console.log( + `[DEBUG] STATE: INSERT key=${String(key)} value=${JSON.stringify(operation.value)}`, + ) this.syncedData.set(key, operation.value) break case `update`: { const existingData = this.syncedData.get(key) const hasExisting = existingData !== undefined - console.log(`[DEBUG] STATE: UPDATE key=${String(key)} mode=${rowUpdateMode} hasExisting=${hasExisting}`) - console.log(`[DEBUG] STATE: UPDATE existing=${JSON.stringify(existingData)}`) - console.log(`[DEBUG] STATE: UPDATE incoming=${JSON.stringify(operation.value)}`) + console.log( + `[DEBUG] STATE: UPDATE key=${String(key)} mode=${rowUpdateMode} hasExisting=${hasExisting}`, + ) + console.log( + `[DEBUG] STATE: UPDATE existing=${JSON.stringify(existingData)}`, + ) + console.log( + `[DEBUG] STATE: UPDATE incoming=${JSON.stringify(operation.value)}`, + ) if (!hasExisting) { - console.warn(`[DEBUG] STATE: ⚠️ UPDATE on non-existent row! key=${String(key)} - this may cause partial data!`) + console.warn( + `[DEBUG] STATE: ⚠️ UPDATE on non-existent row! key=${String(key)} - this may cause partial data!`, + ) } if (rowUpdateMode === `partial`) { @@ -588,7 +604,9 @@ export class CollectionStateManager< this.syncedData.get(key), operation.value, ) - console.log(`[DEBUG] STATE: UPDATE merged=${JSON.stringify(updatedValue)}`) + console.log( + `[DEBUG] STATE: UPDATE merged=${JSON.stringify(updatedValue)}`, + ) this.syncedData.set(key, updatedValue) } else { this.syncedData.set(key, operation.value) @@ -798,7 +816,9 @@ export class CollectionStateManager< // Update cached size after synced data changes this.size = this.calculateSize() - console.log(`[DEBUG] STATE: COMMIT complete - syncedData size=${this.syncedData.size}, total visible size=${this.size}, events emitted=${events.length}`) + console.log( + `[DEBUG] STATE: COMMIT complete - syncedData size=${this.syncedData.size}, total visible size=${this.size}, events emitted=${events.length}`, + ) // Update indexes for all events before emitting if (events.length > 0) { diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index e71fe5d57..beac2ec0f 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -366,7 +366,9 @@ function createLoadSubsetDedupe>({ } const loadSubset = async (opts: LoadSubsetOptions) => { - console.log(`[DEBUG] LOAD_SUBSET: syncMode=${syncMode} isBuffering=${isBufferingInitialSync()} opts=${JSON.stringify(opts)}`) + console.log( + `[DEBUG] LOAD_SUBSET: syncMode=${syncMode} isBuffering=${isBufferingInitialSync()} opts=${JSON.stringify(opts)}`, + ) // In progressive mode, use fetchSnapshot during snapshot phase if (isBufferingInitialSync()) { @@ -375,12 +377,16 @@ function createLoadSubsetDedupe>({ console.log(`[DEBUG] LOAD_SUBSET: Progressive mode - fetching snapshot`) try { const { data: rows } = await stream.fetchSnapshot(snapshotParams) - console.log(`[DEBUG] LOAD_SUBSET: Progressive mode - got ${rows.length} rows`) + console.log( + `[DEBUG] LOAD_SUBSET: Progressive mode - got ${rows.length} rows`, + ) // Check again if we're still buffering - we might have received up-to-date // and completed the atomic swap while waiting for the snapshot if (!isBufferingInitialSync()) { - console.log(`[DEBUG] LOAD_SUBSET: Ignoring snapshot - sync completed while fetching`) + console.log( + `[DEBUG] LOAD_SUBSET: Ignoring snapshot - sync completed while fetching`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Ignoring snapshot - sync completed while fetching`, ) @@ -391,7 +397,9 @@ function createLoadSubsetDedupe>({ if (rows.length > 0) { begin() for (const row of rows) { - console.log(`[DEBUG] LOAD_SUBSET: Applying row from snapshot: ${JSON.stringify(row.value)}`) + console.log( + `[DEBUG] LOAD_SUBSET: Applying row from snapshot: ${JSON.stringify(row.value)}`, + ) write({ type: `insert`, value: row.value, @@ -416,7 +424,9 @@ function createLoadSubsetDedupe>({ } } else if (syncMode === `progressive`) { // Progressive mode after full sync complete: no need to load more - console.log(`[DEBUG] LOAD_SUBSET: Progressive mode complete - no load needed`) + console.log( + `[DEBUG] LOAD_SUBSET: Progressive mode complete - no load needed`, + ) return } else { // On-demand mode: use requestSnapshot @@ -427,7 +437,9 @@ function createLoadSubsetDedupe>({ const { cursor, where, orderBy, limit } = opts if (cursor) { - console.log(`[DEBUG] LOAD_SUBSET: On-demand mode with cursor - making parallel requests`) + console.log( + `[DEBUG] LOAD_SUBSET: On-demand mode with cursor - making parallel requests`, + ) // Make parallel requests for cursor-based pagination const promises: Array> = [] @@ -439,7 +451,9 @@ function createLoadSubsetDedupe>({ // No limit - get all ties } const whereCurrentParams = compileSQL(whereCurrentOpts) - console.log(`[DEBUG] LOAD_SUBSET: requesting whereCurrent snapshot params=${JSON.stringify(whereCurrentParams)}`) + console.log( + `[DEBUG] LOAD_SUBSET: requesting whereCurrent snapshot params=${JSON.stringify(whereCurrentParams)}`, + ) promises.push(stream.requestSnapshot(whereCurrentParams)) debug( @@ -454,7 +468,9 @@ function createLoadSubsetDedupe>({ limit, } const whereFromParams = compileSQL(whereFromOpts) - console.log(`[DEBUG] LOAD_SUBSET: requesting whereFrom snapshot params=${JSON.stringify(whereFromParams)} limit=${limit}`) + console.log( + `[DEBUG] LOAD_SUBSET: requesting whereFrom snapshot params=${JSON.stringify(whereFromParams)} limit=${limit}`, + ) promises.push(stream.requestSnapshot(whereFromParams)) debug( @@ -462,13 +478,17 @@ function createLoadSubsetDedupe>({ ) // Wait for both requests to complete - console.log(`[DEBUG] LOAD_SUBSET: waiting for both snapshot requests to complete`) + console.log( + `[DEBUG] LOAD_SUBSET: waiting for both snapshot requests to complete`, + ) await Promise.all(promises) console.log(`[DEBUG] LOAD_SUBSET: both snapshot requests completed`) } else { // No cursor - standard single request const snapshotParams = compileSQL(opts) - console.log(`[DEBUG] LOAD_SUBSET: requesting single snapshot params=${JSON.stringify(snapshotParams)}`) + console.log( + `[DEBUG] LOAD_SUBSET: requesting single snapshot params=${JSON.stringify(snapshotParams)}`, + ) await stream.requestSnapshot(snapshotParams) console.log(`[DEBUG] LOAD_SUBSET: snapshot request completed`) } @@ -953,21 +973,27 @@ function createElectricSync>( rowId: RowId, rowTagSet: Set, ): void => { - console.log(`[DEBUG] TAGS: addTagsToRow rowId=${String(rowId)} adding ${tags.length} tags: ${JSON.stringify(tags)}`) + console.log( + `[DEBUG] TAGS: addTagsToRow rowId=${String(rowId)} adding ${tags.length} tags: ${JSON.stringify(tags)}`, + ) for (const tag of tags) { const parsedTag = parseTag(tag) // Infer tag length from first tag if (tagLength === undefined) { tagLength = getTagLength(parsedTag) - console.log(`[DEBUG] TAGS: inferred tagLength=${tagLength} from first tag`) + console.log( + `[DEBUG] TAGS: inferred tagLength=${tagLength} from first tag`, + ) initializeTagIndex(tagLength) } // Validate tag length matches const currentTagLength = getTagLength(parsedTag) if (currentTagLength !== tagLength) { - console.warn(`[DEBUG] TAGS: ⚠️ tag length mismatch: expected ${tagLength}, got ${currentTagLength}, skipping tag ${tag}`) + console.warn( + `[DEBUG] TAGS: ⚠️ tag length mismatch: expected ${tagLength}, got ${currentTagLength}, skipping tag ${tag}`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Tag length mismatch: expected ${tagLength}, got ${currentTagLength}`, ) @@ -988,11 +1014,15 @@ function createElectricSync>( rowTagSet: Set, ): void => { if (tagLength === undefined) { - console.log(`[DEBUG] TAGS: removeTagsFromRow rowId=${String(rowId)} - no tagLength set, skipping`) + console.log( + `[DEBUG] TAGS: removeTagsFromRow rowId=${String(rowId)} - no tagLength set, skipping`, + ) return } - console.log(`[DEBUG] TAGS: removeTagsFromRow rowId=${String(rowId)} removing ${removedTags.length} tags: ${JSON.stringify(removedTags)}`) + console.log( + `[DEBUG] TAGS: removeTagsFromRow rowId=${String(rowId)} removing ${removedTags.length} tags: ${JSON.stringify(removedTags)}`, + ) for (const tag of removedTags) { const parsedTag = parseTag(tag) rowTagSet.delete(tag) @@ -1032,7 +1062,9 @@ function createElectricSync>( removeTagsFromRow(removedTags, rowId, rowTagSet) } - console.log(`[DEBUG] TAGS: rowId=${String(rowId)} prevTags=${previousTagCount} newTags=${rowTagSet.size} added=${JSON.stringify(tags)} removed=${JSON.stringify(removedTags)}`) + console.log( + `[DEBUG] TAGS: rowId=${String(rowId)} prevTags=${previousTagCount} newTags=${rowTagSet.size} added=${JSON.stringify(tags)} removed=${JSON.stringify(removedTags)}`, + ) return rowTagSet } @@ -1041,7 +1073,9 @@ function createElectricSync>( * Clear all tag tracking state (used when truncating) */ const clearTagTrackingState = (): void => { - console.log(`[DEBUG] TAGS: clearing all tag tracking state (rowTagSets size=${rowTagSets.size})`) + console.log( + `[DEBUG] TAGS: clearing all tag tracking state (rowTagSets size=${rowTagSets.size})`, + ) rowTagSets.clear() tagIndex.length = 0 tagLength = undefined @@ -1053,17 +1087,23 @@ function createElectricSync>( */ const clearTagsForRow = (rowId: RowId): void => { if (tagLength === undefined) { - console.log(`[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} - no tagLength set, skipping`) + console.log( + `[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} - no tagLength set, skipping`, + ) return } const rowTagSet = rowTagSets.get(rowId) if (!rowTagSet) { - console.log(`[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} - no tags to clear`) + console.log( + `[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} - no tags to clear`, + ) return } - console.log(`[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} clearing ${rowTagSet.size} tags`) + console.log( + `[DEBUG] TAGS: clearTagsForRow rowId=${String(rowId)} clearing ${rowTagSet.size} tags`, + ) // Remove each tag from the index for (const tag of rowTagSet) { @@ -1089,7 +1129,9 @@ function createElectricSync>( ): boolean => { const rowTagSet = rowTagSets.get(rowId) if (!rowTagSet) { - console.log(`[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} - no tag set found`) + console.log( + `[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} - no tag set found`, + ) return false } @@ -1100,18 +1142,24 @@ function createElectricSync>( for (const tag of rowTagSet) { const parsedTag = parseTag(tag) if (tagMatchesPattern(parsedTag, pattern)) { - console.log(`[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} removing matching tag ${tag}`) + console.log( + `[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} removing matching tag ${tag}`, + ) rowTagSet.delete(tag) removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength!) removedCount++ } } - console.log(`[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} removed ${removedCount}/${originalSize} tags, remaining=${rowTagSet.size}`) + console.log( + `[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} removed ${removedCount}/${originalSize} tags, remaining=${rowTagSet.size}`, + ) // Check if row's tag set is now empty if (rowTagSet.size === 0) { - console.log(`[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} - tag set now empty, row will be deleted`) + console.log( + `[DEBUG] TAGS: removeMatchingTagsFromRow rowId=${String(rowId)} - tag set now empty, row will be deleted`, + ) rowTagSets.delete(rowId) return true } @@ -1144,11 +1192,15 @@ function createElectricSync>( for (const pattern of patterns) { // Find all rows that match this pattern const affectedRowIds = findRowsMatchingPattern(pattern, tagIndex) - console.log(`[DEBUG] MOVE-OUT: pattern=${JSON.stringify(pattern)} affectedRows=${Array.from(affectedRowIds).map(String).join(',')}`) + console.log( + `[DEBUG] MOVE-OUT: pattern=${JSON.stringify(pattern)} affectedRows=${Array.from(affectedRowIds).map(String).join(',')}`, + ) for (const rowId of affectedRowIds) { const willDelete = removeMatchingTagsFromRow(rowId, pattern) - console.log(`[DEBUG] MOVE-OUT: rowId=${String(rowId)} willDelete=${willDelete}`) + console.log( + `[DEBUG] MOVE-OUT: rowId=${String(rowId)} willDelete=${willDelete}`, + ) if (willDelete) { // Delete rows with empty tag sets @@ -1239,9 +1291,12 @@ function createElectricSync>( }) const effectiveLog = syncMode === `on-demand` ? `changes_only` : undefined - const effectiveOffset = shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined) + const effectiveOffset = + shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined) - console.log(`[DEBUG] SYNC_START: collectionId=${collectionId} syncMode=${syncMode} log=${effectiveLog} offset=${effectiveOffset} table=${shapeOptions.params?.table} where=${shapeOptions.params?.where}`) + console.log( + `[DEBUG] SYNC_START: collectionId=${collectionId} syncMode=${syncMode} log=${effectiveLog} offset=${effectiveOffset} table=${shapeOptions.params?.table} where=${shapeOptions.params?.where}`, + ) const stream = new ShapeStream({ ...shapeOptions, @@ -1309,15 +1364,21 @@ function createElectricSync>( const isFirstTags = !existingTags || existingTags.size === 0 // Log all change messages - console.log(`[DEBUG] CHANGE: op=${operation} rowId=${String(rowId)} isNew=${isNewRow} hasTags=${hasTags} tags=${JSON.stringify(tags)} value=${JSON.stringify(changeMessage.value)}`) + console.log( + `[DEBUG] CHANGE: op=${operation} rowId=${String(rowId)} isNew=${isNewRow} hasTags=${hasTags} tags=${JSON.stringify(tags)} value=${JSON.stringify(changeMessage.value)}`, + ) // Warn on potential move-in issues if (isNewRow && operation === `update`) { - console.warn(`[DEBUG] ⚠️ UPDATE on non-existent row! rowId=${String(rowId)} - may cause partial data`) + console.warn( + `[DEBUG] ⚠️ UPDATE on non-existent row! rowId=${String(rowId)} - may cause partial data`, + ) } if (tags && tags.length > 0 && isFirstTags) { - console.log(`[DEBUG] 🔄 MOVE-IN: rowId=${String(rowId)} getting first tags`) + console.log( + `[DEBUG] 🔄 MOVE-IN: rowId=${String(rowId)} getting first tags`, + ) } if (operation === `delete`) { @@ -1350,7 +1411,9 @@ function createElectricSync>( }) unsubscribeStream = stream.subscribe((messages: Array>) => { - console.log(`[DEBUG] STREAM: Received batch of ${messages.length} messages`) + console.log( + `[DEBUG] STREAM: Received batch of ${messages.length} messages`, + ) // Track commit point type - up-to-date takes precedence as it also triggers progressive mode atomic swap let commitPoint: `up-to-date` | `subset-end` | null = null @@ -1364,13 +1427,21 @@ function createElectricSync>( for (const message of messages) { // Log each message type if (isChangeMessage(message)) { - console.log(`[DEBUG] STREAM MSG: change op=${message.headers.operation} key=${message.key}`) + console.log( + `[DEBUG] STREAM MSG: change op=${message.headers.operation} key=${message.key}`, + ) } else if (isMoveOutMessage(message)) { - console.log(`[DEBUG] STREAM MSG: move-out patterns=${JSON.stringify(message.headers.patterns)}`) + console.log( + `[DEBUG] STREAM MSG: move-out patterns=${JSON.stringify(message.headers.patterns)}`, + ) } else if (isControlMessage(message)) { - console.log(`[DEBUG] STREAM MSG: control=${message.headers.control}`) + console.log( + `[DEBUG] STREAM MSG: control=${message.headers.control}`, + ) } else { - console.log(`[DEBUG] STREAM MSG: unknown type headers=${JSON.stringify(message.headers)}`) + console.log( + `[DEBUG] STREAM MSG: unknown type headers=${JSON.stringify(message.headers)}`, + ) } // Add message to current batch buffer (for race condition handling) if (isChangeMessage(message) || isMoveOutMessage(message)) { @@ -1422,7 +1493,9 @@ function createElectricSync>( // In buffered initial sync of progressive mode, buffer messages instead of writing if (isBufferingInitialSync()) { - console.log(`[DEBUG] BUFFERING: message buffered (progressive mode initial sync) op=${message.headers.operation} key=${message.key}`) + console.log( + `[DEBUG] BUFFERING: message buffered (progressive mode initial sync) op=${message.headers.operation} key=${message.key}`, + ) bufferedMessages.push(message) } else { // Normal processing: write changes immediately @@ -1441,11 +1514,15 @@ function createElectricSync>( console.log(`[DEBUG] SNAPSHOT_END: received snapshot-end message`) newSnapshots.push(parseSnapshotMessage(message)) } else { - console.log(`[DEBUG] SNAPSHOT_END: buffering snapshot-end (progressive mode)`) + console.log( + `[DEBUG] SNAPSHOT_END: buffering snapshot-end (progressive mode)`, + ) } } else if (isUpToDateMessage(message)) { // up-to-date takes precedence - also triggers progressive mode atomic swap - console.log(`[DEBUG] UP_TO_DATE: received up-to-date message isBuffering=${isBufferingInitialSync()}`) + console.log( + `[DEBUG] UP_TO_DATE: received up-to-date message isBuffering=${isBufferingInitialSync()}`, + ) commitPoint = `up-to-date` } else if (isSubsetEndMessage(message)) { // subset-end triggers commit but not progressive mode atomic swap @@ -1456,7 +1533,9 @@ function createElectricSync>( } else if (isMoveOutMessage(message)) { // Handle move-out event: buffer if buffering, otherwise process immediately if (isBufferingInitialSync()) { - console.log(`[DEBUG] BUFFERING: move-out message buffered (progressive mode)`) + console.log( + `[DEBUG] BUFFERING: move-out message buffered (progressive mode)`, + ) bufferedMessages.push(message) } else { // Normal processing: process move-out immediately @@ -1469,7 +1548,9 @@ function createElectricSync>( ) } } else if (isMustRefetchMessage(message)) { - console.log(`[DEBUG] MUST_REFETCH: received must-refetch, will truncate collection`) + console.log( + `[DEBUG] MUST_REFETCH: received must-refetch, will truncate collection`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`, ) @@ -1499,12 +1580,16 @@ function createElectricSync>( } } - console.log(`[DEBUG] BATCH_END: commitPoint=${commitPoint} transactionStarted=${transactionStarted} isBuffering=${isBufferingInitialSync()}`) + console.log( + `[DEBUG] BATCH_END: commitPoint=${commitPoint} transactionStarted=${transactionStarted} isBuffering=${isBufferingInitialSync()}`, + ) if (commitPoint !== null) { // PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end) if (isBufferingInitialSync() && commitPoint === `up-to-date`) { - console.log(`[DEBUG] ATOMIC_SWAP: starting atomic swap with ${bufferedMessages.length} buffered messages`) + console.log( + `[DEBUG] ATOMIC_SWAP: starting atomic swap with ${bufferedMessages.length} buffered messages`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`, ) @@ -1522,10 +1607,14 @@ function createElectricSync>( clearTagTrackingState() // Apply all buffered change messages and extract txids/snapshots - console.log(`[DEBUG] ATOMIC_SWAP: replaying ${bufferedMessages.length} buffered messages`) + console.log( + `[DEBUG] ATOMIC_SWAP: replaying ${bufferedMessages.length} buffered messages`, + ) for (const bufferedMsg of bufferedMessages) { if (isChangeMessage(bufferedMsg)) { - console.log(`[DEBUG] ATOMIC_SWAP: replaying change message op=${bufferedMsg.headers.operation} key=${bufferedMsg.key}`) + console.log( + `[DEBUG] ATOMIC_SWAP: replaying change message op=${bufferedMsg.headers.operation} key=${bufferedMsg.key}`, + ) processChangeMessage(bufferedMsg) // Extract txids from buffered messages (will be committed to store after transaction) @@ -1535,7 +1624,9 @@ function createElectricSync>( ) } } else if (isSnapshotEndMessage(bufferedMsg)) { - console.log(`[DEBUG] ATOMIC_SWAP: processing buffered snapshot-end`) + console.log( + `[DEBUG] ATOMIC_SWAP: processing buffered snapshot-end`, + ) // Extract snapshots from buffered messages (will be committed to store after transaction) newSnapshots.push(parseSnapshotMessage(bufferedMsg)) } else if (isMoveOutMessage(bufferedMsg)) { @@ -1558,7 +1649,9 @@ function createElectricSync>( // isBufferingInitialSync() will now return false bufferedMessages.length = 0 - console.log(`[DEBUG] ATOMIC_SWAP: complete, now in normal sync mode`) + console.log( + `[DEBUG] ATOMIC_SWAP: complete, now in normal sync mode`, + ) debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Atomic swap complete, now in normal sync mode`, ) @@ -1570,7 +1663,9 @@ function createElectricSync>( commit() transactionStarted = false } else { - console.log(`[DEBUG] TX: no transaction to commit (${commitPoint})`) + console.log( + `[DEBUG] TX: no transaction to commit (${commitPoint})`, + ) } } console.log(`[DEBUG] READY: marking collection ready`) From d5bc77181826e3b3e7063055250049c0864dc7f8 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 19 Dec 2025 20:36:39 +0000 Subject: [PATCH 5/5] Fix TypeScript error: cast message to any in else branch --- packages/electric-db-collection/src/electric.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index beac2ec0f..e6943883c 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1440,7 +1440,7 @@ function createElectricSync>( ) } else { console.log( - `[DEBUG] STREAM MSG: unknown type headers=${JSON.stringify(message.headers)}`, + `[DEBUG] STREAM MSG: unknown type headers=${JSON.stringify((message as any).headers)}`, ) } // Add message to current batch buffer (for race condition handling)