From 3d4c962c287bdbed905b2094dda51cdfd3fd65c3 Mon Sep 17 00:00:00 2001 From: Igor Barakaiev Date: Tue, 23 Dec 2025 13:59:22 -0800 Subject: [PATCH 1/2] fix(electric-db-collection): prevent orphan transactions after must-refetch in progressive mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a `must-refetch` message is received in progressive mode, it starts a transaction with `begin()` and calls `truncate()`. This resets `hasReceivedUpToDate` to `false`, causing `isBufferingInitialSync()` to return `true`. The bug: subsequent messages after must-refetch were being buffered instead of written to the existing transaction. When `up-to-date` was received, the atomic swap code would create a NEW transaction, leaving the first transaction (from must-refetch) uncommitted forever. This "orphan transaction" caused the collection to become corrupted with undefined values. The fix: Add `&& !transactionStarted` checks to 5 places so that when a transaction is already started (from must-refetch), messages are written directly to it instead of being buffered for atomic swap. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .changeset/fix-progressive-must-refetch.md | 9 +++++++ .../electric-db-collection/src/electric.ts | 27 +++++++++++++++---- 2 files changed, 31 insertions(+), 5 deletions(-) create mode 100644 .changeset/fix-progressive-must-refetch.md diff --git a/.changeset/fix-progressive-must-refetch.md b/.changeset/fix-progressive-must-refetch.md new file mode 100644 index 000000000..1c279ff69 --- /dev/null +++ b/.changeset/fix-progressive-must-refetch.md @@ -0,0 +1,9 @@ +--- +'@tanstack/electric-db-collection': patch +--- + +Fix orphan transactions after `must-refetch` in progressive sync mode + +When a `must-refetch` message was received in progressive mode, it started a transaction with `truncate()` but reset `hasReceivedUpToDate`, causing subsequent messages to be buffered instead of written to the existing transaction. On `up-to-date`, the atomic swap code would create a new transaction, leaving the first one uncommitted forever. This caused collections to become corrupted with undefined values. + +The fix ensures that when a transaction is already started (e.g., from must-refetch), messages are written directly to it instead of being buffered for atomic swap. diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 65d3b7cc0..0755c520a 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1303,7 +1303,12 @@ function createElectricSync>( // Check for txids in the message and add them to our store // Skip during buffered initial sync in progressive mode (txids will be extracted during atomic swap) - if (hasTxids(message) && !isBufferingInitialSync()) { + // EXCEPTION: If a transaction is already started (e.g., from must-refetch), track txids + // to avoid losing them when messages are written to the existing transaction. + if ( + hasTxids(message) && + (!isBufferingInitialSync() || transactionStarted) + ) { message.headers.txids?.forEach((txid) => newTxids.add(txid)) } @@ -1338,7 +1343,9 @@ function createElectricSync>( } // In buffered initial sync of progressive mode, buffer messages instead of writing - if (isBufferingInitialSync()) { + // EXCEPTION: If a transaction is already started (e.g., from must-refetch), write + // directly to it instead of buffering. This prevents orphan transactions. + if (isBufferingInitialSync() && !transactionStarted) { bufferedMessages.push(message) } else { // Normal processing: write changes immediately @@ -1352,7 +1359,9 @@ function createElectricSync>( } else if (isSnapshotEndMessage(message)) { // Track postgres snapshot metadata for resolving awaiting mutations // Skip during buffered initial sync (will be extracted during atomic swap) - if (!isBufferingInitialSync()) { + // EXCEPTION: If a transaction is already started (e.g., from must-refetch), track snapshots + // to avoid losing them when messages are written to the existing transaction. + if (!isBufferingInitialSync() || transactionStarted) { newSnapshots.push(parseSnapshotMessage(message)) } } else if (isUpToDateMessage(message)) { @@ -1365,7 +1374,9 @@ function createElectricSync>( } } else if (isMoveOutMessage(message)) { // Handle move-out event: buffer if buffering, otherwise process immediately - if (isBufferingInitialSync()) { + // EXCEPTION: If a transaction is already started (e.g., from must-refetch), process + // immediately to avoid orphan transactions. + if (isBufferingInitialSync() && !transactionStarted) { bufferedMessages.push(message) } else { // Normal processing: process move-out immediately @@ -1405,7 +1416,13 @@ function createElectricSync>( if (commitPoint !== null) { // PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end) - if (isBufferingInitialSync() && commitPoint === `up-to-date`) { + // EXCEPTION: Skip atomic swap if a transaction is already started (e.g., from must-refetch). + // In that case, do a normal commit to properly close the existing transaction. + if ( + isBufferingInitialSync() && + commitPoint === `up-to-date` && + !transactionStarted + ) { debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`, ) From bce10d34ae6ef2e6370459ae562c70d2a80a2d79 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 29 Dec 2025 15:18:46 +0000 Subject: [PATCH 2/2] add tests --- .../tests/electric.test.ts | 356 ++++++++++++++++++ 1 file changed, 356 insertions(+) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 3ac3ccf7a..b506b9101 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -3186,6 +3186,362 @@ describe(`Electric Integration`, () => { expect(testCollection.has(1)).toBe(true) expect(testCollection.status).toBe(`ready`) }) + + it(`should handle must-refetch in progressive mode without orphan transactions`, () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] }) + + const config = { + id: `progressive-must-refetch-orphan-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Phase 1: Complete the initial sync in progressive mode + testSubscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert` }, + }, + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + // After atomic swap, data should be visible + expect(testCollection.status).toBe(`ready`) + expect(testCollection.has(1)).toBe(true) + expect(testCollection.has(2)).toBe(true) + expect(testCollection.size).toBe(2) + + // No pending uncommitted synced transactions after initial sync + expect(testCollection._state.pendingSyncedTransactions.length).toBe(0) + + // Phase 2: Receive must-refetch + // This resets hasReceivedUpToDate to false but starts a transaction + testSubscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + // Old data should still be visible (transaction not committed yet) + expect(testCollection.status).toBe(`ready`) + expect(testCollection.size).toBe(2) + + // There should be exactly 1 uncommitted pending transaction from must-refetch + expect(testCollection._state.pendingSyncedTransactions.length).toBe(1) + expect( + testCollection._state.pendingSyncedTransactions[0]?.committed, + ).toBe(false) + + // Phase 3: Send new data after must-refetch (in separate batch) + // Without the fix, these would be buffered and cause orphan transaction + testSubscriber([ + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { operation: `insert` }, + }, + { + key: `4`, + value: { id: 4, name: `User 4` }, + headers: { operation: `insert` }, + }, + ]) + + // Data still not committed (no up-to-date yet) + expect(testCollection.size).toBe(2) + + // Still 1 pending transaction (with the fix, data is written to it, not buffered) + expect(testCollection._state.pendingSyncedTransactions.length).toBe(1) + + // Phase 4: Send up-to-date (in separate batch) + // Without the fix: atomic swap would try to start a new transaction, + // leaving the must-refetch transaction uncommitted (orphan) + // With the fix: normal commit happens on the existing transaction + testSubscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // After the fix: old data truncated, new data committed + expect(testCollection.status).toBe(`ready`) + expect(testCollection.has(1)).toBe(false) // Truncated by must-refetch + expect(testCollection.has(2)).toBe(false) // Truncated by must-refetch + expect(testCollection.has(3)).toBe(true) // New data after must-refetch + expect(testCollection.has(4)).toBe(true) // New data after must-refetch + expect(testCollection.size).toBe(2) + + // CRITICAL: No orphan uncommitted transactions should remain + // Without the fix, there would be 1 uncommitted transaction from must-refetch + expect(testCollection._state.pendingSyncedTransactions.length).toBe(0) + + // Verify data is correct (not undefined from orphan transaction) + expect(testCollection.get(3)).toEqual({ id: 3, name: `User 3` }) + expect(testCollection.get(4)).toEqual({ id: 4, name: `User 4` }) + }) + + it(`should handle must-refetch in progressive mode with txid tracking`, () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] }) + + const config = { + id: `progressive-must-refetch-txid-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Complete initial sync + testSubscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert`, txids: [100] }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.status).toBe(`ready`) + + // Must-refetch + testSubscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + // Send data with txids after must-refetch + // Without the fix, txids would not be tracked because isBufferingInitialSync() returns true + testSubscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { operation: `insert`, txids: [200] }, + }, + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { operation: `insert`, txids: [201] }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.status).toBe(`ready`) + expect(testCollection.size).toBe(2) + expect(testCollection.has(2)).toBe(true) + expect(testCollection.has(3)).toBe(true) + }) + + it(`should handle must-refetch in progressive mode with snapshot-end metadata`, () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] }) + + const config = { + id: `progressive-must-refetch-snapshot-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Complete initial sync + testSubscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `100`, + xmax: `110`, + xip_list: [], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.status).toBe(`ready`) + + // Must-refetch + testSubscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + // Send data with snapshot-end after must-refetch + // Without the fix, snapshot-end metadata would not be tracked + testSubscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { operation: `insert` }, + }, + { + headers: { + control: `snapshot-end`, + xmin: `200`, + xmax: `210`, + xip_list: [], + }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.status).toBe(`ready`) + expect(testCollection.size).toBe(1) + expect(testCollection.has(2)).toBe(true) + expect(testCollection.get(2)).toEqual({ id: 2, name: `User 2` }) + }) + + it(`should handle multiple batches after must-refetch in progressive mode`, () => { + vi.clearAllMocks() + + let testSubscriber!: (messages: Array>) => void + mockSubscribe.mockImplementation((callback) => { + testSubscriber = callback + return () => {} + }) + mockRequestSnapshot.mockResolvedValue(undefined) + mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] }) + + const config = { + id: `progressive-must-refetch-batches-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `progressive` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Complete initial sync + testSubscriber([ + { + key: `1`, + value: { id: 1, name: `User 1` }, + headers: { operation: `insert` }, + }, + { + headers: { control: `up-to-date` }, + }, + ]) + + expect(testCollection.status).toBe(`ready`) + + // Must-refetch + testSubscriber([ + { + headers: { control: `must-refetch` }, + }, + ]) + + // First batch of data after must-refetch + testSubscriber([ + { + key: `2`, + value: { id: 2, name: `User 2` }, + headers: { operation: `insert` }, + }, + ]) + + // Second batch of data after must-refetch + testSubscriber([ + { + key: `3`, + value: { id: 3, name: `User 3` }, + headers: { operation: `insert` }, + }, + ]) + + // Third batch of data after must-refetch + testSubscriber([ + { + key: `4`, + value: { id: 4, name: `User 4` }, + headers: { operation: `insert` }, + }, + ]) + + // Still waiting for up-to-date + expect(testCollection.size).toBe(1) // Only old data visible + + // Final up-to-date + testSubscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // All new data should be committed + expect(testCollection.status).toBe(`ready`) + expect(testCollection.has(1)).toBe(false) // Truncated + expect(testCollection.has(2)).toBe(true) + expect(testCollection.has(3)).toBe(true) + expect(testCollection.has(4)).toBe(true) + expect(testCollection.size).toBe(3) + }) }) describe(`syncMode configuration - GC and resync`, () => {