Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 78 additions & 11 deletions cmd/aggregator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import (
"github.com/unicitynetwork/bft-go-base/types"

"github.com/unicitynetwork/aggregator-go/internal/config"
"github.com/unicitynetwork/aggregator-go/internal/events"
"github.com/unicitynetwork/aggregator-go/internal/gateway"
"github.com/unicitynetwork/aggregator-go/internal/ha"
"github.com/unicitynetwork/aggregator-go/internal/ha/state"
"github.com/unicitynetwork/aggregator-go/internal/logger"
"github.com/unicitynetwork/aggregator-go/internal/round"
"github.com/unicitynetwork/aggregator-go/internal/service"
"github.com/unicitynetwork/aggregator-go/internal/smt"
"github.com/unicitynetwork/aggregator-go/internal/storage"
"github.com/unicitynetwork/aggregator-go/internal/storage/interfaces"
"github.com/unicitynetwork/aggregator-go/pkg/api"
)

// gracefulExit flushes async logger and exits with the given code
Expand Down Expand Up @@ -125,6 +128,8 @@ func main() {
// Create the shared state tracker for block sync height
stateTracker := state.NewSyncStateTracker()

eventBus := events.NewEventBus(log)

// Load last committed unicity certificate (can be nil for genesis)
var luc *types.UnicityCertificate
lastBlock, err := storageInstance.BlockStorage().GetLatest(ctx)
Expand All @@ -139,8 +144,23 @@ func main() {
}
}

// Create SMT instance based on sharding mode
var smtInstance *smt.SparseMerkleTree
switch cfg.Sharding.Mode {
case config.ShardingModeStandalone:
smtInstance = smt.NewSparseMerkleTree(api.SHA256, 16+256)
case config.ShardingModeChild:
smtInstance = smt.NewChildSparseMerkleTree(api.SHA256, 16+256, cfg.Sharding.Child.ShardID)
case config.ShardingModeParent:
smtInstance = smt.NewParentSparseMerkleTree(api.SHA256, cfg.Sharding.ShardIDLength)
default:
log.WithComponent("main").Error("Unsupported sharding mode", "mode", cfg.Sharding.Mode)
gracefulExit(asyncLogger, 1)
}
threadSafeSmt := smt.NewThreadSafeSMT(smtInstance)

// Create round manager based on sharding mode
roundManager, err := round.NewManager(ctx, cfg, log, commitmentQueue, storageInstance, stateTracker, luc)
roundManager, err := round.NewManager(ctx, cfg, log, commitmentQueue, storageInstance, stateTracker, luc, eventBus, threadSafeSmt)
if err != nil {
log.WithComponent("main").Error("Failed to create round manager", "error", err.Error())
gracefulExit(asyncLogger, 1)
Expand All @@ -152,23 +172,30 @@ func main() {
gracefulExit(asyncLogger, 1)
}

// Initialize leader selector and HA Manager if enabled
// Initialize leader selector and block syncer if enabled
var ls leaderSelector
var haManager *ha.HAManager
var bs *ha.BlockSyncer
if cfg.HA.Enabled {
log.WithComponent("main").Info("High availability mode enabled")
ls = ha.NewLeaderElection(log, cfg.HA, storageInstance.LeadershipStorage())
ls = ha.NewLeaderElection(log, cfg.HA, storageInstance.LeadershipStorage(), eventBus)
ls.Start(ctx)

// Disable block syncing for parent aggregator mode
// Parent mode uses state-based SMT (current shard roots) rather than history-based (commitment leaves)
disableBlockSync := cfg.Sharding.Mode == config.ShardingModeParent
if disableBlockSync {
if cfg.Sharding.Mode == config.ShardingModeParent {
log.WithComponent("main").Info("Block syncing disabled for parent aggregator mode - SMT will be reconstructed on leadership transition")
} else {
log.WithComponent("main").Info("Starting block syncer")
bs = ha.NewBlockSyncer(log, ls, storageInstance, threadSafeSmt, cfg.Sharding.Child.ShardID, cfg.Processing.RoundDuration, stateTracker)
bs.Start(ctx)
}

haManager = ha.NewHAManager(log, roundManager, ls, storageInstance, roundManager.GetSMT(), cfg.Sharding.Child.ShardID, stateTracker, cfg.Processing.RoundDuration, disableBlockSync)
haManager.Start(ctx)
// In HA mode, listen for leadership changes to activate/deactivate the round manager
go func() {
if err := startLeaderChangedEventListener(ctx, log, cfg, roundManager, bs, eventBus); err != nil {
log.WithComponent("ha-listener").Error("Fatal error on leader changed event listener", "error", err.Error())
}
}()
} else {
log.WithComponent("main").Info("High availability mode is disabled, running as standalone leader")
// In non-HA mode, activate the round manager directly
Expand Down Expand Up @@ -212,9 +239,9 @@ func main() {
log.WithComponent("main").Error("Failed to stop server gracefully", "error", err.Error())
}

// Stop HA Manager if it was started
if haManager != nil {
haManager.Stop()
// Stop block syncer if it was started
if bs != nil {
bs.Stop()
}

// Stop leader selector if it was started
Expand Down Expand Up @@ -243,6 +270,46 @@ func main() {
}
}

func startLeaderChangedEventListener(ctx context.Context, log *logger.Logger, cfg *config.Config, roundManager round.Manager, bs *ha.BlockSyncer, eventBus *events.EventBus) error {
log.WithComponent("ha-listener").Info("Subscribing to TopicLeaderChanged")
leaderChangedCh := eventBus.Subscribe(events.TopicLeaderChanged)
defer func() {
if err := eventBus.Unsubscribe(events.TopicLeaderChanged, leaderChangedCh); err != nil {
log.WithComponent("ha-listener").Error("Failed to unsubscribe from TopicLeaderChanged", "error", err)
}
}()

for {
select {
case <-ctx.Done():
return nil
case e := <-leaderChangedCh:
evt := e.(*events.LeaderChangedEvent)
log.WithComponent("ha-listener").Info("Received LeaderChangedEvent", "isLeader", evt.IsLeader)
if evt.IsLeader {
// In child and standalone mode, we must sync SMT state before starting to produce blocks
// In parent mode, the Activate call handles SMT reconstruction, no block sync needed.
if cfg.Sharding.Mode != config.ShardingModeParent {
log.WithComponent("ha-listener").Info("Becoming leader, syncing to latest block...")
if err := bs.SyncToLatestBlock(ctx); err != nil {
log.WithComponent("ha-listener").Error("failed to sync to latest block on leadership change", "error", err)
continue
} else {
log.WithComponent("ha-listener").Info("Sync complete.")
}
}
if err := roundManager.Activate(ctx); err != nil {
log.WithComponent("ha-listener").Error("Failed to activate round manager", "error", err)
}
} else {
if err := roundManager.Deactivate(ctx); err != nil {
log.WithComponent("ha-listener").Error("Failed to deactivate round manager", "error", err)
}
}
}
}
}

type leaderSelector interface {
IsLeader(ctx context.Context) (bool, error)
Start(ctx context.Context)
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ services:
platform: linux/amd64
user: "${USER_UID:-1001}:${USER_GID:-1001}"
# https://github.com/unicitynetwork/bft-core/pkgs/container/bft-core
image: ghcr.io/unicitynetwork/bft-core:49d48f8fd3686aff1066d98eff2512e5fc71713c
image: ghcr.io/unicitynetwork/bft-core:ff28a5da20d4eb3f664264a136303a4e79a88e4c

bft-root:
<<: *bft-base
Expand Down Expand Up @@ -182,7 +182,7 @@ services:
BFT_ENABLED: "true"
BFT_KEY_CONF_FILE: "/app/bft-config/aggregator/keys.json"
BFT_SHARD_CONF_FILE: "/app/bft-config/shard-conf-7_0.json"
BFT_TRUST_BASE_FILE: "/app/bft-config/trust-base.json"
BFT_TRUST_BASE_FILES: "/app/bft-config/trust-base.json"
# BFT_BOOTSTRAP_ADDRESSES will be set dynamically by the entrypoint script

entrypoint: ["/bin/sh", "-c"]
Expand Down
16 changes: 8 additions & 8 deletions ha-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ services:
platform: linux/amd64
user: "${USER_UID:-1001}:${USER_GID:-1001}"
# https://github.com/unicitynetwork/bft-core/pkgs/container/bft-core
image: ghcr.io/unicitynetwork/bft-core:49d48f8fd3686aff1066d98eff2512e5fc71713c
image: ghcr.io/unicitynetwork/bft-core:ff28a5da20d4eb3f664264a136303a4e79a88e4c
bft-root:
<<: *bft-base
volumes:
Expand All @@ -28,7 +28,7 @@ services:
ubft trust-base sign --home /genesis/root --trust-base /genesis/trust-base.json
fi
echo "Starting root node..." &&
ubft root-node run --home /genesis/root --address "/ip4/$(hostname -i)/tcp/8000" --trust-base /genesis/trust-base.json --rpc-server-address "$(hostname -i):8002" &&
ubft root-node run --home /genesis/root --address "/ip4/$(hostname -i)/tcp/8000" --trust-base /genesis/trust-base.json --rpc-server-address "$(hostname -i):8002" --log-level debug &&
ls -l /genesis/root
echo "Root node started successfully."

Expand Down Expand Up @@ -100,7 +100,7 @@ services:

mongo1:
image: mongo:7.0
container_name: mongo-1
container_name: mongo1
command: ["--replSet", "rs0", "--bind_ip_all", "--noauth"]
user: "${USER_UID:-1001}:${USER_GID:-1001}"
networks:
Expand All @@ -116,7 +116,7 @@ services:

mongo2:
image: mongo:7.0
container_name: mongo-2
container_name: mongo2
command: ["--replSet", "rs0", "--bind_ip_all", "--noauth"]
user: "${USER_UID:-1001}:${USER_GID:-1001}"
networks:
Expand All @@ -132,7 +132,7 @@ services:

mongo3:
image: mongo:7.0
container_name: mongo-3
container_name: mongo3
command: ["--replSet", "rs0", "--bind_ip_all", "--noauth"]
user: "${USER_UID:-1001}:${USER_GID:-1001}"
networks:
Expand Down Expand Up @@ -225,8 +225,8 @@ services:
# Database Configuration
MONGODB_URI: "mongodb://mongo1:27017,mongo2:27017,mongo3:27017/aggregator?replicaSet=rs0"
MONGODB_DATABASE: "aggregator"
MONGODB_CONNECT_TIMEOUT: "10s"
MONGODB_SERVER_SELECTION_TIMEOUT: "5s"
MONGODB_CONNECT_TIMEOUT: "30s"
MONGODB_SERVER_SELECTION_TIMEOUT: "30s"

# Redis Configuration
REDIS_HOST: "redis"
Expand Down Expand Up @@ -261,7 +261,7 @@ services:
BFT_ENABLED: "true"
BFT_KEY_CONF_FILE: "/app/bft-config/aggregator/keys.json"
BFT_SHARD_CONF_FILE: "/app/bft-config/shard-conf-7_0.json"
BFT_TRUST_BASE_FILE: "/app/bft-config/trust-base.json"
BFT_TRUST_BASE_FILES: "/app/bft-config/trust-base.json"
# BFT_BOOTSTRAP_ADDRESSES will be set dynamically by the entrypoint script
entrypoint: ["/bin/sh", "-c"]
command:
Expand Down
26 changes: 24 additions & 2 deletions internal/bft/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/unicitynetwork/bft-go-base/types"

"github.com/unicitynetwork/aggregator-go/internal/config"
"github.com/unicitynetwork/aggregator-go/internal/events"
"github.com/unicitynetwork/aggregator-go/internal/logger"
"github.com/unicitynetwork/aggregator-go/internal/models"
"github.com/unicitynetwork/aggregator-go/pkg/api"
Expand All @@ -30,14 +31,15 @@ const (
normal
)

// BFTRootChainClient handles communication with the BFT root chain via P2P network
// BFTClientImpl handles communication with the BFT root chain via P2P network
type (
BFTClientImpl struct {
conf *config.BFTConfig
status atomic.Value
partitionID types.PartitionID
shardID types.ShardID
logger *logger.Logger
eventBus *events.EventBus

// mutex for peer, network, signer TODO: there are readers without mutex
mu sync.Mutex
Expand Down Expand Up @@ -82,7 +84,15 @@ type (
status int
)

func NewBFTClient(conf *config.BFTConfig, roundManager RoundManager, trustBaseStore TrustBaseStore, luc *types.UnicityCertificate, logger *logger.Logger) (*BFTClientImpl, error) {
func NewBFTClient(
ctx context.Context,
conf *config.BFTConfig,
roundManager RoundManager,
trustBaseStore TrustBaseStore,
luc *types.UnicityCertificate,
logger *logger.Logger,
eventBus *events.EventBus,
) (*BFTClientImpl, error) {
logger.Info("Creating BFT Client")
bftClient := &BFTClientImpl{
logger: logger,
Expand All @@ -91,6 +101,7 @@ func NewBFTClient(conf *config.BFTConfig, roundManager RoundManager, trustBaseSt
roundManager: roundManager,
trustBaseStore: trustBaseStore,
conf: conf,
eventBus: eventBus,
}
bftClient.status.Store(idle)
bftClient.luc.Store(luc)
Expand Down Expand Up @@ -121,6 +132,10 @@ func (c *BFTClientImpl) Start(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()

if c.status.Load().(status) != idle {
c.logger.WithContext(ctx).Warn("BFT Client is not idle, skipping start")
return nil
}
c.status.Store(initializing)

peerConf, err := c.conf.PeerConf()
Expand Down Expand Up @@ -152,6 +167,7 @@ func (c *BFTClientImpl) Start(ctx context.Context) error {
msgLoopCtx, cancelFn := context.WithCancel(ctx)
c.msgLoopCancelFn = cancelFn
go func() {
c.logger.WithContext(ctx).Info("BFT client event loop started")
if err := c.loop(msgLoopCtx); err != nil {
c.logger.Error("BFT event loop thread exited with error", "error", err.Error())
} else {
Expand All @@ -167,10 +183,16 @@ func (c *BFTClientImpl) Stop() {
c.mu.Lock()
defer c.mu.Unlock()

if c.status.Load().(status) == idle {
c.logger.Warn("BFT Client is already idle, skipping stop")
return
}

c.status.Store(idle)

if c.msgLoopCancelFn != nil {
c.msgLoopCancelFn()
c.msgLoopCancelFn = nil
}
if c.peer != nil {
if err := c.peer.Close(); err != nil {
Expand Down
78 changes: 78 additions & 0 deletions internal/events/event_bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package events

import (
"fmt"
"slices"
"sync"

"github.com/unicitynetwork/aggregator-go/internal/logger"
)

type (
Event interface{}

Topic string

EventBus struct {
logger *logger.Logger

mu sync.RWMutex
subscribers map[Topic][]chan Event
}
)

func NewEventBus(log *logger.Logger) *EventBus {
return &EventBus{
logger: log,
subscribers: make(map[Topic][]chan Event),
}
}

// Subscribe creates a channel, adds it to the subscribers list, and returns it to the caller.
func (bus *EventBus) Subscribe(topic Topic) <-chan Event {
bus.mu.Lock()
defer bus.mu.Unlock()

ch := make(chan Event, 1)
bus.subscribers[topic] = append(bus.subscribers[topic], ch)
return ch
}

// Publish sends the event to all subscribers.
// If the subscriber is busy then the event is dropped.
func (bus *EventBus) Publish(topic Topic, event Event) {
bus.mu.RLock()
defer bus.mu.RUnlock()

subscribers, found := bus.subscribers[topic]
if !found {
bus.logger.Warn("Event not published, no subscriber found", "topic", topic)
return
}
for _, sub := range subscribers {
select {
case sub <- event:
default:
bus.logger.Warn("Dropped event for a slow subscriber", "topic", topic, "event", event)
}
}
}

// Unsubscribe removes the subscriber from the subscribers list,
// returns error if the provided topic does not exist or the subscriber was not found.
func (bus *EventBus) Unsubscribe(topic Topic, sub <-chan Event) error {
bus.mu.Lock()
defer bus.mu.Unlock()

subs, found := bus.subscribers[topic]
if !found {
return fmt.Errorf("topic not found: %s", topic)
}
for i, s := range subs {
if s == sub {
bus.subscribers[topic] = slices.Delete(subs, i, i+1)
return nil
}
}
return fmt.Errorf("subscriber not found for topic: %s", topic)
}
Loading