diff --git a/example_config.yaml b/example_config.yaml index 619339f..b7762cb 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -56,10 +56,13 @@ processors: addr: "localhost:9000" database: "default" table: "canonical_execution_transaction_structlog" - # debug: false # Enable debug logging for ClickHouse queries - # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) - # bufferMaxRows: 100000 # Max rows before flush (default: 100000) - # bufferFlushInterval: "1s" # Max time before flush (default: 1s) + # debug: false # Enable debug logging for ClickHouse queries + # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) + # bufferMaxRows: 100000 # Max rows before flush (default: 100000) + # bufferFlushInterval: "1s" # Max time before flush (default: 1s) + # bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10) + # bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5) + # bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s) # Aggregated structlog processor (call frame level aggregation) transactionStructlogAgg: @@ -67,10 +70,13 @@ processors: addr: "localhost:9000" database: "default" table: "canonical_execution_transaction_structlog_agg" - # debug: false # Enable debug logging for ClickHouse queries - # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) - # bufferMaxRows: 100000 # Max rows before flush (default: 100000) - # bufferFlushInterval: "1s" # Max time before flush (default: 1s) + # debug: false # Enable debug logging for ClickHouse queries + # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) + # bufferMaxRows: 100000 # Max rows before flush (default: 100000) + # bufferFlushInterval: "1s" # Max time before flush (default: 1s) + # bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10) + # bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5) + # bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s) # Simple transaction processor (lightweight - no debug traces) transactionSimple: @@ -78,10 +84,13 @@ processors: addr: "localhost:9000" database: "default" table: "execution_transaction" - # debug: false # Enable debug logging for ClickHouse queries - # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) - # bufferMaxRows: 100000 # Max rows before flush (default: 100000) - # bufferFlushInterval: "1s" # Max time before flush (default: 1s) + # debug: false # Enable debug logging for ClickHouse queries + # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) + # bufferMaxRows: 100000 # Max rows before flush (default: 100000) + # bufferFlushInterval: "1s" # Max time before flush (default: 1s) + # bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10) + # bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5) + # bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s) # Application settings shutdownTimeout: 6m diff --git a/go.mod b/go.mod index 8a32246..1bae8bb 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/redis/go-redis/v9 v9.17.2 github.com/sirupsen/logrus v1.9.3 + github.com/sony/gobreaker/v2 v2.4.0 github.com/spf13/cobra v1.10.1 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.40.0 diff --git a/go.sum b/go.sum index e165a32..fdd7b78 100644 --- a/go.sum +++ b/go.sum @@ -299,6 +299,8 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sony/gobreaker/v2 v2.4.0 h1:g2KJRW1Ubty3+ZOcSEUN7K+REQJdN6yo6XvaML+jptg= +github.com/sony/gobreaker/v2 v2.4.0/go.mod h1:pTyFJgcZ3h2tdQVLZZruK2C0eoFL1fb/G83wK1ZQl+s= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s= diff --git a/pkg/common/metrics.go b/pkg/common/metrics.go index 4ed8f94..b3eba14 100644 --- a/pkg/common/metrics.go +++ b/pkg/common/metrics.go @@ -233,4 +233,20 @@ var ( Name: "execution_processor_row_buffer_pending_tasks", Help: "Current number of tasks waiting for their rows to be flushed", }, []string{"network", "processor", "table"}) + + RowBufferInflightFlushes = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "execution_processor_row_buffer_inflight_flushes", + Help: "Number of flush operations currently in progress", + }, []string{"network", "processor", "table"}) + + // Circuit breaker metrics for row buffer. + RowBufferCircuitOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "execution_processor_row_buffer_circuit_open", + Help: "Whether circuit breaker is open (1) or closed (0)", + }, []string{"network", "processor", "table"}) + + RowBufferCircuitRejections = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "execution_processor_row_buffer_circuit_rejections_total", + Help: "Total number of flushes rejected by circuit breaker", + }, []string{"network", "processor", "table"}) ) diff --git a/pkg/ethereum/execution/geth/rpc.go b/pkg/ethereum/execution/geth/rpc.go index 8dd9dfa..8b469ec 100644 --- a/pkg/ethereum/execution/geth/rpc.go +++ b/pkg/ethereum/execution/geth/rpc.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" + "github.com/sirupsen/logrus" pcommon "github.com/ethpandaops/execution-processor/pkg/common" "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" @@ -73,13 +74,56 @@ func (n *RPCNode) blockByNumber(ctx context.Context, blockNumber *big.Int) (exec return NewBlockAdapter(block), nil } +// maxBatchSize is the maximum number of blocks to request in a single batch RPC call. +// Most RPC nodes have a default limit of 100 (e.g., Erigon's --rpc.batch.limit). +const maxBatchSize = 100 + // blocksByNumbers fetches multiple blocks using batch RPC calls. // Returns blocks up to the first not-found (contiguous only). +// Large requests are automatically chunked to respect RPC batch limits. func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) { if len(numbers) == 0 { return []execution.Block{}, nil } + // If request exceeds batch limit, chunk it + if len(numbers) > maxBatchSize { + return n.blocksByNumbersChunked(ctx, numbers) + } + + return n.blocksByNumbersBatch(ctx, numbers) +} + +// blocksByNumbersChunked fetches blocks in chunks to respect RPC batch limits. +func (n *RPCNode) blocksByNumbersChunked(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) { + allBlocks := make([]execution.Block, 0, len(numbers)) + + for i := 0; i < len(numbers); i += maxBatchSize { + end := i + maxBatchSize + if end > len(numbers) { + end = len(numbers) + } + + chunk := numbers[i:end] + + blocks, err := n.blocksByNumbersBatch(ctx, chunk) + if err != nil { + return allBlocks, err + } + + allBlocks = append(allBlocks, blocks...) + + // If we got fewer blocks than requested, a block was not found - stop for contiguity + if len(blocks) < len(chunk) { + break + } + } + + return allBlocks, nil +} + +// blocksByNumbersBatch fetches a single batch of blocks (must be <= maxBatchSize). +func (n *RPCNode) blocksByNumbersBatch(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) { start := time.Now() network := n.Metadata().ChainID() @@ -132,12 +176,19 @@ func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]ex // Check for individual call error - stop at first error for contiguity // We intentionally don't return this error as we want partial results if elem.Error != nil { + n.log.WithError(elem.Error).WithField("block_index", i).Debug("Batch element error, stopping") + break } // Check for nil/not-found block (null JSON response) if results[i] == nil || len(*results[i]) == 0 || string(*results[i]) == "null" { // Block not found - stop here for contiguity + n.log.WithFields(logrus.Fields{ + "block_index": i, + "block_number": numbers[i].String(), + }).Debug("Block not found in batch, stopping") + break } @@ -145,13 +196,18 @@ func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]ex block, parseErr := parseBlockFromJSON(*results[i]) if parseErr != nil { // Parse error - stop here for contiguity + n.log.WithError(parseErr).WithFields(logrus.Fields{ + "block_index": i, + "block_number": numbers[i].String(), + }).Warn("Failed to parse block from JSON, stopping batch") + break } blocks = append(blocks, NewBlockAdapter(block)) } - return blocks, nil //nolint:nilerr // Intentionally returning partial results for contiguity + return blocks, nil } // toBlockNumArg converts a block number to the RPC argument format. diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index c0ede65..12ac099 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -564,16 +564,15 @@ func (m *Manager) processBlocks(ctx context.Context) bool { } else { workDone = true - // Track processing duration + // Track processing duration (block count is tracked by the processor itself) duration := time.Since(startTime) common.BlockProcessingDuration.WithLabelValues(m.network.Name, name).Observe(duration.Seconds()) - common.BlocksProcessed.WithLabelValues(m.network.Name, name).Inc() m.log.WithFields(logrus.Fields{ "processor": name, "duration": duration, - }).Debug("Successfully processed block") + }).Debug("Successfully processed blocks") } // Update head distance metric (regardless of success/failure to track current distance) diff --git a/pkg/processor/transaction/simple/block_processing.go b/pkg/processor/transaction/simple/block_processing.go index 6ed3dd9..60dd62c 100644 --- a/pkg/processor/transaction/simple/block_processing.go +++ b/pkg/processor/transaction/simple/block_processing.go @@ -137,10 +137,24 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { }).Debug("Fetched batch of blocks") // Process each block, stopping on first error + processedCount := 0 + for _, block := range blocks { if processErr := p.processBlock(ctx, block); processErr != nil { + // Record blocks that were successfully processed before the error + if processedCount > 0 { + common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) + } + return processErr } + + processedCount++ + } + + // Record all successfully processed blocks + if processedCount > 0 { + common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) } return nil diff --git a/pkg/processor/transaction/simple/config.go b/pkg/processor/transaction/simple/config.go index d379e67..f4ded18 100644 --- a/pkg/processor/transaction/simple/config.go +++ b/pkg/processor/transaction/simple/config.go @@ -9,8 +9,11 @@ import ( // Default buffer configuration values. const ( - DefaultBufferMaxRows = 100000 - DefaultBufferFlushInterval = time.Second + DefaultBufferMaxRows = 100000 + DefaultBufferFlushInterval = time.Second + DefaultBufferMaxConcurrentFlushes = 10 + DefaultBufferCircuitBreakerMaxFailures = 5 + DefaultBufferCircuitBreakerTimeout = 60 * time.Second ) // Config holds configuration for the simple transaction processor. @@ -20,8 +23,11 @@ type Config struct { Table string `yaml:"table"` // Row buffer settings for batched ClickHouse inserts - BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 - BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s + BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 + BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s + BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10 + BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5 + BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s // Block completion tracking MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2 diff --git a/pkg/processor/transaction/simple/processor.go b/pkg/processor/transaction/simple/processor.go index 2550aa7..2d1fd50 100644 --- a/pkg/processor/transaction/simple/processor.go +++ b/pkg/processor/transaction/simple/processor.go @@ -85,6 +85,18 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { config.BufferFlushInterval = DefaultBufferFlushInterval } + if config.BufferMaxConcurrentFlushes <= 0 { + config.BufferMaxConcurrentFlushes = DefaultBufferMaxConcurrentFlushes + } + + if config.BufferCircuitBreakerMaxFailures <= 0 { + config.BufferCircuitBreakerMaxFailures = DefaultBufferCircuitBreakerMaxFailures + } + + if config.BufferCircuitBreakerTimeout <= 0 { + config.BufferCircuitBreakerTimeout = DefaultBufferCircuitBreakerTimeout + } + log := deps.Log.WithField("processor", ProcessorName) // Create the limiter for shared functionality @@ -129,21 +141,25 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { // Create the row buffer with the flush function processor.rowBuffer = rowbuffer.New( rowbuffer.Config{ - MaxRows: config.BufferMaxRows, - FlushInterval: config.BufferFlushInterval, - Network: deps.Network.Name, - Processor: ProcessorName, - Table: config.Table, + MaxRows: config.BufferMaxRows, + FlushInterval: config.BufferFlushInterval, + MaxConcurrentFlushes: config.BufferMaxConcurrentFlushes, + CircuitBreakerMaxFailures: config.BufferCircuitBreakerMaxFailures, + CircuitBreakerTimeout: config.BufferCircuitBreakerTimeout, + Network: deps.Network.Name, + Processor: ProcessorName, + Table: config.Table, }, processor.flushRows, log, ) processor.log.WithFields(logrus.Fields{ - "network": processor.network.Name, - "max_pending_block_range": config.MaxPendingBlockRange, - "buffer_max_rows": config.BufferMaxRows, - "buffer_flush_interval": config.BufferFlushInterval, + "network": processor.network.Name, + "max_pending_block_range": config.MaxPendingBlockRange, + "buffer_max_rows": config.BufferMaxRows, + "buffer_flush_interval": config.BufferFlushInterval, + "buffer_max_concurrent_flushes": config.BufferMaxConcurrentFlushes, }).Info("Simple transaction processor initialized") return processor, nil diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 19a292b..651b504 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -140,10 +140,24 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { }).Debug("Fetched batch of blocks") // Process each block, stopping on first error + processedCount := 0 + for _, block := range blocks { if processErr := p.processBlock(ctx, block); processErr != nil { + // Record blocks that were successfully processed before the error + if processedCount > 0 { + common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) + } + return processErr } + + processedCount++ + } + + // Record all successfully processed blocks + if processedCount > 0 { + common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) } return nil @@ -156,7 +170,7 @@ func (p *Processor) handleBlockNotFound(ctx context.Context, node execution.Node chainTip := new(big.Int).SetUint64(*latestBlock) diff := new(big.Int).Sub(nextBlock, chainTip).Int64() - if diff <= 5 { // Within 5 blocks of chain tip + if diff > 0 && diff <= 5 { // 1-5 blocks ahead of chain tip, might appear soon p.log.WithFields(logrus.Fields{ "network": p.network.Name, "block_number": nextBlock, diff --git a/pkg/processor/transaction/structlog/config.go b/pkg/processor/transaction/structlog/config.go index e948c1d..0bfc024 100644 --- a/pkg/processor/transaction/structlog/config.go +++ b/pkg/processor/transaction/structlog/config.go @@ -9,8 +9,11 @@ import ( // Default buffer configuration values. const ( - DefaultBufferMaxRows = 100000 - DefaultBufferFlushInterval = time.Second + DefaultBufferMaxRows = 100000 + DefaultBufferFlushInterval = time.Second + DefaultBufferMaxConcurrentFlushes = 10 + DefaultBufferCircuitBreakerMaxFailures = 5 + DefaultBufferCircuitBreakerTimeout = 60 * time.Second ) // Config holds configuration for transaction structlog processor. @@ -20,8 +23,11 @@ type Config struct { Table string `yaml:"table"` // Row buffer settings for batched ClickHouse inserts - BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 - BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s + BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 + BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s + BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10 + BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5 + BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s // Block completion tracking MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2 diff --git a/pkg/processor/transaction/structlog/processor.go b/pkg/processor/transaction/structlog/processor.go index f58e69d..c4ad3a6 100644 --- a/pkg/processor/transaction/structlog/processor.go +++ b/pkg/processor/transaction/structlog/processor.go @@ -93,6 +93,18 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { config.BufferFlushInterval = DefaultBufferFlushInterval } + if config.BufferMaxConcurrentFlushes <= 0 { + config.BufferMaxConcurrentFlushes = DefaultBufferMaxConcurrentFlushes + } + + if config.BufferCircuitBreakerMaxFailures <= 0 { + config.BufferCircuitBreakerMaxFailures = DefaultBufferCircuitBreakerMaxFailures + } + + if config.BufferCircuitBreakerTimeout <= 0 { + config.BufferCircuitBreakerTimeout = DefaultBufferCircuitBreakerTimeout + } + log := deps.Log.WithField("processor", ProcessorName) // Create the limiter for shared functionality @@ -138,21 +150,25 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { // Create the row buffer with the flush function processor.rowBuffer = rowbuffer.New( rowbuffer.Config{ - MaxRows: config.BufferMaxRows, - FlushInterval: config.BufferFlushInterval, - Network: deps.Network.Name, - Processor: ProcessorName, - Table: config.Table, + MaxRows: config.BufferMaxRows, + FlushInterval: config.BufferFlushInterval, + MaxConcurrentFlushes: config.BufferMaxConcurrentFlushes, + CircuitBreakerMaxFailures: config.BufferCircuitBreakerMaxFailures, + CircuitBreakerTimeout: config.BufferCircuitBreakerTimeout, + Network: deps.Network.Name, + Processor: ProcessorName, + Table: config.Table, }, processor.flushRows, log, ) processor.log.WithFields(logrus.Fields{ - "network": processor.network.Name, - "max_pending_block_range": config.MaxPendingBlockRange, - "buffer_max_rows": config.BufferMaxRows, - "buffer_flush_interval": config.BufferFlushInterval, + "network": processor.network.Name, + "max_pending_block_range": config.MaxPendingBlockRange, + "buffer_max_rows": config.BufferMaxRows, + "buffer_flush_interval": config.BufferFlushInterval, + "buffer_max_concurrent_flushes": config.BufferMaxConcurrentFlushes, }).Info("Detected network") return processor, nil diff --git a/pkg/processor/transaction/structlog/processor_test.go b/pkg/processor/transaction/structlog/processor_test.go index 6e394a2..8764372 100644 --- a/pkg/processor/transaction/structlog/processor_test.go +++ b/pkg/processor/transaction/structlog/processor_test.go @@ -715,3 +715,128 @@ func TestHeadDistanceMetricLabels(t *testing.T) { }) } } + +func TestHandleBlockNotFound_DiffCalculation(t *testing.T) { + // This test verifies the correct behavior of handleBlockNotFound: + // - Blocks at or behind chain tip (diff <= 0) should NOT trigger "not yet available" + // - Blocks 1-5 ahead of chain tip should trigger "not yet available" (might appear soon) + // - Blocks >5 ahead of chain tip should return "block not found" (something is wrong) + // + // The fixed logic: + // diff := nextBlock - chainTip + // if diff > 0 && diff <= 5 { return "block not yet available" } + testCases := []struct { + name string + nextBlock int64 + chainTip int64 + expectedDiff int64 + shouldBeAvailable bool // true = block should exist (at or behind tip) + shouldWaitForBlock bool // true = "not yet available" message (1-5 ahead) + shouldReturnNotFound bool // true = "block not found" message (>5 ahead) + }{ + { + name: "block far behind chain tip - should exist", + nextBlock: 24365273, + chainTip: 24365743, + expectedDiff: -470, + shouldBeAvailable: true, + shouldWaitForBlock: false, + shouldReturnNotFound: false, + }, + { + name: "block 100 blocks behind chain tip - should exist", + nextBlock: 100, + chainTip: 200, + expectedDiff: -100, + shouldBeAvailable: true, + shouldWaitForBlock: false, + shouldReturnNotFound: false, + }, + { + name: "block at chain tip - should exist", + nextBlock: 200, + chainTip: 200, + expectedDiff: 0, + shouldBeAvailable: true, + shouldWaitForBlock: false, + shouldReturnNotFound: false, + }, + { + name: "block 1 ahead of chain tip - wait for it", + nextBlock: 201, + chainTip: 200, + expectedDiff: 1, + shouldBeAvailable: false, + shouldWaitForBlock: true, + shouldReturnNotFound: false, + }, + { + name: "block 5 ahead of chain tip - wait for it", + nextBlock: 205, + chainTip: 200, + expectedDiff: 5, + shouldBeAvailable: false, + shouldWaitForBlock: true, + shouldReturnNotFound: false, + }, + { + name: "block 6 ahead of chain tip - not found", + nextBlock: 206, + chainTip: 200, + expectedDiff: 6, + shouldBeAvailable: false, + shouldWaitForBlock: false, + shouldReturnNotFound: true, + }, + { + name: "block 10 ahead of chain tip - not found", + nextBlock: 210, + chainTip: 200, + expectedDiff: 10, + shouldBeAvailable: false, + shouldWaitForBlock: false, + shouldReturnNotFound: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + nextBlock := big.NewInt(tc.nextBlock) + chainTip := big.NewInt(tc.chainTip) + + // Calculate diff as the code does + diff := new(big.Int).Sub(nextBlock, chainTip).Int64() + + assert.Equal(t, tc.expectedDiff, diff, "Diff calculation should match") + + // Fixed behavior: diff > 0 && diff <= 5 means "not yet available" + fixedCodeWaitsForBlock := diff > 0 && diff <= 5 + fixedCodeReturnsNotFound := diff > 5 + fixedCodeSaysAvailable := diff <= 0 + + assert.Equal(t, tc.shouldWaitForBlock, fixedCodeWaitsForBlock, + "Fixed code should wait for block when 1-5 blocks ahead") + assert.Equal(t, tc.shouldReturnNotFound, fixedCodeReturnsNotFound, + "Fixed code should return 'not found' when >5 blocks ahead") + assert.Equal(t, tc.shouldBeAvailable, fixedCodeSaysAvailable, + "Fixed code should say available when at or behind chain tip") + + // Verify mutual exclusivity + outcomes := 0 + + if fixedCodeWaitsForBlock { + outcomes++ + } + + if fixedCodeReturnsNotFound { + outcomes++ + } + + if fixedCodeSaysAvailable { + outcomes++ + } + + assert.Equal(t, 1, outcomes, "Exactly one outcome should be true") + }) + } +} diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 598e150..1b93eac 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -140,10 +140,24 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { }).Debug("Fetched batch of blocks") // Process each block, stopping on first error + processedCount := 0 + for _, block := range blocks { if processErr := p.processBlock(ctx, block); processErr != nil { + // Record blocks that were successfully processed before the error + if processedCount > 0 { + common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) + } + return processErr } + + processedCount++ + } + + // Record all successfully processed blocks + if processedCount > 0 { + common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) } return nil @@ -156,7 +170,7 @@ func (p *Processor) handleBlockNotFound(ctx context.Context, node execution.Node chainTip := new(big.Int).SetUint64(*latestBlock) diff := new(big.Int).Sub(nextBlock, chainTip).Int64() - if diff <= 5 { // Within 5 blocks of chain tip + if diff > 0 && diff <= 5 { // 1-5 blocks ahead of chain tip, might appear soon p.log.WithFields(logrus.Fields{ "network": p.network.Name, "block_number": nextBlock, diff --git a/pkg/processor/transaction/structlog_agg/config.go b/pkg/processor/transaction/structlog_agg/config.go index 3e0af8b..2acb376 100644 --- a/pkg/processor/transaction/structlog_agg/config.go +++ b/pkg/processor/transaction/structlog_agg/config.go @@ -9,8 +9,11 @@ import ( // Default buffer configuration values. const ( - DefaultBufferMaxRows = 100000 - DefaultBufferFlushInterval = time.Second + DefaultBufferMaxRows = 100000 + DefaultBufferFlushInterval = time.Second + DefaultBufferMaxConcurrentFlushes = 10 + DefaultBufferCircuitBreakerMaxFailures = 5 + DefaultBufferCircuitBreakerTimeout = 60 * time.Second ) // Config holds configuration for transaction structlog_agg processor. @@ -20,8 +23,11 @@ type Config struct { Table string `yaml:"table"` // Row buffer settings for batched ClickHouse inserts - BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 - BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s + BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 + BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s + BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10 + BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5 + BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s // Block completion tracking MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2 diff --git a/pkg/processor/transaction/structlog_agg/processor.go b/pkg/processor/transaction/structlog_agg/processor.go index 4a765a7..891f45d 100644 --- a/pkg/processor/transaction/structlog_agg/processor.go +++ b/pkg/processor/transaction/structlog_agg/processor.go @@ -103,6 +103,18 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { config.BufferFlushInterval = DefaultBufferFlushInterval } + if config.BufferMaxConcurrentFlushes <= 0 { + config.BufferMaxConcurrentFlushes = DefaultBufferMaxConcurrentFlushes + } + + if config.BufferCircuitBreakerMaxFailures <= 0 { + config.BufferCircuitBreakerMaxFailures = DefaultBufferCircuitBreakerMaxFailures + } + + if config.BufferCircuitBreakerTimeout <= 0 { + config.BufferCircuitBreakerTimeout = DefaultBufferCircuitBreakerTimeout + } + log := deps.Log.WithField("processor", ProcessorName) // Create the limiter for shared functionality @@ -148,21 +160,25 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { // Create the row buffer with the flush function processor.rowBuffer = rowbuffer.New( rowbuffer.Config{ - MaxRows: config.BufferMaxRows, - FlushInterval: config.BufferFlushInterval, - Network: deps.Network.Name, - Processor: ProcessorName, - Table: config.Table, + MaxRows: config.BufferMaxRows, + FlushInterval: config.BufferFlushInterval, + MaxConcurrentFlushes: config.BufferMaxConcurrentFlushes, + CircuitBreakerMaxFailures: config.BufferCircuitBreakerMaxFailures, + CircuitBreakerTimeout: config.BufferCircuitBreakerTimeout, + Network: deps.Network.Name, + Processor: ProcessorName, + Table: config.Table, }, processor.flushRows, log, ) processor.log.WithFields(logrus.Fields{ - "network": processor.network.Name, - "max_pending_block_range": config.MaxPendingBlockRange, - "buffer_max_rows": config.BufferMaxRows, - "buffer_flush_interval": config.BufferFlushInterval, + "network": processor.network.Name, + "max_pending_block_range": config.MaxPendingBlockRange, + "buffer_max_rows": config.BufferMaxRows, + "buffer_flush_interval": config.BufferFlushInterval, + "buffer_max_concurrent_flushes": config.BufferMaxConcurrentFlushes, }).Info("Detected network") return processor, nil diff --git a/pkg/rowbuffer/buffer.go b/pkg/rowbuffer/buffer.go index c38d195..5a73ac9 100644 --- a/pkg/rowbuffer/buffer.go +++ b/pkg/rowbuffer/buffer.go @@ -5,11 +5,13 @@ package rowbuffer import ( "context" + "errors" "fmt" "sync" "time" "github.com/sirupsen/logrus" + "github.com/sony/gobreaker/v2" "github.com/ethpandaops/execution-processor/pkg/common" ) @@ -19,11 +21,16 @@ type FlushFunc[R any] func(ctx context.Context, rows []R) error // Config holds configuration for the row buffer. type Config struct { - MaxRows int // Flush threshold (default: 100000) - FlushInterval time.Duration // Max wait before flush (default: 1s) - Network string // For metrics - Processor string // For metrics - Table string // For metrics + MaxRows int // Flush threshold (default: 100000) + FlushInterval time.Duration // Max wait before flush (default: 1s) + MaxConcurrentFlushes int // Max parallel flush operations (default: 10) + Network string // For metrics + Processor string // For metrics + Table string // For metrics + + // Circuit breaker configuration + CircuitBreakerMaxFailures uint32 // Consecutive failures to trip circuit (default: 5) + CircuitBreakerTimeout time.Duration // Open state duration before half-open (default: 60s) } // waiter represents a task waiting for its rows to be flushed. @@ -46,6 +53,9 @@ type Buffer[R any] struct { stoppedChan chan struct{} wg sync.WaitGroup started bool + + flushSem chan struct{} // Semaphore to limit concurrent flushes + cb *gobreaker.CircuitBreaker[any] // Circuit breaker for flush operations } // New creates a new Buffer with the given configuration and flush function. @@ -59,14 +69,55 @@ func New[R any](cfg Config, flushFn FlushFunc[R], log logrus.FieldLogger) *Buffe cfg.FlushInterval = time.Second } + if cfg.MaxConcurrentFlushes <= 0 { + cfg.MaxConcurrentFlushes = 10 + } + + if cfg.CircuitBreakerMaxFailures <= 0 { + cfg.CircuitBreakerMaxFailures = 5 + } + + if cfg.CircuitBreakerTimeout <= 0 { + cfg.CircuitBreakerTimeout = 60 * time.Second + } + + bufLog := log.WithField("component", "rowbuffer") + + cbSettings := gobreaker.Settings{ + Name: fmt.Sprintf("rowbuffer-%s-%s", cfg.Processor, cfg.Table), + MaxRequests: 1, + Timeout: cfg.CircuitBreakerTimeout, + ReadyToTrip: func(counts gobreaker.Counts) bool { + return counts.ConsecutiveFailures >= cfg.CircuitBreakerMaxFailures + }, + OnStateChange: func(name string, from, to gobreaker.State) { + bufLog.WithFields(logrus.Fields{ + "circuit": name, + "from": from.String(), + "to": to.String(), + }).Warn("Circuit breaker state changed") + + state := 0.0 + if to == gobreaker.StateOpen { + state = 1.0 + } + + common.RowBufferCircuitOpen.WithLabelValues( + cfg.Network, cfg.Processor, cfg.Table, + ).Set(state) + }, + } + return &Buffer[R]{ rows: make([]R, 0, cfg.MaxRows), waiters: make([]waiter, 0, 64), config: cfg, flushFn: flushFn, - log: log.WithField("component", "rowbuffer"), + log: bufLog, stopChan: make(chan struct{}), stoppedChan: make(chan struct{}), + flushSem: make(chan struct{}, cfg.MaxConcurrentFlushes), + cb: gobreaker.NewCircuitBreaker[any](cbSettings), } } @@ -189,9 +240,21 @@ func (b *Buffer[R]) Submit(ctx context.Context, rows []R) error { // Perform flush outside of lock if triggered by size if shouldFlush { - go func() { + b.wg.Go(func() { + b.flushSem <- struct{}{} // Acquire semaphore (blocks if at limit) + + defer func() { <-b.flushSem }() // Release semaphore + + common.RowBufferInflightFlushes.WithLabelValues( + b.config.Network, b.config.Processor, b.config.Table, + ).Inc() + + defer common.RowBufferInflightFlushes.WithLabelValues( + b.config.Network, b.config.Processor, b.config.Table, + ).Dec() + _ = b.doFlush(context.Background(), flushRows, flushWaiters, "size") - }() + }) } // Wait for result or context cancellation @@ -244,15 +307,55 @@ func (b *Buffer[R]) flushOnTimer(ctx context.Context) { b.waiters = make([]waiter, 0, 64) b.mu.Unlock() - _ = b.doFlush(ctx, rows, waiters, "timer") + b.wg.Go(func() { + b.flushSem <- struct{}{} // Acquire semaphore (blocks if at limit) + + defer func() { <-b.flushSem }() // Release semaphore + + common.RowBufferInflightFlushes.WithLabelValues( + b.config.Network, b.config.Processor, b.config.Table, + ).Inc() + + defer common.RowBufferInflightFlushes.WithLabelValues( + b.config.Network, b.config.Processor, b.config.Table, + ).Dec() + + _ = b.doFlush(ctx, rows, waiters, "timer") + }) } -// doFlush performs the actual flush and notifies all waiters. +// doFlush performs the actual flush through the circuit breaker and notifies all waiters. func (b *Buffer[R]) doFlush(ctx context.Context, rows []R, waiters []waiter, trigger string) error { if len(rows) == 0 { return nil } + // Execute through circuit breaker + _, err := b.cb.Execute(func() (any, error) { + return nil, b.executeFlush(ctx, rows, trigger) + }) + + // Track circuit breaker rejections + if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) { + common.RowBufferCircuitRejections.WithLabelValues( + b.config.Network, b.config.Processor, b.config.Table, + ).Inc() + } + + // Notify all waiters + for _, w := range waiters { + select { + case w.resultCh <- err: + default: + // Waiter may have timed out, skip + } + } + + return err +} + +// executeFlush performs the actual flush to ClickHouse. +func (b *Buffer[R]) executeFlush(ctx context.Context, rows []R, trigger string) error { start := time.Now() rowCount := len(rows) @@ -294,7 +397,6 @@ func (b *Buffer[R]) doFlush(ctx context.Context, rows []R, waiters []waiter, tri if err != nil { b.log.WithError(err).WithFields(logrus.Fields{ "rows": rowCount, - "waiters": len(waiters), "trigger": trigger, "duration": duration, "processor": b.config.Processor, @@ -303,7 +405,6 @@ func (b *Buffer[R]) doFlush(ctx context.Context, rows []R, waiters []waiter, tri } else { b.log.WithFields(logrus.Fields{ "rows": rowCount, - "waiters": len(waiters), "trigger": trigger, "duration": duration, "processor": b.config.Processor, @@ -311,15 +412,6 @@ func (b *Buffer[R]) doFlush(ctx context.Context, rows []R, waiters []waiter, tri }).Debug("ClickHouse flush completed") } - // Notify all waiters - for _, w := range waiters { - select { - case w.resultCh <- err: - default: - // Waiter may have timed out, skip - } - } - return err } diff --git a/pkg/rowbuffer/buffer_test.go b/pkg/rowbuffer/buffer_test.go index dbdbccc..8da74ce 100644 --- a/pkg/rowbuffer/buffer_test.go +++ b/pkg/rowbuffer/buffer_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/sirupsen/logrus" + "github.com/sony/gobreaker/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -382,3 +383,412 @@ func TestBuffer_MultipleFlushes(t *testing.T) { assert.GreaterOrEqual(t, flushCount.Load(), int32(2)) }) } + +func TestBuffer_ConcurrentFlushes(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var maxConcurrent atomic.Int32 + + var currentConcurrent atomic.Int32 + + buf := New(Config{ + MaxRows: 10, + FlushInterval: time.Hour, + MaxConcurrentFlushes: 3, + }, func(ctx context.Context, rows []int) error { + cur := currentConcurrent.Add(1) + + // Track max concurrent flushes observed + for { + maxVal := maxConcurrent.Load() + if cur <= maxVal || maxConcurrent.CompareAndSwap(maxVal, cur) { + break + } + } + + // Simulate some work + time.Sleep(10 * time.Millisecond) + + currentConcurrent.Add(-1) + + return nil + }, newTestLogger()) + + require.NoError(t, buf.Start(context.Background())) + + // Submit 50 rows in batches of 10 (triggers 5 flushes) + for i := range 5 { + go func(batch int) { + _ = buf.Submit(context.Background(), make([]int, 10)) + }(i) + } + + synctest.Wait() + + // Advance time to let flushes complete + time.Sleep(100 * time.Millisecond) + + synctest.Wait() + + require.NoError(t, buf.Stop(context.Background())) + + // Max concurrent should be <= 3 (the limit) + assert.LessOrEqual(t, maxConcurrent.Load(), int32(3)) + // And at least 1 flush should have happened + assert.GreaterOrEqual(t, maxConcurrent.Load(), int32(1)) + }) +} + +func TestBuffer_FlushSemaphoreLimit(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + flushStarted := make(chan struct{}, 10) + blockFlush := make(chan struct{}) + + var flushCount atomic.Int32 + + buf := New(Config{ + MaxRows: 5, + FlushInterval: time.Hour, + MaxConcurrentFlushes: 2, + }, func(ctx context.Context, rows []int) error { + flushStarted <- struct{}{} + + <-blockFlush + + flushCount.Add(1) + + return nil + }, newTestLogger()) + + require.NoError(t, buf.Start(context.Background())) + + // Submit 20 rows rapidly (triggers 4 flushes of 5 rows each) + for i := range 4 { + go func(batch int) { + _ = buf.Submit(context.Background(), make([]int, 5)) + }(i) + } + + synctest.Wait() + + // Only 2 flushes should start initially (semaphore limit) + startedCount := 0 + + for { + select { + case <-flushStarted: + startedCount++ + default: + goto done + } + } + + done: + assert.Equal(t, 2, startedCount, "only 2 flushes should start due to semaphore") + + // Unblock flushes + close(blockFlush) + + synctest.Wait() + + require.NoError(t, buf.Stop(context.Background())) + + // All 4 flushes should have completed + assert.Equal(t, int32(4), flushCount.Load()) + }) +} + +func TestBuffer_GracefulShutdown_WaitsForInflight(t *testing.T) { + flushStarted := make(chan struct{}) + blockFlush := make(chan struct{}) + + var flushCompleted atomic.Bool + + buf := New(Config{ + MaxRows: 10, + FlushInterval: time.Hour, + MaxConcurrentFlushes: 2, + }, func(ctx context.Context, rows []int) error { + close(flushStarted) + <-blockFlush + flushCompleted.Store(true) + + return nil + }, newTestLogger()) + + require.NoError(t, buf.Start(context.Background())) + + // Submit rows to trigger flush + go func() { + _ = buf.Submit(context.Background(), make([]int, 10)) + }() + + // Wait for flush to start + <-flushStarted + + // Start Stop() in goroutine - it should block until flush completes + stopDone := make(chan struct{}) + + go func() { + _ = buf.Stop(context.Background()) + + close(stopDone) + }() + + // Give Stop() time to potentially return early (it shouldn't) + time.Sleep(10 * time.Millisecond) + + // Stop should still be blocked + select { + case <-stopDone: + t.Fatal("Stop() returned before flush completed") + default: + // Expected - Stop is still waiting + } + + // Unblock the flush + close(blockFlush) + + // Now Stop should complete + select { + case <-stopDone: + assert.True(t, flushCompleted.Load(), "flush should have completed") + case <-time.After(time.Second): + t.Fatal("Stop() did not complete after flush unblocked") + } +} + +func TestBuffer_DefaultMaxConcurrentFlushes(t *testing.T) { + // Create buffer with Config{} (no MaxConcurrentFlushes set) + buf := New(Config{}, func(ctx context.Context, rows []int) error { + return nil + }, newTestLogger()) + + // Verify defaults to 10 (check via config) + assert.Equal(t, 10, buf.config.MaxConcurrentFlushes) + // Also verify the semaphore has capacity 10 + assert.Equal(t, 10, cap(buf.flushSem)) +} + +func TestBuffer_ConcurrentFlushErrors(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var flushNum atomic.Int32 + + expectedErr := errors.New("batch 2 error") + + buf := New(Config{ + MaxRows: 5, + FlushInterval: time.Hour, + MaxConcurrentFlushes: 2, + }, func(ctx context.Context, rows []int) error { + num := flushNum.Add(1) + + // Batch 2 fails, others succeed + if num == 2 { + return expectedErr + } + + return nil + }, newTestLogger()) + + require.NoError(t, buf.Start(context.Background())) + + errChan := make(chan error, 4) + + // Submit 4 batches of 5 rows each + for i := range 4 { + go func(batch int) { + errChan <- buf.Submit(context.Background(), make([]int, 5)) + }(i) + } + + synctest.Wait() + + require.NoError(t, buf.Stop(context.Background())) + + // Collect all errors + var errs []error + + for range 4 { + select { + case err := <-errChan: + errs = append(errs, err) + default: + } + } + + // Should have exactly one error (from batch 2) + errorCount := 0 + + for _, err := range errs { + if err != nil { + errorCount++ + + assert.ErrorIs(t, err, expectedErr) + } + } + + assert.Equal(t, 1, errorCount, "exactly one batch should have failed") + }) +} + +func TestBuffer_CircuitBreakerTrips(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + flushErr := errors.New("clickhouse error") + + var flushCount atomic.Int32 + + buf := New(Config{ + MaxRows: 5, + FlushInterval: time.Hour, + MaxConcurrentFlushes: 1, // Force sequential flushes for deterministic behavior + CircuitBreakerMaxFailures: 3, // Trip after 3 consecutive failures + CircuitBreakerTimeout: time.Hour, + }, func(ctx context.Context, rows []int) error { + flushCount.Add(1) + + return flushErr + }, newTestLogger()) + + require.NoError(t, buf.Start(context.Background())) + + defer func() { _ = buf.Stop(context.Background()) }() + + errChan := make(chan error, 5) + + // Submit 5 batches - first 3 should fail with flushErr, then circuit opens + for i := range 5 { + go func(batch int) { + errChan <- buf.Submit(context.Background(), make([]int, 5)) + }(i) + } + + synctest.Wait() + + // Collect errors + var flushErrCount int + + var circuitOpenCount int + + for range 5 { + select { + case err := <-errChan: + if errors.Is(err, flushErr) { + flushErrCount++ + } else if errors.Is(err, gobreaker.ErrOpenState) { + circuitOpenCount++ + } + default: + } + } + + // With sequential flushes, should have exactly 3 flush errors before circuit tripped + assert.Equal(t, 3, int(flushCount.Load()), "should have attempted exactly 3 flushes before circuit opened") + assert.Equal(t, 3, flushErrCount, "should have 3 flush errors") + assert.Equal(t, 2, circuitOpenCount, "should have 2 circuit open rejections") + }) +} + +func TestBuffer_CircuitBreakerRejectsWhenOpen(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + flushErr := errors.New("clickhouse error") + + buf := New(Config{ + MaxRows: 5, + FlushInterval: time.Hour, + CircuitBreakerMaxFailures: 1, // Trip after 1 failure + CircuitBreakerTimeout: time.Hour, + }, func(ctx context.Context, rows []int) error { + return flushErr + }, newTestLogger()) + + require.NoError(t, buf.Start(context.Background())) + + defer func() { _ = buf.Stop(context.Background()) }() + + errChan := make(chan error, 1) + + // First batch trips the circuit + go func() { + errChan <- buf.Submit(context.Background(), make([]int, 5)) + }() + + synctest.Wait() + + err := <-errChan + require.ErrorIs(t, err, flushErr, "first batch should fail with flush error") + + // Second batch should be rejected immediately by circuit breaker + go func() { + errChan <- buf.Submit(context.Background(), make([]int, 5)) + }() + + synctest.Wait() + + err = <-errChan + require.ErrorIs(t, err, gobreaker.ErrOpenState, "second batch should be rejected by open circuit") + }) +} + +func TestBuffer_CircuitBreakerRecovery(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var shouldFail atomic.Bool + shouldFail.Store(true) + + buf := New(Config{ + MaxRows: 5, + FlushInterval: time.Hour, + CircuitBreakerMaxFailures: 1, + CircuitBreakerTimeout: 100 * time.Millisecond, // Short timeout for testing + }, func(ctx context.Context, rows []int) error { + if shouldFail.Load() { + return errors.New("clickhouse error") + } + + return nil + }, newTestLogger()) + + require.NoError(t, buf.Start(context.Background())) + + defer func() { _ = buf.Stop(context.Background()) }() + + errChan := make(chan error, 1) + + // First batch trips the circuit + go func() { + errChan <- buf.Submit(context.Background(), make([]int, 5)) + }() + + synctest.Wait() + + err := <-errChan + require.Error(t, err, "first batch should fail") + + // Wait for circuit to transition to half-open + time.Sleep(150 * time.Millisecond) + + synctest.Wait() + + // Fix the "database" before the trial request + shouldFail.Store(false) + + // Next batch should succeed (half-open allows one trial) + go func() { + errChan <- buf.Submit(context.Background(), make([]int, 5)) + }() + + synctest.Wait() + + err = <-errChan + require.NoError(t, err, "batch after recovery should succeed") + }) +} + +func TestBuffer_CircuitBreakerDefaultConfig(t *testing.T) { + buf := New(Config{}, func(ctx context.Context, rows []int) error { + return nil + }, newTestLogger()) + + // Verify defaults + assert.Equal(t, uint32(5), buf.config.CircuitBreakerMaxFailures) + assert.Equal(t, 60*time.Second, buf.config.CircuitBreakerTimeout) +}