Skip to content

Commit b173d4a

Browse files
committed
Merge main into sharding-improvements
2 parents e15dff5 + 7788968 commit b173d4a

30 files changed

+1301
-119
lines changed

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,22 @@ The service is configured via environment variables:
172172
| `REDIS_FLUSH_INTERVAL` | Interval for flushing pending commitments to Redis | `50ms` |
173173
| `REDIS_MAX_BATCH_SIZE` | Maximum batch size before forcing flush | `2000` |
174174

175+
### BFT Configuration
176+
177+
| Variable | Description | Default |
178+
|-------------------------------------|-------------------------------------------------------------------------------------|----------------------------------|
179+
| `BFT_ENABLED` | Enables or disables the BFT client integration. | `true` |
180+
| `BFT_ADDRESS` | The libp2p multiaddress for the BFT client to listen on. | `/ip4/0.0.0.0/tcp/9000` |
181+
| `BFT_ANNOUNCE_ADDRESSES` | Comma-separated list of public callback multi-addresses to announce to other peers. | `""` |
182+
| `BFT_BOOTSTRAP_ADDRESSES` | Comma-separated list of bootstrap peer addresses. | `""` |
183+
| `BFT_BOOTSTRAP_CONNECT_RETRY` | Number of retries for connecting to bootstrap peers. | `3` |
184+
| `BFT_BOOTSTRAP_CONNECT_RETRY_DELAY` | Delay between bootstrap connection retries (in seconds). | `5` |
185+
| `BFT_HEARTBEAT_INTERVAL` | How often the BFT client checks for inactivity. | `1s` |
186+
| `BFT_INACTIVITY_TIMEOUT` | Duration of inactivity before the BFT client sends a new handshake. | `5s` |
187+
| `BFT_KEY_CONF_FILE` | Path to the BFT key configuration file. | `bft-config/keys.json` |
188+
| `BFT_SHARD_CONF_FILE` | Path to the aggregator shard configuration file. | `bft-config/shard-conf-7_0.json` |
189+
| `BFT_TRUST_BASE_FILES` | Comma-separated list of paths to trust base files. | `bft-config/trust-base.json` |
190+
175191
## API Endpoints
176192

177193
### JSON-RPC 2.0 Methods
@@ -432,6 +448,24 @@ Returns the health status and role of the service.
432448
}
433449
```
434450

451+
#### `PUT /api/v1/trustbases`
452+
Adds trust base to the trust base store. The request body must be a valid trust base in json format.
453+
454+
Example curl request
455+
```curl -X PUT -H 'Content-Type: application/json' -d @./test-nodes/trust-base-1.json http://localhost:3000/api/v1/trustbases```
456+
457+
**If trust base was stored successfully then status 200 with empty response body is returned:**
458+
```json
459+
{}
460+
```
461+
462+
**If trust base is invalid error then status 400 with error cause is returned:**
463+
```json
464+
{
465+
"error":"failed to store trust base: trust base already exists"
466+
}
467+
```
468+
435469
#### `GET /docs`
436470
Returns **executable** interactive HTML API documentation page with live testing capabilities (if `ENABLE_DOCS=true`).
437471

cmd/aggregator/main.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package main
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"os/signal"
89
"syscall"
910
"time"
1011

12+
"github.com/unicitynetwork/bft-go-base/types"
13+
1114
"github.com/unicitynetwork/aggregator-go/internal/config"
1215
"github.com/unicitynetwork/aggregator-go/internal/gateway"
1316
"github.com/unicitynetwork/aggregator-go/internal/ha"
@@ -16,6 +19,7 @@ import (
1619
"github.com/unicitynetwork/aggregator-go/internal/round"
1720
"github.com/unicitynetwork/aggregator-go/internal/service"
1821
"github.com/unicitynetwork/aggregator-go/internal/storage"
22+
"github.com/unicitynetwork/aggregator-go/internal/storage/interfaces"
1923
)
2024

2125
// gracefulExit flushes async logger and exits with the given code
@@ -68,7 +72,7 @@ func main() {
6872
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
6973
defer stop()
7074

71-
commitmentQueue, storageInstance, err := storage.NewStorage(cfg, log)
75+
commitmentQueue, storageInstance, err := storage.NewStorage(ctx, cfg, log)
7276
if err != nil {
7377
log.WithComponent("main").Error("Failed to initialize storage", "error", err.Error())
7478
gracefulExit(asyncLogger, 1)
@@ -94,6 +98,25 @@ func main() {
9498

9599
log.WithComponent("main").Info("Database connection established")
96100

101+
// Store trust bases from config files
102+
trustBaseValidator := service.NewTrustBaseValidator(storageInstance.TrustBaseStorage())
103+
for _, tb := range cfg.BFT.TrustBases {
104+
if err := trustBaseValidator.Verify(ctx, &tb); err != nil {
105+
log.WithComponent("main").Error(fmt.Sprintf("Trust base verification failed"), "error", err.Error())
106+
gracefulExit(asyncLogger, 1)
107+
}
108+
if err := storageInstance.TrustBaseStorage().Store(ctx, &tb); err != nil {
109+
if errors.Is(err, interfaces.ErrTrustBaseAlreadyExists) {
110+
log.WithComponent("main").Warn(fmt.Sprintf("Trust base already exists, not overwriting it"), "epoch", tb.GetEpoch())
111+
} else {
112+
log.WithComponent("main").Error("Failed to store trust base", "epoch", tb.GetEpoch(), "error", err.Error())
113+
gracefulExit(asyncLogger, 1)
114+
}
115+
} else {
116+
log.WithComponent("main").Info("Stored trust base", "epoch", tb.GetEpoch())
117+
}
118+
}
119+
97120
if err := commitmentQueue.Initialize(ctx); err != nil {
98121
log.WithComponent("main").Error("Failed to initialize commitment queue", "error", err.Error())
99122
gracefulExit(asyncLogger, 1)
@@ -102,8 +125,22 @@ func main() {
102125
// Create the shared state tracker for block sync height
103126
stateTracker := state.NewSyncStateTracker()
104127

128+
// Load last committed unicity certificate (can be nil for genesis)
129+
var luc *types.UnicityCertificate
130+
lastBlock, err := storageInstance.BlockStorage().GetLatest(ctx)
131+
if err != nil {
132+
log.WithComponent("main").Error("Failed to load last stored block", "error", err.Error())
133+
gracefulExit(asyncLogger, 1)
134+
}
135+
if lastBlock != nil {
136+
if err := types.Cbor.Unmarshal(lastBlock.UnicityCertificate, &luc); err != nil {
137+
log.WithComponent("main").Error("Failed to decode unicity certificate", "error", err.Error())
138+
gracefulExit(asyncLogger, 1)
139+
}
140+
}
141+
105142
// Create round manager based on sharding mode
106-
roundManager, err := round.NewManager(ctx, cfg, log, commitmentQueue, storageInstance, stateTracker)
143+
roundManager, err := round.NewManager(ctx, cfg, log, commitmentQueue, storageInstance, stateTracker, luc)
107144
if err != nil {
108145
log.WithComponent("main").Error("Failed to create round manager", "error", err.Error())
109146
gracefulExit(asyncLogger, 1)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ require (
1212
github.com/stretchr/testify v1.11.1
1313
github.com/testcontainers/testcontainers-go v0.39.0
1414
github.com/testcontainers/testcontainers-go/modules/mongodb v0.39.0
15-
github.com/unicitynetwork/bft-core v1.0.2-0.20250618101015-e1316ad51f4e
16-
github.com/unicitynetwork/bft-go-base v1.0.2
15+
github.com/unicitynetwork/bft-core v1.0.2-0.20251126094351-51daa89c8c43
16+
github.com/unicitynetwork/bft-go-base v1.0.3-0.20251125134602-9545226e4709
1717
go.mongodb.org/mongo-driver v1.17.4
1818
go.opentelemetry.io/otel/metric v1.38.0
1919
go.opentelemetry.io/otel/trace v1.38.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -559,10 +559,10 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
559559
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
560560
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
561561
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
562-
github.com/unicitynetwork/bft-core v1.0.2-0.20250618101015-e1316ad51f4e h1:C8emZcWmMPG9uEfLW3Jk1jt7IcDdGo6g7KMxer7w1Aw=
563-
github.com/unicitynetwork/bft-core v1.0.2-0.20250618101015-e1316ad51f4e/go.mod h1:oHcvP+Jp8N4SzedwzxWfq7SfOAzgV0si6WsQBNYVQIo=
564-
github.com/unicitynetwork/bft-go-base v1.0.2 h1:Z5stN59ZP3psY5RDH4VFDyg5HXDsePv27uqvP56mNGw=
565-
github.com/unicitynetwork/bft-go-base v1.0.2/go.mod h1:hBnOG52VRy/vpgIBUulTgk7PBTwODZ2xkVjCEu5yRcQ=
562+
github.com/unicitynetwork/bft-core v1.0.2-0.20251126094351-51daa89c8c43 h1:oWVHqRyI+QeldeziycQ9Qs0g3PiJN3YlNxYEBFUHxUQ=
563+
github.com/unicitynetwork/bft-core v1.0.2-0.20251126094351-51daa89c8c43/go.mod h1:9qPdOWF7Hr2k2owuT4L4OpndbGcdZSdymjfbxiyp3Is=
564+
github.com/unicitynetwork/bft-go-base v1.0.3-0.20251125134602-9545226e4709 h1:otZGyxGVw63HpZBIVyyKNoJorukL7xNXDhKoWxCqbtc=
565+
github.com/unicitynetwork/bft-go-base v1.0.3-0.20251125134602-9545226e4709/go.mod h1:hBnOG52VRy/vpgIBUulTgk7PBTwODZ2xkVjCEu5yRcQ=
566566
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
567567
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
568568
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=

internal/bft/client.go

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package bft
33
import (
44
"bytes"
55
"context"
6+
"crypto"
67
"encoding/hex"
78
"errors"
89
"fmt"
@@ -11,11 +12,10 @@ import (
1112
"sync/atomic"
1213
"time"
1314

14-
"github.com/libp2p/go-libp2p/core/peer"
1515
"github.com/unicitynetwork/bft-core/network"
1616
"github.com/unicitynetwork/bft-core/network/protocol/certification"
1717
"github.com/unicitynetwork/bft-core/network/protocol/handshake"
18-
"github.com/unicitynetwork/bft-go-base/crypto"
18+
cryptobft "github.com/unicitynetwork/bft-go-base/crypto"
1919
"github.com/unicitynetwork/bft-go-base/types"
2020

2121
"github.com/unicitynetwork/aggregator-go/internal/config"
@@ -39,12 +39,11 @@ type (
3939
shardID types.ShardID
4040
logger *logger.Logger
4141

42-
// mutex for peer, network, signer, rootNodes
43-
mu sync.Mutex
44-
peer *network.Peer
45-
network *BftNetwork
46-
rootNodes peer.IDSlice
47-
signer crypto.Signer
42+
// mutex for peer, network, signer TODO: there are readers without mutex
43+
mu sync.Mutex
44+
peer *network.Peer
45+
network *BftNetwork
46+
signer cryptobft.Signer
4847

4948
// Latest UC this node has seen. Can be ahead of the committed UC during recovery.
5049
luc atomic.Pointer[types.UnicityCertificate]
@@ -58,6 +57,11 @@ type (
5857
ucProcessingMutex sync.Mutex
5958

6059
msgLoopCancelFn context.CancelFunc
60+
61+
// timestamp when last UC was received
62+
lastCertResponseTime atomic.Int64
63+
64+
trustBaseStore TrustBaseStore
6165
}
6266

6367
BFTClient interface {
@@ -71,19 +75,26 @@ type (
7175
StartNewRound(ctx context.Context, roundNumber *api.BigInt) error
7276
}
7377

78+
TrustBaseStore interface {
79+
GetByEpoch(ctx context.Context, epoch uint64) (types.RootTrustBase, error)
80+
}
81+
7482
status int
7583
)
7684

77-
func NewBFTClient(ctx context.Context, conf *config.BFTConfig, roundManager RoundManager, logger *logger.Logger) (*BFTClientImpl, error) {
85+
func NewBFTClient(conf *config.BFTConfig, roundManager RoundManager, trustBaseStore TrustBaseStore, luc *types.UnicityCertificate, logger *logger.Logger) (*BFTClientImpl, error) {
7886
logger.Info("Creating BFT Client")
7987
bftClient := &BFTClientImpl{
80-
logger: logger,
81-
partitionID: conf.ShardConf.PartitionID,
82-
shardID: conf.ShardConf.ShardID,
83-
roundManager: roundManager,
84-
conf: conf,
88+
logger: logger,
89+
partitionID: conf.ShardConf.PartitionID,
90+
shardID: conf.ShardConf.ShardID,
91+
roundManager: roundManager,
92+
trustBaseStore: trustBaseStore,
93+
conf: conf,
8594
}
8695
bftClient.status.Store(idle)
96+
bftClient.luc.Store(luc)
97+
bftClient.lastCertResponseTime.Store(time.Now().UnixMilli())
8798
return bftClient, nil
8899
}
89100

@@ -120,10 +131,6 @@ func (c *BFTClientImpl) Start(ctx context.Context) error {
120131
if err != nil {
121132
return err
122133
}
123-
rootNodes, err := c.conf.GetRootNodes()
124-
if err != nil {
125-
return fmt.Errorf("failed to get root nodes: %w", err)
126-
}
127134
signer, err := c.conf.KeyConf.Signer()
128135
if err != nil {
129136
return fmt.Errorf("failed to create signer: %w", err)
@@ -137,19 +144,20 @@ func (c *BFTClientImpl) Start(ctx context.Context) error {
137144
c.peer = self
138145
c.network = networkP2P
139146
c.signer = signer
140-
c.rootNodes = rootNodes
141147

142148
if err := c.peer.BootstrapConnect(ctx, c.logger.Logger); err != nil {
143149
return fmt.Errorf("failed to bootstrap peer: %w", err)
144150
}
145151

146-
if err := c.sendHandshake(ctx); err != nil {
147-
return fmt.Errorf("failed to send handshake: %w", err)
148-
}
149-
150152
msgLoopCtx, cancelFn := context.WithCancel(ctx)
151153
c.msgLoopCancelFn = cancelFn
152-
go c.loop(msgLoopCtx)
154+
go func() {
155+
if err := c.loop(msgLoopCtx); err != nil {
156+
c.logger.Error("BFT event loop thread exited with error", "error", err.Error())
157+
} else {
158+
c.logger.Info("BFT event loop thread finished")
159+
}
160+
}()
153161

154162
return nil
155163
}
@@ -171,16 +179,21 @@ func (c *BFTClientImpl) Stop() {
171179
c.peer = nil
172180
c.network = nil
173181
c.signer = nil
174-
c.rootNodes = nil
175182
}
176183
}
177184

178185
func (c *BFTClientImpl) sendHandshake(ctx context.Context) error {
179186
c.logger.WithContext(ctx).Debug("sending handshake to root chain")
187+
188+
// load trust base
189+
rootEpoch := c.luc.Load().GetRootEpoch()
190+
tb, err := c.trustBaseStore.GetByEpoch(ctx, rootEpoch)
191+
if err != nil {
192+
return fmt.Errorf("failed to load trust base for epoch %d: %w", rootEpoch, err)
193+
}
180194
// select some random root nodes
181-
rootIDs, err := randomNodeSelector(c.rootNodes, defaultHandshakeNodes)
195+
rootIDs, err := randomNodeSelector(tb, defaultHandshakeNodes)
182196
if err != nil {
183-
// error should only happen in case the root nodes are not initialized
184197
return fmt.Errorf("failed to select root nodes for handshake: %w", err)
185198
}
186199
if err = c.network.Send(ctx,
@@ -196,6 +209,13 @@ func (c *BFTClientImpl) sendHandshake(ctx context.Context) error {
196209
}
197210

198211
func (c *BFTClientImpl) loop(ctx context.Context) error {
212+
if err := c.sendHandshake(ctx); err != nil {
213+
return fmt.Errorf("failed to send initial handshake: %w", err)
214+
}
215+
216+
heartbeat := time.NewTicker(c.conf.HeartbeatInterval)
217+
defer heartbeat.Stop()
218+
199219
for {
200220
select {
201221
case <-ctx.Done():
@@ -206,6 +226,15 @@ func (c *BFTClientImpl) loop(ctx context.Context) error {
206226
}
207227
c.logger.WithContext(ctx).Debug("received message", "type", fmt.Sprintf("%T", m))
208228
c.handleMessage(ctx, m)
229+
case <-heartbeat.C:
230+
lastCertMillis := c.lastCertResponseTime.Load()
231+
lastCertTime := time.UnixMilli(lastCertMillis)
232+
if time.Since(lastCertTime) > c.conf.InactivityTimeout {
233+
c.logger.Warn("BFT client inactivity timeout exceeded, sending new handshake")
234+
if err := c.sendHandshake(ctx); err != nil {
235+
c.logger.Error("failed to send handshake on inactivity timeout", "error", err.Error())
236+
}
237+
}
209238
}
210239
}
211240
}
@@ -214,16 +243,30 @@ func (c *BFTClientImpl) handleMessage(ctx context.Context, msg any) {
214243
switch mt := msg.(type) {
215244
case *certification.CertificationResponse:
216245
c.logger.WithContext(ctx).Info("received CertificationResponse")
217-
c.handleCertificationResponse(ctx, mt)
246+
if err := c.handleCertificationResponse(ctx, mt); err != nil {
247+
c.logger.WithContext(ctx).Error("error processing CertificationResponse message", "error", err.Error())
248+
}
218249
default:
219250
c.logger.WithContext(ctx).Info("received unknown message")
220251
}
221252
}
222253

223254
func (c *BFTClientImpl) handleCertificationResponse(ctx context.Context, cr *certification.CertificationResponse) error {
255+
c.lastCertResponseTime.Store(time.Now().UnixMilli())
256+
224257
if err := cr.IsValid(); err != nil {
225258
return fmt.Errorf("invalid CertificationResponse: %w", err)
226259
}
260+
261+
// verify UC
262+
tb, err := c.trustBaseStore.GetByEpoch(ctx, cr.UC.GetRootEpoch())
263+
if err != nil {
264+
return fmt.Errorf("failed to load trust base for epoch %d: %w", cr.UC.GetRootEpoch(), err)
265+
}
266+
if err := cr.UC.Verify(tb, crypto.SHA256, c.partitionID, c.shardID, nil); err != nil {
267+
return fmt.Errorf("failed to verify UC: %w", err)
268+
}
269+
227270
c.logger.WithContext(ctx).Info(fmt.Sprintf("handleCertificationResponse: UC round %d, next round %d, next leader %s",
228271
cr.UC.GetRoundNumber(), cr.Technical.Round, cr.Technical.Leader))
229272

@@ -476,7 +519,12 @@ func (c *BFTClientImpl) sendCertificationRequest(ctx context.Context, rootHash s
476519
}
477520
c.logger.WithContext(ctx).Info(fmt.Sprintf("Round %d sending block certification request to root chain, IR hash %X",
478521
req.InputRecord.RoundNumber, req.InputRecord.Hash))
479-
rootIDs, err := rootNodesSelector(luc, c.rootNodes, defaultNofRootNodes)
522+
523+
tb, err := c.trustBaseStore.GetByEpoch(ctx, luc.GetRootEpoch())
524+
if err != nil {
525+
return fmt.Errorf("failed to load trust base for epoch %d: %w", luc.GetRootEpoch(), err)
526+
}
527+
rootIDs, err := rootNodesSelector(luc, tb, defaultNofRootNodes)
480528
if err != nil {
481529
return fmt.Errorf("selecting root nodes: %w", err)
482530
}

0 commit comments

Comments
 (0)