Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,41 @@ processors:
addr: "localhost:9000"
database: "default"
table: "canonical_execution_transaction_structlog"
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
# bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10)
# bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5)
# bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s)

# Aggregated structlog processor (call frame level aggregation)
transactionStructlogAgg:
enabled: false
addr: "localhost:9000"
database: "default"
table: "canonical_execution_transaction_structlog_agg"
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
# bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10)
# bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5)
# bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s)

# Simple transaction processor (lightweight - no debug traces)
transactionSimple:
enabled: false
addr: "localhost:9000"
database: "default"
table: "execution_transaction"
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)
# bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10)
# bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5)
# bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s)

# Application settings
shutdownTimeout: 6m
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/prometheus/client_golang v1.20.5
github.com/redis/go-redis/v9 v9.17.2
github.com/sirupsen/logrus v1.9.3
github.com/sony/gobreaker/v2 v2.4.0
github.com/spf13/cobra v1.10.1
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.40.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/sony/gobreaker/v2 v2.4.0 h1:g2KJRW1Ubty3+ZOcSEUN7K+REQJdN6yo6XvaML+jptg=
github.com/sony/gobreaker/v2 v2.4.0/go.mod h1:pTyFJgcZ3h2tdQVLZZruK2C0eoFL1fb/G83wK1ZQl+s=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,20 @@ var (
Name: "execution_processor_row_buffer_pending_tasks",
Help: "Current number of tasks waiting for their rows to be flushed",
}, []string{"network", "processor", "table"})

RowBufferInflightFlushes = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "execution_processor_row_buffer_inflight_flushes",
Help: "Number of flush operations currently in progress",
}, []string{"network", "processor", "table"})

// Circuit breaker metrics for row buffer.
RowBufferCircuitOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "execution_processor_row_buffer_circuit_open",
Help: "Whether circuit breaker is open (1) or closed (0)",
}, []string{"network", "processor", "table"})

RowBufferCircuitRejections = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "execution_processor_row_buffer_circuit_rejections_total",
Help: "Total number of flushes rejected by circuit breaker",
}, []string{"network", "processor", "table"})
)
58 changes: 57 additions & 1 deletion pkg/ethereum/execution/geth/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/sirupsen/logrus"

pcommon "github.com/ethpandaops/execution-processor/pkg/common"
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
Expand Down Expand Up @@ -73,13 +74,56 @@ func (n *RPCNode) blockByNumber(ctx context.Context, blockNumber *big.Int) (exec
return NewBlockAdapter(block), nil
}

// maxBatchSize is the maximum number of blocks to request in a single batch RPC call.
// Most RPC nodes have a default limit of 100 (e.g., Erigon's --rpc.batch.limit).
const maxBatchSize = 100

// blocksByNumbers fetches multiple blocks using batch RPC calls.
// Returns blocks up to the first not-found (contiguous only).
// Large requests are automatically chunked to respect RPC batch limits.
func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
if len(numbers) == 0 {
return []execution.Block{}, nil
}

// If request exceeds batch limit, chunk it
if len(numbers) > maxBatchSize {
return n.blocksByNumbersChunked(ctx, numbers)
}

return n.blocksByNumbersBatch(ctx, numbers)
}

// blocksByNumbersChunked fetches blocks in chunks to respect RPC batch limits.
func (n *RPCNode) blocksByNumbersChunked(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
allBlocks := make([]execution.Block, 0, len(numbers))

for i := 0; i < len(numbers); i += maxBatchSize {
end := i + maxBatchSize
if end > len(numbers) {
end = len(numbers)
}

chunk := numbers[i:end]

blocks, err := n.blocksByNumbersBatch(ctx, chunk)
if err != nil {
return allBlocks, err
}

allBlocks = append(allBlocks, blocks...)

// If we got fewer blocks than requested, a block was not found - stop for contiguity
if len(blocks) < len(chunk) {
break
}
}

return allBlocks, nil
}

// blocksByNumbersBatch fetches a single batch of blocks (must be <= maxBatchSize).
func (n *RPCNode) blocksByNumbersBatch(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) {
start := time.Now()
network := n.Metadata().ChainID()

Expand Down Expand Up @@ -132,26 +176,38 @@ func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]ex
// Check for individual call error - stop at first error for contiguity
// We intentionally don't return this error as we want partial results
if elem.Error != nil {
n.log.WithError(elem.Error).WithField("block_index", i).Debug("Batch element error, stopping")

break
}

// Check for nil/not-found block (null JSON response)
if results[i] == nil || len(*results[i]) == 0 || string(*results[i]) == "null" {
// Block not found - stop here for contiguity
n.log.WithFields(logrus.Fields{
"block_index": i,
"block_number": numbers[i].String(),
}).Debug("Block not found in batch, stopping")

break
}

// Parse the block from JSON
block, parseErr := parseBlockFromJSON(*results[i])
if parseErr != nil {
// Parse error - stop here for contiguity
n.log.WithError(parseErr).WithFields(logrus.Fields{
"block_index": i,
"block_number": numbers[i].String(),
}).Warn("Failed to parse block from JSON, stopping batch")

break
}

blocks = append(blocks, NewBlockAdapter(block))
}

return blocks, nil //nolint:nilerr // Intentionally returning partial results for contiguity
return blocks, nil
}

// toBlockNumArg converts a block number to the RPC argument format.
Expand Down
5 changes: 2 additions & 3 deletions pkg/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,16 +564,15 @@ func (m *Manager) processBlocks(ctx context.Context) bool {
} else {
workDone = true

// Track processing duration
// Track processing duration (block count is tracked by the processor itself)
duration := time.Since(startTime)

common.BlockProcessingDuration.WithLabelValues(m.network.Name, name).Observe(duration.Seconds())
common.BlocksProcessed.WithLabelValues(m.network.Name, name).Inc()

m.log.WithFields(logrus.Fields{
"processor": name,
"duration": duration,
}).Debug("Successfully processed block")
}).Debug("Successfully processed blocks")
}

// Update head distance metric (regardless of success/failure to track current distance)
Expand Down
14 changes: 14 additions & 0 deletions pkg/processor/transaction/simple/block_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,24 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error {
}).Debug("Fetched batch of blocks")

// Process each block, stopping on first error
processedCount := 0

for _, block := range blocks {
if processErr := p.processBlock(ctx, block); processErr != nil {
// Record blocks that were successfully processed before the error
if processedCount > 0 {
common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount))
}

return processErr
}

processedCount++
}

// Record all successfully processed blocks
if processedCount > 0 {
common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount))
}

return nil
Expand Down
14 changes: 10 additions & 4 deletions pkg/processor/transaction/simple/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (

// Default buffer configuration values.
const (
DefaultBufferMaxRows = 100000
DefaultBufferFlushInterval = time.Second
DefaultBufferMaxRows = 100000
DefaultBufferFlushInterval = time.Second
DefaultBufferMaxConcurrentFlushes = 10
DefaultBufferCircuitBreakerMaxFailures = 5
DefaultBufferCircuitBreakerTimeout = 60 * time.Second
)

// Config holds configuration for the simple transaction processor.
Expand All @@ -20,8 +23,11 @@ type Config struct {
Table string `yaml:"table"`

// Row buffer settings for batched ClickHouse inserts
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10
BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5
BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s

// Block completion tracking
MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2
Expand Down
34 changes: 25 additions & 9 deletions pkg/processor/transaction/simple/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ func New(deps *Dependencies, config *Config) (*Processor, error) {
config.BufferFlushInterval = DefaultBufferFlushInterval
}

if config.BufferMaxConcurrentFlushes <= 0 {
config.BufferMaxConcurrentFlushes = DefaultBufferMaxConcurrentFlushes
}

if config.BufferCircuitBreakerMaxFailures <= 0 {
config.BufferCircuitBreakerMaxFailures = DefaultBufferCircuitBreakerMaxFailures
}

if config.BufferCircuitBreakerTimeout <= 0 {
config.BufferCircuitBreakerTimeout = DefaultBufferCircuitBreakerTimeout
}

log := deps.Log.WithField("processor", ProcessorName)

// Create the limiter for shared functionality
Expand Down Expand Up @@ -129,21 +141,25 @@ func New(deps *Dependencies, config *Config) (*Processor, error) {
// Create the row buffer with the flush function
processor.rowBuffer = rowbuffer.New(
rowbuffer.Config{
MaxRows: config.BufferMaxRows,
FlushInterval: config.BufferFlushInterval,
Network: deps.Network.Name,
Processor: ProcessorName,
Table: config.Table,
MaxRows: config.BufferMaxRows,
FlushInterval: config.BufferFlushInterval,
MaxConcurrentFlushes: config.BufferMaxConcurrentFlushes,
CircuitBreakerMaxFailures: config.BufferCircuitBreakerMaxFailures,
CircuitBreakerTimeout: config.BufferCircuitBreakerTimeout,
Network: deps.Network.Name,
Processor: ProcessorName,
Table: config.Table,
},
processor.flushRows,
log,
)

processor.log.WithFields(logrus.Fields{
"network": processor.network.Name,
"max_pending_block_range": config.MaxPendingBlockRange,
"buffer_max_rows": config.BufferMaxRows,
"buffer_flush_interval": config.BufferFlushInterval,
"network": processor.network.Name,
"max_pending_block_range": config.MaxPendingBlockRange,
"buffer_max_rows": config.BufferMaxRows,
"buffer_flush_interval": config.BufferFlushInterval,
"buffer_max_concurrent_flushes": config.BufferMaxConcurrentFlushes,
}).Info("Simple transaction processor initialized")

return processor, nil
Expand Down
16 changes: 15 additions & 1 deletion pkg/processor/transaction/structlog/block_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,24 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error {
}).Debug("Fetched batch of blocks")

// Process each block, stopping on first error
processedCount := 0

for _, block := range blocks {
if processErr := p.processBlock(ctx, block); processErr != nil {
// Record blocks that were successfully processed before the error
if processedCount > 0 {
common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount))
}

return processErr
}

processedCount++
}

// Record all successfully processed blocks
if processedCount > 0 {
common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount))
}

return nil
Expand All @@ -156,7 +170,7 @@ func (p *Processor) handleBlockNotFound(ctx context.Context, node execution.Node
chainTip := new(big.Int).SetUint64(*latestBlock)
diff := new(big.Int).Sub(nextBlock, chainTip).Int64()

if diff <= 5 { // Within 5 blocks of chain tip
if diff > 0 && diff <= 5 { // 1-5 blocks ahead of chain tip, might appear soon
p.log.WithFields(logrus.Fields{
"network": p.network.Name,
"block_number": nextBlock,
Expand Down
14 changes: 10 additions & 4 deletions pkg/processor/transaction/structlog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (

// Default buffer configuration values.
const (
DefaultBufferMaxRows = 100000
DefaultBufferFlushInterval = time.Second
DefaultBufferMaxRows = 100000
DefaultBufferFlushInterval = time.Second
DefaultBufferMaxConcurrentFlushes = 10
DefaultBufferCircuitBreakerMaxFailures = 5
DefaultBufferCircuitBreakerTimeout = 60 * time.Second
)

// Config holds configuration for transaction structlog processor.
Expand All @@ -20,8 +23,11 @@ type Config struct {
Table string `yaml:"table"`

// Row buffer settings for batched ClickHouse inserts
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000
BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s
BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10
BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5
BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s

// Block completion tracking
MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2
Expand Down
Loading