Skip to content

Commit 7788968

Browse files
authored
Merge pull request #86 from unicitynetwork/dynamic-trust-base
Dynamic trust base
2 parents bf59d2d + 255398d commit 7788968

31 files changed

+1307
-124
lines changed

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,22 @@ The service is configured via environment variables:
160160
| `BATCH_LIMIT` | Maximum number of commitments to process per batch | `1000` |
161161
| `ROUND_DURATION` | Duration between block creation rounds | `1s` |
162162

163+
### BFT Configuration
164+
165+
| Variable | Description | Default |
166+
|-------------------------------------|-------------------------------------------------------------------------------------|----------------------------------|
167+
| `BFT_ENABLED` | Enables or disables the BFT client integration. | `true` |
168+
| `BFT_ADDRESS` | The libp2p multiaddress for the BFT client to listen on. | `/ip4/0.0.0.0/tcp/9000` |
169+
| `BFT_ANNOUNCE_ADDRESSES` | Comma-separated list of public callback multi-addresses to announce to other peers. | `""` |
170+
| `BFT_BOOTSTRAP_ADDRESSES` | Comma-separated list of bootstrap peer addresses. | `""` |
171+
| `BFT_BOOTSTRAP_CONNECT_RETRY` | Number of retries for connecting to bootstrap peers. | `3` |
172+
| `BFT_BOOTSTRAP_CONNECT_RETRY_DELAY` | Delay between bootstrap connection retries (in seconds). | `5` |
173+
| `BFT_HEARTBEAT_INTERVAL` | How often the BFT client checks for inactivity. | `1s` |
174+
| `BFT_INACTIVITY_TIMEOUT` | Duration of inactivity before the BFT client sends a new handshake. | `5s` |
175+
| `BFT_KEY_CONF_FILE` | Path to the BFT key configuration file. | `bft-config/keys.json` |
176+
| `BFT_SHARD_CONF_FILE` | Path to the aggregator shard configuration file. | `bft-config/shard-conf-7_0.json` |
177+
| `BFT_TRUST_BASE_FILES` | Comma-separated list of paths to trust base files. | `bft-config/trust-base.json` |
178+
163179
## API Endpoints
164180

165181
### JSON-RPC 2.0 Methods
@@ -420,6 +436,24 @@ Returns the health status and role of the service.
420436
}
421437
```
422438

439+
#### `PUT /api/v1/trustbases`
440+
Adds trust base to the trust base store. The request body must be a valid trust base in json format.
441+
442+
Example curl request
443+
```curl -X PUT -H 'Content-Type: application/json' -d @./test-nodes/trust-base-1.json http://localhost:3000/api/v1/trustbases```
444+
445+
**If trust base was stored successfully then status 200 with empty response body is returned:**
446+
```json
447+
{}
448+
```
449+
450+
**If trust base is invalid error then status 400 with error cause is returned:**
451+
```json
452+
{
453+
"error":"failed to store trust base: trust base already exists"
454+
}
455+
```
456+
423457
#### `GET /docs`
424458
Returns **executable** interactive HTML API documentation page with live testing capabilities (if `ENABLE_DOCS=true`).
425459

cmd/aggregator/main.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package main
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"os/signal"
89
"syscall"
910
"time"
1011

12+
"github.com/unicitynetwork/bft-go-base/types"
13+
1114
"github.com/unicitynetwork/aggregator-go/internal/config"
1215
"github.com/unicitynetwork/aggregator-go/internal/gateway"
1316
"github.com/unicitynetwork/aggregator-go/internal/ha"
@@ -16,6 +19,7 @@ import (
1619
"github.com/unicitynetwork/aggregator-go/internal/round"
1720
"github.com/unicitynetwork/aggregator-go/internal/service"
1821
"github.com/unicitynetwork/aggregator-go/internal/storage"
22+
"github.com/unicitynetwork/aggregator-go/internal/storage/interfaces"
1923
)
2024

2125
// gracefulExit flushes async logger and exits with the given code
@@ -68,7 +72,7 @@ func main() {
6872
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
6973
defer stop()
7074

71-
commitmentQueue, storageInstance, err := storage.NewStorage(cfg, log)
75+
commitmentQueue, storageInstance, err := storage.NewStorage(ctx, cfg, log)
7276
if err != nil {
7377
log.WithComponent("main").Error("Failed to initialize storage", "error", err.Error())
7478
gracefulExit(asyncLogger, 1)
@@ -94,6 +98,25 @@ func main() {
9498

9599
log.WithComponent("main").Info("Database connection established")
96100

101+
// Store trust bases from config files
102+
trustBaseValidator := service.NewTrustBaseValidator(storageInstance.TrustBaseStorage())
103+
for _, tb := range cfg.BFT.TrustBases {
104+
if err := trustBaseValidator.Verify(ctx, &tb); err != nil {
105+
log.WithComponent("main").Error(fmt.Sprintf("Trust base verification failed"), "error", err.Error())
106+
gracefulExit(asyncLogger, 1)
107+
}
108+
if err := storageInstance.TrustBaseStorage().Store(ctx, &tb); err != nil {
109+
if errors.Is(err, interfaces.ErrTrustBaseAlreadyExists) {
110+
log.WithComponent("main").Warn(fmt.Sprintf("Trust base already exists, not overwriting it"), "epoch", tb.GetEpoch())
111+
} else {
112+
log.WithComponent("main").Error("Failed to store trust base", "epoch", tb.GetEpoch(), "error", err.Error())
113+
gracefulExit(asyncLogger, 1)
114+
}
115+
} else {
116+
log.WithComponent("main").Info("Stored trust base", "epoch", tb.GetEpoch())
117+
}
118+
}
119+
97120
if err := commitmentQueue.Initialize(ctx); err != nil {
98121
log.WithComponent("main").Error("Failed to initialize commitment queue", "error", err.Error())
99122
gracefulExit(asyncLogger, 1)
@@ -102,8 +125,22 @@ func main() {
102125
// Create the shared state tracker for block sync height
103126
stateTracker := state.NewSyncStateTracker()
104127

128+
// Load last committed unicity certificate (can be nil for genesis)
129+
var luc *types.UnicityCertificate
130+
lastBlock, err := storageInstance.BlockStorage().GetLatest(ctx)
131+
if err != nil {
132+
log.WithComponent("main").Error("Failed to load last stored block", "error", err.Error())
133+
gracefulExit(asyncLogger, 1)
134+
}
135+
if lastBlock != nil {
136+
if err := types.Cbor.Unmarshal(lastBlock.UnicityCertificate, &luc); err != nil {
137+
log.WithComponent("main").Error("Failed to decode unicity certificate", "error", err.Error())
138+
gracefulExit(asyncLogger, 1)
139+
}
140+
}
141+
105142
// Create round manager based on sharding mode
106-
roundManager, err := round.NewManager(ctx, cfg, log, commitmentQueue, storageInstance, stateTracker)
143+
roundManager, err := round.NewManager(ctx, cfg, log, commitmentQueue, storageInstance, stateTracker, luc)
107144
if err != nil {
108145
log.WithComponent("main").Error("Failed to create round manager", "error", err.Error())
109146
gracefulExit(asyncLogger, 1)

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ require (
1212
github.com/stretchr/testify v1.11.1
1313
github.com/testcontainers/testcontainers-go v0.39.0
1414
github.com/testcontainers/testcontainers-go/modules/mongodb v0.39.0
15-
github.com/unicitynetwork/bft-core v1.0.2-0.20250618101015-e1316ad51f4e
16-
github.com/unicitynetwork/bft-go-base v1.0.2
15+
github.com/unicitynetwork/bft-core v1.0.2-0.20251126094351-51daa89c8c43
16+
github.com/unicitynetwork/bft-go-base v1.0.3-0.20251125134602-9545226e4709
1717
go.mongodb.org/mongo-driver v1.17.4
1818
go.opentelemetry.io/otel/metric v1.38.0
1919
go.opentelemetry.io/otel/trace v1.38.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -559,10 +559,10 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
559559
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
560560
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
561561
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
562-
github.com/unicitynetwork/bft-core v1.0.2-0.20250618101015-e1316ad51f4e h1:C8emZcWmMPG9uEfLW3Jk1jt7IcDdGo6g7KMxer7w1Aw=
563-
github.com/unicitynetwork/bft-core v1.0.2-0.20250618101015-e1316ad51f4e/go.mod h1:oHcvP+Jp8N4SzedwzxWfq7SfOAzgV0si6WsQBNYVQIo=
564-
github.com/unicitynetwork/bft-go-base v1.0.2 h1:Z5stN59ZP3psY5RDH4VFDyg5HXDsePv27uqvP56mNGw=
565-
github.com/unicitynetwork/bft-go-base v1.0.2/go.mod h1:hBnOG52VRy/vpgIBUulTgk7PBTwODZ2xkVjCEu5yRcQ=
562+
github.com/unicitynetwork/bft-core v1.0.2-0.20251126094351-51daa89c8c43 h1:oWVHqRyI+QeldeziycQ9Qs0g3PiJN3YlNxYEBFUHxUQ=
563+
github.com/unicitynetwork/bft-core v1.0.2-0.20251126094351-51daa89c8c43/go.mod h1:9qPdOWF7Hr2k2owuT4L4OpndbGcdZSdymjfbxiyp3Is=
564+
github.com/unicitynetwork/bft-go-base v1.0.3-0.20251125134602-9545226e4709 h1:otZGyxGVw63HpZBIVyyKNoJorukL7xNXDhKoWxCqbtc=
565+
github.com/unicitynetwork/bft-go-base v1.0.3-0.20251125134602-9545226e4709/go.mod h1:hBnOG52VRy/vpgIBUulTgk7PBTwODZ2xkVjCEu5yRcQ=
566566
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
567567
github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
568568
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=

internal/bft/client.go

Lines changed: 78 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@ package bft
33
import (
44
"bytes"
55
"context"
6+
"crypto"
67
"encoding/hex"
78
"errors"
89
"fmt"
910
"math/big"
1011
"sync"
1112
"sync/atomic"
13+
"time"
1214

13-
"github.com/libp2p/go-libp2p/core/peer"
1415
"github.com/unicitynetwork/bft-core/network"
1516
"github.com/unicitynetwork/bft-core/network/protocol/certification"
1617
"github.com/unicitynetwork/bft-core/network/protocol/handshake"
17-
"github.com/unicitynetwork/bft-go-base/crypto"
18+
cryptobft "github.com/unicitynetwork/bft-go-base/crypto"
1819
"github.com/unicitynetwork/bft-go-base/types"
1920

2021
"github.com/unicitynetwork/aggregator-go/internal/config"
@@ -38,12 +39,11 @@ type (
3839
shardID types.ShardID
3940
logger *logger.Logger
4041

41-
// mutex for peer, network, signer, rootNodes
42-
mu sync.Mutex
43-
peer *network.Peer
44-
network *BftNetwork
45-
rootNodes peer.IDSlice
46-
signer crypto.Signer
42+
// mutex for peer, network, signer TODO: there are readers without mutex
43+
mu sync.Mutex
44+
peer *network.Peer
45+
network *BftNetwork
46+
signer cryptobft.Signer
4747

4848
// Latest UC this node has seen. Can be ahead of the committed UC during recovery.
4949
luc atomic.Pointer[types.UnicityCertificate]
@@ -57,6 +57,11 @@ type (
5757
ucProcessingMutex sync.Mutex
5858

5959
msgLoopCancelFn context.CancelFunc
60+
61+
// timestamp when last UC was received
62+
lastCertResponseTime atomic.Int64
63+
64+
trustBaseStore TrustBaseStore
6065
}
6166

6267
BFTClient interface {
@@ -70,19 +75,26 @@ type (
7075
StartNewRound(ctx context.Context, roundNumber *api.BigInt) error
7176
}
7277

78+
TrustBaseStore interface {
79+
GetByEpoch(ctx context.Context, epoch uint64) (types.RootTrustBase, error)
80+
}
81+
7382
status int
7483
)
7584

76-
func NewBFTClient(ctx context.Context, conf *config.BFTConfig, roundManager RoundManager, logger *logger.Logger) (*BFTClientImpl, error) {
85+
func NewBFTClient(conf *config.BFTConfig, roundManager RoundManager, trustBaseStore TrustBaseStore, luc *types.UnicityCertificate, logger *logger.Logger) (*BFTClientImpl, error) {
7786
logger.Info("Creating BFT Client")
7887
bftClient := &BFTClientImpl{
79-
logger: logger,
80-
partitionID: conf.ShardConf.PartitionID,
81-
shardID: conf.ShardConf.ShardID,
82-
roundManager: roundManager,
83-
conf: conf,
88+
logger: logger,
89+
partitionID: conf.ShardConf.PartitionID,
90+
shardID: conf.ShardConf.ShardID,
91+
roundManager: roundManager,
92+
trustBaseStore: trustBaseStore,
93+
conf: conf,
8494
}
8595
bftClient.status.Store(idle)
96+
bftClient.luc.Store(luc)
97+
bftClient.lastCertResponseTime.Store(time.Now().UnixMilli())
8698
return bftClient, nil
8799
}
88100

@@ -119,10 +131,6 @@ func (c *BFTClientImpl) Start(ctx context.Context) error {
119131
if err != nil {
120132
return err
121133
}
122-
rootNodes, err := c.conf.GetRootNodes()
123-
if err != nil {
124-
return fmt.Errorf("failed to get root nodes: %w", err)
125-
}
126134
signer, err := c.conf.KeyConf.Signer()
127135
if err != nil {
128136
return fmt.Errorf("failed to create signer: %w", err)
@@ -136,19 +144,20 @@ func (c *BFTClientImpl) Start(ctx context.Context) error {
136144
c.peer = self
137145
c.network = networkP2P
138146
c.signer = signer
139-
c.rootNodes = rootNodes
140147

141148
if err := c.peer.BootstrapConnect(ctx, c.logger.Logger); err != nil {
142149
return fmt.Errorf("failed to bootstrap peer: %w", err)
143150
}
144151

145-
if err := c.sendHandshake(ctx); err != nil {
146-
return fmt.Errorf("failed to send handshake: %w", err)
147-
}
148-
149152
msgLoopCtx, cancelFn := context.WithCancel(ctx)
150153
c.msgLoopCancelFn = cancelFn
151-
go c.loop(msgLoopCtx)
154+
go func() {
155+
if err := c.loop(msgLoopCtx); err != nil {
156+
c.logger.Error("BFT event loop thread exited with error", "error", err.Error())
157+
} else {
158+
c.logger.Info("BFT event loop thread finished")
159+
}
160+
}()
152161

153162
return nil
154163
}
@@ -170,16 +179,21 @@ func (c *BFTClientImpl) Stop() {
170179
c.peer = nil
171180
c.network = nil
172181
c.signer = nil
173-
c.rootNodes = nil
174182
}
175183
}
176184

177185
func (c *BFTClientImpl) sendHandshake(ctx context.Context) error {
178186
c.logger.WithContext(ctx).Debug("sending handshake to root chain")
187+
188+
// load trust base
189+
rootEpoch := c.luc.Load().GetRootEpoch()
190+
tb, err := c.trustBaseStore.GetByEpoch(ctx, rootEpoch)
191+
if err != nil {
192+
return fmt.Errorf("failed to load trust base for epoch %d: %w", rootEpoch, err)
193+
}
179194
// select some random root nodes
180-
rootIDs, err := randomNodeSelector(c.rootNodes, defaultHandshakeNodes)
195+
rootIDs, err := randomNodeSelector(tb, defaultHandshakeNodes)
181196
if err != nil {
182-
// error should only happen in case the root nodes are not initialized
183197
return fmt.Errorf("failed to select root nodes for handshake: %w", err)
184198
}
185199
if err = c.network.Send(ctx,
@@ -195,6 +209,13 @@ func (c *BFTClientImpl) sendHandshake(ctx context.Context) error {
195209
}
196210

197211
func (c *BFTClientImpl) loop(ctx context.Context) error {
212+
if err := c.sendHandshake(ctx); err != nil {
213+
return fmt.Errorf("failed to send initial handshake: %w", err)
214+
}
215+
216+
heartbeat := time.NewTicker(c.conf.HeartbeatInterval)
217+
defer heartbeat.Stop()
218+
198219
for {
199220
select {
200221
case <-ctx.Done():
@@ -205,6 +226,15 @@ func (c *BFTClientImpl) loop(ctx context.Context) error {
205226
}
206227
c.logger.WithContext(ctx).Debug("received message", "type", fmt.Sprintf("%T", m))
207228
c.handleMessage(ctx, m)
229+
case <-heartbeat.C:
230+
lastCertMillis := c.lastCertResponseTime.Load()
231+
lastCertTime := time.UnixMilli(lastCertMillis)
232+
if time.Since(lastCertTime) > c.conf.InactivityTimeout {
233+
c.logger.Warn("BFT client inactivity timeout exceeded, sending new handshake")
234+
if err := c.sendHandshake(ctx); err != nil {
235+
c.logger.Error("failed to send handshake on inactivity timeout", "error", err.Error())
236+
}
237+
}
208238
}
209239
}
210240
}
@@ -213,16 +243,30 @@ func (c *BFTClientImpl) handleMessage(ctx context.Context, msg any) {
213243
switch mt := msg.(type) {
214244
case *certification.CertificationResponse:
215245
c.logger.WithContext(ctx).Info("received CertificationResponse")
216-
c.handleCertificationResponse(ctx, mt)
246+
if err := c.handleCertificationResponse(ctx, mt); err != nil {
247+
c.logger.WithContext(ctx).Error("error processing CertificationResponse message", "error", err.Error())
248+
}
217249
default:
218250
c.logger.WithContext(ctx).Info("received unknown message")
219251
}
220252
}
221253

222254
func (c *BFTClientImpl) handleCertificationResponse(ctx context.Context, cr *certification.CertificationResponse) error {
255+
c.lastCertResponseTime.Store(time.Now().UnixMilli())
256+
223257
if err := cr.IsValid(); err != nil {
224258
return fmt.Errorf("invalid CertificationResponse: %w", err)
225259
}
260+
261+
// verify UC
262+
tb, err := c.trustBaseStore.GetByEpoch(ctx, cr.UC.GetRootEpoch())
263+
if err != nil {
264+
return fmt.Errorf("failed to load trust base for epoch %d: %w", cr.UC.GetRootEpoch(), err)
265+
}
266+
if err := cr.UC.Verify(tb, crypto.SHA256, c.partitionID, c.shardID, nil); err != nil {
267+
return fmt.Errorf("failed to verify UC: %w", err)
268+
}
269+
226270
c.logger.WithContext(ctx).Info(fmt.Sprintf("handleCertificationResponse: UC round %d, next round %d, next leader %s",
227271
cr.UC.GetRoundNumber(), cr.Technical.Round, cr.Technical.Leader))
228272

@@ -472,7 +516,12 @@ func (c *BFTClientImpl) sendCertificationRequest(ctx context.Context, rootHash s
472516
}
473517
c.logger.WithContext(ctx).Info(fmt.Sprintf("Round %d sending block certification request to root chain, IR hash %X",
474518
req.InputRecord.RoundNumber, req.InputRecord.Hash))
475-
rootIDs, err := rootNodesSelector(luc, c.rootNodes, defaultNofRootNodes)
519+
520+
tb, err := c.trustBaseStore.GetByEpoch(ctx, luc.GetRootEpoch())
521+
if err != nil {
522+
return fmt.Errorf("failed to load trust base for epoch %d: %w", luc.GetRootEpoch(), err)
523+
}
524+
rootIDs, err := rootNodesSelector(luc, tb, defaultNofRootNodes)
476525
if err != nil {
477526
return fmt.Errorf("selecting root nodes: %w", err)
478527
}

0 commit comments

Comments
 (0)