diff --git a/example_config.yaml b/example_config.yaml index b7762cb..619339f 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -56,13 +56,10 @@ processors: addr: "localhost:9000" database: "default" table: "canonical_execution_transaction_structlog" - # debug: false # Enable debug logging for ClickHouse queries - # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) - # bufferMaxRows: 100000 # Max rows before flush (default: 100000) - # bufferFlushInterval: "1s" # Max time before flush (default: 1s) - # bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10) - # bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5) - # bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s) + # debug: false # Enable debug logging for ClickHouse queries + # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) + # bufferMaxRows: 100000 # Max rows before flush (default: 100000) + # bufferFlushInterval: "1s" # Max time before flush (default: 1s) # Aggregated structlog processor (call frame level aggregation) transactionStructlogAgg: @@ -70,13 +67,10 @@ processors: addr: "localhost:9000" database: "default" table: "canonical_execution_transaction_structlog_agg" - # debug: false # Enable debug logging for ClickHouse queries - # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) - # bufferMaxRows: 100000 # Max rows before flush (default: 100000) - # bufferFlushInterval: "1s" # Max time before flush (default: 1s) - # bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10) - # bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5) - # bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s) + # debug: false # Enable debug logging for ClickHouse queries + # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) + # bufferMaxRows: 100000 # Max rows before flush (default: 100000) + # bufferFlushInterval: "1s" # Max time before flush (default: 1s) # Simple transaction processor (lightweight - no debug traces) transactionSimple: @@ -84,13 +78,10 @@ processors: addr: "localhost:9000" database: "default" table: "execution_transaction" - # debug: false # Enable debug logging for ClickHouse queries - # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) - # bufferMaxRows: 100000 # Max rows before flush (default: 100000) - # bufferFlushInterval: "1s" # Max time before flush (default: 1s) - # bufferMaxConcurrentFlushes: 10 # Max concurrent ClickHouse inserts (default: 10) - # bufferCircuitBreakerMaxFailures: 5 # Consecutive failures to trip circuit (default: 5) - # bufferCircuitBreakerTimeout: "60s" # Open state duration before half-open (default: 60s) + # debug: false # Enable debug logging for ClickHouse queries + # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) + # bufferMaxRows: 100000 # Max rows before flush (default: 100000) + # bufferFlushInterval: "1s" # Max time before flush (default: 1s) # Application settings shutdownTimeout: 6m diff --git a/go.mod b/go.mod index 1bae8bb..8a32246 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/redis/go-redis/v9 v9.17.2 github.com/sirupsen/logrus v1.9.3 - github.com/sony/gobreaker/v2 v2.4.0 github.com/spf13/cobra v1.10.1 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.40.0 diff --git a/go.sum b/go.sum index fdd7b78..e165a32 100644 --- a/go.sum +++ b/go.sum @@ -299,8 +299,6 @@ github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/sony/gobreaker/v2 v2.4.0 h1:g2KJRW1Ubty3+ZOcSEUN7K+REQJdN6yo6XvaML+jptg= -github.com/sony/gobreaker/v2 v2.4.0/go.mod h1:pTyFJgcZ3h2tdQVLZZruK2C0eoFL1fb/G83wK1ZQl+s= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s= diff --git a/pkg/common/metrics.go b/pkg/common/metrics.go index b3eba14..4ed8f94 100644 --- a/pkg/common/metrics.go +++ b/pkg/common/metrics.go @@ -233,20 +233,4 @@ var ( Name: "execution_processor_row_buffer_pending_tasks", Help: "Current number of tasks waiting for their rows to be flushed", }, []string{"network", "processor", "table"}) - - RowBufferInflightFlushes = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "execution_processor_row_buffer_inflight_flushes", - Help: "Number of flush operations currently in progress", - }, []string{"network", "processor", "table"}) - - // Circuit breaker metrics for row buffer. - RowBufferCircuitOpen = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "execution_processor_row_buffer_circuit_open", - Help: "Whether circuit breaker is open (1) or closed (0)", - }, []string{"network", "processor", "table"}) - - RowBufferCircuitRejections = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "execution_processor_row_buffer_circuit_rejections_total", - Help: "Total number of flushes rejected by circuit breaker", - }, []string{"network", "processor", "table"}) ) diff --git a/pkg/ethereum/execution/geth/rpc.go b/pkg/ethereum/execution/geth/rpc.go index 8b469ec..8dd9dfa 100644 --- a/pkg/ethereum/execution/geth/rpc.go +++ b/pkg/ethereum/execution/geth/rpc.go @@ -13,7 +13,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" - "github.com/sirupsen/logrus" pcommon "github.com/ethpandaops/execution-processor/pkg/common" "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" @@ -74,56 +73,13 @@ func (n *RPCNode) blockByNumber(ctx context.Context, blockNumber *big.Int) (exec return NewBlockAdapter(block), nil } -// maxBatchSize is the maximum number of blocks to request in a single batch RPC call. -// Most RPC nodes have a default limit of 100 (e.g., Erigon's --rpc.batch.limit). -const maxBatchSize = 100 - // blocksByNumbers fetches multiple blocks using batch RPC calls. // Returns blocks up to the first not-found (contiguous only). -// Large requests are automatically chunked to respect RPC batch limits. func (n *RPCNode) blocksByNumbers(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) { if len(numbers) == 0 { return []execution.Block{}, nil } - // If request exceeds batch limit, chunk it - if len(numbers) > maxBatchSize { - return n.blocksByNumbersChunked(ctx, numbers) - } - - return n.blocksByNumbersBatch(ctx, numbers) -} - -// blocksByNumbersChunked fetches blocks in chunks to respect RPC batch limits. -func (n *RPCNode) blocksByNumbersChunked(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) { - allBlocks := make([]execution.Block, 0, len(numbers)) - - for i := 0; i < len(numbers); i += maxBatchSize { - end := i + maxBatchSize - if end > len(numbers) { - end = len(numbers) - } - - chunk := numbers[i:end] - - blocks, err := n.blocksByNumbersBatch(ctx, chunk) - if err != nil { - return allBlocks, err - } - - allBlocks = append(allBlocks, blocks...) - - // If we got fewer blocks than requested, a block was not found - stop for contiguity - if len(blocks) < len(chunk) { - break - } - } - - return allBlocks, nil -} - -// blocksByNumbersBatch fetches a single batch of blocks (must be <= maxBatchSize). -func (n *RPCNode) blocksByNumbersBatch(ctx context.Context, numbers []*big.Int) ([]execution.Block, error) { start := time.Now() network := n.Metadata().ChainID() @@ -176,19 +132,12 @@ func (n *RPCNode) blocksByNumbersBatch(ctx context.Context, numbers []*big.Int) // Check for individual call error - stop at first error for contiguity // We intentionally don't return this error as we want partial results if elem.Error != nil { - n.log.WithError(elem.Error).WithField("block_index", i).Debug("Batch element error, stopping") - break } // Check for nil/not-found block (null JSON response) if results[i] == nil || len(*results[i]) == 0 || string(*results[i]) == "null" { // Block not found - stop here for contiguity - n.log.WithFields(logrus.Fields{ - "block_index": i, - "block_number": numbers[i].String(), - }).Debug("Block not found in batch, stopping") - break } @@ -196,18 +145,13 @@ func (n *RPCNode) blocksByNumbersBatch(ctx context.Context, numbers []*big.Int) block, parseErr := parseBlockFromJSON(*results[i]) if parseErr != nil { // Parse error - stop here for contiguity - n.log.WithError(parseErr).WithFields(logrus.Fields{ - "block_index": i, - "block_number": numbers[i].String(), - }).Warn("Failed to parse block from JSON, stopping batch") - break } blocks = append(blocks, NewBlockAdapter(block)) } - return blocks, nil + return blocks, nil //nolint:nilerr // Intentionally returning partial results for contiguity } // toBlockNumArg converts a block number to the RPC argument format. diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index 12ac099..c0ede65 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -564,15 +564,16 @@ func (m *Manager) processBlocks(ctx context.Context) bool { } else { workDone = true - // Track processing duration (block count is tracked by the processor itself) + // Track processing duration duration := time.Since(startTime) common.BlockProcessingDuration.WithLabelValues(m.network.Name, name).Observe(duration.Seconds()) + common.BlocksProcessed.WithLabelValues(m.network.Name, name).Inc() m.log.WithFields(logrus.Fields{ "processor": name, "duration": duration, - }).Debug("Successfully processed blocks") + }).Debug("Successfully processed block") } // Update head distance metric (regardless of success/failure to track current distance) diff --git a/pkg/processor/transaction/simple/block_processing.go b/pkg/processor/transaction/simple/block_processing.go index 60dd62c..6ed3dd9 100644 --- a/pkg/processor/transaction/simple/block_processing.go +++ b/pkg/processor/transaction/simple/block_processing.go @@ -137,24 +137,10 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { }).Debug("Fetched batch of blocks") // Process each block, stopping on first error - processedCount := 0 - for _, block := range blocks { if processErr := p.processBlock(ctx, block); processErr != nil { - // Record blocks that were successfully processed before the error - if processedCount > 0 { - common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) - } - return processErr } - - processedCount++ - } - - // Record all successfully processed blocks - if processedCount > 0 { - common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) } return nil diff --git a/pkg/processor/transaction/simple/config.go b/pkg/processor/transaction/simple/config.go index f4ded18..d379e67 100644 --- a/pkg/processor/transaction/simple/config.go +++ b/pkg/processor/transaction/simple/config.go @@ -9,11 +9,8 @@ import ( // Default buffer configuration values. const ( - DefaultBufferMaxRows = 100000 - DefaultBufferFlushInterval = time.Second - DefaultBufferMaxConcurrentFlushes = 10 - DefaultBufferCircuitBreakerMaxFailures = 5 - DefaultBufferCircuitBreakerTimeout = 60 * time.Second + DefaultBufferMaxRows = 100000 + DefaultBufferFlushInterval = time.Second ) // Config holds configuration for the simple transaction processor. @@ -23,11 +20,8 @@ type Config struct { Table string `yaml:"table"` // Row buffer settings for batched ClickHouse inserts - BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 - BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s - BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10 - BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5 - BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s + BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 + BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s // Block completion tracking MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2 diff --git a/pkg/processor/transaction/simple/processor.go b/pkg/processor/transaction/simple/processor.go index 2d1fd50..2550aa7 100644 --- a/pkg/processor/transaction/simple/processor.go +++ b/pkg/processor/transaction/simple/processor.go @@ -85,18 +85,6 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { config.BufferFlushInterval = DefaultBufferFlushInterval } - if config.BufferMaxConcurrentFlushes <= 0 { - config.BufferMaxConcurrentFlushes = DefaultBufferMaxConcurrentFlushes - } - - if config.BufferCircuitBreakerMaxFailures <= 0 { - config.BufferCircuitBreakerMaxFailures = DefaultBufferCircuitBreakerMaxFailures - } - - if config.BufferCircuitBreakerTimeout <= 0 { - config.BufferCircuitBreakerTimeout = DefaultBufferCircuitBreakerTimeout - } - log := deps.Log.WithField("processor", ProcessorName) // Create the limiter for shared functionality @@ -141,25 +129,21 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { // Create the row buffer with the flush function processor.rowBuffer = rowbuffer.New( rowbuffer.Config{ - MaxRows: config.BufferMaxRows, - FlushInterval: config.BufferFlushInterval, - MaxConcurrentFlushes: config.BufferMaxConcurrentFlushes, - CircuitBreakerMaxFailures: config.BufferCircuitBreakerMaxFailures, - CircuitBreakerTimeout: config.BufferCircuitBreakerTimeout, - Network: deps.Network.Name, - Processor: ProcessorName, - Table: config.Table, + MaxRows: config.BufferMaxRows, + FlushInterval: config.BufferFlushInterval, + Network: deps.Network.Name, + Processor: ProcessorName, + Table: config.Table, }, processor.flushRows, log, ) processor.log.WithFields(logrus.Fields{ - "network": processor.network.Name, - "max_pending_block_range": config.MaxPendingBlockRange, - "buffer_max_rows": config.BufferMaxRows, - "buffer_flush_interval": config.BufferFlushInterval, - "buffer_max_concurrent_flushes": config.BufferMaxConcurrentFlushes, + "network": processor.network.Name, + "max_pending_block_range": config.MaxPendingBlockRange, + "buffer_max_rows": config.BufferMaxRows, + "buffer_flush_interval": config.BufferFlushInterval, }).Info("Simple transaction processor initialized") return processor, nil diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 651b504..19a292b 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -140,24 +140,10 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { }).Debug("Fetched batch of blocks") // Process each block, stopping on first error - processedCount := 0 - for _, block := range blocks { if processErr := p.processBlock(ctx, block); processErr != nil { - // Record blocks that were successfully processed before the error - if processedCount > 0 { - common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) - } - return processErr } - - processedCount++ - } - - // Record all successfully processed blocks - if processedCount > 0 { - common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) } return nil @@ -170,7 +156,7 @@ func (p *Processor) handleBlockNotFound(ctx context.Context, node execution.Node chainTip := new(big.Int).SetUint64(*latestBlock) diff := new(big.Int).Sub(nextBlock, chainTip).Int64() - if diff > 0 && diff <= 5 { // 1-5 blocks ahead of chain tip, might appear soon + if diff <= 5 { // Within 5 blocks of chain tip p.log.WithFields(logrus.Fields{ "network": p.network.Name, "block_number": nextBlock, diff --git a/pkg/processor/transaction/structlog/config.go b/pkg/processor/transaction/structlog/config.go index 0bfc024..e948c1d 100644 --- a/pkg/processor/transaction/structlog/config.go +++ b/pkg/processor/transaction/structlog/config.go @@ -9,11 +9,8 @@ import ( // Default buffer configuration values. const ( - DefaultBufferMaxRows = 100000 - DefaultBufferFlushInterval = time.Second - DefaultBufferMaxConcurrentFlushes = 10 - DefaultBufferCircuitBreakerMaxFailures = 5 - DefaultBufferCircuitBreakerTimeout = 60 * time.Second + DefaultBufferMaxRows = 100000 + DefaultBufferFlushInterval = time.Second ) // Config holds configuration for transaction structlog processor. @@ -23,11 +20,8 @@ type Config struct { Table string `yaml:"table"` // Row buffer settings for batched ClickHouse inserts - BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 - BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s - BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10 - BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5 - BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s + BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 + BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s // Block completion tracking MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2 diff --git a/pkg/processor/transaction/structlog/processor.go b/pkg/processor/transaction/structlog/processor.go index c4ad3a6..f58e69d 100644 --- a/pkg/processor/transaction/structlog/processor.go +++ b/pkg/processor/transaction/structlog/processor.go @@ -93,18 +93,6 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { config.BufferFlushInterval = DefaultBufferFlushInterval } - if config.BufferMaxConcurrentFlushes <= 0 { - config.BufferMaxConcurrentFlushes = DefaultBufferMaxConcurrentFlushes - } - - if config.BufferCircuitBreakerMaxFailures <= 0 { - config.BufferCircuitBreakerMaxFailures = DefaultBufferCircuitBreakerMaxFailures - } - - if config.BufferCircuitBreakerTimeout <= 0 { - config.BufferCircuitBreakerTimeout = DefaultBufferCircuitBreakerTimeout - } - log := deps.Log.WithField("processor", ProcessorName) // Create the limiter for shared functionality @@ -150,25 +138,21 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { // Create the row buffer with the flush function processor.rowBuffer = rowbuffer.New( rowbuffer.Config{ - MaxRows: config.BufferMaxRows, - FlushInterval: config.BufferFlushInterval, - MaxConcurrentFlushes: config.BufferMaxConcurrentFlushes, - CircuitBreakerMaxFailures: config.BufferCircuitBreakerMaxFailures, - CircuitBreakerTimeout: config.BufferCircuitBreakerTimeout, - Network: deps.Network.Name, - Processor: ProcessorName, - Table: config.Table, + MaxRows: config.BufferMaxRows, + FlushInterval: config.BufferFlushInterval, + Network: deps.Network.Name, + Processor: ProcessorName, + Table: config.Table, }, processor.flushRows, log, ) processor.log.WithFields(logrus.Fields{ - "network": processor.network.Name, - "max_pending_block_range": config.MaxPendingBlockRange, - "buffer_max_rows": config.BufferMaxRows, - "buffer_flush_interval": config.BufferFlushInterval, - "buffer_max_concurrent_flushes": config.BufferMaxConcurrentFlushes, + "network": processor.network.Name, + "max_pending_block_range": config.MaxPendingBlockRange, + "buffer_max_rows": config.BufferMaxRows, + "buffer_flush_interval": config.BufferFlushInterval, }).Info("Detected network") return processor, nil diff --git a/pkg/processor/transaction/structlog/processor_test.go b/pkg/processor/transaction/structlog/processor_test.go index 8764372..6e394a2 100644 --- a/pkg/processor/transaction/structlog/processor_test.go +++ b/pkg/processor/transaction/structlog/processor_test.go @@ -715,128 +715,3 @@ func TestHeadDistanceMetricLabels(t *testing.T) { }) } } - -func TestHandleBlockNotFound_DiffCalculation(t *testing.T) { - // This test verifies the correct behavior of handleBlockNotFound: - // - Blocks at or behind chain tip (diff <= 0) should NOT trigger "not yet available" - // - Blocks 1-5 ahead of chain tip should trigger "not yet available" (might appear soon) - // - Blocks >5 ahead of chain tip should return "block not found" (something is wrong) - // - // The fixed logic: - // diff := nextBlock - chainTip - // if diff > 0 && diff <= 5 { return "block not yet available" } - testCases := []struct { - name string - nextBlock int64 - chainTip int64 - expectedDiff int64 - shouldBeAvailable bool // true = block should exist (at or behind tip) - shouldWaitForBlock bool // true = "not yet available" message (1-5 ahead) - shouldReturnNotFound bool // true = "block not found" message (>5 ahead) - }{ - { - name: "block far behind chain tip - should exist", - nextBlock: 24365273, - chainTip: 24365743, - expectedDiff: -470, - shouldBeAvailable: true, - shouldWaitForBlock: false, - shouldReturnNotFound: false, - }, - { - name: "block 100 blocks behind chain tip - should exist", - nextBlock: 100, - chainTip: 200, - expectedDiff: -100, - shouldBeAvailable: true, - shouldWaitForBlock: false, - shouldReturnNotFound: false, - }, - { - name: "block at chain tip - should exist", - nextBlock: 200, - chainTip: 200, - expectedDiff: 0, - shouldBeAvailable: true, - shouldWaitForBlock: false, - shouldReturnNotFound: false, - }, - { - name: "block 1 ahead of chain tip - wait for it", - nextBlock: 201, - chainTip: 200, - expectedDiff: 1, - shouldBeAvailable: false, - shouldWaitForBlock: true, - shouldReturnNotFound: false, - }, - { - name: "block 5 ahead of chain tip - wait for it", - nextBlock: 205, - chainTip: 200, - expectedDiff: 5, - shouldBeAvailable: false, - shouldWaitForBlock: true, - shouldReturnNotFound: false, - }, - { - name: "block 6 ahead of chain tip - not found", - nextBlock: 206, - chainTip: 200, - expectedDiff: 6, - shouldBeAvailable: false, - shouldWaitForBlock: false, - shouldReturnNotFound: true, - }, - { - name: "block 10 ahead of chain tip - not found", - nextBlock: 210, - chainTip: 200, - expectedDiff: 10, - shouldBeAvailable: false, - shouldWaitForBlock: false, - shouldReturnNotFound: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - nextBlock := big.NewInt(tc.nextBlock) - chainTip := big.NewInt(tc.chainTip) - - // Calculate diff as the code does - diff := new(big.Int).Sub(nextBlock, chainTip).Int64() - - assert.Equal(t, tc.expectedDiff, diff, "Diff calculation should match") - - // Fixed behavior: diff > 0 && diff <= 5 means "not yet available" - fixedCodeWaitsForBlock := diff > 0 && diff <= 5 - fixedCodeReturnsNotFound := diff > 5 - fixedCodeSaysAvailable := diff <= 0 - - assert.Equal(t, tc.shouldWaitForBlock, fixedCodeWaitsForBlock, - "Fixed code should wait for block when 1-5 blocks ahead") - assert.Equal(t, tc.shouldReturnNotFound, fixedCodeReturnsNotFound, - "Fixed code should return 'not found' when >5 blocks ahead") - assert.Equal(t, tc.shouldBeAvailable, fixedCodeSaysAvailable, - "Fixed code should say available when at or behind chain tip") - - // Verify mutual exclusivity - outcomes := 0 - - if fixedCodeWaitsForBlock { - outcomes++ - } - - if fixedCodeReturnsNotFound { - outcomes++ - } - - if fixedCodeSaysAvailable { - outcomes++ - } - - assert.Equal(t, 1, outcomes, "Exactly one outcome should be true") - }) - } -} diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 1b93eac..598e150 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -140,24 +140,10 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { }).Debug("Fetched batch of blocks") // Process each block, stopping on first error - processedCount := 0 - for _, block := range blocks { if processErr := p.processBlock(ctx, block); processErr != nil { - // Record blocks that were successfully processed before the error - if processedCount > 0 { - common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) - } - return processErr } - - processedCount++ - } - - // Record all successfully processed blocks - if processedCount > 0 { - common.BlocksProcessed.WithLabelValues(p.network.Name, p.Name()).Add(float64(processedCount)) } return nil @@ -170,7 +156,7 @@ func (p *Processor) handleBlockNotFound(ctx context.Context, node execution.Node chainTip := new(big.Int).SetUint64(*latestBlock) diff := new(big.Int).Sub(nextBlock, chainTip).Int64() - if diff > 0 && diff <= 5 { // 1-5 blocks ahead of chain tip, might appear soon + if diff <= 5 { // Within 5 blocks of chain tip p.log.WithFields(logrus.Fields{ "network": p.network.Name, "block_number": nextBlock, diff --git a/pkg/processor/transaction/structlog_agg/config.go b/pkg/processor/transaction/structlog_agg/config.go index 2acb376..3e0af8b 100644 --- a/pkg/processor/transaction/structlog_agg/config.go +++ b/pkg/processor/transaction/structlog_agg/config.go @@ -9,11 +9,8 @@ import ( // Default buffer configuration values. const ( - DefaultBufferMaxRows = 100000 - DefaultBufferFlushInterval = time.Second - DefaultBufferMaxConcurrentFlushes = 10 - DefaultBufferCircuitBreakerMaxFailures = 5 - DefaultBufferCircuitBreakerTimeout = 60 * time.Second + DefaultBufferMaxRows = 100000 + DefaultBufferFlushInterval = time.Second ) // Config holds configuration for transaction structlog_agg processor. @@ -23,11 +20,8 @@ type Config struct { Table string `yaml:"table"` // Row buffer settings for batched ClickHouse inserts - BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 - BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s - BufferMaxConcurrentFlushes int `yaml:"bufferMaxConcurrentFlushes"` // Max parallel flush ops. Default: 10 - BufferCircuitBreakerMaxFailures uint32 `yaml:"bufferCircuitBreakerMaxFailures"` // Consecutive failures to trip circuit. Default: 5 - BufferCircuitBreakerTimeout time.Duration `yaml:"bufferCircuitBreakerTimeout"` // Open state duration before half-open. Default: 60s + BufferMaxRows int `yaml:"bufferMaxRows"` // Max rows before flush. Default: 100000 + BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s // Block completion tracking MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2 diff --git a/pkg/processor/transaction/structlog_agg/processor.go b/pkg/processor/transaction/structlog_agg/processor.go index 891f45d..4a765a7 100644 --- a/pkg/processor/transaction/structlog_agg/processor.go +++ b/pkg/processor/transaction/structlog_agg/processor.go @@ -103,18 +103,6 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { config.BufferFlushInterval = DefaultBufferFlushInterval } - if config.BufferMaxConcurrentFlushes <= 0 { - config.BufferMaxConcurrentFlushes = DefaultBufferMaxConcurrentFlushes - } - - if config.BufferCircuitBreakerMaxFailures <= 0 { - config.BufferCircuitBreakerMaxFailures = DefaultBufferCircuitBreakerMaxFailures - } - - if config.BufferCircuitBreakerTimeout <= 0 { - config.BufferCircuitBreakerTimeout = DefaultBufferCircuitBreakerTimeout - } - log := deps.Log.WithField("processor", ProcessorName) // Create the limiter for shared functionality @@ -160,25 +148,21 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { // Create the row buffer with the flush function processor.rowBuffer = rowbuffer.New( rowbuffer.Config{ - MaxRows: config.BufferMaxRows, - FlushInterval: config.BufferFlushInterval, - MaxConcurrentFlushes: config.BufferMaxConcurrentFlushes, - CircuitBreakerMaxFailures: config.BufferCircuitBreakerMaxFailures, - CircuitBreakerTimeout: config.BufferCircuitBreakerTimeout, - Network: deps.Network.Name, - Processor: ProcessorName, - Table: config.Table, + MaxRows: config.BufferMaxRows, + FlushInterval: config.BufferFlushInterval, + Network: deps.Network.Name, + Processor: ProcessorName, + Table: config.Table, }, processor.flushRows, log, ) processor.log.WithFields(logrus.Fields{ - "network": processor.network.Name, - "max_pending_block_range": config.MaxPendingBlockRange, - "buffer_max_rows": config.BufferMaxRows, - "buffer_flush_interval": config.BufferFlushInterval, - "buffer_max_concurrent_flushes": config.BufferMaxConcurrentFlushes, + "network": processor.network.Name, + "max_pending_block_range": config.MaxPendingBlockRange, + "buffer_max_rows": config.BufferMaxRows, + "buffer_flush_interval": config.BufferFlushInterval, }).Info("Detected network") return processor, nil diff --git a/pkg/rowbuffer/buffer.go b/pkg/rowbuffer/buffer.go index 5a73ac9..c38d195 100644 --- a/pkg/rowbuffer/buffer.go +++ b/pkg/rowbuffer/buffer.go @@ -5,13 +5,11 @@ package rowbuffer import ( "context" - "errors" "fmt" "sync" "time" "github.com/sirupsen/logrus" - "github.com/sony/gobreaker/v2" "github.com/ethpandaops/execution-processor/pkg/common" ) @@ -21,16 +19,11 @@ type FlushFunc[R any] func(ctx context.Context, rows []R) error // Config holds configuration for the row buffer. type Config struct { - MaxRows int // Flush threshold (default: 100000) - FlushInterval time.Duration // Max wait before flush (default: 1s) - MaxConcurrentFlushes int // Max parallel flush operations (default: 10) - Network string // For metrics - Processor string // For metrics - Table string // For metrics - - // Circuit breaker configuration - CircuitBreakerMaxFailures uint32 // Consecutive failures to trip circuit (default: 5) - CircuitBreakerTimeout time.Duration // Open state duration before half-open (default: 60s) + MaxRows int // Flush threshold (default: 100000) + FlushInterval time.Duration // Max wait before flush (default: 1s) + Network string // For metrics + Processor string // For metrics + Table string // For metrics } // waiter represents a task waiting for its rows to be flushed. @@ -53,9 +46,6 @@ type Buffer[R any] struct { stoppedChan chan struct{} wg sync.WaitGroup started bool - - flushSem chan struct{} // Semaphore to limit concurrent flushes - cb *gobreaker.CircuitBreaker[any] // Circuit breaker for flush operations } // New creates a new Buffer with the given configuration and flush function. @@ -69,55 +59,14 @@ func New[R any](cfg Config, flushFn FlushFunc[R], log logrus.FieldLogger) *Buffe cfg.FlushInterval = time.Second } - if cfg.MaxConcurrentFlushes <= 0 { - cfg.MaxConcurrentFlushes = 10 - } - - if cfg.CircuitBreakerMaxFailures <= 0 { - cfg.CircuitBreakerMaxFailures = 5 - } - - if cfg.CircuitBreakerTimeout <= 0 { - cfg.CircuitBreakerTimeout = 60 * time.Second - } - - bufLog := log.WithField("component", "rowbuffer") - - cbSettings := gobreaker.Settings{ - Name: fmt.Sprintf("rowbuffer-%s-%s", cfg.Processor, cfg.Table), - MaxRequests: 1, - Timeout: cfg.CircuitBreakerTimeout, - ReadyToTrip: func(counts gobreaker.Counts) bool { - return counts.ConsecutiveFailures >= cfg.CircuitBreakerMaxFailures - }, - OnStateChange: func(name string, from, to gobreaker.State) { - bufLog.WithFields(logrus.Fields{ - "circuit": name, - "from": from.String(), - "to": to.String(), - }).Warn("Circuit breaker state changed") - - state := 0.0 - if to == gobreaker.StateOpen { - state = 1.0 - } - - common.RowBufferCircuitOpen.WithLabelValues( - cfg.Network, cfg.Processor, cfg.Table, - ).Set(state) - }, - } - return &Buffer[R]{ rows: make([]R, 0, cfg.MaxRows), waiters: make([]waiter, 0, 64), config: cfg, flushFn: flushFn, - log: bufLog, + log: log.WithField("component", "rowbuffer"), stopChan: make(chan struct{}), stoppedChan: make(chan struct{}), - flushSem: make(chan struct{}, cfg.MaxConcurrentFlushes), - cb: gobreaker.NewCircuitBreaker[any](cbSettings), } } @@ -240,21 +189,9 @@ func (b *Buffer[R]) Submit(ctx context.Context, rows []R) error { // Perform flush outside of lock if triggered by size if shouldFlush { - b.wg.Go(func() { - b.flushSem <- struct{}{} // Acquire semaphore (blocks if at limit) - - defer func() { <-b.flushSem }() // Release semaphore - - common.RowBufferInflightFlushes.WithLabelValues( - b.config.Network, b.config.Processor, b.config.Table, - ).Inc() - - defer common.RowBufferInflightFlushes.WithLabelValues( - b.config.Network, b.config.Processor, b.config.Table, - ).Dec() - + go func() { _ = b.doFlush(context.Background(), flushRows, flushWaiters, "size") - }) + }() } // Wait for result or context cancellation @@ -307,55 +244,15 @@ func (b *Buffer[R]) flushOnTimer(ctx context.Context) { b.waiters = make([]waiter, 0, 64) b.mu.Unlock() - b.wg.Go(func() { - b.flushSem <- struct{}{} // Acquire semaphore (blocks if at limit) - - defer func() { <-b.flushSem }() // Release semaphore - - common.RowBufferInflightFlushes.WithLabelValues( - b.config.Network, b.config.Processor, b.config.Table, - ).Inc() - - defer common.RowBufferInflightFlushes.WithLabelValues( - b.config.Network, b.config.Processor, b.config.Table, - ).Dec() - - _ = b.doFlush(ctx, rows, waiters, "timer") - }) + _ = b.doFlush(ctx, rows, waiters, "timer") } -// doFlush performs the actual flush through the circuit breaker and notifies all waiters. +// doFlush performs the actual flush and notifies all waiters. func (b *Buffer[R]) doFlush(ctx context.Context, rows []R, waiters []waiter, trigger string) error { if len(rows) == 0 { return nil } - // Execute through circuit breaker - _, err := b.cb.Execute(func() (any, error) { - return nil, b.executeFlush(ctx, rows, trigger) - }) - - // Track circuit breaker rejections - if errors.Is(err, gobreaker.ErrOpenState) || errors.Is(err, gobreaker.ErrTooManyRequests) { - common.RowBufferCircuitRejections.WithLabelValues( - b.config.Network, b.config.Processor, b.config.Table, - ).Inc() - } - - // Notify all waiters - for _, w := range waiters { - select { - case w.resultCh <- err: - default: - // Waiter may have timed out, skip - } - } - - return err -} - -// executeFlush performs the actual flush to ClickHouse. -func (b *Buffer[R]) executeFlush(ctx context.Context, rows []R, trigger string) error { start := time.Now() rowCount := len(rows) @@ -397,6 +294,7 @@ func (b *Buffer[R]) executeFlush(ctx context.Context, rows []R, trigger string) if err != nil { b.log.WithError(err).WithFields(logrus.Fields{ "rows": rowCount, + "waiters": len(waiters), "trigger": trigger, "duration": duration, "processor": b.config.Processor, @@ -405,6 +303,7 @@ func (b *Buffer[R]) executeFlush(ctx context.Context, rows []R, trigger string) } else { b.log.WithFields(logrus.Fields{ "rows": rowCount, + "waiters": len(waiters), "trigger": trigger, "duration": duration, "processor": b.config.Processor, @@ -412,6 +311,15 @@ func (b *Buffer[R]) executeFlush(ctx context.Context, rows []R, trigger string) }).Debug("ClickHouse flush completed") } + // Notify all waiters + for _, w := range waiters { + select { + case w.resultCh <- err: + default: + // Waiter may have timed out, skip + } + } + return err } diff --git a/pkg/rowbuffer/buffer_test.go b/pkg/rowbuffer/buffer_test.go index 8da74ce..dbdbccc 100644 --- a/pkg/rowbuffer/buffer_test.go +++ b/pkg/rowbuffer/buffer_test.go @@ -10,7 +10,6 @@ import ( "time" "github.com/sirupsen/logrus" - "github.com/sony/gobreaker/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -383,412 +382,3 @@ func TestBuffer_MultipleFlushes(t *testing.T) { assert.GreaterOrEqual(t, flushCount.Load(), int32(2)) }) } - -func TestBuffer_ConcurrentFlushes(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - var maxConcurrent atomic.Int32 - - var currentConcurrent atomic.Int32 - - buf := New(Config{ - MaxRows: 10, - FlushInterval: time.Hour, - MaxConcurrentFlushes: 3, - }, func(ctx context.Context, rows []int) error { - cur := currentConcurrent.Add(1) - - // Track max concurrent flushes observed - for { - maxVal := maxConcurrent.Load() - if cur <= maxVal || maxConcurrent.CompareAndSwap(maxVal, cur) { - break - } - } - - // Simulate some work - time.Sleep(10 * time.Millisecond) - - currentConcurrent.Add(-1) - - return nil - }, newTestLogger()) - - require.NoError(t, buf.Start(context.Background())) - - // Submit 50 rows in batches of 10 (triggers 5 flushes) - for i := range 5 { - go func(batch int) { - _ = buf.Submit(context.Background(), make([]int, 10)) - }(i) - } - - synctest.Wait() - - // Advance time to let flushes complete - time.Sleep(100 * time.Millisecond) - - synctest.Wait() - - require.NoError(t, buf.Stop(context.Background())) - - // Max concurrent should be <= 3 (the limit) - assert.LessOrEqual(t, maxConcurrent.Load(), int32(3)) - // And at least 1 flush should have happened - assert.GreaterOrEqual(t, maxConcurrent.Load(), int32(1)) - }) -} - -func TestBuffer_FlushSemaphoreLimit(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - flushStarted := make(chan struct{}, 10) - blockFlush := make(chan struct{}) - - var flushCount atomic.Int32 - - buf := New(Config{ - MaxRows: 5, - FlushInterval: time.Hour, - MaxConcurrentFlushes: 2, - }, func(ctx context.Context, rows []int) error { - flushStarted <- struct{}{} - - <-blockFlush - - flushCount.Add(1) - - return nil - }, newTestLogger()) - - require.NoError(t, buf.Start(context.Background())) - - // Submit 20 rows rapidly (triggers 4 flushes of 5 rows each) - for i := range 4 { - go func(batch int) { - _ = buf.Submit(context.Background(), make([]int, 5)) - }(i) - } - - synctest.Wait() - - // Only 2 flushes should start initially (semaphore limit) - startedCount := 0 - - for { - select { - case <-flushStarted: - startedCount++ - default: - goto done - } - } - - done: - assert.Equal(t, 2, startedCount, "only 2 flushes should start due to semaphore") - - // Unblock flushes - close(blockFlush) - - synctest.Wait() - - require.NoError(t, buf.Stop(context.Background())) - - // All 4 flushes should have completed - assert.Equal(t, int32(4), flushCount.Load()) - }) -} - -func TestBuffer_GracefulShutdown_WaitsForInflight(t *testing.T) { - flushStarted := make(chan struct{}) - blockFlush := make(chan struct{}) - - var flushCompleted atomic.Bool - - buf := New(Config{ - MaxRows: 10, - FlushInterval: time.Hour, - MaxConcurrentFlushes: 2, - }, func(ctx context.Context, rows []int) error { - close(flushStarted) - <-blockFlush - flushCompleted.Store(true) - - return nil - }, newTestLogger()) - - require.NoError(t, buf.Start(context.Background())) - - // Submit rows to trigger flush - go func() { - _ = buf.Submit(context.Background(), make([]int, 10)) - }() - - // Wait for flush to start - <-flushStarted - - // Start Stop() in goroutine - it should block until flush completes - stopDone := make(chan struct{}) - - go func() { - _ = buf.Stop(context.Background()) - - close(stopDone) - }() - - // Give Stop() time to potentially return early (it shouldn't) - time.Sleep(10 * time.Millisecond) - - // Stop should still be blocked - select { - case <-stopDone: - t.Fatal("Stop() returned before flush completed") - default: - // Expected - Stop is still waiting - } - - // Unblock the flush - close(blockFlush) - - // Now Stop should complete - select { - case <-stopDone: - assert.True(t, flushCompleted.Load(), "flush should have completed") - case <-time.After(time.Second): - t.Fatal("Stop() did not complete after flush unblocked") - } -} - -func TestBuffer_DefaultMaxConcurrentFlushes(t *testing.T) { - // Create buffer with Config{} (no MaxConcurrentFlushes set) - buf := New(Config{}, func(ctx context.Context, rows []int) error { - return nil - }, newTestLogger()) - - // Verify defaults to 10 (check via config) - assert.Equal(t, 10, buf.config.MaxConcurrentFlushes) - // Also verify the semaphore has capacity 10 - assert.Equal(t, 10, cap(buf.flushSem)) -} - -func TestBuffer_ConcurrentFlushErrors(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - var flushNum atomic.Int32 - - expectedErr := errors.New("batch 2 error") - - buf := New(Config{ - MaxRows: 5, - FlushInterval: time.Hour, - MaxConcurrentFlushes: 2, - }, func(ctx context.Context, rows []int) error { - num := flushNum.Add(1) - - // Batch 2 fails, others succeed - if num == 2 { - return expectedErr - } - - return nil - }, newTestLogger()) - - require.NoError(t, buf.Start(context.Background())) - - errChan := make(chan error, 4) - - // Submit 4 batches of 5 rows each - for i := range 4 { - go func(batch int) { - errChan <- buf.Submit(context.Background(), make([]int, 5)) - }(i) - } - - synctest.Wait() - - require.NoError(t, buf.Stop(context.Background())) - - // Collect all errors - var errs []error - - for range 4 { - select { - case err := <-errChan: - errs = append(errs, err) - default: - } - } - - // Should have exactly one error (from batch 2) - errorCount := 0 - - for _, err := range errs { - if err != nil { - errorCount++ - - assert.ErrorIs(t, err, expectedErr) - } - } - - assert.Equal(t, 1, errorCount, "exactly one batch should have failed") - }) -} - -func TestBuffer_CircuitBreakerTrips(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - flushErr := errors.New("clickhouse error") - - var flushCount atomic.Int32 - - buf := New(Config{ - MaxRows: 5, - FlushInterval: time.Hour, - MaxConcurrentFlushes: 1, // Force sequential flushes for deterministic behavior - CircuitBreakerMaxFailures: 3, // Trip after 3 consecutive failures - CircuitBreakerTimeout: time.Hour, - }, func(ctx context.Context, rows []int) error { - flushCount.Add(1) - - return flushErr - }, newTestLogger()) - - require.NoError(t, buf.Start(context.Background())) - - defer func() { _ = buf.Stop(context.Background()) }() - - errChan := make(chan error, 5) - - // Submit 5 batches - first 3 should fail with flushErr, then circuit opens - for i := range 5 { - go func(batch int) { - errChan <- buf.Submit(context.Background(), make([]int, 5)) - }(i) - } - - synctest.Wait() - - // Collect errors - var flushErrCount int - - var circuitOpenCount int - - for range 5 { - select { - case err := <-errChan: - if errors.Is(err, flushErr) { - flushErrCount++ - } else if errors.Is(err, gobreaker.ErrOpenState) { - circuitOpenCount++ - } - default: - } - } - - // With sequential flushes, should have exactly 3 flush errors before circuit tripped - assert.Equal(t, 3, int(flushCount.Load()), "should have attempted exactly 3 flushes before circuit opened") - assert.Equal(t, 3, flushErrCount, "should have 3 flush errors") - assert.Equal(t, 2, circuitOpenCount, "should have 2 circuit open rejections") - }) -} - -func TestBuffer_CircuitBreakerRejectsWhenOpen(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - flushErr := errors.New("clickhouse error") - - buf := New(Config{ - MaxRows: 5, - FlushInterval: time.Hour, - CircuitBreakerMaxFailures: 1, // Trip after 1 failure - CircuitBreakerTimeout: time.Hour, - }, func(ctx context.Context, rows []int) error { - return flushErr - }, newTestLogger()) - - require.NoError(t, buf.Start(context.Background())) - - defer func() { _ = buf.Stop(context.Background()) }() - - errChan := make(chan error, 1) - - // First batch trips the circuit - go func() { - errChan <- buf.Submit(context.Background(), make([]int, 5)) - }() - - synctest.Wait() - - err := <-errChan - require.ErrorIs(t, err, flushErr, "first batch should fail with flush error") - - // Second batch should be rejected immediately by circuit breaker - go func() { - errChan <- buf.Submit(context.Background(), make([]int, 5)) - }() - - synctest.Wait() - - err = <-errChan - require.ErrorIs(t, err, gobreaker.ErrOpenState, "second batch should be rejected by open circuit") - }) -} - -func TestBuffer_CircuitBreakerRecovery(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - var shouldFail atomic.Bool - shouldFail.Store(true) - - buf := New(Config{ - MaxRows: 5, - FlushInterval: time.Hour, - CircuitBreakerMaxFailures: 1, - CircuitBreakerTimeout: 100 * time.Millisecond, // Short timeout for testing - }, func(ctx context.Context, rows []int) error { - if shouldFail.Load() { - return errors.New("clickhouse error") - } - - return nil - }, newTestLogger()) - - require.NoError(t, buf.Start(context.Background())) - - defer func() { _ = buf.Stop(context.Background()) }() - - errChan := make(chan error, 1) - - // First batch trips the circuit - go func() { - errChan <- buf.Submit(context.Background(), make([]int, 5)) - }() - - synctest.Wait() - - err := <-errChan - require.Error(t, err, "first batch should fail") - - // Wait for circuit to transition to half-open - time.Sleep(150 * time.Millisecond) - - synctest.Wait() - - // Fix the "database" before the trial request - shouldFail.Store(false) - - // Next batch should succeed (half-open allows one trial) - go func() { - errChan <- buf.Submit(context.Background(), make([]int, 5)) - }() - - synctest.Wait() - - err = <-errChan - require.NoError(t, err, "batch after recovery should succeed") - }) -} - -func TestBuffer_CircuitBreakerDefaultConfig(t *testing.T) { - buf := New(Config{}, func(ctx context.Context, rows []int) error { - return nil - }, newTestLogger()) - - // Verify defaults - assert.Equal(t, uint32(5), buf.config.CircuitBreakerMaxFailures) - assert.Equal(t, 60*time.Second, buf.config.CircuitBreakerTimeout) -}