Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions graft/coreth/plugin/evm/atomic/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ func (s *Syncer) Sync(ctx context.Context) error {
return s.syncer.Sync(ctx)
}

func (*Syncer) UpdateTarget(_ message.Syncable) error {
return nil
}

func (*Syncer) Finalize(_ context.Context) error {
return nil
}

// addZeroes returns the big-endian representation of `height`, prefixed with [common.HashLength] zeroes.
func addZeroes(height uint64) []byte {
// Key format is [height(8 bytes)][blockchainID(32 bytes)]. Start should be the
Expand Down
94 changes: 94 additions & 0 deletions graft/coreth/plugin/evm/vmsync/block_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package vmsync

import "sync"

// BlockOperationType represents the type of operation to perform on a block.
type BlockOperationType int

const (
OpAccept BlockOperationType = iota
OpReject
OpVerify
)

// String returns the string representation of the block operation.
func (op BlockOperationType) String() string {
switch op {
case OpAccept:
return "accept"
case OpReject:
return "reject"
case OpVerify:
return "verify"
default:
return "unknown"
}
}

// blockOperation represents a queued block operation.
type blockOperation struct {
block EthBlockWrapper
operation BlockOperationType
}

// blockQueue buffers block operations (accept/reject/verify) that arrive while
// the coordinator is in the Running state. Operations are processed in FIFO order.
// It is cleared (drained) on UpdateSyncTarget to avoid drops and is snapshotted
// at finalization via DequeueBatch. Enqueue is always allowed; a DequeueBatch
// only captures the current buffered operations and clears them, and new enqueues
// after the snapshot are not part of that batch.
type blockQueue struct {
mu sync.Mutex
// buffered operations accumulated before finalization
items []blockOperation
}

// newBlockQueue creates a new empty queue.
func newBlockQueue() *blockQueue {
return &blockQueue{}
}

// enqueue appends a block operation to the buffer. Returns true if the operation
// was queued, false if the block is nil.
func (q *blockQueue) enqueue(b EthBlockWrapper, op BlockOperationType) bool {
if b == nil {
return false
}
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, blockOperation{
block: b,
operation: op,
})
return true
}

// dequeueBatch returns the current buffered operations and clears the buffer. New
// arrivals after the snapshot are not included and remain buffered for later.
func (q *blockQueue) dequeueBatch() []blockOperation {
q.mu.Lock()
defer q.mu.Unlock()
out := q.items
q.items = nil
return out
}

// removeBelowHeight removes all queued blocks with height <= targetHeight.
// This is called after UpdateSyncTarget to remove blocks that will never be executed
// because the sync target has advanced past them.
func (q *blockQueue) removeBelowHeight(targetHeight uint64) {
q.mu.Lock()
defer q.mu.Unlock()

filtered := q.items[:0]
for _, op := range q.items {
ethBlock := op.block.GetEthBlock()
if ethBlock != nil && ethBlock.NumberU64() > targetHeight {
filtered = append(filtered, op)
}
}
q.items = filtered
}
76 changes: 76 additions & 0 deletions graft/coreth/plugin/evm/vmsync/block_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package vmsync

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestBlockQueue_EnqueueAndDequeue(t *testing.T) {
q := newBlockQueue()

// Nil block should be rejected.
require.False(t, q.enqueue(nil, OpAccept))

// Enqueue blocks.
for i := uint64(100); i < 105; i++ {
require.True(t, q.enqueue(newMockBlock(i), OpAccept))
}

// Dequeue returns all in FIFO order and clears queue.
batch := q.dequeueBatch()
require.Len(t, batch, 5)
for i, op := range batch {
require.Equal(t, uint64(100+i), op.block.GetEthBlock().NumberU64())
}

// Queue is now empty.
require.Empty(t, q.dequeueBatch())
}

func TestBlockQueue_RemoveBelowHeight(t *testing.T) {
q := newBlockQueue()

// Enqueue blocks at heights 100-110.
for i := uint64(100); i <= 110; i++ {
q.enqueue(newMockBlock(i), OpAccept)
}

// Remove blocks at or below height 105.
q.removeBelowHeight(105)

// Only blocks > 105 should remain (106, 107, 108, 109, 110).
batch := q.dequeueBatch()
require.Len(t, batch, 5)
require.Equal(t, uint64(106), batch[0].block.GetEthBlock().NumberU64())
}

func TestBlockQueue_ConcurrentAccess(t *testing.T) {
t.Parallel()

q := newBlockQueue()
const numGoroutines = 10
const numOps = 100

var wg sync.WaitGroup
wg.Add(numGoroutines)

for g := 0; g < numGoroutines; g++ {
go func(id int) {
defer wg.Done()
for i := 0; i < numOps; i++ {
q.enqueue(newMockBlock(uint64(id*numOps+i)), OpAccept)
}
}(g)
}

wg.Wait()

// All operations should have been enqueued.
batch := q.dequeueBatch()
require.Len(t, batch, numGoroutines*numOps)
}
Loading
Loading