Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions lib/db/sqlite/firo_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ abstract class _FiroCache {
sparkSetCacheFile.path,
mode: OpenMode.readWrite,
);

// Migrations: safe to run on every startup (IF NOT EXISTS / IF NOT).
_setCacheDB[network]!.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_sparksetcoins_set_coin
ON SparkSetCoins(setId, coinId);
""");

// Add `complete` column to SparkSet for tracking whether a download
// finished. Existing rows default to 1 (complete) since the old
// all-or-nothing writer only saved on full completion.
try {
_setCacheDB[network]!.execute("""
ALTER TABLE SparkSet ADD COLUMN complete INTEGER NOT NULL DEFAULT 1;
""");
} catch (_) {
// Column already exists — safe to ignore.
}

_usedTagsCacheDB[network] = sqlite3.open(
sparkUsedTagsCacheFile.path,
mode: OpenMode.readWrite,
Expand Down Expand Up @@ -133,6 +151,7 @@ abstract class _FiroCache {
setHash TEXT NOT NULL,
groupId INTEGER NOT NULL,
size INTEGER NOT NULL,
complete INTEGER NOT NULL DEFAULT 0,
UNIQUE (blockHash, setHash, groupId)
);

Expand Down
102 changes: 84 additions & 18 deletions lib/db/sqlite/firo_cache_coordinator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -104,52 +104,117 @@ abstract class FiroCacheCoordinator {

progressUpdated?.call(prevSize, meta.size);

if (prevMeta?.blockHash == meta.blockHash) {
Logging.instance.d("prevMeta?.blockHash == meta.blockHash");
if (prevMeta?.blockHash == meta.blockHash &&
prevMeta!.size >= meta.size) {
Logging.instance.d(
"prevMeta matches meta blockHash and size >= meta.size, "
"already up to date",
);
return;
}

final numberOfCoinsToFetch = meta.size - prevSize;
// When resuming a partial download of the SAME block, we can skip
// already-saved coins because the index space hasn't shifted.
//
// When the block changes, we check the `complete` flag on the
// previous SparkSet to determine if the old download finished.
// - Complete: use the delta (meta.size - prevSize) from index 0.
// The newest coins in the new block are at the lowest indices.
// - Partial: indices have shifted due to the new block, so we
// can't reliably compute which coins are missing. Re-download
// the full set from index 0. INSERT OR IGNORE handles overlap.
final bool sameBlock = prevMeta?.blockHash == meta.blockHash;

final int numberOfCoinsToFetch;
final int indexOffset;

if (sameBlock) {
// Same block: resume from where we left off.
numberOfCoinsToFetch = meta.size - prevSize;
indexOffset = prevSize;
} else if (prevMeta != null && prevMeta.complete) {
// Different block, but previous download was complete.
// The delta coins are at indices 0..(meta.size - prevSize - 1).
numberOfCoinsToFetch = meta.size - prevSize;
indexOffset = 0;
} else {
// Different block and previous download was partial (or no
// previous data). Must re-download the full set.
numberOfCoinsToFetch = meta.size;
indexOffset = 0;
}

if (numberOfCoinsToFetch <= 0) {
// Edge case: reorg, stale cache, or already up to date.
return;
}

final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize;
final remainder = numberOfCoinsToFetch % sectorSize;

final List<dynamic> coins = [];
int coinsSaved = 0;

for (int i = 0; i < fullSectorCount; i++) {
final start = (i * sectorSize);
final start = indexOffset + (i * sectorSize);
final data = await client.getSparkAnonymitySetBySector(
coinGroupId: groupId,
latestBlock: meta.blockHash,
startIndex: start,
endIndex: start + sectorSize,
);
progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch);

coins.addAll(data);
final sectorCoins =
data
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
.toList();

coinsSaved += sectorCoins.length;

await _workers[network]!.runTask(
FCTask(
func: FCFuncName._insertSparkAnonSetCoinsIncremental,
data: (meta, sectorCoins, indexOffset + coinsSaved),
),
);

progressUpdated?.call(
indexOffset + (i + 1) * sectorSize,
meta.size,
);
}

if (remainder > 0) {
final remainderStart = indexOffset + numberOfCoinsToFetch - remainder;
final data = await client.getSparkAnonymitySetBySector(
coinGroupId: groupId,
latestBlock: meta.blockHash,
startIndex: numberOfCoinsToFetch - remainder,
endIndex: numberOfCoinsToFetch,
startIndex: remainderStart,
endIndex: indexOffset + numberOfCoinsToFetch,
);
progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch);

coins.addAll(data);
}
final sectorCoins =
data
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
.toList();

final result =
coins
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
.toList();
coinsSaved += sectorCoins.length;

await _workers[network]!.runTask(
FCTask(
func: FCFuncName._insertSparkAnonSetCoinsIncremental,
data: (meta, sectorCoins, indexOffset + coinsSaved),
),
);

progressUpdated?.call(meta.size, meta.size);
}

// Mark this SparkSet as complete so cross-block resume knows
// the download finished and can safely use the delta approach.
await _workers[network]!.runTask(
FCTask(
func: FCFuncName._updateSparkAnonSetCoinsWith,
data: (meta, result),
func: FCFuncName._markSparkAnonSetComplete,
data: meta,
),
);
});
Expand Down Expand Up @@ -268,6 +333,7 @@ abstract class FiroCacheCoordinator {
blockHash: result.first["blockHash"] as String,
setHash: result.first["setHash"] as String,
size: result.first["size"] as int,
complete: (result.first["complete"] as int) == 1,
);
}

Expand Down
2 changes: 1 addition & 1 deletion lib/db/sqlite/firo_cache_reader.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ abstract class _Reader {
required Database db,
}) async {
final query = """
SELECT ss.blockHash, ss.setHash, ss.size
SELECT ss.blockHash, ss.setHash, ss.size, ss.complete
FROM SparkSet ss
WHERE ss.groupId = $groupId
ORDER BY ss.size DESC
Expand Down
18 changes: 14 additions & 4 deletions lib/db/sqlite/firo_cache_worker.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
part of 'firo_cache.dart';

enum FCFuncName {
_updateSparkAnonSetCoinsWith,
_insertSparkAnonSetCoinsIncremental,
_markSparkAnonSetComplete,
_updateSparkUsedTagsWith,
}

Expand Down Expand Up @@ -93,13 +94,22 @@ class _FiroCacheWorker {
try {
final FCResult result;
switch (task.func) {
case FCFuncName._updateSparkAnonSetCoinsWith:
case FCFuncName._insertSparkAnonSetCoinsIncremental:
final data =
task.data as (SparkAnonymitySetMeta, List<RawSparkCoin>);
result = _updateSparkAnonSetCoinsWith(
task.data
as (SparkAnonymitySetMeta, List<RawSparkCoin>, int);
result = _insertSparkAnonSetCoinsIncremental(
setCacheDb,
data.$2,
data.$1,
data.$3,
);
break;

case FCFuncName._markSparkAnonSetComplete:
result = _markSparkAnonSetComplete(
setCacheDb,
task.data as SparkAnonymitySetMeta,
);
break;

Expand Down
94 changes: 65 additions & 29 deletions lib/db/sqlite/firo_cache_writer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,68 +45,89 @@ FCResult _updateSparkUsedTagsWith(Database db, List<List<dynamic>> tags) {
}

// ===========================================================================
// ================== write to spark anon set cache ==========================
// =========== incremental write to spark anon set cache ====================

/// update the sqlite cache
/// Persist a single sector's worth of coins to the cache, creating or
/// updating the SparkSet row as needed. Safe to call repeatedly — uses
/// INSERT OR IGNORE so duplicate coins (from crash-recovery reruns) are
/// silently skipped.
///
/// returns true if successful, otherwise false
FCResult _updateSparkAnonSetCoinsWith(
/// [cumulativeSize] should be prevSize + total coins saved so far (including
/// this batch). It is written to SparkSet.size so that on resume,
/// getLatestSetInfoForGroupId returns the correct partial progress.
FCResult _insertSparkAnonSetCoinsIncremental(
Database db,
final List<RawSparkCoin> coinsRaw,
SparkAnonymitySetMeta meta,
int cumulativeSize,
) {
if (coinsRaw.isEmpty) {
// no coins to actually insert
return FCResult(success: true);
}

final checkResult = db.select(
"""
SELECT *
FROM SparkSet
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
""",
[meta.blockHash, meta.setHash, meta.coinGroupId],
);

if (checkResult.isNotEmpty) {
// already up to date
return FCResult(success: true);
}

final coins = coinsRaw.reversed;

db.execute("BEGIN;");
try {
// Create SparkSet row if it doesn't exist yet for this block state.
// complete = 0 marks this as an in-progress download.
db.execute(
"""
INSERT INTO SparkSet (blockHash, setHash, groupId, size)
VALUES (?, ?, ?, ?);
INSERT OR IGNORE INTO SparkSet (blockHash, setHash, groupId, size, complete)
VALUES (?, ?, ?, 0, 0);
""",
[meta.blockHash, meta.setHash, meta.coinGroupId],
);

// Get the SparkSet row's id (whether just created or already existing).
final setIdResult = db.select(
"""
SELECT id FROM SparkSet
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
""",
[meta.blockHash, meta.setHash, meta.coinGroupId, meta.size],
[meta.blockHash, meta.setHash, meta.coinGroupId],
);
final setId = db.lastInsertRowId;
final setId = setIdResult.first["id"] as int;

for (final coin in coins) {
// INSERT OR IGNORE handles duplicates from crash-recovery reruns.
db.execute(
"""
INSERT INTO SparkCoin (serialized, txHash, context, groupId)
VALUES (?, ?, ?, ?);
""",
INSERT OR IGNORE INTO SparkCoin (serialized, txHash, context, groupId)
VALUES (?, ?, ?, ?);
""",
[coin.serialized, coin.txHash, coin.context, coin.groupId],
);
final coinId = db.lastInsertRowId;

// finally add the row id to the newly added set
// lastInsertRowId is 0 when INSERT OR IGNORE skips a duplicate,
// so we must SELECT explicitly.
final coinIdResult = db.select(
"""
SELECT id FROM SparkCoin
WHERE serialized = ? AND txHash = ? AND context = ? AND groupId = ?;
""",
[coin.serialized, coin.txHash, coin.context, coin.groupId],
);
final coinId = coinIdResult.first["id"] as int;

db.execute(
"""
INSERT INTO SparkSetCoins (setId, coinId)
INSERT OR IGNORE INTO SparkSetCoins (setId, coinId)
VALUES (?, ?);
""",
[setId, coinId],
);
}

// Update cumulative size to track partial progress.
db.execute(
"""
UPDATE SparkSet SET size = ?
WHERE id = ?;
""",
[cumulativeSize, setId],
);

db.execute("COMMIT;");

return FCResult(success: true);
Expand All @@ -115,3 +136,18 @@ FCResult _updateSparkAnonSetCoinsWith(
return FCResult(success: false, error: e);
}
}

/// Mark a SparkSet row as complete after all sectors have been downloaded.
FCResult _markSparkAnonSetComplete(
Database db,
SparkAnonymitySetMeta meta,
) {
db.execute(
"""
UPDATE SparkSet SET complete = 1
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
""",
[meta.blockHash, meta.setHash, meta.coinGroupId],
);
return FCResult(success: true);
}
5 changes: 4 additions & 1 deletion lib/models/electrumx_response/spark_models.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ class SparkAnonymitySetMeta {
final String blockHash;
final String setHash;
final int size;
final bool complete;

SparkAnonymitySetMeta({
required this.coinGroupId,
required this.blockHash,
required this.setHash,
required this.size,
this.complete = false,
});

@override
Expand All @@ -44,7 +46,8 @@ class SparkAnonymitySetMeta {
"coinGroupId: $coinGroupId, "
"blockHash: $blockHash, "
"setHash: $setHash, "
"size: $size"
"size: $size, "
"complete: $complete"
"}";
}
}
Expand Down
Loading