From e03071e2471e64d4145c2548e918acbfcae2ac03 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Wed, 3 Dec 2025 21:57:52 +0200 Subject: [PATCH 1/3] refactor(vmsync): extract SyncStrategy pattern for static state sync Restructure the vmsync package to separate concerns and prepare for future dynamic state sync support. - Extract `finalizer.go` - VM state finalization logic (`finishSync`, `commitMarkers`) moved to dedicated `finalizer` struct. - Add `SyncStrategy` interface with `Start` method for sync execution. - Implement `staticStrategy` for current sequential sync behavior. - Simplify `client.go` to orchestrate sync lifecycle via strategy. This refactoring enables adding dynamic state sync as a separate strategy implementation without modifying existing static sync logic. resolves #4651 Signed-off-by: Tsvetan Dimitrov (tsvetan.dimitrov@avalabs.org) --- graft/coreth/plugin/evm/vmsync/client.go | 340 ++++++------------ graft/coreth/plugin/evm/vmsync/finalizer.go | 152 ++++++++ .../plugin/evm/vmsync/strategy_static.go | 38 ++ 3 files changed, 309 insertions(+), 221 deletions(-) create mode 100644 graft/coreth/plugin/evm/vmsync/finalizer.go create mode 100644 graft/coreth/plugin/evm/vmsync/strategy_static.go diff --git a/graft/coreth/plugin/evm/vmsync/client.go b/graft/coreth/plugin/evm/vmsync/client.go index 435e5b836560..efd7c4a72076 100644 --- a/graft/coreth/plugin/evm/vmsync/client.go +++ b/graft/coreth/plugin/evm/vmsync/client.go @@ -5,11 +5,10 @@ package vmsync import ( "context" + "errors" "fmt" "sync" - "github.com/ava-labs/libevm/common" - "github.com/ava-labs/libevm/core/types" "github.com/ava-labs/libevm/ethdb" "github.com/ava-labs/libevm/log" @@ -17,23 +16,25 @@ import ( "github.com/ava-labs/avalanchego/database/versiondb" "github.com/ava-labs/avalanchego/graft/coreth/core/state/snapshot" "github.com/ava-labs/avalanchego/graft/coreth/eth" - "github.com/ava-labs/avalanchego/graft/coreth/params" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" - "github.com/ava-labs/avalanchego/graft/coreth/sync/blocksync" - "github.com/ava-labs/avalanchego/graft/coreth/sync/statesync" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/vms/components/chain" syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" + "github.com/ava-labs/avalanchego/graft/coreth/sync/blocksync" syncclient "github.com/ava-labs/avalanchego/graft/coreth/sync/client" + "github.com/ava-labs/avalanchego/graft/coreth/sync/statesync" ) // BlocksToFetch is the number of the block parents the state syncs to. // The last 256 block hashes are necessary to support the BLOCKHASH opcode. const BlocksToFetch = 256 -var stateSyncSummaryKey = []byte("stateSyncSummary") +var ( + errSkipSync = fmt.Errorf("skip sync") + stateSyncSummaryKey = []byte("stateSyncSummary") +) // BlockAcceptor provides a mechanism to update the last accepted block ID during state synchronization. // This interface is used by the state sync process to ensure the blockchain state @@ -42,11 +43,11 @@ type BlockAcceptor interface { PutLastAcceptedID(ids.ID) error } -// EthBlockWrapper can be implemented by a concrete block wrapper type to -// return *types.Block, which is needed to update chain pointers at the -// end of the sync operation. -type EthBlockWrapper interface { - GetEthBlock() *types.Block +// SyncStrategy defines how state sync is executed. +// Implementations handle the sync lifecycle differently based on sync mode. +type SyncStrategy interface { + // Start begins the sync process and blocks until completion or error. + Start(ctx context.Context) error } type ClientConfig struct { @@ -76,21 +77,16 @@ type ClientConfig struct { } type client struct { - *ClientConfig - + config *ClientConfig resumableSummary message.Syncable - - cancel context.CancelFunc - wg sync.WaitGroup - - // State Sync results - summary message.Syncable - err error + cancel context.CancelFunc + wg sync.WaitGroup + err error } func NewClient(config *ClientConfig) Client { return &client{ - ClientConfig: config, + config: config, } } @@ -107,37 +103,37 @@ type Client interface { } // StateSyncEnabled returns [client.enabled], which is set in the chain's config file. -func (client *client) StateSyncEnabled(context.Context) (bool, error) { - return client.Enabled, nil +func (c *client) StateSyncEnabled(context.Context) (bool, error) { + return c.config.Enabled, nil } // GetOngoingSyncStateSummary returns a state summary that was previously started // and not finished, and sets [resumableSummary] if one was found. // Returns [database.ErrNotFound] if no ongoing summary is found or if [client.skipResume] is true. -func (client *client) GetOngoingSyncStateSummary(context.Context) (block.StateSummary, error) { - if client.SkipResume { +func (c *client) GetOngoingSyncStateSummary(context.Context) (block.StateSummary, error) { + if c.config.SkipResume { return nil, database.ErrNotFound } - summaryBytes, err := client.MetadataDB.Get(stateSyncSummaryKey) + summaryBytes, err := c.config.MetadataDB.Get(stateSyncSummaryKey) if err != nil { return nil, err // includes the [database.ErrNotFound] case } - summary, err := client.Parser.Parse(summaryBytes, client.acceptSyncSummary) + summary, err := c.config.Parser.Parse(summaryBytes, c.acceptSyncSummary) if err != nil { return nil, fmt.Errorf("failed to parse saved state sync summary to SyncSummary: %w", err) } - client.resumableSummary = summary + c.resumableSummary = summary return summary, nil } // ClearOngoingSummary clears any marker of an ongoing state sync summary -func (client *client) ClearOngoingSummary() error { - if err := client.MetadataDB.Delete(stateSyncSummaryKey); err != nil { +func (c *client) ClearOngoingSummary() error { + if err := c.config.MetadataDB.Delete(stateSyncSummaryKey); err != nil { return fmt.Errorf("failed to clear ongoing summary: %w", err) } - if err := client.VerDB.Commit(); err != nil { + if err := c.config.VerDB.Commit(); err != nil { return fmt.Errorf("failed to commit db while clearing ongoing summary: %w", err) } @@ -145,121 +141,55 @@ func (client *client) ClearOngoingSummary() error { } // ParseStateSummary parses [summaryBytes] to [commonEng.Summary] -func (client *client) ParseStateSummary(_ context.Context, summaryBytes []byte) (block.StateSummary, error) { - return client.Parser.Parse(summaryBytes, client.acceptSyncSummary) -} - -func (client *client) stateSync(ctx context.Context) error { - // Create and register all syncers. - registry := NewSyncerRegistry() - - if err := client.registerSyncers(registry); err != nil { - return err - } - - // Run all registered syncers sequentially. - return registry.RunSyncerTasks(ctx, client.summary) +func (c *client) ParseStateSummary(_ context.Context, summaryBytes []byte) (block.StateSummary, error) { + return c.config.Parser.Parse(summaryBytes, c.acceptSyncSummary) } -func (client *client) registerSyncers(registry *SyncerRegistry) error { - // Register block syncer. - blockSyncer, err := client.createBlockSyncer(client.summary.GetBlockHash(), client.summary.Height()) - if err != nil { - return fmt.Errorf("failed to create block syncer: %w", err) - } - - codeQueue, err := client.createCodeQueue() - if err != nil { - return fmt.Errorf("failed to create code queue: %w", err) - } - - codeSyncer, err := client.createCodeSyncer(codeQueue.CodeHashes()) - if err != nil { - return fmt.Errorf("failed to create code syncer: %w", err) - } - - stateSyncer, err := client.createEVMSyncer(codeQueue) - if err != nil { - return fmt.Errorf("failed to create EVM state syncer: %w", err) - } - - var atomicSyncer syncpkg.Syncer - if client.Extender != nil { - atomicSyncer, err = client.createAtomicSyncer() - if err != nil { - return fmt.Errorf("failed to create atomic syncer: %w", err) +// acceptSyncSummary returns true if sync will be performed and launches the state sync process +// in a goroutine. +func (c *client) acceptSyncSummary(summary message.Syncable) (block.StateSyncMode, error) { + if err := c.prepareForSync(summary); err != nil { + if errors.Is(err, errSkipSync) { + return block.StateSyncSkipped, nil } + return block.StateSyncSkipped, err } - syncers := []syncpkg.Syncer{ - blockSyncer, - codeSyncer, - stateSyncer, - } - if atomicSyncer != nil { - syncers = append(syncers, atomicSyncer) - } - - for _, s := range syncers { - if err := registry.Register(s); err != nil { - return fmt.Errorf("failed to register %s syncer: %w", s.Name(), err) - } + registry, err := c.newSyncerRegistry(summary) + if err != nil { + return block.StateSyncSkipped, fmt.Errorf("failed to create syncer registry: %w", err) } - return nil -} - -func (client *client) createBlockSyncer(fromHash common.Hash, fromHeight uint64) (syncpkg.Syncer, error) { - return blocksync.NewSyncer( - client.Client, - client.ChainDB, - fromHash, - fromHeight, - BlocksToFetch, - ) -} - -func (client *client) createEVMSyncer(queue *statesync.CodeQueue) (syncpkg.Syncer, error) { - return statesync.NewSyncer( - client.Client, - client.ChainDB, - client.summary.GetBlockRoot(), - queue, - client.RequestSize, + finalizer := newFinalizer( + c.config.Chain, + c.config.State, + c.config.Acceptor, + c.config.VerDB, + c.config.MetadataDB, + c.config.Extender, + c.config.LastAcceptedHeight, ) -} -func (client *client) createCodeQueue() (*statesync.CodeQueue, error) { - return statesync.NewCodeQueue( - client.ChainDB, - client.StateSyncDone, - ) -} + strategy := newStaticStrategy(registry, finalizer, summary) -func (client *client) createCodeSyncer(codeHashes <-chan common.Hash) (syncpkg.Syncer, error) { - return statesync.NewCodeSyncer(client.Client, client.ChainDB, codeHashes) + return c.startAsync(strategy), nil } -func (client *client) createAtomicSyncer() (syncpkg.Syncer, error) { - return client.Extender.CreateSyncer(client.Client, client.VerDB, client.summary) -} - -// acceptSyncSummary returns true if sync will be performed and launches the state sync process -// in a goroutine. -func (client *client) acceptSyncSummary(proposedSummary message.Syncable) (block.StateSyncMode, error) { - isResume := client.resumableSummary != nil && - proposedSummary.GetBlockHash() == client.resumableSummary.GetBlockHash() +// prepareForSync handles resume check and snapshot wipe before sync starts. +func (c *client) prepareForSync(summary message.Syncable) error { + isResume := c.resumableSummary != nil && + summary.GetBlockHash() == c.resumableSummary.GetBlockHash() if !isResume { // Skip syncing if the blockchain is not significantly ahead of local state, // since bootstrapping would be faster. // (Also ensures we don't sync to a height prior to local state.) - if client.LastAcceptedHeight+client.MinBlocks > proposedSummary.Height() { + if c.config.LastAcceptedHeight+c.config.MinBlocks > summary.Height() { log.Info( "last accepted too close to most recent syncable block, skipping state sync", - "lastAccepted", client.LastAcceptedHeight, - "syncableHeight", proposedSummary.Height(), + "lastAccepted", c.config.LastAcceptedHeight, + "syncableHeight", summary.Height(), ) - return block.StateSyncSkipped, nil + return errSkipSync } // Wipe the snapshot completely if we are not resuming from an existing sync, so that we do not @@ -268,140 +198,108 @@ func (client *client) acceptSyncSummary(proposedSummary message.Syncable) (block // sync marker will be wiped, so we do not accidentally resume progress from an incorrect version // of the snapshot. (if switching between versions that come before this change and back this could // lead to the snapshot not being cleaned up correctly) - <-snapshot.WipeSnapshot(client.ChainDB, true) + <-snapshot.WipeSnapshot(c.config.ChainDB, true) // Reset the snapshot generator here so that when state sync completes, snapshots will not attempt to read an // invalid generator. // Note: this must be called after WipeSnapshot is called so that we do not invalidate a partially generated snapshot. - snapshot.ResetSnapshotGeneration(client.ChainDB) + snapshot.ResetSnapshotGeneration(c.config.ChainDB) } - client.summary = proposedSummary // Update the current state sync summary key in the database // Note: this must be performed after WipeSnapshot finishes so that we do not start a state sync // session from a partially wiped snapshot. - if err := client.MetadataDB.Put(stateSyncSummaryKey, proposedSummary.Bytes()); err != nil { - return block.StateSyncSkipped, fmt.Errorf("failed to write state sync summary key to disk: %w", err) + if err := c.config.MetadataDB.Put(stateSyncSummaryKey, summary.Bytes()); err != nil { + return fmt.Errorf("failed to write state sync summary key to disk: %w", err) } - if err := client.VerDB.Commit(); err != nil { - return block.StateSyncSkipped, fmt.Errorf("failed to commit db: %w", err) + if err := c.config.VerDB.Commit(); err != nil { + return fmt.Errorf("failed to commit db: %w", err) } - log.Info("Starting state sync", "summary", proposedSummary) + return nil +} - // create a cancellable ctx for the state sync goroutine +// startAsync launches the sync strategy in a background goroutine. +func (c *client) startAsync(strategy SyncStrategy) block.StateSyncMode { ctx, cancel := context.WithCancel(context.Background()) - client.cancel = cancel - client.wg.Add(1) // track the state sync goroutine so we can wait for it on shutdown + c.cancel = cancel + + c.wg.Add(1) go func() { - defer client.wg.Done() + defer c.wg.Done() defer cancel() - if err := client.stateSync(ctx); err != nil { - client.err = err - } else { - client.err = client.finishSync() + if err := strategy.Start(ctx); err != nil { + c.err = err } // notify engine regardless of whether err == nil, // this error will be propagated to the engine when it calls // vm.SetState(snow.Bootstrapping) - log.Info("stateSync completed, notifying engine", "err", client.err) - close(client.StateSyncDone) + log.Info("state sync completed, notifying engine", "err", c.err) + close(c.config.StateSyncDone) }() - return block.StateSyncStatic, nil + + log.Info("state sync started", "mode", block.StateSyncStatic) + return block.StateSyncStatic } -func (client *client) Shutdown() error { - if client.cancel != nil { - client.cancel() +func (c *client) Shutdown() error { + if c.cancel != nil { + c.cancel() } - client.wg.Wait() // wait for the background goroutine to exit + c.wg.Wait() // wait for the background goroutine to exit return nil } -// finishSync is responsible for updating disk and memory pointers so the VM is prepared -// for bootstrapping. Executes any shared memory operations from the atomic trie to shared memory. -func (client *client) finishSync() error { - stateBlock, err := client.State.GetBlock(context.TODO(), ids.ID(client.summary.GetBlockHash())) - if err != nil { - return fmt.Errorf("could not get block by hash from client state: %s", client.summary.GetBlockHash()) - } +// Error returns a non-nil error if one occurred during the sync. +func (c *client) Error() error { return c.err } - wrapper, ok := stateBlock.(*chain.BlockWrapper) - if !ok { - return fmt.Errorf("could not convert block(%T) to *chain.BlockWrapper", wrapper) - } - wrappedBlock := wrapper.Block +// newSyncerRegistry creates a registry with all required syncers for the given summary. +func (c *client) newSyncerRegistry(summary message.Syncable) (*SyncerRegistry, error) { + registry := NewSyncerRegistry() - evmBlockGetter, ok := wrappedBlock.(EthBlockWrapper) - if !ok { - return fmt.Errorf("could not convert block(%T) to evm.EthBlockWrapper", stateBlock) + blockSyncer, err := blocksync.NewSyncer( + c.config.Client, c.config.ChainDB, + summary.GetBlockHash(), summary.Height(), + BlocksToFetch, + ) + if err != nil { + return nil, fmt.Errorf("failed to create block syncer: %w", err) } - block := evmBlockGetter.GetEthBlock() - - if block.Hash() != client.summary.GetBlockHash() { - return fmt.Errorf("attempted to set last summary block to unexpected block hash: (%s != %s)", block.Hash(), client.summary.GetBlockHash()) - } - if block.NumberU64() != client.summary.Height() { - return fmt.Errorf("attempted to set last summary block to unexpected block number: (%d != %d)", block.NumberU64(), client.summary.Height()) + codeQueue, err := statesync.NewCodeQueue(c.config.ChainDB, c.config.StateSyncDone) + if err != nil { + return nil, fmt.Errorf("failed to create code queue: %w", err) } - // BloomIndexer needs to know that some parts of the chain are not available - // and cannot be indexed. This is done by calling [AddCheckpoint] here. - // Since the indexer uses sections of size [params.BloomBitsBlocks] (= 4096), - // each block is indexed in section number [blockNumber/params.BloomBitsBlocks]. - // To allow the indexer to start with the block we just synced to, - // we create a checkpoint for its parent. - // Note: This requires assuming the synced block height is divisible - // by [params.BloomBitsBlocks]. - parentHeight := block.NumberU64() - 1 - parentHash := block.ParentHash() - client.Chain.BloomIndexer().AddCheckpoint(parentHeight/params.BloomBitsBlocks, parentHash) - - if err := client.Chain.BlockChain().ResetToStateSyncedBlock(block); err != nil { - return err + codeSyncer, err := statesync.NewCodeSyncer(c.config.Client, c.config.ChainDB, codeQueue.CodeHashes()) + if err != nil { + return nil, fmt.Errorf("failed to create code syncer: %w", err) } - if client.Extender != nil { - if err := client.Extender.OnFinishBeforeCommit(client.LastAcceptedHeight, client.summary); err != nil { - return err - } + stateSyncer, err := statesync.NewSyncer( + c.config.Client, c.config.ChainDB, + summary.GetBlockRoot(), + codeQueue, c.config.RequestSize, + ) + if err != nil { + return nil, fmt.Errorf("failed to create EVM state syncer: %w", err) } - if err := client.commitVMMarkers(); err != nil { - return fmt.Errorf("error updating vm markers, height=%d, hash=%s, err=%w", block.NumberU64(), block.Hash(), err) - } + syncers := []syncpkg.Syncer{blockSyncer, codeSyncer, stateSyncer} - if err := client.State.SetLastAcceptedBlock(wrappedBlock); err != nil { - return err + if c.config.Extender != nil { + atomicSyncer, err := c.config.Extender.CreateSyncer(c.config.Client, c.config.VerDB, summary) + if err != nil { + return nil, fmt.Errorf("failed to create atomic syncer: %w", err) + } + syncers = append(syncers, atomicSyncer) } - if client.Extender != nil { - return client.Extender.OnFinishAfterCommit(block.NumberU64()) + for _, s := range syncers { + if err := registry.Register(s); err != nil { + return nil, fmt.Errorf("failed to register %s syncer: %w", s.Name(), err) + } } - return nil -} - -// commitVMMarkers updates the following markers in the VM's database -// and commits them atomically: -// - updates atomic trie so it will have necessary metadata for the last committed root -// - updates atomic trie so it will resume applying operations to shared memory on initialize -// - updates lastAcceptedKey -// - removes state sync progress markers -func (client *client) commitVMMarkers() error { - // Mark the previously last accepted block for the shared memory cursor, so that we will execute shared - // memory operations from the previously last accepted block to [vm.syncSummary] when ApplyToSharedMemory - // is called. - id := ids.ID(client.summary.GetBlockHash()) - if err := client.Acceptor.PutLastAcceptedID(id); err != nil { - return err - } - if err := client.MetadataDB.Delete(stateSyncSummaryKey); err != nil { - return err - } - return client.VerDB.Commit() + return registry, nil } - -// Error returns a non-nil error if one occurred during the sync. -func (client *client) Error() error { return client.err } diff --git a/graft/coreth/plugin/evm/vmsync/finalizer.go b/graft/coreth/plugin/evm/vmsync/finalizer.go new file mode 100644 index 000000000000..b803d4e6fb80 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/finalizer.go @@ -0,0 +1,152 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + "errors" + "fmt" + + "github.com/ava-labs/libevm/core/types" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/versiondb" + "github.com/ava-labs/avalanchego/graft/coreth/eth" + "github.com/ava-labs/avalanchego/graft/coreth/params" + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/vms/components/chain" + + syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" +) + +// EthBlockWrapper can be implemented by a concrete block wrapper type to +// return *types.Block, which is needed to update chain pointers at the +// end of the sync operation. +type EthBlockWrapper interface { + GetEthBlock() *types.Block +} + +var ( + errBlockNotFound = errors.New("block not found in state") + errInvalidBlockType = errors.New("invalid block wrapper type") + errBlockHashMismatch = errors.New("block hash mismatch") + errBlockHeightMismatch = errors.New("block height mismatch") + errFinalizeCancelled = errors.New("finalize cancelled") + errCommitMarkers = errors.New("failed to commit VM markers") +) + +// finalizer handles VM state finalization after sync completes. +type finalizer struct { + chain *eth.Ethereum + state *chain.State + acceptor BlockAcceptor + verDB *versiondb.Database + metadataDB database.Database + extender syncpkg.Extender + lastAcceptedHeight uint64 +} + +// newFinalizer creates a new finalizer with the given dependencies. +func newFinalizer( + chain *eth.Ethereum, + state *chain.State, + acceptor BlockAcceptor, + verDB *versiondb.Database, + metadataDB database.Database, + extender syncpkg.Extender, + lastAcceptedHeight uint64, +) *finalizer { + return &finalizer{ + chain: chain, + state: state, + acceptor: acceptor, + verDB: verDB, + metadataDB: metadataDB, + extender: extender, + lastAcceptedHeight: lastAcceptedHeight, + } +} + +// finalize updates disk and memory pointers so the VM is prepared for bootstrapping. +// Executes any shared memory operations from the atomic trie to shared memory. +func (f *finalizer) finalize(ctx context.Context, summary message.Syncable) error { + stateBlock, err := f.state.GetBlock(ctx, ids.ID(summary.GetBlockHash())) + if err != nil { + return fmt.Errorf("%w: hash=%s", errBlockNotFound, summary.GetBlockHash()) + } + + wrapper, ok := stateBlock.(*chain.BlockWrapper) + if !ok { + return fmt.Errorf("%w: got %T, want *chain.BlockWrapper", errInvalidBlockType, stateBlock) + } + wrappedBlock := wrapper.Block + + evmBlockGetter, ok := wrappedBlock.(EthBlockWrapper) + if !ok { + return fmt.Errorf("%w: got %T, want EthBlockWrapper", errInvalidBlockType, wrappedBlock) + } + + block := evmBlockGetter.GetEthBlock() + + if block.Hash() != summary.GetBlockHash() { + return fmt.Errorf("%w: got %s, want %s", errBlockHashMismatch, block.Hash(), summary.GetBlockHash()) + } + if block.NumberU64() != summary.Height() { + return fmt.Errorf("%w: got %d, want %d", errBlockHeightMismatch, block.NumberU64(), summary.Height()) + } + + // BloomIndexer needs to know that some parts of the chain are not available + // and cannot be indexed. This is done by calling [AddCheckpoint] here. + // Since the indexer uses sections of size [params.BloomBitsBlocks] (= 4096), + // each block is indexed in section number [blockNumber/params.BloomBitsBlocks]. + // To allow the indexer to start with the block we just synced to, + // we create a checkpoint for its parent. + // Note: This requires assuming the synced block height is divisible + // by [params.BloomBitsBlocks]. + parentHeight := block.NumberU64() - 1 + parentHash := block.ParentHash() + f.chain.BloomIndexer().AddCheckpoint(parentHeight/params.BloomBitsBlocks, parentHash) + + if err := ctx.Err(); err != nil { + return fmt.Errorf("%w: %w", errFinalizeCancelled, err) + } + if err := f.chain.BlockChain().ResetToStateSyncedBlock(block); err != nil { + return err + } + + if f.extender != nil { + if err := f.extender.OnFinishBeforeCommit(f.lastAcceptedHeight, summary); err != nil { + return err + } + } + + if err := f.commitMarkers(summary); err != nil { + return fmt.Errorf("%w: height=%d, hash=%s: %w", errCommitMarkers, block.NumberU64(), block.Hash(), err) + } + + if err := f.state.SetLastAcceptedBlock(wrappedBlock); err != nil { + return err + } + + if f.extender != nil { + if err := f.extender.OnFinishAfterCommit(block.NumberU64()); err != nil { + return err + } + } + + return nil +} + +// commitMarkers updates VM database markers atomically. +func (f *finalizer) commitMarkers(summary message.Syncable) error { + id := ids.ID(summary.GetBlockHash()) + if err := f.acceptor.PutLastAcceptedID(id); err != nil { + return err + } + if err := f.metadataDB.Delete(stateSyncSummaryKey); err != nil { + return err + } + return f.verDB.Commit() +} diff --git a/graft/coreth/plugin/evm/vmsync/strategy_static.go b/graft/coreth/plugin/evm/vmsync/strategy_static.go new file mode 100644 index 000000000000..ba1d6c7aae67 --- /dev/null +++ b/graft/coreth/plugin/evm/vmsync/strategy_static.go @@ -0,0 +1,38 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package vmsync + +import ( + "context" + + "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" +) + +var _ SyncStrategy = (*staticStrategy)(nil) + +// staticStrategy runs syncers sequentially without block queueing. +// This is the default sync mode where all syncers complete before +// finalization, with no concurrent block processing. +type staticStrategy struct { + registry *SyncerRegistry + finalizer *finalizer + summary message.Syncable +} + +func newStaticStrategy(registry *SyncerRegistry, finalizer *finalizer, summary message.Syncable) *staticStrategy { + return &staticStrategy{ + registry: registry, + finalizer: finalizer, + summary: summary, + } +} + +// Start begins the sync process and blocks until completion or error. +// For static sync, this runs all syncers and then finalizes the VM state. +func (s *staticStrategy) Start(ctx context.Context) error { + if err := s.registry.RunSyncerTasks(ctx, s.summary); err != nil { + return err + } + return s.finalizer.finalize(ctx, s.summary) +} From a5f7cbc13532f45ab80f577c315ed0838f061a34 Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Wed, 3 Dec 2025 23:19:05 +0200 Subject: [PATCH 2/3] refactor(vmsync): refinements on sync strategy definition --- graft/coreth/plugin/evm/vmsync/client.go | 12 ++++++------ graft/coreth/plugin/evm/vmsync/strategy_static.go | 10 ++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/graft/coreth/plugin/evm/vmsync/client.go b/graft/coreth/plugin/evm/vmsync/client.go index efd7c4a72076..c2e77a7c18b8 100644 --- a/graft/coreth/plugin/evm/vmsync/client.go +++ b/graft/coreth/plugin/evm/vmsync/client.go @@ -17,14 +17,14 @@ import ( "github.com/ava-labs/avalanchego/graft/coreth/core/state/snapshot" "github.com/ava-labs/avalanchego/graft/coreth/eth" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/message" + "github.com/ava-labs/avalanchego/graft/coreth/sync/blocksync" + "github.com/ava-labs/avalanchego/graft/coreth/sync/statesync" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/vms/components/chain" syncpkg "github.com/ava-labs/avalanchego/graft/coreth/sync" - "github.com/ava-labs/avalanchego/graft/coreth/sync/blocksync" syncclient "github.com/ava-labs/avalanchego/graft/coreth/sync/client" - "github.com/ava-labs/avalanchego/graft/coreth/sync/statesync" ) // BlocksToFetch is the number of the block parents the state syncs to. @@ -32,7 +32,7 @@ import ( const BlocksToFetch = 256 var ( - errSkipSync = fmt.Errorf("skip sync") + errSkipSync = errors.New("skip sync") stateSyncSummaryKey = []byte("stateSyncSummary") ) @@ -47,7 +47,7 @@ type BlockAcceptor interface { // Implementations handle the sync lifecycle differently based on sync mode. type SyncStrategy interface { // Start begins the sync process and blocks until completion or error. - Start(ctx context.Context) error + Start(ctx context.Context, summary message.Syncable) error } type ClientConfig struct { @@ -170,7 +170,7 @@ func (c *client) acceptSyncSummary(summary message.Syncable) (block.StateSyncMod c.config.LastAcceptedHeight, ) - strategy := newStaticStrategy(registry, finalizer, summary) + strategy := newStaticStrategy(registry, finalizer) return c.startAsync(strategy), nil } @@ -228,7 +228,7 @@ func (c *client) startAsync(strategy SyncStrategy) block.StateSyncMode { defer c.wg.Done() defer cancel() - if err := strategy.Start(ctx); err != nil { + if err := strategy.Start(ctx, c.resumableSummary); err != nil { c.err = err } // notify engine regardless of whether err == nil, diff --git a/graft/coreth/plugin/evm/vmsync/strategy_static.go b/graft/coreth/plugin/evm/vmsync/strategy_static.go index ba1d6c7aae67..88afed6309ad 100644 --- a/graft/coreth/plugin/evm/vmsync/strategy_static.go +++ b/graft/coreth/plugin/evm/vmsync/strategy_static.go @@ -17,22 +17,20 @@ var _ SyncStrategy = (*staticStrategy)(nil) type staticStrategy struct { registry *SyncerRegistry finalizer *finalizer - summary message.Syncable } -func newStaticStrategy(registry *SyncerRegistry, finalizer *finalizer, summary message.Syncable) *staticStrategy { +func newStaticStrategy(registry *SyncerRegistry, finalizer *finalizer) *staticStrategy { return &staticStrategy{ registry: registry, finalizer: finalizer, - summary: summary, } } // Start begins the sync process and blocks until completion or error. // For static sync, this runs all syncers and then finalizes the VM state. -func (s *staticStrategy) Start(ctx context.Context) error { - if err := s.registry.RunSyncerTasks(ctx, s.summary); err != nil { +func (s *staticStrategy) Start(ctx context.Context, summary message.Syncable) error { + if err := s.registry.RunSyncerTasks(ctx, summary); err != nil { return err } - return s.finalizer.finalize(ctx, s.summary) + return s.finalizer.finalize(ctx, summary) } From 0283266ae4a69eca623f58998abc57d513423d2c Mon Sep 17 00:00:00 2001 From: Tsvetan Dimitrov Date: Wed, 3 Dec 2025 23:33:07 +0200 Subject: [PATCH 3/3] fix(vmsync): pass summary to startAsync instead of using resumableSummary --- graft/coreth/plugin/evm/vmsync/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/graft/coreth/plugin/evm/vmsync/client.go b/graft/coreth/plugin/evm/vmsync/client.go index c2e77a7c18b8..7ad0ef090350 100644 --- a/graft/coreth/plugin/evm/vmsync/client.go +++ b/graft/coreth/plugin/evm/vmsync/client.go @@ -172,7 +172,7 @@ func (c *client) acceptSyncSummary(summary message.Syncable) (block.StateSyncMod strategy := newStaticStrategy(registry, finalizer) - return c.startAsync(strategy), nil + return c.startAsync(strategy, summary), nil } // prepareForSync handles resume check and snapshot wipe before sync starts. @@ -219,7 +219,7 @@ func (c *client) prepareForSync(summary message.Syncable) error { } // startAsync launches the sync strategy in a background goroutine. -func (c *client) startAsync(strategy SyncStrategy) block.StateSyncMode { +func (c *client) startAsync(strategy SyncStrategy, summary message.Syncable) block.StateSyncMode { ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel @@ -228,7 +228,7 @@ func (c *client) startAsync(strategy SyncStrategy) block.StateSyncMode { defer c.wg.Done() defer cancel() - if err := strategy.Start(ctx, c.resumableSummary); err != nil { + if err := strategy.Start(ctx, summary); err != nil { c.err = err } // notify engine regardless of whether err == nil,