Skip to content

Commit a541c74

Browse files
committed
blockservice: add WithProvider option
This allows to recreate the behavior of advertising added blocks the bitswap server used to do.
1 parent a001f98 commit a541c74

File tree

5 files changed

+96
-6
lines changed

5 files changed

+96
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:
1616

1717
### Added
1818

19+
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.
20+
1921
### Changed
2022

2123
### Removed

blockservice/blockservice.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/ipfs/boxo/blockstore"
1515
"github.com/ipfs/boxo/exchange"
16+
"github.com/ipfs/boxo/provider"
1617
"github.com/ipfs/boxo/verifcid"
1718
blocks "github.com/ipfs/go-block-format"
1819
"github.com/ipfs/go-cid"
@@ -71,10 +72,19 @@ type BoundedBlockService interface {
7172
Allowlist() verifcid.Allowlist
7273
}
7374

75+
// ProvidingBlockService is a [Blockservice] which announces new cids to a [provider.Provider].
76+
type ProvidingBlockService interface {
77+
BlockService
78+
79+
// Provider can return [nil] if there is no provider used.
80+
Provider() provider.Provider
81+
}
82+
7483
type blockService struct {
7584
allowlist verifcid.Allowlist
7685
blockstore blockstore.Blockstore
7786
exchange exchange.Interface
87+
provider provider.Provider
7888
// If checkFirst is true then first check that a block doesn't
7989
// already exist to avoid republishing the block on the exchange.
8090
checkFirst bool
@@ -97,6 +107,13 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option {
97107
}
98108
}
99109

110+
// WithProvider allows to advertise anything that is added through the blockservice.
111+
func WithProvider(prov provider.Provider) Option {
112+
return func(bs *blockService) {
113+
bs.provider = prov
114+
}
115+
}
116+
100117
// New creates a BlockService with given datastore instance.
101118
func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService {
102119
if exchange == nil {
@@ -139,6 +156,10 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
139156
return s.allowlist
140157
}
141158

159+
func (s *blockService) Provider() provider.Provider {
160+
return s.provider
161+
}
162+
142163
// NewSession creates a new session that allows for
143164
// controlled exchange of wantlists to decrease the bandwidth overhead.
144165
// If the current exchange is a SessionExchange, a new exchange
@@ -149,6 +170,12 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
149170
if bbs, ok := bs.(BoundedBlockService); ok {
150171
allowlist = bbs.Allowlist()
151172
}
173+
174+
var prov provider.Provider
175+
if bprov, ok := bs.(ProvidingBlockService); ok {
176+
prov = bprov.Provider()
177+
}
178+
152179
exch := bs.Exchange()
153180
if sessEx, ok := exch.(exchange.SessionExchange); ok {
154181
return &Session{
@@ -158,6 +185,7 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
158185
sessEx: sessEx,
159186
bs: bs.Blockstore(),
160187
notifier: exch,
188+
provider: prov,
161189
}
162190
}
163191
return &Session{
@@ -166,6 +194,7 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
166194
sessCtx: ctx,
167195
bs: bs.Blockstore(),
168196
notifier: exch,
197+
provider: prov,
169198
}
170199
}
171200

@@ -196,6 +225,11 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
196225
logger.Errorf("NotifyNewBlocks: %s", err.Error())
197226
}
198227
}
228+
if s.provider != nil {
229+
if err := s.provider.Provide(o.Cid()); err != nil {
230+
logger.Errorf("Provide: %s", err.Error())
231+
}
232+
}
199233

200234
return nil
201235
}
@@ -242,6 +276,14 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
242276
logger.Errorf("NotifyNewBlocks: %s", err.Error())
243277
}
244278
}
279+
if s.provider != nil {
280+
for _, o := range toput {
281+
if err := s.provider.Provide(o.Cid()); err != nil {
282+
logger.Errorf("Provide: %s", err.Error())
283+
}
284+
}
285+
}
286+
245287
return nil
246288
}
247289

@@ -256,14 +298,14 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e
256298
f = s.getExchange
257299
}
258300

259-
return getBlock(ctx, c, s.blockstore, s.allowlist, f)
301+
return getBlock(ctx, c, s.blockstore, s.allowlist, f, s.provider)
260302
}
261303

262304
func (s *blockService) getExchange() notifiableFetcher {
263305
return s.exchange
264306
}
265307

266-
func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) (blocks.Block, error) {
308+
func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher, prov provider.Provider) (blocks.Block, error) {
267309
err := verifcid.ValidateCid(allowlist, c) // hash security
268310
if err != nil {
269311
return nil, err
@@ -293,6 +335,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlis
293335
if err != nil {
294336
return nil, err
295337
}
338+
if prov != nil {
339+
err = prov.Provide(blk.Cid())
340+
if err != nil {
341+
return nil, err
342+
}
343+
}
296344
logger.Debugf("BlockService.BlockFetched %s", c)
297345
return blk, nil
298346
}
@@ -313,10 +361,10 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block
313361
f = s.getExchange
314362
}
315363

316-
return getBlocks(ctx, ks, s.blockstore, s.allowlist, f)
364+
return getBlocks(ctx, ks, s.blockstore, s.allowlist, f, s.provider)
317365
}
318366

319-
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) <-chan blocks.Block {
367+
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher, prov provider.Provider) <-chan blocks.Block {
320368
out := make(chan blocks.Block)
321369

322370
go func() {
@@ -398,6 +446,14 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
398446
}
399447
cache[0] = nil // early gc
400448

449+
if prov != nil {
450+
err = prov.Provide(b.Cid())
451+
if err != nil {
452+
logger.Errorf("could not tell the provider about new blocks: %s", err)
453+
return
454+
}
455+
}
456+
401457
select {
402458
case out <- b:
403459
case <-ctx.Done():
@@ -437,6 +493,7 @@ type Session struct {
437493
sessEx exchange.SessionExchange
438494
sessCtx context.Context
439495
notifier notifier
496+
provider provider.Provider
440497
lk sync.Mutex
441498
}
442499

@@ -480,15 +537,15 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error)
480537
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
481538
defer span.End()
482539

483-
return getBlock(ctx, c, s.bs, s.allowlist, s.getFetcherFactory())
540+
return getBlock(ctx, c, s.bs, s.allowlist, s.getFetcherFactory(), s.provider)
484541
}
485542

486543
// GetBlocks gets blocks in the context of a request session
487544
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
488545
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
489546
defer span.End()
490547

491-
return getBlocks(ctx, ks, s.bs, s.allowlist, s.getFetcherFactory())
548+
return getBlocks(ctx, ks, s.bs, s.allowlist, s.getFetcherFactory(), s.provider)
492549
}
493550

494551
var _ BlockGetter = (*Session)(nil)

blockservice/blockservice_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,3 +288,31 @@ func TestAllowlist(t *testing.T) {
288288
check(blockservice.GetBlock)
289289
check(NewSession(ctx, blockservice).GetBlock)
290290
}
291+
292+
type mockProvider []cid.Cid
293+
294+
func (p *mockProvider) Provide(c cid.Cid) error {
295+
*p = append(*p, c)
296+
return nil
297+
}
298+
299+
func TestProviding(t *testing.T) {
300+
t.Parallel()
301+
a := assert.New(t)
302+
303+
bgen := butil.NewBlockGenerator()
304+
blocks := bgen.Blocks(3)
305+
306+
prov := mockProvider{}
307+
blockservice := New(blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), nil, WithProvider(&prov))
308+
var added []cid.Cid
309+
310+
a.NoError(blockservice.AddBlock(context.Background(), blocks[0]))
311+
added = append(added, blocks[0].Cid())
312+
313+
a.NoError(blockservice.AddBlocks(context.Background(), blocks[1:]))
314+
added = append(added, blocks[1].Cid())
315+
added = append(added, blocks[2].Cid())
316+
317+
a.ElementsMatch(added, []cid.Cid(prov))
318+
}

examples/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ require (
5959
github.com/huin/goupnp v1.3.0 // indirect
6060
github.com/ipfs/bbloom v0.0.4 // indirect
6161
github.com/ipfs/go-bitfield v1.1.0 // indirect
62+
github.com/ipfs/go-cidutil v0.1.0 // indirect
6263
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
6364
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
6465
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect

examples/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,8 @@ github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUP
251251
github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
252252
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
253253
github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk=
254+
github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q=
255+
github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA=
254256
github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk=
255257
github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8=
256258
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=

0 commit comments

Comments
 (0)