diff --git a/graft/coreth/plugin/evm/atomic/sync/syncer.go b/graft/coreth/plugin/evm/atomic/sync/syncer.go index 17099e545838..f56c353b4748 100644 --- a/graft/coreth/plugin/evm/atomic/sync/syncer.go +++ b/graft/coreth/plugin/evm/atomic/sync/syncer.go @@ -146,6 +146,14 @@ func (s *Syncer) Sync(ctx context.Context) error { return s.syncer.Sync(ctx) } +func (*Syncer) UpdateTarget(_ message.Syncable) error { + return nil +} + +func (*Syncer) Finalize(_ context.Context) error { + return nil +} + // addZeroes returns the big-endian representation of `height`, prefixed with [common.HashLength] zeroes. func addZeroes(height uint64) []byte { // Key format is [height(8 bytes)][blockchainID(32 bytes)]. Start should be the diff --git a/graft/coreth/plugin/evm/vmsync/block_queue.go b/graft/coreth/plugin/evm/vmsync/block_queue.go new file mode 100644 index 000000000000..9a8fe86beaa3 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/block_queue.go @@ -0,0 +1,94 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import "sync" + +// BlockOperationType represents the type of operation to perform on a block. +type BlockOperationType int + +const ( + OpAccept BlockOperationType = iota + OpReject + OpVerify +) + +// String returns the string representation of the block operation. +func (op BlockOperationType) String() string { + switch op { + case OpAccept: + return "accept" + case OpReject: + return "reject" + case OpVerify: + return "verify" + default: + return "unknown" + } +} + +// blockOperation represents a queued block operation. +type blockOperation struct { + block EthBlockWrapper + operation BlockOperationType +} + +// blockQueue buffers block operations (accept/reject/verify) that arrive while +// the coordinator is in the Running state. Operations are processed in FIFO order. +// It is cleared (drained) on UpdateSyncTarget to avoid drops and is snapshotted +// at finalization via DequeueBatch. Enqueue is always allowed; a DequeueBatch +// only captures the current buffered operations and clears them, and new enqueues +// after the snapshot are not part of that batch. +type blockQueue struct { + mu sync.Mutex + // buffered operations accumulated before finalization + items []blockOperation +} + +// newBlockQueue creates a new empty queue. +func newBlockQueue() *blockQueue { + return &blockQueue{} +} + +// enqueue appends a block operation to the buffer. Returns true if the operation +// was queued, false if the block is nil. +func (q *blockQueue) enqueue(b EthBlockWrapper, op BlockOperationType) bool { + if b == nil { + return false + } + q.mu.Lock() + defer q.mu.Unlock() + q.items = append(q.items, blockOperation{ + block: b, + operation: op, + }) + return true +} + +// dequeueBatch returns the current buffered operations and clears the buffer. New +// arrivals after the snapshot are not included and remain buffered for later. +func (q *blockQueue) dequeueBatch() []blockOperation { + q.mu.Lock() + defer q.mu.Unlock() + out := q.items + q.items = nil + return out +} + +// removeBelowHeight removes all queued blocks with height <= targetHeight. +// This is called after UpdateSyncTarget to remove blocks that will never be executed +// because the sync target has advanced past them. +func (q *blockQueue) removeBelowHeight(targetHeight uint64) { + q.mu.Lock() + defer q.mu.Unlock() + + filtered := q.items[:0] + for _, op := range q.items { + ethBlock := op.block.GetEthBlock() + if ethBlock != nil && ethBlock.NumberU64() > targetHeight { + filtered = append(filtered, op) + } + } + q.items = filtered +} diff --git a/graft/coreth/plugin/evm/vmsync/block_queue_test.go b/graft/coreth/plugin/evm/vmsync/block_queue_test.go new file mode 100644 index 000000000000..f361575af315 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/block_queue_test.go @@ -0,0 +1,76 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBlockQueue_EnqueueAndDequeue(t *testing.T) { + q := newBlockQueue() + + // Nil block should be rejected. + require.False(t, q.enqueue(nil, OpAccept)) + + // Enqueue blocks. + for i := uint64(100); i < 105; i++ { + require.True(t, q.enqueue(newMockBlock(i), OpAccept)) + } + + // Dequeue returns all in FIFO order and clears queue. + batch := q.dequeueBatch() + require.Len(t, batch, 5) + for i, op := range batch { + require.Equal(t, uint64(100+i), op.block.GetEthBlock().NumberU64()) + } + + // Queue is now empty. + require.Empty(t, q.dequeueBatch()) +} + +func TestBlockQueue_RemoveBelowHeight(t *testing.T) { + q := newBlockQueue() + + // Enqueue blocks at heights 100-110. + for i := uint64(100); i <= 110; i++ { + q.enqueue(newMockBlock(i), OpAccept) + } + + // Remove blocks at or below height 105. + q.removeBelowHeight(105) + + // Only blocks > 105 should remain (106, 107, 108, 109, 110). + batch := q.dequeueBatch() + require.Len(t, batch, 5) + require.Equal(t, uint64(106), batch[0].block.GetEthBlock().NumberU64()) +} + +func TestBlockQueue_ConcurrentAccess(t *testing.T) { + t.Parallel() + + q := newBlockQueue() + const numGoroutines = 10 + const numOps = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for g := 0; g < numGoroutines; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < numOps; i++ { + q.enqueue(newMockBlock(uint64(id*numOps+i)), OpAccept) + } + }(g) + } + + wg.Wait() + + // All operations should have been enqueued. + batch := q.dequeueBatch() + require.Len(t, batch, numGoroutines*numOps) +} diff --git a/graft/coreth/plugin/evm/vmsync/client.go b/graft/coreth/plugin/evm/vmsync/client.go index c2e77a7c18b8..e567e5d259ba 100644 --- a/graft/coreth/plugin/evm/vmsync/client.go +++ b/graft/coreth/plugin/evm/vmsync/client.go @@ -5,10 +5,10 @@ package vmsync import ( "context" - "errors" "fmt" "sync" + "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/log" @@ -31,23 +31,38 @@ import ( // The last 256 block hashes are necessary to support the BLOCKHASH opcode. const BlocksToFetch = 256 -var ( - errSkipSync = errors.New("skip sync") - stateSyncSummaryKey = []byte("stateSyncSummary") -) +var stateSyncSummaryKey = []byte("stateSyncSummary") + +// SyncStrategy defines how state sync is executed. +// Implementations handle syncer orchestration and block processing during sync. +type SyncStrategy interface { + // Start begins sync and blocks until completion or error. + Start(ctx context.Context, summary message.Syncable) error + + // OnBlockAccepted handles a block accepted during sync. + OnBlockAccepted(EthBlockWrapper) (bool, error) + + // OnBlockRejected handles a block rejected during sync. + OnBlockRejected(EthBlockWrapper) (bool, error) + + // OnBlockVerified handles a block verified during sync. + OnBlockVerified(EthBlockWrapper) (bool, error) +} // BlockAcceptor provides a mechanism to update the last accepted block ID during state synchronization. -// This interface is used by the state sync process to ensure the blockchain state -// is properly updated when new blocks are synchronized from the network. type BlockAcceptor interface { PutLastAcceptedID(ids.ID) error } -// SyncStrategy defines how state sync is executed. -// Implementations handle the sync lifecycle differently based on sync mode. -type SyncStrategy interface { - // Start begins the sync process and blocks until completion or error. - Start(ctx context.Context, summary message.Syncable) error +// EthBlockWrapper can be implemented by a concrete block wrapper type to +// return *types.Block, which is needed to update chain pointers at the +// end of the sync operation. It also provides Accept/Reject/Verify operations +// for deferred processing during dynamic state sync. +type EthBlockWrapper interface { + GetEthBlock() *types.Block + Accept(context.Context) error + Reject(context.Context) error + Verify(context.Context) error } type ClientConfig struct { @@ -74,6 +89,11 @@ type ClientConfig struct { RequestSize uint16 // number of key/value pairs to ask peers for per request Enabled bool SkipResume bool + // DynamicStateSyncEnabled toggles dynamic vs static state sync orchestration. + DynamicStateSyncEnabled bool + + // PivotInterval advances the sync target every N blocks. + PivotInterval uint64 } type client struct { @@ -82,12 +102,12 @@ type client struct { cancel context.CancelFunc wg sync.WaitGroup err error + stateSyncOnce sync.Once // ensures only one state sync can be in progress at a time + strategy SyncStrategy // strategy manages sync execution (static or dynamic) } func NewClient(config *ClientConfig) Client { - return &client{ - config: config, - } + return &client{config: config} } type Client interface { @@ -100,6 +120,15 @@ type Client interface { ClearOngoingSummary() error Shutdown() error Error() error + // OnEngineAccept should be called by the engine when a block is accepted. + // Returns true if the block was enqueued for deferred processing, false otherwise. + OnEngineAccept(EthBlockWrapper) (bool, error) + // OnEngineReject should be called by the engine when a block is rejected. + // Returns true if the block was enqueued for deferred processing, false otherwise. + OnEngineReject(EthBlockWrapper) (bool, error) + // OnEngineVerify should be called by the engine when a block is verified. + // Returns true if the block was enqueued for deferred processing, false otherwise. + OnEngineVerify(EthBlockWrapper) (bool, error) } // StateSyncEnabled returns [client.enabled], which is set in the chain's config file. @@ -145,51 +174,64 @@ func (c *client) ParseStateSummary(_ context.Context, summaryBytes []byte) (bloc return c.config.Parser.Parse(summaryBytes, c.acceptSyncSummary) } -// acceptSyncSummary returns true if sync will be performed and launches the state sync process -// in a goroutine. -func (c *client) acceptSyncSummary(summary message.Syncable) (block.StateSyncMode, error) { - if err := c.prepareForSync(summary); err != nil { - if errors.Is(err, errSkipSync) { - return block.StateSyncSkipped, nil - } - return block.StateSyncSkipped, err +// OnEngineAccept delegates to the strategy if active. +func (c *client) OnEngineAccept(b EthBlockWrapper) (bool, error) { + if c.strategy == nil { + return false, nil } + return c.strategy.OnBlockAccepted(b) +} - registry, err := c.newSyncerRegistry(summary) - if err != nil { - return block.StateSyncSkipped, fmt.Errorf("failed to create syncer registry: %w", err) +// OnEngineReject delegates to the strategy if active. +func (c *client) OnEngineReject(b EthBlockWrapper) (bool, error) { + if c.strategy == nil { + return false, nil } + return c.strategy.OnBlockRejected(b) +} - finalizer := newFinalizer( - c.config.Chain, - c.config.State, - c.config.Acceptor, - c.config.VerDB, - c.config.MetadataDB, - c.config.Extender, - c.config.LastAcceptedHeight, - ) +// OnEngineVerify delegates to the strategy if active. +func (c *client) OnEngineVerify(b EthBlockWrapper) (bool, error) { + if c.strategy == nil { + return false, nil + } + return c.strategy.OnBlockVerified(b) +} - strategy := newStaticStrategy(registry, finalizer) +func (c *client) Shutdown() error { + c.signalDone(context.Canceled) + c.wg.Wait() + return nil +} - return c.startAsync(strategy), nil +// Error returns a non-nil error if one occurred during the sync. +func (c *client) Error() error { + return c.err } -// prepareForSync handles resume check and snapshot wipe before sync starts. -func (c *client) prepareForSync(summary message.Syncable) error { +// acceptSyncSummary returns true if sync will be performed and launches the state sync process +// in a goroutine. +func (c *client) acceptSyncSummary(proposedSummary message.Syncable) (block.StateSyncMode, error) { + // If dynamic sync is already running, treat new summaries as target updates. + if ds, ok := c.strategy.(*dynamicStrategy); ok && ds.CurrentState() == StateRunning { + if err := ds.UpdateSyncTarget(proposedSummary); err != nil { + return block.StateSyncSkipped, err + } + return block.StateSyncDynamic, nil + } + isResume := c.resumableSummary != nil && - summary.GetBlockHash() == c.resumableSummary.GetBlockHash() + proposedSummary.GetBlockHash() == c.resumableSummary.GetBlockHash() if !isResume { // Skip syncing if the blockchain is not significantly ahead of local state, // since bootstrapping would be faster. - // (Also ensures we don't sync to a height prior to local state.) - if c.config.LastAcceptedHeight+c.config.MinBlocks > summary.Height() { + if c.config.LastAcceptedHeight+c.config.MinBlocks > proposedSummary.Height() { log.Info( "last accepted too close to most recent syncable block, skipping state sync", "lastAccepted", c.config.LastAcceptedHeight, - "syncableHeight", summary.Height(), + "syncableHeight", proposedSummary.Height(), ) - return errSkipSync + return block.StateSyncSkipped, nil } // Wipe the snapshot completely if we are not resuming from an existing sync, so that we do not @@ -205,54 +247,68 @@ func (c *client) prepareForSync(summary message.Syncable) error { snapshot.ResetSnapshotGeneration(c.config.ChainDB) } - // Update the current state sync summary key in the database - // Note: this must be performed after WipeSnapshot finishes so that we do not start a state sync - // session from a partially wiped snapshot. - if err := c.config.MetadataDB.Put(stateSyncSummaryKey, summary.Bytes()); err != nil { - return fmt.Errorf("failed to write state sync summary key to disk: %w", err) + // Update the current state sync summary key in the database. + if err := c.config.MetadataDB.Put(stateSyncSummaryKey, proposedSummary.Bytes()); err != nil { + return block.StateSyncSkipped, fmt.Errorf("failed to write state sync summary key to disk: %w", err) } if err := c.config.VerDB.Commit(); err != nil { - return fmt.Errorf("failed to commit db: %w", err) + return block.StateSyncSkipped, fmt.Errorf("failed to commit db: %w", err) } - return nil -} - -// startAsync launches the sync strategy in a background goroutine. -func (c *client) startAsync(strategy SyncStrategy) block.StateSyncMode { + log.Info("Starting state sync", "summary", proposedSummary.GetBlockHash().Hex(), "height", proposedSummary.Height()) ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel + registry, err := c.newSyncerRegistry(proposedSummary) + if err != nil { + return block.StateSyncSkipped, err + } + + finalizer := newFinalizer( + c.config.Chain, + c.config.State, + c.config.Acceptor, + c.config.VerDB, + c.config.MetadataDB, + c.config.Extender, + c.config.LastAcceptedHeight, + ) + + var ( + strategy SyncStrategy + mode block.StateSyncMode + ) + if c.config.DynamicStateSyncEnabled { + strategy = newDynamicStrategy(registry, finalizer, c.config.PivotInterval) + mode = block.StateSyncDynamic + } else { + strategy = newStaticStrategy(registry, finalizer) + mode = block.StateSyncStatic + } + + c.strategy = strategy c.wg.Add(1) go func() { defer c.wg.Done() - defer cancel() - - if err := strategy.Start(ctx, c.resumableSummary); err != nil { - c.err = err - } - // notify engine regardless of whether err == nil, - // this error will be propagated to the engine when it calls - // vm.SetState(snow.Bootstrapping) - log.Info("state sync completed, notifying engine", "err", c.err) - close(c.config.StateSyncDone) + err := strategy.Start(ctx, proposedSummary) + c.signalDone(err) }() - log.Info("state sync started", "mode", block.StateSyncStatic) - return block.StateSyncStatic + log.Info("state sync started", "mode", mode.String(), "summary", proposedSummary.GetBlockHash().Hex(), "height", proposedSummary.Height()) + return mode, nil } -func (c *client) Shutdown() error { - if c.cancel != nil { - c.cancel() - } - c.wg.Wait() // wait for the background goroutine to exit - return nil +// signalDone sets the terminal error exactly once, signals completion to the engine. +func (c *client) signalDone(err error) { + c.stateSyncOnce.Do(func() { + c.err = err + if c.cancel != nil { + c.cancel() + } + close(c.config.StateSyncDone) + }) } -// Error returns a non-nil error if one occurred during the sync. -func (c *client) Error() error { return c.err } - // newSyncerRegistry creates a registry with all required syncers for the given summary. func (c *client) newSyncerRegistry(summary message.Syncable) (*SyncerRegistry, error) { registry := NewSyncerRegistry() diff --git a/graft/coreth/plugin/evm/vmsync/coordinator.go b/graft/coreth/plugin/evm/vmsync/coordinator.go new file mode 100644 index 000000000000..7196c278db83 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/coordinator.go @@ -0,0 +1,243 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "github.com/ava-labs/libevm/libevm/options" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" +) + +// State represents the lifecycle phases of dynamic state sync orchestration. +type State int + +const ( + StateIdle State = iota + StateInitializing + StateRunning + StateFinalizing + StateExecutingBatch + StateCompleted + StateAborted +) + +var ( + errInvalidTargetType = errors.New("invalid target type") + errInvalidState = errors.New("invalid coordinator state") + errBatchCancelled = errors.New("batch execution cancelled") + errBatchOperationFailed = errors.New("batch operation failed") +) + +// Callbacks allows the coordinator to delegate VM-specific work back to the client. +type Callbacks struct { + // FinalizeVM performs the same actions as finishSync/commitVMMarkers in the client. + // The context is used for cancellation checks during finalization. + FinalizeVM func(ctx context.Context, target message.Syncable) error + // OnDone is called when the coordinator finishes (successfully or with error). + OnDone func(err error) +} + +// Coordinator orchestrates dynamic state sync across multiple syncers. +type Coordinator struct { + // state is managed atomically to allow cheap concurrent checks/updates. + state atomic.Int32 + // target stores the current target [message.Syncable] when [Coordinator.UpdateSyncTarget] is called. + target atomic.Value + queue *blockQueue + syncerRegistry *SyncerRegistry + callbacks Callbacks + // doneOnce ensures [Callbacks.OnDone] is invoked at most once. + doneOnce sync.Once + + // pivotInterval configures the pivot policy throttling. 0 disables throttling. + pivotInterval uint64 + pivot *pivotPolicy +} + +// CoordinatorOption follows the functional options pattern for Coordinator. +type CoordinatorOption = options.Option[Coordinator] + +// WithPivotInterval configures the interval-based pivot policy. 0 disables it. +func WithPivotInterval(interval uint64) CoordinatorOption { + return options.Func[Coordinator](func(co *Coordinator) { + co.pivotInterval = interval + }) +} + +// NewCoordinator constructs a coordinator to orchestrate dynamic state sync across multiple syncers. +func NewCoordinator(syncerRegistry *SyncerRegistry, cbs Callbacks, opts ...CoordinatorOption) *Coordinator { + co := &Coordinator{ + queue: newBlockQueue(), + syncerRegistry: syncerRegistry, + callbacks: cbs, + } + options.ApplyTo(co, opts...) + co.state.Store(int32(StateIdle)) + + return co +} + +// Start launches all syncers and returns immediately. Failures are monitored +// in the background and will transition to [StateAborted]. +func (co *Coordinator) Start(ctx context.Context, initial message.Syncable) { + co.state.Store(int32(StateInitializing)) + co.target.Store(initial) + co.pivot = newPivotPolicy(co.pivotInterval) + + cctx, cancel := context.WithCancelCause(ctx) + g := co.syncerRegistry.StartAsync(cctx, initial) + + co.state.Store(int32(StateRunning)) + + go func() { + if err := g.Wait(); err != nil { + co.finish(cancel, err) + return + } + // All syncers finished successfully: finalize syncers, then finalize VM and execute the queued batch. + if err := co.syncerRegistry.FinalizeAll(cctx); err != nil { + co.finish(cancel, err) + return + } + if err := co.ProcessQueuedBlockOperations(cctx); err != nil { + co.finish(cancel, err) + return + } + co.finish(cancel, nil) + }() +} + +// ProcessQueuedBlockOperations finalizes the VM and processes queued block operations +// in FIFO order. Called after syncers complete to finalize state and execute deferred operations. +func (co *Coordinator) ProcessQueuedBlockOperations(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return err + } + + co.state.Store(int32(StateFinalizing)) + + if co.callbacks.FinalizeVM != nil { + if err := ctx.Err(); err != nil { + co.state.Store(int32(StateAborted)) + return err + } + + loaded := co.target.Load() + current, ok := loaded.(message.Syncable) + if !ok { + co.state.Store(int32(StateAborted)) + return errInvalidTargetType + } + if err := co.callbacks.FinalizeVM(ctx, current); err != nil { + co.state.Store(int32(StateAborted)) + return err + } + } + + if err := ctx.Err(); err != nil { + co.state.Store(int32(StateAborted)) + return err + } + + co.state.Store(int32(StateExecutingBatch)) + + if err := co.executeBlockOperationBatch(ctx); err != nil { + return err + } + + return nil +} + +// UpdateSyncTarget broadcasts a new target to all syncers and removes stale blocks from queue. +// Only valid in [StateRunning] state. Syncers manage cancellation themselves. +func (co *Coordinator) UpdateSyncTarget(newTarget message.Syncable) error { + if co.CurrentState() != StateRunning { + return errInvalidState + } + if !co.pivot.shouldForward(newTarget.Height()) { + return nil + } + + // Re-check state before modifying queue to handle concurrent transitions. + if co.CurrentState() != StateRunning { + return errInvalidState + } + + // Remove blocks from queue that will never be executed (behind the new target). + co.queue.removeBelowHeight(newTarget.Height()) + + co.target.Store(newTarget) + + if err := co.syncerRegistry.UpdateSyncTarget(newTarget); err != nil { + return err + } + co.pivot.advance() + return nil +} + +// AddBlockOperation appends the block to the queue while in the Running or +// StateExecutingBatch state. Blocks enqueued during batch execution will be +// processed in the next batch. Returns true if the block was queued, false +// if the queue was already sealed or the block is nil. +func (co *Coordinator) AddBlockOperation(b EthBlockWrapper, op BlockOperationType) bool { + if b == nil { + return false + } + state := co.CurrentState() + if state != StateRunning && state != StateExecutingBatch { + return false + } + return co.queue.enqueue(b, op) +} + +func (co *Coordinator) CurrentState() State { + return State(co.state.Load()) +} + +// executeBlockOperationBatch executes queued block operations in FIFO order. +// Partial completion is acceptable as operations are idempotent. +func (co *Coordinator) executeBlockOperationBatch(ctx context.Context) error { + operations := co.queue.dequeueBatch() + for i, op := range operations { + select { + case <-ctx.Done(): + return fmt.Errorf("operation %d/%d: %w", i+1, len(operations), errors.Join(errBatchCancelled, ctx.Err())) + default: + } + + var err error + switch op.operation { + case OpAccept: + err = op.block.Accept(ctx) + case OpReject: + err = op.block.Reject(ctx) + case OpVerify: + err = op.block.Verify(ctx) + } + if err != nil { + return fmt.Errorf("operation %d/%d (%v): %w", i+1, len(operations), op.operation, errors.Join(errBatchOperationFailed, err)) + } + } + return nil +} + +func (co *Coordinator) finish(cancel context.CancelCauseFunc, err error) { + if err != nil { + co.state.Store(int32(StateAborted)) + } else { + co.state.Store(int32(StateCompleted)) + } + if cancel != nil { + cancel(err) + } + if co.callbacks.OnDone != nil { + co.doneOnce.Do(func() { co.callbacks.OnDone(err) }) + } +} diff --git a/graft/coreth/plugin/evm/vmsync/coordinator_test.go b/graft/coreth/plugin/evm/vmsync/coordinator_test.go new file mode 100644 index 000000000000..678dbdb035fd --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/coordinator_test.go @@ -0,0 +1,137 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/ava-labs/libevm/common" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" +) + +func TestCoordinator_StateValidation(t *testing.T) { + co := NewCoordinator(NewSyncerRegistry(), Callbacks{}, WithPivotInterval(1)) + block := newMockBlock(100) + target := newTestSyncTarget(100) + + // States that reject both operations. + for _, state := range []State{StateIdle, StateInitializing, StateFinalizing, StateCompleted, StateAborted} { + co.state.Store(int32(state)) + require.False(t, co.AddBlockOperation(block, OpAccept), "state %d should reject block", state) + err := co.UpdateSyncTarget(target) + require.ErrorIs(t, err, errInvalidState, "state %d should reject target update", state) + } + + // Running: accepts both. + co.state.Store(int32(StateRunning)) + require.True(t, co.AddBlockOperation(block, OpAccept)) + require.NoError(t, co.UpdateSyncTarget(target)) + + // ExecutingBatch: accepts blocks, rejects target updates. + co.state.Store(int32(StateExecutingBatch)) + require.True(t, co.AddBlockOperation(block, OpAccept)) + err := co.UpdateSyncTarget(target) + require.ErrorIs(t, err, errInvalidState) + + // Nil block is always rejected. + co.state.Store(int32(StateRunning)) + require.False(t, co.AddBlockOperation(nil, OpAccept)) +} + +func TestCoordinator_UpdateSyncTarget_RemovesStaleBlocks(t *testing.T) { + co := NewCoordinator(NewSyncerRegistry(), Callbacks{}, WithPivotInterval(1)) + co.state.Store(int32(StateRunning)) + + for i := uint64(100); i <= 110; i++ { + co.AddBlockOperation(newMockBlock(i), OpAccept) + } + + require.NoError(t, co.UpdateSyncTarget(newTestSyncTarget(105))) + + batch := co.queue.dequeueBatch() + require.Len(t, batch, 5) // Only 106-110 remain. +} + +func TestCoordinator_Lifecycle(t *testing.T) { + t.Run("completes successfully", func(t *testing.T) { + registry := NewSyncerRegistry() + require.NoError(t, registry.Register(newMockSyncer("test", nil))) + + co, err := runCoordinator(t, registry, Callbacks{ + FinalizeVM: func(context.Context, message.Syncable) error { return nil }, + }) + + require.NoError(t, err) + require.Equal(t, StateCompleted, co.CurrentState()) + }) + + t.Run("aborts on syncer error", func(t *testing.T) { + expectedErr := errors.New("syncer failed") + registry := NewSyncerRegistry() + require.NoError(t, registry.Register(newMockSyncer("failing", expectedErr))) + + co, err := runCoordinator(t, registry, Callbacks{}) + + require.ErrorIs(t, err, expectedErr) + require.Equal(t, StateAborted, co.CurrentState()) + }) +} + +func TestCoordinator_ProcessQueuedBlockOperations(t *testing.T) { + t.Run("executes queued operations", func(t *testing.T) { + co := NewCoordinator(NewSyncerRegistry(), Callbacks{}) + co.state.Store(int32(StateRunning)) + co.target.Store(newTestSyncTarget(100)) + co.AddBlockOperation(newMockBlock(100), OpAccept) + + require.NoError(t, co.ProcessQueuedBlockOperations(t.Context())) + require.Equal(t, StateExecutingBatch, co.CurrentState()) + }) + + t.Run("returns error on block operation failure", func(t *testing.T) { + co := NewCoordinator(NewSyncerRegistry(), Callbacks{}) + co.state.Store(int32(StateRunning)) + co.target.Store(newTestSyncTarget(100)) + + failBlock := newMockBlock(100) + failBlock.acceptErr = errors.New("accept failed") + co.AddBlockOperation(failBlock, OpAccept) + + err := co.ProcessQueuedBlockOperations(t.Context()) + require.ErrorIs(t, err, errBatchOperationFailed) + }) +} + +// runCoordinator starts a coordinator and waits for completion. +func runCoordinator(t *testing.T, registry *SyncerRegistry, cbs Callbacks) (*Coordinator, error) { + t.Helper() + + var ( + errDone error + wg sync.WaitGroup + ) + wg.Add(1) + + cbs.OnDone = func(err error) { + errDone = err + wg.Done() + } + + co := NewCoordinator(registry, cbs) + co.Start(t.Context(), newTestSyncTarget(100)) + wg.Wait() + + return co, errDone +} + +func newTestSyncTarget(height uint64) message.Syncable { + hash := common.BytesToHash([]byte{byte(height)}) + root := common.BytesToHash([]byte{byte(height + 1)}) + return newSyncTarget(hash, root, height) +} diff --git a/graft/coreth/plugin/evm/vmsync/doubles_test.go b/graft/coreth/plugin/evm/vmsync/doubles_test.go index 7c79095df6c7..9ed1ecc1b65e 100644 --- a/graft/coreth/plugin/evm/vmsync/doubles_test.go +++ b/graft/coreth/plugin/evm/vmsync/doubles_test.go @@ -6,12 +6,39 @@ package vmsync import ( "context" "errors" + "math/big" "sync" "time" + "github.com/ava-labs/libevm/core/types" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" + syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" ) +// mockEthBlockWrapper implements EthBlockWrapper for testing. +type mockEthBlockWrapper struct { + ethBlock *types.Block + acceptErr error + rejectErr error + verifyErr error +} + +func newMockBlock(height uint64) *mockEthBlockWrapper { + header := &types.Header{Number: new(big.Int).SetUint64(height)} + return &mockEthBlockWrapper{ + ethBlock: types.NewBlockWithHeader(header), + } +} + +func (m *mockEthBlockWrapper) GetEthBlock() *types.Block { return m.ethBlock } +func (m *mockEthBlockWrapper) Accept(context.Context) error { return m.acceptErr } +func (m *mockEthBlockWrapper) Reject(context.Context) error { return m.rejectErr } +func (m *mockEthBlockWrapper) Verify(context.Context) error { return m.verifyErr } + +var _ EthBlockWrapper = (*mockEthBlockWrapper)(nil) + // FuncSyncer adapts a function to the simple Syncer shape used in tests. It is // useful for defining small, behavior-driven syncers inline. type FuncSyncer struct { @@ -22,8 +49,10 @@ type FuncSyncer struct { func (f FuncSyncer) Sync(ctx context.Context) error { return f.fn(ctx) } // Name returns the provided name or a default if unspecified. -func (FuncSyncer) Name() string { return "Test Name" } -func (FuncSyncer) ID() string { return "test_id" } +func (FuncSyncer) Name() string { return "Test Name" } +func (FuncSyncer) ID() string { return "test_id" } +func (FuncSyncer) UpdateTarget(_ message.Syncable) error { return nil } +func (FuncSyncer) Finalize(_ context.Context) error { return nil } var _ syncpkg.Syncer = FuncSyncer{} diff --git a/graft/coreth/plugin/evm/vmsync/finalizer.go b/graft/coreth/plugin/evm/vmsync/finalizer.go index b803d4e6fb80..709606d7f77b 100644 --- a/graft/coreth/plugin/evm/vmsync/finalizer.go +++ b/graft/coreth/plugin/evm/vmsync/finalizer.go @@ -8,8 +8,6 @@ import ( "errors" "fmt" - "github.com/ava-labs/libevm/core/types" - "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/versiondb" "github.com/ava-labs/avalanchego/graft/coreth/eth" @@ -21,13 +19,6 @@ import ( syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" ) -// EthBlockWrapper can be implemented by a concrete block wrapper type to -// return *types.Block, which is needed to update chain pointers at the -// end of the sync operation. -type EthBlockWrapper interface { - GetEthBlock() *types.Block -} - var ( errBlockNotFound = errors.New("block not found in state") errInvalidBlockType = errors.New("invalid block wrapper type") diff --git a/graft/coreth/plugin/evm/vmsync/pivot_policy.go b/graft/coreth/plugin/evm/vmsync/pivot_policy.go new file mode 100644 index 000000000000..ac808368b92c --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/pivot_policy.go @@ -0,0 +1,60 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import "sync/atomic" + +// defaultPivotInterval is the default number of blocks between sync target updates. +const defaultPivotInterval = uint64(10000) + +// pivotPolicy encapsulates the logic for deciding when to forward +// a new sync target based on a fixed block-height interval. It is +// safe for concurrent use. +type pivotPolicy struct { + interval uint64 + // nextHeight is the next height threshold at or beyond which we + // should forward an update. A value of 0 means uninitialized. + nextHeight atomic.Uint64 +} + +// newPivotPolicy creates a new pivot policy with the given interval. +// If interval is 0, defaultPivotInterval is used. +func newPivotPolicy(interval uint64) *pivotPolicy { + if interval == 0 { + interval = defaultPivotInterval + } + return &pivotPolicy{interval: interval} +} + +// shouldForward reports whether a summary at the given height should be +// forwarded, initializing the next threshold on first use. When it returns +// true, callers should follow up with advance(). +func (p *pivotPolicy) shouldForward(height uint64) bool { + if p == nil || p.interval == 0 { + return true + } + next := p.nextHeight.Load() + if next == 0 { + // Round up the initial height to the next multiple of interval. + // Ceil division: ((h + interval - 1) / interval) * interval + h := height + init := ((h + p.interval - 1) / p.interval) * p.interval + // Initialize once - if another goroutine wins, read the established value. + if !p.nextHeight.CompareAndSwap(0, init) { + next = p.nextHeight.Load() + } else { + next = init + } + } + return height >= next +} + +// advance moves the next threshold forward by one interval. Call this +// only after shouldForward has returned true and the update was issued. +func (p *pivotPolicy) advance() { + if p == nil || p.interval == 0 { + return + } + p.nextHeight.Add(p.interval) +} diff --git a/graft/coreth/plugin/evm/vmsync/pivot_policy_test.go b/graft/coreth/plugin/evm/vmsync/pivot_policy_test.go new file mode 100644 index 000000000000..5237a102c0d1 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/pivot_policy_test.go @@ -0,0 +1,27 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPivotPolicy(t *testing.T) { + // Zero interval uses default. + require.Equal(t, defaultPivotInterval, newPivotPolicy(0).interval) + + // Test throttling behavior. + p := newPivotPolicy(100) + + // First call at 150 initializes threshold to ceil(150/100)*100 = 200 + require.False(t, p.shouldForward(150)) // 150 < 200 + require.False(t, p.shouldForward(199)) // 199 < 200 + require.True(t, p.shouldForward(200)) // 200 >= 200 + p.advance() // threshold becomes 300 + + require.False(t, p.shouldForward(250)) // 250 < 300 + require.True(t, p.shouldForward(300)) // 300 >= 300 +} diff --git a/graft/coreth/plugin/evm/vmsync/registry.go b/graft/coreth/plugin/evm/vmsync/registry.go index 5e9bc6031c3b..2bc8eac7383d 100644 --- a/graft/coreth/plugin/evm/vmsync/registry.go +++ b/graft/coreth/plugin/evm/vmsync/registry.go @@ -102,3 +102,30 @@ func (r *SyncerRegistry) StartAsync(ctx context.Context, summary message.Syncabl return g } + +// UpdateSyncTarget updates the sync target for all syncers. +// Note: Syncers manage cancellation themselves through their Sync() contexts. +func (r *SyncerRegistry) UpdateSyncTarget(newTarget message.Syncable) error { + for _, task := range r.syncers { + if err := task.syncer.UpdateTarget(newTarget); err != nil { + log.Error("failed updating sync target", "name", task.name, "err", err) + return err + } + log.Info("updated sync target", "name", task.name, "new_target", newTarget.GetBlockHash().Hex(), "height", newTarget.Height()) + } + return nil +} + +// FinalizeAll calls Finalize on all registered syncers. +// This should be called after all syncers have completed their Sync() operations +// and before finalizing the VM state. +func (r *SyncerRegistry) FinalizeAll(ctx context.Context) error { + for _, task := range r.syncers { + if err := task.syncer.Finalize(ctx); err != nil { + log.Error("failed finalizing syncer", "name", task.name, "err", err) + return fmt.Errorf("%s finalize failed: %w", task.name, err) + } + log.Info("finalized syncer", "name", task.name) + } + return nil +} diff --git a/graft/coreth/plugin/evm/vmsync/registry_test.go b/graft/coreth/plugin/evm/vmsync/registry_test.go index 55f17d15d01f..b7665bbcc415 100644 --- a/graft/coreth/plugin/evm/vmsync/registry_test.go +++ b/graft/coreth/plugin/evm/vmsync/registry_test.go @@ -38,8 +38,10 @@ func (m *mockSyncer) Sync(context.Context) error { return m.syncError } -func (m *mockSyncer) Name() string { return m.name } -func (m *mockSyncer) ID() string { return m.name } +func (m *mockSyncer) Name() string { return m.name } +func (m *mockSyncer) ID() string { return m.name } +func (*mockSyncer) UpdateTarget(_ message.Syncable) error { return nil } +func (*mockSyncer) Finalize(_ context.Context) error { return nil } // namedSyncer adapts an existing syncer with a provided name to satisfy Syncer with Name(). type namedSyncer struct { @@ -50,6 +52,13 @@ type namedSyncer struct { func (n *namedSyncer) Sync(ctx context.Context) error { return n.syncer.Sync(ctx) } func (n *namedSyncer) Name() string { return n.name } func (n *namedSyncer) ID() string { return n.name } +func (n *namedSyncer) UpdateTarget(newTarget message.Syncable) error { + return n.syncer.UpdateTarget(newTarget) +} + +func (n *namedSyncer) Finalize(ctx context.Context) error { + return n.syncer.Finalize(ctx) +} // syncerConfig describes a test syncer setup for RunSyncerTasks table tests. type syncerConfig struct { diff --git a/graft/coreth/plugin/evm/vmsync/strategy_dynamic.go b/graft/coreth/plugin/evm/vmsync/strategy_dynamic.go new file mode 100644 index 000000000000..6a2529d18fe9 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/strategy_dynamic.go @@ -0,0 +1,106 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + "fmt" + + "github.com/ava-labs/libevm/log" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" +) + +var _ SyncStrategy = (*dynamicStrategy)(nil) + +// dynamicStrategy runs syncers concurrently with block queueing. +// It wraps [Coordinator] to manage the sync lifecycle. +type dynamicStrategy struct { + coordinator *Coordinator +} + +func newDynamicStrategy(registry *SyncerRegistry, finalizer *finalizer, pivotInterval uint64) *dynamicStrategy { + coordinator := NewCoordinator( + registry, + Callbacks{ + FinalizeVM: finalizer.finalize, + OnDone: nil, // Set in Start to capture completion. + }, + WithPivotInterval(pivotInterval), + ) + return &dynamicStrategy{coordinator: coordinator} +} + +// Start launches the coordinator and blocks until sync completes or fails. +func (d *dynamicStrategy) Start(ctx context.Context, summary message.Syncable) error { + done := make(chan error, 1) + + // Wire up OnDone to signal completion. + d.coordinator.callbacks.OnDone = func(err error) { + if err != nil { + log.Error("dynamic state sync completed with error", "err", err) + } else { + log.Info("dynamic state sync completed successfully") + } + done <- err + } + + d.coordinator.Start(ctx, summary) + return <-done +} + +// OnBlockAccepted enqueues the block for deferred processing and updates the sync target. +func (d *dynamicStrategy) OnBlockAccepted(b EthBlockWrapper) (bool, error) { + if d.coordinator.CurrentState() == StateExecutingBatch { + // Still enqueue for the next batch, but don't update target. + return d.enqueue(b, OpAccept), nil + } + + if !d.enqueue(b, OpAccept) { + return false, nil + } + + ethb := b.GetEthBlock() + target := newSyncTarget(ethb.Hash(), ethb.Root(), ethb.NumberU64()) + if err := d.coordinator.UpdateSyncTarget(target); err != nil { + // Block is enqueued but target update failed. + return true, fmt.Errorf("block enqueued but sync target update failed: %w", err) + } + return true, nil +} + +// OnBlockRejected enqueues the block for deferred rejection. +func (d *dynamicStrategy) OnBlockRejected(b EthBlockWrapper) (bool, error) { + return d.enqueue(b, OpReject), nil +} + +// OnBlockVerified enqueues the block for deferred verification. +func (d *dynamicStrategy) OnBlockVerified(b EthBlockWrapper) (bool, error) { + return d.enqueue(b, OpVerify), nil +} + +// enqueue adds a block operation to the coordinator's queue. +func (d *dynamicStrategy) enqueue(b EthBlockWrapper, op BlockOperationType) bool { + ok := d.coordinator.AddBlockOperation(b, op) + if !ok { + if ethb := b.GetEthBlock(); ethb != nil { + log.Warn("could not enqueue block operation", + "hash", ethb.Hash(), + "height", ethb.NumberU64(), + "op", op.String(), + ) + } + } + return ok +} + +// CurrentState returns the coordinator's current state. +func (d *dynamicStrategy) CurrentState() State { + return d.coordinator.CurrentState() +} + +// UpdateSyncTarget updates the coordinator's sync target. +func (d *dynamicStrategy) UpdateSyncTarget(target message.Syncable) error { + return d.coordinator.UpdateSyncTarget(target) +} diff --git a/graft/coreth/plugin/evm/vmsync/strategy_static.go b/graft/coreth/plugin/evm/vmsync/strategy_static.go index 88afed6309ad..8f98fc61bc4a 100644 --- a/graft/coreth/plugin/evm/vmsync/strategy_static.go +++ b/graft/coreth/plugin/evm/vmsync/strategy_static.go @@ -34,3 +34,18 @@ func (s *staticStrategy) Start(ctx context.Context, summary message.Syncable) er } return s.finalizer.finalize(ctx, summary) } + +// OnBlockAccepted is a no-op for static sync since blocks are not queued. +func (*staticStrategy) OnBlockAccepted(EthBlockWrapper) (bool, error) { + return false, nil +} + +// OnBlockRejected is a no-op for static sync since blocks are not queued. +func (*staticStrategy) OnBlockRejected(EthBlockWrapper) (bool, error) { + return false, nil +} + +// OnBlockVerified is a no-op for static sync since blocks are not queued. +func (*staticStrategy) OnBlockVerified(EthBlockWrapper) (bool, error) { + return false, nil +} diff --git a/graft/coreth/plugin/evm/vmsync/sync_target.go b/graft/coreth/plugin/evm/vmsync/sync_target.go new file mode 100644 index 000000000000..539e3ed6e9ed --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/sync_target.go @@ -0,0 +1,44 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + + "github.com/ava-labs/libevm/common" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow/engine/snowman/block" +) + +var _ message.Syncable = (*syncTarget)(nil) + +// syncTarget is a minimal implementation of [message.Syncable] used internally +// to advance the coordinator's sync target from engine-accepted blocks. +// +// NOTE: Unlike [message.BlockSyncSummary], this is not serializable and should not +// be used for network communication. Only [message.Syncable.GetBlockHash], +// [message.Syncable.GetBlockRoot], and [message.Syncable.Height] are used in practice. +// The other methods are stubs to satisfy the interface. +type syncTarget struct { + hash common.Hash + root common.Hash + height uint64 +} + +func newSyncTarget(hash common.Hash, root common.Hash, height uint64) message.Syncable { + return &syncTarget{hash: hash, root: root, height: height} +} + +func (s *syncTarget) GetBlockHash() common.Hash { return s.hash } +func (s *syncTarget) GetBlockRoot() common.Hash { return s.root } + +func (s *syncTarget) ID() ids.ID { return ids.ID(s.hash) } +func (s *syncTarget) Height() uint64 { return s.height } +func (s *syncTarget) Bytes() []byte { return s.hash.Bytes() } +func (*syncTarget) Accept(context.Context) (block.StateSyncMode, error) { + // When used internally to advance targets, we always handle dynamically. + return block.StateSyncDynamic, nil +} diff --git a/graft/coreth/plugin/evm/wrapped_block.go b/graft/coreth/plugin/evm/wrapped_block.go index 2db44fa8b263..a227a0ff99d6 100644 --- a/graft/coreth/plugin/evm/wrapped_block.go +++ b/graft/coreth/plugin/evm/wrapped_block.go @@ -55,6 +55,7 @@ var ( errParentBeaconRootNonEmpty = errors.New("invalid non-empty parentBeaconRoot") errBlobGasUsedNilInCancun = errors.New("blob gas used must not be nil in Cancun") errBlobsNotEnabled = errors.New("blobs not enabled on avalanche networks") + errCouldNotNotifySyncClient = errors.New("could not notify sync client") ) var ( @@ -91,11 +92,21 @@ func wrapBlock(ethBlock *types.Block, vm *VM) (*wrappedBlock, error) { func (b *wrappedBlock) ID() ids.ID { return b.id } // Accept implements the snowman.Block interface -func (b *wrappedBlock) Accept(context.Context) error { - vm := b.vm - // Although returning an error from Accept is considered fatal, it is good - // practice to cleanup the batch we were modifying in the case of an error. - defer vm.versiondb.Abort() +// TODO(powerslider): Propagate context to the sync client. +func (b *wrappedBlock) Accept(_ context.Context) error { + // Notify sync client that engine accepted a block. + // If the block was enqueued for deferred processing, skip immediate execution. + if client := b.vm.SyncerClient(); client != nil { + deferred, err := client.OnEngineAccept(b) + if err != nil { + return fmt.Errorf("%w: %w", errCouldNotNotifySyncClient, err) + } + if deferred { + // Block was enqueued for deferred processing during dynamic state sync. + // It will be processed later from the queue, so skip immediate execution. + return nil + } + } blkID := b.ID() log.Debug("accepting block", @@ -111,17 +122,17 @@ func (b *wrappedBlock) Accept(context.Context) error { if err := b.handlePrecompileAccept(rules); err != nil { return err } - if err := vm.blockChain.Accept(b.ethBlock); err != nil { + if err := b.vm.blockChain.Accept(b.ethBlock); err != nil { return fmt.Errorf("chain could not accept %s: %w", blkID, err) } - if err := vm.PutLastAcceptedID(blkID); err != nil { + if err := b.vm.PutLastAcceptedID(blkID); err != nil { return fmt.Errorf("failed to put %s as the last accepted block: %w", blkID, err) } // Get pending operations on the vm's versionDB so we can apply them atomically // with the block extension's changes. - vdbBatch, err := vm.versiondb.CommitBatch() + vdbBatch, err := b.vm.versiondb.CommitBatch() if err != nil { return fmt.Errorf("could not create commit batch processing block[%s]: %w", blkID, err) } @@ -130,11 +141,17 @@ func (b *wrappedBlock) Accept(context.Context) error { // Apply any changes atomically with other pending changes to // the vm's versionDB. // Accept flushes the changes in the batch to the database. - return b.extension.Accept(vdbBatch) + if err := b.extension.Accept(vdbBatch); err != nil { + return err + } + } else { + // If there is no extension, we still need to apply the changes to the versionDB + if err := vdbBatch.Write(); err != nil { + return err + } } - // If there is no extension, we still need to apply the changes to the versionDB - return vdbBatch.Write() + return nil } // handlePrecompileAccept calls Accept on any logs generated with an active precompile address that implements @@ -173,7 +190,21 @@ func (b *wrappedBlock) handlePrecompileAccept(rules extras.Rules) error { // Reject implements the snowman.Block interface // If [b] contains an atomic transaction, attempt to re-issue it -func (b *wrappedBlock) Reject(context.Context) error { +func (b *wrappedBlock) Reject(_ context.Context) error { + // Notify sync client that engine rejected a block. + // If the block was enqueued for deferred processing, skip immediate execution. + if client := b.vm.SyncerClient(); client != nil { + deferred, err := client.OnEngineReject(b) + if err != nil { + return fmt.Errorf("%w: %w", errCouldNotNotifySyncClient, err) + } + if deferred { + // Block was enqueued for deferred processing during dynamic state sync. + // It will be processed later from the queue, so skip immediate execution. + return nil + } + } + blkID := b.ID() log.Debug("rejecting block", "hash", blkID.Hex(), @@ -205,7 +236,24 @@ func (b *wrappedBlock) Timestamp() time.Time { } // Verify implements the snowman.Block interface -func (b *wrappedBlock) Verify(context.Context) error { +// TODO(powerslider): Propagate context to the sync client. +func (b *wrappedBlock) Verify(_ context.Context) error { + // Notify sync client that engine verified a block. + // If the block was enqueued for deferred processing, skip immediate execution. + if client := b.vm.SyncerClient(); client != nil { + deferred, err := client.OnEngineVerify(b) + if err != nil { + return fmt.Errorf("%w: %w", errCouldNotNotifySyncClient, err) + } + if deferred { + // Block was enqueued for deferred processing during dynamic state sync. + // It will be processed later from the queue, so skip immediate execution. + // Note: Verify may be called multiple times with different contexts, but + // we only enqueue once and process once from the queue. + return nil + } + } + return b.verify(&precompileconfig.PredicateContext{ SnowCtx: b.vm.ctx, ProposerVMBlockCtx: nil, @@ -236,8 +284,25 @@ func (b *wrappedBlock) ShouldVerifyWithContext(context.Context) (bool, error) { return false, nil } -// VerifyWithContext implements the block.WithVerifyContext interface +// VerifyWithContext implements the block.WithVerifyContext interface. +// TODO(powerslider): Propagate context to the sync client. func (b *wrappedBlock) VerifyWithContext(_ context.Context, proposerVMBlockCtx *block.Context) error { + // Notify sync client that engine verified a block. + // If the block was enqueued for deferred processing, skip immediate execution. + if client := b.vm.SyncerClient(); client != nil { + deferred, err := client.OnEngineVerify(b) + if err != nil { + return fmt.Errorf("%w: %w", errCouldNotNotifySyncClient, err) + } + if deferred { + // Block was enqueued for deferred processing during dynamic state sync. + // It will be processed later from the queue, so skip immediate execution. + // Note: VerifyWithContext may be called multiple times with different contexts, but + // we only enqueue once and process once from the queue. + return nil + } + } + return b.verify(&precompileconfig.PredicateContext{ SnowCtx: b.vm.ctx, ProposerVMBlockCtx: proposerVMBlockCtx, diff --git a/graft/coreth/sync/blocksync/syncer.go b/graft/coreth/sync/blocksync/syncer.go index 2f7340741234..10952fa7bb81 100644 --- a/graft/coreth/sync/blocksync/syncer.go +++ b/graft/coreth/sync/blocksync/syncer.go @@ -13,6 +13,8 @@ import ( "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/log" + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" + syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" statesyncclient "github.com/ava-labs/avalanchego/graft/coreth/sync/client" ) @@ -112,3 +114,11 @@ func (s *BlockSyncer) Sync(ctx context.Context) error { log.Info("fetched blocks from peer", "total", blocksToFetch) return batch.Write() } + +func (*BlockSyncer) UpdateTarget(_ message.Syncable) error { + return nil +} + +func (*BlockSyncer) Finalize(_ context.Context) error { + return nil +} diff --git a/graft/coreth/sync/statesync/code_syncer.go b/graft/coreth/sync/statesync/code_syncer.go index 1ac8c47b5d71..839188293090 100644 --- a/graft/coreth/sync/statesync/code_syncer.go +++ b/graft/coreth/sync/statesync/code_syncer.go @@ -115,6 +115,14 @@ func (c *CodeSyncer) Sync(ctx context.Context) error { return eg.Wait() } +func (*CodeSyncer) UpdateTarget(_ message.Syncable) error { + return nil +} + +func (*CodeSyncer) Finalize(_ context.Context) error { + return nil +} + // work fulfills any incoming requests from the producer channel by fetching code bytes from the network // and fulfilling them by updating the database. func (c *CodeSyncer) work(ctx context.Context) error { diff --git a/graft/coreth/sync/statesync/state_syncer.go b/graft/coreth/sync/statesync/state_syncer.go index a286431dd1aa..f955cbed8e88 100644 --- a/graft/coreth/sync/statesync/state_syncer.go +++ b/graft/coreth/sync/statesync/state_syncer.go @@ -17,6 +17,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/ava-labs/avalanchego/graft/coreth/core/state/snapshot" + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" syncclient "github.com/ava-labs/avalanchego/graft/coreth/sync/client" @@ -167,6 +168,14 @@ func (t *stateSync) Sync(ctx context.Context) error { return eg.Wait() } +func (*stateSync) UpdateTarget(_ message.Syncable) error { + return nil +} + +func (*stateSync) Finalize(_ context.Context) error { + return nil +} + // onStorageTrieFinished is called after a storage trie finishes syncing. func (t *stateSync) onStorageTrieFinished(root common.Hash) error { <-t.triesInProgressSem // allow another trie to start (release the semaphore) diff --git a/graft/coreth/sync/types.go b/graft/coreth/sync/types.go index 04b8b75bdf77..06194a50e15a 100644 --- a/graft/coreth/sync/types.go +++ b/graft/coreth/sync/types.go @@ -29,6 +29,12 @@ type Syncer interface { // "state_evm_state_sync", "state_atomic_sync"). Implementations should ensure this is unique and // stable across renames for logging/metrics/deduplication. ID() string + + // UpdateTarget updates the syncer's target while running to support dynamic state sync. + UpdateTarget(newTarget message.Syncable) error + + // Finalize is called when the syncer is finished. + Finalize(ctx context.Context) error } // SummaryProvider is an interface for providing state summaries.