diff --git a/.changeset/add-offline-persistence.md b/.changeset/add-offline-persistence.md new file mode 100644 index 000000000..5502acaaf --- /dev/null +++ b/.changeset/add-offline-persistence.md @@ -0,0 +1,29 @@ +--- +"@tanstack/electric-db-collection": minor +--- + +feat: Add offline persistence adapter for Electric collections + +This adds a localStorage-based persistence layer that enables offline-first data access and faster initial loads. + +**What changed:** +- New `persistence` configuration option in `electricCollectionOptions` +- Collection data is automatically saved to localStorage as it syncs +- Persisted data is restored on collection initialization +- Shape handle and offset are persisted for resumable sync +- In `on-demand` mode, collections are marked ready immediately after loading from persistence + +**Why:** +Enables offline-first applications where users can see their data immediately on app launch, even before network sync completes. This improves perceived performance and allows the app to function without network connectivity. + +**How to use:** +```typescript +electricCollectionOptions({ + id: 'my-collection', + syncMode: 'on-demand', + persistence: { + storageKey: 'my-collection-storage', + }, + // ... other options +}) +``` diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 65d3b7cc0..1e98896a0 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -46,10 +46,13 @@ import type { ControlMessage, GetExtensions, Message, + Offset, PostgresSnapshot, Row, ShapeStreamOptions, } from '@electric-sql/client' +import { createPersistence, ElectricPersistenceConfig } from './persistence/createPersistence' +import { validateJsonSerializable } from './persistence/persistenceAdapter' // Re-export for user convenience in custom match functions export { isChangeMessage, isControlMessage } from '@electric-sql/client' @@ -114,8 +117,7 @@ type InferSchemaOutput = T extends StandardSchemaV1 : Record : Record -/** - * The mode of sync to use for the collection. +/** The mode of sync to use for the collection. * @default `eager` * @description * - `eager`: @@ -141,19 +143,25 @@ export interface ElectricCollectionConfig< T extends Row = Row, TSchema extends StandardSchemaV1 = never, > extends Omit< - BaseCollectionConfig< - T, - string | number, - TSchema, - ElectricCollectionUtils, - any - >, - `onInsert` | `onUpdate` | `onDelete` | `syncMode` -> { + BaseCollectionConfig< + T, + string | number, + TSchema, + ElectricCollectionUtils, + any + >, + `onInsert` | `onUpdate` | `onDelete` | `syncMode` + > { /** * Configuration options for the ElectricSQL ShapeStream */ shapeOptions: ShapeStreamOptions> + + /** + * Optional persistence configuration for localStorage storage + * When provided, data will be persisted to localStorage or the specified storage and loaded on startup + */ + persistence?: ElectricPersistenceConfig syncMode?: ElectricSyncMode /** @@ -467,6 +475,15 @@ function createLoadSubsetDedupe>({ */ export type AwaitTxIdFn = (txId: Txid, timeout?: number) => Promise +/** + * Type for the clearPersistence utility function + */ +export type ClearPersistenceFn = () => Promise + +/** + * Type for the getPersistenceSize utility function + */ +export type GetPersistenceSizeFn = () => Promise /** * Type for the awaitMatch utility function */ @@ -478,13 +495,21 @@ export type AwaitMatchFn> = ( /** * Electric collection utilities type */ -export interface ElectricCollectionUtils< - T extends Row = Row, -> extends UtilsRecord { +export interface ElectricCollectionUtils = Row> + extends UtilsRecord { awaitTxId: AwaitTxIdFn awaitMatch: AwaitMatchFn } +/** + * Electric collection utilities type with persistence + */ +export interface ElectricCollectionUtilsWithPersistence + extends ElectricCollectionUtils { + clearPersistence: ClearPersistenceFn + getPersistenceSize: GetPersistenceSizeFn +} + /** * Creates Electric collection options for use with a standard Collection * @@ -528,6 +553,9 @@ export function electricCollectionOptions>( schema?: any } { const seenTxids = new Store>(new Set([])) + const persistence = + config.persistence && createPersistence(config.persistence) + const seenSnapshots = new Store>([]) const internalSyncMode = config.syncMode ?? `eager` const finalSyncMode = @@ -586,6 +614,7 @@ export function electricCollectionOptions>( const sync = createElectricSync(config.shapeOptions, { seenTxids, seenSnapshots, + persistence: config.persistence, syncMode: internalSyncMode, pendingMatches, currentBatchMessages, @@ -793,12 +822,21 @@ export function electricCollectionOptions>( ElectricCollectionUtils >, ) => { + // Validate that all values in the transaction can be JSON serialized (if persistence enabled) + if (config.persistence) + params.transaction.mutations.forEach((m) => + validateJsonSerializable(m.modified, `insert`) + ) const handlerResult = await config.onInsert!(params) await processMatchingStrategy(handlerResult) + + // called outside stream -> snapshot rows, keep prior cursor + if (persistence) persistence.saveCollectionSnapshot(params.collection) return handlerResult } : undefined + // Create wrapper handlers for direct persistence operations that handle txid awaiting const wrappedOnUpdate = config.onUpdate ? async ( params: UpdateMutationFnParams< @@ -807,8 +845,19 @@ export function electricCollectionOptions>( ElectricCollectionUtils >, ) => { + // Validate that all values in the transaction can be JSON serialized (if persistence enabled) + if (config.persistence) { + params.transaction.mutations.forEach((m) => + validateJsonSerializable(m.modified, `update`) + ) + } const handlerResult = await config.onUpdate!(params) await processMatchingStrategy(handlerResult) + + if (persistence) { + persistence.saveCollectionSnapshot(params.collection) + } + return handlerResult } : undefined @@ -823,19 +872,44 @@ export function electricCollectionOptions>( ) => { const handlerResult = await config.onDelete!(params) await processMatchingStrategy(handlerResult) + + // Persist to storage if configured + if (persistence) { + // Save collection state to storage adapter + persistence.saveCollectionSnapshot(params.collection) + } + return handlerResult } : undefined + const clearPersistence: ClearPersistenceFn = async () => { + if (!persistence) { + throw new Error(`Persistence is not configured for this collection`) + } + persistence.clear() + } + + const getPersistenceSize: GetPersistenceSizeFn = async () => + persistence ? persistence.size() : 0 + // Extract standard Collection config properties const { shapeOptions: _shapeOptions, + persistence: _persistence, onInsert: _onInsert, onUpdate: _onUpdate, onDelete: _onDelete, ...restConfig } = config + // Build utils object based on whether persistence is configured + const utils: + | ElectricCollectionUtils + | ElectricCollectionUtilsWithPersistence = persistence + ? { awaitTxId, awaitMatch, clearPersistence, getPersistenceSize } + : { awaitTxId, awaitMatch } + return { ...restConfig, syncMode: finalSyncMode, @@ -843,10 +917,7 @@ export function electricCollectionOptions>( onInsert: wrappedOnInsert, onUpdate: wrappedOnUpdate, onDelete: wrappedOnDelete, - utils: { - awaitTxId, - awaitMatch, - }, + utils: utils as ElectricCollectionUtils, } } @@ -876,6 +947,7 @@ function createElectricSync>( removePendingMatches: (matchIds: Array) => void resolveMatchedPendingMatches: () => void collectionId?: string + persistence?: ElectricPersistenceConfig testHooks?: ElectricTestHooks }, ): SyncConfig { @@ -889,8 +961,11 @@ function createElectricSync>( removePendingMatches, resolveMatchedPendingMatches, collectionId, + persistence: persistenceConfig, testHooks, } = options + const persistence = + persistenceConfig && createPersistence(persistenceConfig) const MAX_BATCH_MESSAGES = 1000 // Safety limit for message buffer // Store for the relation schema information @@ -1142,21 +1217,119 @@ function createElectricSync>( sync: (params: Parameters[`sync`]>[0]) => { const { begin, write, commit, markReady, truncate, collection } = params + // Load from persistence adapter if persistence is configured + // Only keep the lightweight metadata (lastOffset, shapeHandle) after loading + // to avoid keeping the heavy collection data in memory + let persistedMetadata: + | { lastOffset?: unknown; shapeHandle?: string } + | undefined + + debug(`persistence: exists ${persistence}`) + if (persistence) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}persistence: starting load from storage` + ) + const persistedData = persistence.read() + try { + const hasPersistedData = + !!persistedData?.value && + Object.keys(persistedData.value).length > 0 + + debug( + `${collectionId ? `[${collectionId}] ` : ``}persistence: hasPersistedData=%s, isReady=%s, syncMode=%s`, + hasPersistedData, + persistedData?.isReady ?? false, + syncMode + ) + + persistence.loadSnapshotInto( + begin, + (op) => write({ ...op, metadata: {} }), + commit + ) + + debug( + `${collectionId ? `[${collectionId}] ` : ``}persistence: after loadSnapshotInto, collection.state.size=%d`, + collection.state.size + ) + + // In on-demand mode, mark the collection as ready immediately after loading + // from persistence since on-demand works with partial/incremental data + // and doesn't require waiting for server sync to be usable. + // Also mark ready if the collection was previously marked ready when persisted. + const hasPersistedDataOnDemand = + syncMode === `on-demand` && hasPersistedData + + if (hasPersistedDataOnDemand || persistedData?.isReady) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}persistence: marking ready (hasPersistedDataOnDemand=%s, persistedData.isReady=%s)`, + hasPersistedDataOnDemand, + persistedData.isReady ?? false + ) + markReady() + } else { + debug( + `${collectionId ? `[${collectionId}] ` : ``}persistence: not marking ready yet, waiting for sync` + ) + } + + if (persistenceConfig.onPersistenceLoaded) + persistenceConfig.onPersistenceLoaded() + + // Extract only the lightweight metadata we need for stream configuration + // This allows the heavy `value` data to be garbage collected + persistedMetadata = { + lastOffset: persistedData?.lastOffset, + shapeHandle: persistedData?.shapeHandle, + } + debug( + `${collectionId ? `[${collectionId}] ` : ``}persistence: load complete, offset=%s, handle=%s`, + persistedMetadata.lastOffset ?? `none`, + persistedMetadata.shapeHandle ?? `none` + ) + } catch (e) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}persistence: load error %o`, + e + ) + console.warn(`[ElectricPersistence] load error`, e) + } + } else { + debug( + `${collectionId ? `[${collectionId}] ` : ``}persistence: not configured, skipping load` + ) + } + // Wrap markReady to wait for test hook in progressive mode let progressiveReadyGate: Promise | null = null const wrappedMarkReady = (isBuffering: boolean) => { + debug( + `${collectionId ? `[${collectionId}] ` : ``}wrappedMarkReady called: isBuffering=%s, syncMode=%s, hasTestHook=%s`, + isBuffering, + syncMode, + !!testHooks?.beforeMarkingReady + ) // Only create gate if we're in buffering phase (first up-to-date) if ( isBuffering && syncMode === `progressive` && testHooks?.beforeMarkingReady ) { + debug( + `${collectionId ? `[${collectionId}] ` : ``}wrappedMarkReady: waiting for test hook before marking ready` + ) // Create a new gate promise for this sync cycle progressiveReadyGate = testHooks.beforeMarkingReady() progressiveReadyGate.then(() => { + debug( + `${collectionId ? `[${collectionId}] ` : ``}wrappedMarkReady: test hook resolved, marking ready` + ) markReady() }) } else { + debug( + `${collectionId ? `[${collectionId}] ` : ``}wrappedMarkReady: marking ready immediately` + ) // No hook, not buffering, or already past first up-to-date markReady() } @@ -1191,14 +1364,24 @@ function createElectricSync>( }) }) + const computedOffset: Offset | undefined = (() => { + const offset = shapeOptions.offset + if (offset != null) return offset + const lastOffset = persistedMetadata?.lastOffset as Offset | undefined + if (lastOffset != null) return lastOffset + if (syncMode === `on-demand`) return `now` + return undefined + })() + + const computedHandle: string | undefined = + shapeOptions.handle ?? persistedMetadata?.shapeHandle + const stream = new ShapeStream({ ...shapeOptions, // In on-demand mode, we only want to sync changes, so we set the log to `changes_only` log: syncMode === `on-demand` ? `changes_only` : undefined, - // In on-demand mode, we only need the changes from the point of time the collection was created - // so we default to `now` when there is no saved offset. - offset: - shapeOptions.offset ?? (syncMode === `on-demand` ? `now` : undefined), + offset: computedOffset, + handle: computedHandle, signal: abortController.signal, onError: (errorParams) => { // Just immediately mark ready if there's an error to avoid blocking @@ -1206,6 +1389,10 @@ function createElectricSync>( // Note that Electric sends a 409 error on a `must-refetch` message, but the // ShapeStream handled this and it will not reach this handler, therefor // this markReady will not be triggers by a `must-refetch`. + debug( + `${collectionId ? `[${collectionId}] ` : ``}stream error, marking ready to unblock preload: %o`, + errorParams + ) markReady() if (shapeOptions.onError) { @@ -1389,6 +1576,9 @@ function createElectricSync>( truncate() + // Clear persistence storage on truncate + if (persistence) persistence.clear() + // Clear tag tracking state clearTagTrackingState() @@ -1404,6 +1594,9 @@ function createElectricSync>( } if (commitPoint !== null) { + // Track whether we actually committed a transaction (needed for persistence) + let didCommit = false + // PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end) if (isBufferingInitialSync() && commitPoint === `up-to-date`) { debug( @@ -1446,6 +1639,7 @@ function createElectricSync>( // Commit the atomic swap commit() + didCommit = true // Exit buffering phase by marking that we've received up-to-date // isBufferingInitialSync() will now return false @@ -1460,9 +1654,30 @@ function createElectricSync>( if (transactionStarted) { commit() transactionStarted = false + didCommit = true } } - wrappedMarkReady(isBufferingInitialSync()) + + // Persist after we've committed a transaction + // (didCommit implies there were changes worth persisting) + if (persistence && didCommit) + persistence.saveCollectionSnapshot(collection, stream) + + // Clear the current batch buffer since we're now up-to-date + currentBatchMessages.setState(() => []) + + if (commitPoint === `up-to-date` || (commitPoint === `subset-end` && syncMode === `on-demand`)) { + // Mark the collection as ready now that sync is up to date + debug( + `${collectionId ? `[${collectionId}] ` : ``}sync: received %s, calling wrappedMarkReady`, + commitPoint + ) + wrappedMarkReady(isBufferingInitialSync()) + + // Persist isReady state after marking ready + if (persistence) + persistence.saveCollectionSnapshot(collection, stream) + } // Track that we've received the first up-to-date for progressive mode if (commitPoint === `up-to-date`) { @@ -1470,7 +1685,7 @@ function createElectricSync>( } // Always commit txids when we receive up-to-date, regardless of transaction state - seenTxids.setState((currentTxids) => { + seenTxids.setState((currentTxids: Set) => { const clonedSeen = new Set(currentTxids) if (newTxids.size > 0) { debug( diff --git a/packages/electric-db-collection/src/index.ts b/packages/electric-db-collection/src/index.ts index 31f615e56..5ee7ba411 100644 --- a/packages/electric-db-collection/src/index.ts +++ b/packages/electric-db-collection/src/index.ts @@ -4,6 +4,7 @@ export { isControlMessage, type ElectricCollectionConfig, type ElectricCollectionUtils, + type ElectricCollectionUtilsWithPersistence, type Txid, type AwaitTxIdFn, } from './electric' diff --git a/packages/electric-db-collection/src/persistence/createPersistence.ts b/packages/electric-db-collection/src/persistence/createPersistence.ts new file mode 100644 index 000000000..24f4d1aba --- /dev/null +++ b/packages/electric-db-collection/src/persistence/createPersistence.ts @@ -0,0 +1,189 @@ +import DebugModule from 'debug' +import type { StorageApi } from './persistenceAdapter' + +const debug = DebugModule.debug(`ts/db:electric:persistence`) + +/** + * Configuration interface for Electric collection persistence + * @template T - The type of items in the collection + */ +export interface ElectricPersistenceConfig { + /** + * The key to use for storing the collection data in localStorage/sessionStorage + */ + storageKey: string + + /** + * Storage API to use (defaults to window.localStorage) + * Can be any object that implements the Storage interface (e.g., sessionStorage) + */ + storage?: StorageApi + + /** + * Optional pre-loaded cache data. When provided, initial reads will use this cache + * instead of calling storage.getItem(). This is useful for async storage adapters + * (like OPFS) that need to load data before the collection is created. + * The Map keys are storage keys and values are the JSON strings. + */ + cache?: Map + + /** + * Callback which triggers after data has been loaded from the persistence adapter. + * Receives markReady function to allow marking the collection ready (e.g., when offline). + */ + onPersistenceLoaded?: () => void +} + +// Envelope we persist to storage +type PersistedEnvelope = { + v: 1 + value: Record + lastOffset?: number + shapeHandle?: string + isReady?: boolean +} + +export function createPersistence(cfg: ElectricPersistenceConfig) { + const key = cfg.storageKey + const storage = + cfg.storage || (typeof window !== `undefined` ? window.localStorage : null) + const cache = cfg.cache + + const safeParse = (raw: string | null): PersistedEnvelope | null => { + if (!raw) return null + try { + const parsed = JSON.parse(raw) + if (parsed && typeof parsed === `object` && parsed.v === 1) { + return parsed as PersistedEnvelope + } + return null + } catch { + return null + } + } + + const read = (): PersistedEnvelope | null => { + // Try cache first if available (for async storage adapters like OPFS) + if (cache) { + const cachedRaw = cache.get(key) + if (cachedRaw) { + const parsed = safeParse(cachedRaw) + if (parsed) { + const itemCount = Object.keys(parsed.value).length + debug( + `[%s] read from cache: found %d items, offset=%s, handle=%s, isReady=%s`, + key, + itemCount, + parsed.lastOffset ?? `none`, + parsed.shapeHandle ?? `none`, + parsed.isReady ?? false, + ) + } + return parsed + } + debug(`[%s] read: no data in cache`, key) + } + + // Fall back to storage + if (!storage) { + debug(`[%s] read: no storage available`, key) + return null + } + const raw = storage.getItem(key) + const parsed = safeParse(raw) + if (parsed) { + const itemCount = Object.keys(parsed.value).length + debug( + `[%s] read: found %d items, offset=%s, handle=%s, isReady=%s`, + key, + itemCount, + parsed.lastOffset ?? `none`, + parsed.shapeHandle ?? `none`, + parsed.isReady ?? false, + ) + } else { + debug(`[%s] read: no persisted data found`, key) + } + return parsed + } + + const write = (next: PersistedEnvelope) => { + if (!storage) return + storage.setItem(key, JSON.stringify(next)) + } + + const clear = () => { + if (!storage) return + storage.removeItem(key) + } + + const size = (): number => { + if (!storage) return 0 + const data = storage.getItem(key) + return data ? new Blob([data]).size : 0 + } + + const saveCollectionSnapshot = (collection: any, stream?: any) => { + if (!storage) return + // 1) snapshot collection state + const value: Record = {} + for (const [k, v] of collection.state) value[String(k)] = v as T + + // 2) load previous envelope (to preserve cursor when no stream present) + const prev = read() ?? { v: 1, value: {} as Record } + + // 3) only advance cursor if we're called from the stream + const lastOffset = + (stream?.lastOffset as number | undefined) ?? prev.lastOffset + const shapeHandle = stream?.shapeHandle ?? prev.shapeHandle + + // 4) Capture isReady status from collection, have to write isReady so compare to true so it's false if not + const isReady = collection.isReady() || false + const next: PersistedEnvelope = { + v: 1, + value, + lastOffset, + shapeHandle, + isReady, + } + write(next) + } + + const loadSnapshotInto = ( + begin: () => void, + writeOp: (op: { type: `insert`; value: T }) => void, + commit: () => void, + ) => { + debug(`[%s] loadSnapshotInto: starting`, key) + const env = read() + if (!env?.value) { + debug(`[%s] loadSnapshotInto: no envelope or value, skipping`, key) + return + } + const entries = Object.entries(env.value) + if (!entries.length) { + debug(`[%s] loadSnapshotInto: envelope empty, skipping`, key) + return + } + debug( + `[%s] loadSnapshotInto: loading %d entries into collection`, + key, + entries.length, + ) + begin() + for (const [, row] of entries) { + writeOp({ type: `insert`, value: row }) + } + commit() + debug(`[%s] loadSnapshotInto: completed`, key) + } + + return { + read, + write, + clear, + size, + saveCollectionSnapshot, + loadSnapshotInto, + } +} diff --git a/packages/electric-db-collection/src/persistence/persistenceAdapter.ts b/packages/electric-db-collection/src/persistence/persistenceAdapter.ts new file mode 100644 index 000000000..b93801a41 --- /dev/null +++ b/packages/electric-db-collection/src/persistence/persistenceAdapter.ts @@ -0,0 +1,127 @@ +import type { Row } from "@electric-sql/client" + +/** + * Storage API interface - subset of DOM Storage that we need + * Matches the pattern used in @tanstack/db local-storage implementation + */ +export type StorageApi = Pick + +/** + * Internal storage format that includes version tracking + * Matches the pattern used in localStorage implementation + */ +export interface StoredItem { + versionKey: string + data: T +} + +/** + * Generate a UUID for version tracking + * @returns A unique identifier string for tracking data versions + */ +export function generateVersionKey(): string { + return crypto.randomUUID() +} + +/** + * Load data from storage and return as a Map + * @param storageKey - The key used to store data in the storage API + * @param storage - The storage API to load from (localStorage, sessionStorage, etc.) + * @returns Map of stored items with version tracking, or empty Map if loading fails + */ +export function loadFromStorage>( + storageKey: string, + storage: StorageApi +): Map> { + try { + const rawData = storage.getItem(storageKey) + if (!rawData) { + return new Map() + } + + const parsed = JSON.parse(rawData) + const dataMap = new Map>() + + // Handle object format where keys map to StoredItem values + if ( + typeof parsed === `object` && + parsed !== null && + !Array.isArray(parsed) + ) { + Object.entries(parsed).forEach(([key, value]) => { + // Runtime check to ensure the value has the expected StoredItem structure + if ( + value && + typeof value === `object` && + `versionKey` in value && + `data` in value + ) { + const storedItem = value as StoredItem + dataMap.set(key, storedItem) + } else { + console.warn( + `[ElectricPersistence] Invalid data format for key "${key}" in storage key "${storageKey}"` + ) + } + }) + } else { + console.warn( + `[ElectricPersistence] Invalid storage object format for key "${storageKey}"` + ) + } + + return dataMap + } catch (error) { + console.warn( + `[ElectricPersistence] Error loading data from storage key "${storageKey}":`, + error + ) + return new Map() + } +} + +/** + * Save data to storage + * @param storageKey - The key to use for storing data + * @param storage - The storage API to save to + * @param dataMap - Map of items with version tracking to save to storage + */ +export function saveToStorage>( + storageKey: string, + storage: StorageApi, + dataMap: Map> +): void { + try { + // Convert Map to object format for storage + const objectData: Record> = {} + dataMap.forEach((storedItem, key) => { + objectData[String(key)] = storedItem + }) + const serialized = JSON.stringify(objectData) + storage.setItem(storageKey, serialized) + } catch (error) { + console.error( + `[ElectricPersistence] Error saving data to storage key "${storageKey}":`, + error + ) + throw error + } +} + +/** + * Validates that a value can be JSON serialized + * @param value - The value to validate for JSON serialization + * @param operation - The operation type being performed (for error messages) + * @throws Error if the value cannot be JSON serialized + */ +export function validateJsonSerializable(value: any, operation: string): void { + try { + JSON.stringify(value) + } catch (error) { + throw new Error( + `Cannot serialize value for ${operation}: ${ + error instanceof Error ? error.message : String(error) + }` + ) + } +}