diff --git a/lib/db/sqlite/firo_cache.dart b/lib/db/sqlite/firo_cache.dart index 9a0d83f6a2..8b942dbbdf 100644 --- a/lib/db/sqlite/firo_cache.dart +++ b/lib/db/sqlite/firo_cache.dart @@ -84,6 +84,24 @@ abstract class _FiroCache { sparkSetCacheFile.path, mode: OpenMode.readWrite, ); + + // Migrations: safe to run on every startup (IF NOT EXISTS / IF NOT). + _setCacheDB[network]!.execute(""" + CREATE UNIQUE INDEX IF NOT EXISTS idx_sparksetcoins_set_coin + ON SparkSetCoins(setId, coinId); + """); + + // Add `complete` column to SparkSet for tracking whether a download + // finished. Existing rows default to 1 (complete) since the old + // all-or-nothing writer only saved on full completion. + try { + _setCacheDB[network]!.execute(""" + ALTER TABLE SparkSet ADD COLUMN complete INTEGER NOT NULL DEFAULT 1; + """); + } catch (_) { + // Column already exists — safe to ignore. + } + _usedTagsCacheDB[network] = sqlite3.open( sparkUsedTagsCacheFile.path, mode: OpenMode.readWrite, @@ -133,6 +151,7 @@ abstract class _FiroCache { setHash TEXT NOT NULL, groupId INTEGER NOT NULL, size INTEGER NOT NULL, + complete INTEGER NOT NULL DEFAULT 0, UNIQUE (blockHash, setHash, groupId) ); diff --git a/lib/db/sqlite/firo_cache_coordinator.dart b/lib/db/sqlite/firo_cache_coordinator.dart index 5ecd7534b9..138d4ee506 100644 --- a/lib/db/sqlite/firo_cache_coordinator.dart +++ b/lib/db/sqlite/firo_cache_coordinator.dart @@ -104,52 +104,117 @@ abstract class FiroCacheCoordinator { progressUpdated?.call(prevSize, meta.size); - if (prevMeta?.blockHash == meta.blockHash) { - Logging.instance.d("prevMeta?.blockHash == meta.blockHash"); + if (prevMeta?.blockHash == meta.blockHash && + prevMeta!.size >= meta.size) { + Logging.instance.d( + "prevMeta matches meta blockHash and size >= meta.size, " + "already up to date", + ); return; } - final numberOfCoinsToFetch = meta.size - prevSize; + // When resuming a partial download of the SAME block, we can skip + // already-saved coins because the index space hasn't shifted. + // + // When the block changes, we check the `complete` flag on the + // previous SparkSet to determine if the old download finished. + // - Complete: use the delta (meta.size - prevSize) from index 0. + // The newest coins in the new block are at the lowest indices. + // - Partial: indices have shifted due to the new block, so we + // can't reliably compute which coins are missing. Re-download + // the full set from index 0. INSERT OR IGNORE handles overlap. + final bool sameBlock = prevMeta?.blockHash == meta.blockHash; + + final int numberOfCoinsToFetch; + final int indexOffset; + + if (sameBlock) { + // Same block: resume from where we left off. + numberOfCoinsToFetch = meta.size - prevSize; + indexOffset = prevSize; + } else if (prevMeta != null && prevMeta.complete) { + // Different block, but previous download was complete. + // The delta coins are at indices 0..(meta.size - prevSize - 1). + numberOfCoinsToFetch = meta.size - prevSize; + indexOffset = 0; + } else { + // Different block and previous download was partial (or no + // previous data). Must re-download the full set. + numberOfCoinsToFetch = meta.size; + indexOffset = 0; + } + + if (numberOfCoinsToFetch <= 0) { + // Edge case: reorg, stale cache, or already up to date. + return; + } final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize; final remainder = numberOfCoinsToFetch % sectorSize; - final List coins = []; + int coinsSaved = 0; for (int i = 0; i < fullSectorCount; i++) { - final start = (i * sectorSize); + final start = indexOffset + (i * sectorSize); final data = await client.getSparkAnonymitySetBySector( coinGroupId: groupId, latestBlock: meta.blockHash, startIndex: start, endIndex: start + sectorSize, ); - progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch); - coins.addAll(data); + final sectorCoins = + data + .map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId)) + .toList(); + + coinsSaved += sectorCoins.length; + + await _workers[network]!.runTask( + FCTask( + func: FCFuncName._insertSparkAnonSetCoinsIncremental, + data: (meta, sectorCoins, indexOffset + coinsSaved), + ), + ); + + progressUpdated?.call( + indexOffset + (i + 1) * sectorSize, + meta.size, + ); } if (remainder > 0) { + final remainderStart = indexOffset + numberOfCoinsToFetch - remainder; final data = await client.getSparkAnonymitySetBySector( coinGroupId: groupId, latestBlock: meta.blockHash, - startIndex: numberOfCoinsToFetch - remainder, - endIndex: numberOfCoinsToFetch, + startIndex: remainderStart, + endIndex: indexOffset + numberOfCoinsToFetch, ); - progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch); - coins.addAll(data); - } + final sectorCoins = + data + .map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId)) + .toList(); - final result = - coins - .map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId)) - .toList(); + coinsSaved += sectorCoins.length; + + await _workers[network]!.runTask( + FCTask( + func: FCFuncName._insertSparkAnonSetCoinsIncremental, + data: (meta, sectorCoins, indexOffset + coinsSaved), + ), + ); + + progressUpdated?.call(meta.size, meta.size); + } + // Mark this SparkSet as complete so cross-block resume knows + // the download finished and can safely use the delta approach. await _workers[network]!.runTask( FCTask( - func: FCFuncName._updateSparkAnonSetCoinsWith, - data: (meta, result), + func: FCFuncName._markSparkAnonSetComplete, + data: meta, ), ); }); @@ -268,6 +333,7 @@ abstract class FiroCacheCoordinator { blockHash: result.first["blockHash"] as String, setHash: result.first["setHash"] as String, size: result.first["size"] as int, + complete: (result.first["complete"] as int) == 1, ); } diff --git a/lib/db/sqlite/firo_cache_reader.dart b/lib/db/sqlite/firo_cache_reader.dart index 67fea7764c..f31e744132 100644 --- a/lib/db/sqlite/firo_cache_reader.dart +++ b/lib/db/sqlite/firo_cache_reader.dart @@ -25,7 +25,7 @@ abstract class _Reader { required Database db, }) async { final query = """ - SELECT ss.blockHash, ss.setHash, ss.size + SELECT ss.blockHash, ss.setHash, ss.size, ss.complete FROM SparkSet ss WHERE ss.groupId = $groupId ORDER BY ss.size DESC diff --git a/lib/db/sqlite/firo_cache_worker.dart b/lib/db/sqlite/firo_cache_worker.dart index abaceb288b..67073b1181 100644 --- a/lib/db/sqlite/firo_cache_worker.dart +++ b/lib/db/sqlite/firo_cache_worker.dart @@ -1,7 +1,8 @@ part of 'firo_cache.dart'; enum FCFuncName { - _updateSparkAnonSetCoinsWith, + _insertSparkAnonSetCoinsIncremental, + _markSparkAnonSetComplete, _updateSparkUsedTagsWith, } @@ -93,13 +94,22 @@ class _FiroCacheWorker { try { final FCResult result; switch (task.func) { - case FCFuncName._updateSparkAnonSetCoinsWith: + case FCFuncName._insertSparkAnonSetCoinsIncremental: final data = - task.data as (SparkAnonymitySetMeta, List); - result = _updateSparkAnonSetCoinsWith( + task.data + as (SparkAnonymitySetMeta, List, int); + result = _insertSparkAnonSetCoinsIncremental( setCacheDb, data.$2, data.$1, + data.$3, + ); + break; + + case FCFuncName._markSparkAnonSetComplete: + result = _markSparkAnonSetComplete( + setCacheDb, + task.data as SparkAnonymitySetMeta, ); break; diff --git a/lib/db/sqlite/firo_cache_writer.dart b/lib/db/sqlite/firo_cache_writer.dart index fadc3eb91c..b97f5867d7 100644 --- a/lib/db/sqlite/firo_cache_writer.dart +++ b/lib/db/sqlite/firo_cache_writer.dart @@ -45,32 +45,23 @@ FCResult _updateSparkUsedTagsWith(Database db, List> tags) { } // =========================================================================== -// ================== write to spark anon set cache ========================== +// =========== incremental write to spark anon set cache ==================== -/// update the sqlite cache +/// Persist a single sector's worth of coins to the cache, creating or +/// updating the SparkSet row as needed. Safe to call repeatedly — uses +/// INSERT OR IGNORE so duplicate coins (from crash-recovery reruns) are +/// silently skipped. /// -/// returns true if successful, otherwise false -FCResult _updateSparkAnonSetCoinsWith( +/// [cumulativeSize] should be prevSize + total coins saved so far (including +/// this batch). It is written to SparkSet.size so that on resume, +/// getLatestSetInfoForGroupId returns the correct partial progress. +FCResult _insertSparkAnonSetCoinsIncremental( Database db, final List coinsRaw, SparkAnonymitySetMeta meta, + int cumulativeSize, ) { if (coinsRaw.isEmpty) { - // no coins to actually insert - return FCResult(success: true); - } - - final checkResult = db.select( - """ - SELECT * - FROM SparkSet - WHERE blockHash = ? AND setHash = ? AND groupId = ?; - """, - [meta.blockHash, meta.setHash, meta.coinGroupId], - ); - - if (checkResult.isNotEmpty) { - // already up to date return FCResult(success: true); } @@ -78,35 +69,65 @@ FCResult _updateSparkAnonSetCoinsWith( db.execute("BEGIN;"); try { + // Create SparkSet row if it doesn't exist yet for this block state. + // complete = 0 marks this as an in-progress download. db.execute( """ - INSERT INTO SparkSet (blockHash, setHash, groupId, size) - VALUES (?, ?, ?, ?); + INSERT OR IGNORE INTO SparkSet (blockHash, setHash, groupId, size, complete) + VALUES (?, ?, ?, 0, 0); + """, + [meta.blockHash, meta.setHash, meta.coinGroupId], + ); + + // Get the SparkSet row's id (whether just created or already existing). + final setIdResult = db.select( + """ + SELECT id FROM SparkSet + WHERE blockHash = ? AND setHash = ? AND groupId = ?; """, - [meta.blockHash, meta.setHash, meta.coinGroupId, meta.size], + [meta.blockHash, meta.setHash, meta.coinGroupId], ); - final setId = db.lastInsertRowId; + final setId = setIdResult.first["id"] as int; for (final coin in coins) { + // INSERT OR IGNORE handles duplicates from crash-recovery reruns. db.execute( """ - INSERT INTO SparkCoin (serialized, txHash, context, groupId) - VALUES (?, ?, ?, ?); - """, + INSERT OR IGNORE INTO SparkCoin (serialized, txHash, context, groupId) + VALUES (?, ?, ?, ?); + """, [coin.serialized, coin.txHash, coin.context, coin.groupId], ); - final coinId = db.lastInsertRowId; - // finally add the row id to the newly added set + // lastInsertRowId is 0 when INSERT OR IGNORE skips a duplicate, + // so we must SELECT explicitly. + final coinIdResult = db.select( + """ + SELECT id FROM SparkCoin + WHERE serialized = ? AND txHash = ? AND context = ? AND groupId = ?; + """, + [coin.serialized, coin.txHash, coin.context, coin.groupId], + ); + final coinId = coinIdResult.first["id"] as int; + db.execute( """ - INSERT INTO SparkSetCoins (setId, coinId) + INSERT OR IGNORE INTO SparkSetCoins (setId, coinId) VALUES (?, ?); """, [setId, coinId], ); } + // Update cumulative size to track partial progress. + db.execute( + """ + UPDATE SparkSet SET size = ? + WHERE id = ?; + """, + [cumulativeSize, setId], + ); + db.execute("COMMIT;"); return FCResult(success: true); @@ -115,3 +136,18 @@ FCResult _updateSparkAnonSetCoinsWith( return FCResult(success: false, error: e); } } + +/// Mark a SparkSet row as complete after all sectors have been downloaded. +FCResult _markSparkAnonSetComplete( + Database db, + SparkAnonymitySetMeta meta, +) { + db.execute( + """ + UPDATE SparkSet SET complete = 1 + WHERE blockHash = ? AND setHash = ? AND groupId = ?; + """, + [meta.blockHash, meta.setHash, meta.coinGroupId], + ); + return FCResult(success: true); +} diff --git a/lib/models/electrumx_response/spark_models.dart b/lib/models/electrumx_response/spark_models.dart index 22c6cf25fe..2bb69247ff 100644 --- a/lib/models/electrumx_response/spark_models.dart +++ b/lib/models/electrumx_response/spark_models.dart @@ -30,12 +30,14 @@ class SparkAnonymitySetMeta { final String blockHash; final String setHash; final int size; + final bool complete; SparkAnonymitySetMeta({ required this.coinGroupId, required this.blockHash, required this.setHash, required this.size, + this.complete = false, }); @override @@ -44,7 +46,8 @@ class SparkAnonymitySetMeta { "coinGroupId: $coinGroupId, " "blockHash: $blockHash, " "setHash: $setHash, " - "size: $size" + "size: $size, " + "complete: $complete" "}"; } } diff --git a/lib/wallets/wallet/wallet.dart b/lib/wallets/wallet/wallet.dart index 1aa40ef6a7..b70c852104 100644 --- a/lib/wallets/wallet/wallet.dart +++ b/lib/wallets/wallet/wallet.dart @@ -641,95 +641,105 @@ abstract class Wallet { ); } - // add some small buffer before making calls. - // this can probably be removed in the future but was added as a - // debugging feature - await Future.delayed(const Duration(milliseconds: 300)); - - // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. - final Set codesToCheck = {}; - if (this is PaynymInterface && !viewOnly) { - // isSegwit does not matter here at all - final myCode = await (this as PaynymInterface).getPaymentCode( - isSegwit: false, - ); + await Future(() async { + // add some small buffer before making calls. + // this can probably be removed in the future but was added as a + // debugging feature + await Future.delayed(const Duration(milliseconds: 300)); + + // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. + final Set codesToCheck = {}; + if (this is PaynymInterface && !viewOnly) { + // isSegwit does not matter here at all + final myCode = await (this as PaynymInterface).getPaymentCode( + isSegwit: false, + ); - final nym = await PaynymIsApi().nym(myCode.toString()); - if (nym.value != null) { - for (final follower in nym.value!.followers) { - codesToCheck.add(follower.code); - } - for (final following in nym.value!.following) { - codesToCheck.add(following.code); + final nym = await PaynymIsApi().nym(myCode.toString()); + if (nym.value != null) { + for (final follower in nym.value!.followers) { + codesToCheck.add(follower.code); + } + for (final following in nym.value!.following) { + codesToCheck.add(following.code); + } } } - } - _fireRefreshPercentChange(0); - await updateChainHeight(); + _fireRefreshPercentChange(0); + await updateChainHeight(); - if (this is BitcoinFrostWallet) { - await (this as BitcoinFrostWallet).lookAhead(); - } + if (this is BitcoinFrostWallet) { + await (this as BitcoinFrostWallet).lookAhead(); + } - _fireRefreshPercentChange(0.1); + _fireRefreshPercentChange(0.1); - // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. - if (this is MultiAddressInterface) { - if (info.otherData[WalletInfoKeys.reuseAddress] != true) { - await (this as MultiAddressInterface) - .checkReceivingAddressForTransactions(); + // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. + if (this is MultiAddressInterface) { + if (info.otherData[WalletInfoKeys.reuseAddress] != true) { + await (this as MultiAddressInterface) + .checkReceivingAddressForTransactions(); + } } - } - _fireRefreshPercentChange(0.2); + _fireRefreshPercentChange(0.2); - // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. - if (this is MultiAddressInterface) { - if (info.otherData[WalletInfoKeys.reuseAddress] != true) { - await (this as MultiAddressInterface) - .checkChangeAddressForTransactions(); + // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. + if (this is MultiAddressInterface) { + if (info.otherData[WalletInfoKeys.reuseAddress] != true) { + await (this as MultiAddressInterface) + .checkChangeAddressForTransactions(); + } + } + _fireRefreshPercentChange(0.3); + if (this is SparkInterface && !viewOnly) { + // this should be called before updateTransactions() + await (this as SparkInterface).refreshSparkData((0.3, 0.6)); } - } - _fireRefreshPercentChange(0.3); - if (this is SparkInterface && !viewOnly) { - // this should be called before updateTransactions() - await (this as SparkInterface).refreshSparkData((0.3, 0.6)); - } - if (this is NamecoinWallet) { - await updateUTXOs(); - _fireRefreshPercentChange(0.6); - await (this as NamecoinWallet).checkAutoRegisterNameNewOutputs(); - _fireRefreshPercentChange(0.70); - await updateTransactions(); - } else { - final fetchFuture = updateTransactions(); - _fireRefreshPercentChange(0.6); - final utxosRefreshFuture = updateUTXOs(); - _fireRefreshPercentChange(0.65); - await utxosRefreshFuture; - _fireRefreshPercentChange(0.70); - await fetchFuture; - } + if (this is NamecoinWallet) { + await updateUTXOs(); + _fireRefreshPercentChange(0.6); + await (this as NamecoinWallet).checkAutoRegisterNameNewOutputs(); + _fireRefreshPercentChange(0.70); + await updateTransactions(); + } else { + final fetchFuture = updateTransactions(); + _fireRefreshPercentChange(0.6); + final utxosRefreshFuture = updateUTXOs(); + await utxosRefreshFuture; + _fireRefreshPercentChange(0.65); + await fetchFuture; + _fireRefreshPercentChange(0.70); + } - // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. - if (!viewOnly && this is PaynymInterface && codesToCheck.isNotEmpty) { - await (this as PaynymInterface).checkForNotificationTransactionsTo( - codesToCheck, - ); - // check utxos again for notification outputs - await updateUTXOs(); - } - _fireRefreshPercentChange(0.80); + // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. + if (!viewOnly && this is PaynymInterface && codesToCheck.isNotEmpty) { + await (this as PaynymInterface).checkForNotificationTransactionsTo( + codesToCheck, + ); + // check utxos again for notification outputs + await updateUTXOs(); + } + _fireRefreshPercentChange(0.80); - // await getAllTxsToWatch(); + // await getAllTxsToWatch(); - _fireRefreshPercentChange(0.90); + _fireRefreshPercentChange(0.90); - await updateBalance(); + await updateBalance(); - _fireRefreshPercentChange(1.0); + _fireRefreshPercentChange(1.0); + }).timeout( + const Duration(minutes: 5), + onTimeout: () { + throw TimeoutException( + 'Wallet refresh timed out for $walletId', + const Duration(minutes: 5), + ); + }, + ); completer.complete(); } catch (error, strace) { diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart index 0dec8aab29..615628c751 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart @@ -1073,20 +1073,13 @@ mixin SparkInterface // missing groupIds to the list if sets to check and update final latestGroupId = await electrumXClient.getSparkLatestCoinId(); - final List groupIds = []; - if (latestGroupId > 1) { - for (int id = 1; id < latestGroupId; id++) { - final setExists = - await FiroCacheCoordinator.checkSetInfoForGroupIdExists( - id, - cryptoCurrency.network, - ); - if (!setExists) { - groupIds.add(id); - } - } - } - groupIds.add(latestGroupId); + // Process all groupIds every sync. The coordinator's early-return + // check (blockHash + size comparison) makes complete groups a no-op + // (just one meta RPC call). This ensures partially-downloaded groups + // are always completed. + final List groupIds = [ + for (int id = 1; id <= latestGroupId; id++) id, + ]; final steps = groupIds.length +