-
Notifications
You must be signed in to change notification settings - Fork 2
Feat/overlay support #94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
912cb3d
78d1eea
a95d06a
708ef6d
e3e81e9
bb5ff9a
2231438
eb667c7
def4dc5
8de7164
fd2d4ef
4c26b81
a9eaf77
3f8b5da
ccc9526
f41e19f
a1516e3
e67c815
615fe33
a3ef5f0
c3a9a38
b357f85
28f771c
aa81487
deb61d3
ddbb8cf
7bae0d3
14bfc22
146d7c4
826cb9e
ed79c49
3b68b6b
4df5672
46ab602
3ef34ac
3e5a0d9
a452e29
7285c01
312300f
7dc8f07
d02805c
9b46793
07f9985
2ad06a5
244f637
e80e068
569b9ef
d813542
948dc81
e739431
aa719bd
e22da22
a88fa6a
7c5427c
6a9cf60
220243a
237cfe1
6ee349d
3518bd3
121dbc7
c8bc042
a7eeafa
12a2080
5a971e5
1761e83
a1f7c77
b376760
6b19152
53bea50
95cd5a8
df268d4
5510f8c
f688d2a
a849b92
ff38a92
bac5368
34dc1df
edab387
85fcbec
5c025ab
cfdd639
4ae36ed
523a63d
61581ed
3798ed5
c5eacb5
25bc6d9
0d0bf86
5e1e422
62172ee
43ed736
7dc802e
d322da1
fdb6bcf
ead1301
47773f2
0390129
34f54ae
ff5c0fd
c0c931e
746e488
fbac395
8b43740
0734058
7e778d1
c124057
db82e4d
9584873
cd4337f
7bb2166
9aa220f
acf87c6
9bc6b0b
ac20cd5
42554f8
a19f288
0dee08f
78cfda7
243e18e
d49ecfa
088e39a
4360adf
648356b
58140fd
95aa72e
048a345
e9892d9
3c37e39
bddf8d3
7b32d08
4eec4c1
001d320
551aa91
5f7ff39
324fb01
87d0129
9d53e4d
1cfa464
bf6b9b8
7034496
258ccb3
1350159
d2dac51
080af2f
b6e5800
9c425bd
eebdf97
f7745ff
ead2080
5b541b6
409ba23
593cc70
faa3721
44ddedf
a3a756f
8ea4437
bc0ad02
6c82ec9
2b1eb69
1d02ba2
5ef0176
8836dfb
db2b848
ed75e2d
c8dc6f4
7758afe
1471da2
ea54aad
025f3ba
587d56b
cf93215
766a426
abf71db
186f27b
3aa9a9a
3ce6d92
5d35901
f7c9050
9005c7b
89f3d0f
c52bb6a
fd91f7d
00aa9b8
54c1283
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ env: | |
| jobs: | ||
| build: | ||
| strategy: | ||
| fail-fast: false | ||
| matrix: | ||
| include: ${{ fromJson(inputs.matrix) }} | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,3 +44,9 @@ docker/prometheus-data | |
| data/ | ||
| nimbledeps | ||
| logs/ | ||
| .grepai/ | ||
| *.log | ||
| CLAUDE.md | ||
| AGENTS.md | ||
| .claude | ||
| .opencode | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ import std/sets | |
| import std/options | ||
| import std/algorithm | ||
| import std/sugar | ||
| import std/tables | ||
|
|
||
| import pkg/chronos | ||
| import pkg/libp2p/[cid, switch, multihash, multicodec] | ||
|
|
@@ -396,10 +397,17 @@ proc validateBlockDelivery(self: BlockExcEngine, bd: BlockDelivery): ?!void = | |
|
|
||
| proc blocksDeliveryHandler*( | ||
| self: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery] | ||
| ) {.async: (raises: []).} = | ||
| ) {.async: (raises: [CancelledError]).} = | ||
| # TODO: this should not be here, the engine should just resolve the future, | ||
| # and let the caller do the validation and storing | ||
| trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) | ||
|
|
||
| var validatedBlocksDelivery: seq[BlockDelivery] | ||
| # Validate all deliveries and separate leaf vs non-leaf | ||
| var | ||
| validatedBlocksDelivery: seq[BlockDelivery] | ||
| leafByTree: Table[Cid, seq[(Block, Natural, ArchivistProof)]] | ||
| nonLeafDeliveries: seq[BlockDelivery] | ||
|
|
||
| for bd in blocksDelivery: | ||
| logScope: | ||
| peer = peer | ||
|
|
@@ -410,36 +418,54 @@ proc blocksDeliveryHandler*( | |
| warn "Block validation failed", msg = err.msg | ||
| continue | ||
|
|
||
| if err =? (await self.localStore.putBlock(bd.blk)).errorOption: | ||
| error "Unable to store block", err = err.msg | ||
| continue | ||
|
|
||
| if bd.address.leaf: | ||
| without proof =? bd.proof: | ||
| warn "Proof expected for a leaf block delivery" | ||
| continue | ||
| if err =? ( | ||
| await self.localStore.putCidAndProof( | ||
| bd.address.treeCid, bd.address.index, bd.blk.cid, proof | ||
| ) | ||
| ).errorOption: | ||
| warn "Unable to store proof and cid for a block" | ||
| continue | ||
|
|
||
| leafByTree.withValue(bd.address.treeCid, slot): | ||
| slot[].add((bd.blk, bd.address.index, proof)) | ||
| do: | ||
| leafByTree[bd.address.treeCid] = @[(bd.blk, bd.address.index, proof)] | ||
| else: | ||
| nonLeafDeliveries.add(bd) | ||
| except CatchableError as exc: | ||
| warn "Error handling block delivery", error = exc.msg | ||
| continue | ||
|
|
||
| validatedBlocksDelivery.add(bd) | ||
|
|
||
| # Batch write leaf blocks grouped by treeCid | ||
| for treeCid, items in leafByTree: | ||
| if err =? (await self.localStore.putBlocks(treeCid, items)).errorOption: | ||
| error "Unable to store leaf blocks", treeCid, err = err.msg | ||
| # Remove failed leaves from validatedBlocksDelivery | ||
| validatedBlocksDelivery.keepItIf( | ||
| not (it.address.leaf and it.address.treeCid == treeCid) | ||
| ) | ||
|
|
||
| # Write non-leaf blocks sequentially - this should only be manifests after #94 | ||
| for bd in nonLeafDeliveries: | ||
| without isManifest =? bd.blk.cid.isManifest, err: | ||
| error "Received a non-leaf block that isn't a manifest!", err = err.msg | ||
| validatedBlocksDelivery.keepItIf(it.address.cid != bd.address.cid) | ||
| continue | ||
|
|
||
| # TODO: The putBlock here should be replace by something like - | ||
| # storeManifestBlock(...) | ||
| if err =? (await self.localStore.putBlock(bd.blk)).errorOption: | ||
| error "Unable to store block", err = err.msg | ||
| validatedBlocksDelivery.keepItIf(it.address.cid != bd.address.cid) | ||
|
|
||
| archivist_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) | ||
|
|
||
| let peerCtx = self.peers.get(peer) | ||
| if peerCtx != nil: | ||
| if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption: | ||
| if err =? catchAsync(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption: | ||
| warn "Error paying for blocks", err = err.msg | ||
| return | ||
|
|
||
| if err =? catch(await self.resolveBlocks(validatedBlocksDelivery)).errorOption: | ||
| if err =? catchAsync(await self.resolveBlocks(validatedBlocksDelivery)).errorOption: | ||
| warn "Error resolving blocks", err = err.msg | ||
| return | ||
|
|
||
|
|
@@ -470,7 +496,11 @@ proc wantListHandler*( | |
| let | ||
| have = | ||
| try: | ||
| await e.address in self.localStore | ||
| if e.address.leaf: | ||
| (await self.localStore.hasBlock(e.address.treeCid, e.address.index)) |? | ||
| false | ||
| else: | ||
| (await self.localStore.hasBlock(e.address.cid)) |? false | ||
|
Comment on lines
+499
to
+503
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be nice to create a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I intentionally removed the BlockAddress api from the BlockStore abstraction, because there was way too much noice. I also thought that I'd go the other way around, and use BlockAddress exclusively, alas I found it to be very clunky compared to the |
||
| except CatchableError: | ||
| # TODO: should not be necessary once we have proper exception tracking on the BlockStore interface | ||
| false | ||
|
|
@@ -612,13 +642,13 @@ proc taskHandler*( | |
| proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = | ||
| if e.address.leaf: | ||
| (await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( | ||
| (blkAndProof: (Block, ArchivistProof)) => | ||
| (blkAndProof: (Natural, Block, ArchivistProof)) => | ||
| BlockDelivery( | ||
| address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some | ||
| address: e.address, blk: blkAndProof[1], proof: blkAndProof[2].some | ||
| ) | ||
| ) | ||
| else: | ||
| (await self.localStore.getBlock(e.address)).map( | ||
| (await self.localStore.getBlock(e.address.cid)).map( | ||
| (blk: Block) => | ||
| BlockDelivery(address: e.address, blk: blk, proof: ArchivistProof.none) | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a left-over from testing? I would not disable fail-fast in general, because we have too few github runners to use them on jobs in PRs that are failing anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I'll clean it up before merge, but I need this to be able to see what breaks on which platform.