-
Notifications
You must be signed in to change notification settings - Fork 0
feat(rowbuffer): add circuit breaker and concurrent flush limiting #71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Savid
merged 3 commits into
release/embed-mode-structlog-agg
from
feat/rowbuffer-circuit-breaker-concurrency
Feb 2, 2026
Merged
feat(rowbuffer): add circuit breaker and concurrent flush limiting #71
Savid
merged 3 commits into
release/embed-mode-structlog-agg
from
feat/rowbuffer-circuit-breaker-concurrency
Feb 2, 2026
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- Add circuit breaker using sony/gobreaker to protect against cascading failures when ClickHouse is unavailable. Configurable via bufferCircuitBreakerMaxFailures and bufferCircuitBreakerTimeout. - Add semaphore to limit concurrent flush operations, configurable via bufferMaxConcurrentFlushes (default: 10). - Add metrics: inflight_flushes, circuit_open, circuit_rejections_total - Fix RPC batch fetching to automatically chunk requests exceeding the 100 block limit imposed by most RPC nodes. - Fix BlocksProcessed metric to correctly count all blocks in a batch by moving increment to processors. - Fix block not found diff calculation: use diff > 0 && diff <= 5 to correctly distinguish blocks that might appear soon from blocks that should already exist. - Add comprehensive tests for circuit breaker, concurrent flushes, semaphore limiting, and block diff calculation.
…ircuit-breaker-concurrency
…ircuit-breaker-concurrency
mattevans
approved these changes
Feb 2, 2026
Savid
added a commit
that referenced
this pull request
Feb 3, 2026
* feat(structlog): add call frame tracking to identify EVM call contexts Introduce CallTracker to assign sequential frame IDs and maintain call paths during opcode traversal. This enables accurate identification of which contract call each opcode belongs to, even when the same contract is called multiple times. Extend Structlog with CallFrameID and CallFramePath fields to persist the tracking information alongside each opcode record. Update extractCallAddress to handle all CALL-type opcodes (CALL, CALLCODE, DELEGATECALL, STATICCALL) for complete call target extraction. * test: add comprehensive unit tests for extractCallAddress function * test: add unit tests for CREATE/CREATE2 address extraction feat: detect CREATE/CREATE2 opcodes and fetch contract address from receipt refactor: replace extractCallAddress with extractCallAddressWithCreate to handle contract creation addresses * style(transaction_processing.go): move comment to line above log to match Go style * refactor(processor): replace receipt-based CREATE address lookup with trace-based computation - Remove fetchCreateAddress and hasCreateOpcode helpers - Introduce ComputeCreateAddresses to extract addresses directly from trace - Update extractCallAddressWithCreate signature to accept index and map - Rename queue name from "transaction-structlog" to "transaction_structlog" - Expand test coverage for nested and failed CREATE scenarios * fix(call_tracker): align root frame depth with EVM traces (depth 1) The root frame now starts at depth 1 instead of 0 to match the actual EVM structlog output, where execution begins at depth 1. All tests updated to reflect the new depth semantics. * feat(structlog): add GasSelf field to isolate CALL/CREATE overhead from child gas Introduce GasSelf to represent the gas consumed by an opcode *excluding* any gas spent in child frames. For CALL/CREATE opcodes this yields the pure call overhead (warm/cold access, memory expansion, value transfer); for all other opcodes GasSelf equals GasUsed. This allows accurate aggregation of total execution gas without double counting. - Add ComputeGasSelf() helper that subtracts the sum of *direct* child GasUsed values from each CALL/CREATE opcode’s GasUsed. - Extend Structlog struct and ClickHouse schema with new GasSelf column. - Update both ProcessTransaction() and ExtractStructlogs() to populate the new field. - Provide extensive unit tests covering nested, sibling and edge cases. * test(structlog): fix and expand address extraction tests for CALL opcodes test(structlog): add comprehensive format_address tests for 20-byte padding refactor(structlog): introduce formatAddress to normalize stack values to 42-char addresses * WIP: current changes * fix(structlog): correct EOA call detection to prevent phantom synthetic frames Replace heuristic precompile check with go-ethereum's canonical list to avoid mis-classifying early EOAs/contracts as precompiles. Tighten the EOA detection logic to only create synthetic frames when call depth remains unchanged (depth == nextDepth) instead of the previous nextDepth <= depth, eliminating phantom frames for failed calls and out-of-gas scenarios. * chore: accidental commit * refactor: remove unused ParityTrace types and helper from execution package * feat: add Node interface and EmbeddedNode for library embedding Introduce abstraction layer for execution nodes to support both RPC-based and embedded usage patterns. - Add Node interface defining the contract for execution data providers - Add DataSource interface for hosts to provide data directly - Add EmbeddedNode implementation that delegates to a DataSource - Rename existing implementation to RPCNode - Add NewPoolWithNodes() for creating pools with pre-created nodes - Update pool to work with Node interface instead of concrete types * refactor: abstract execution types to remove CGO dependency Replace go-ethereum types (Block, Transaction, Receipt) with abstract interfaces to enable library embedding without CGO. Move geth-specific RPC implementation to pkg/ethereum/execution/geth/ subpackage with build tags, allowing clean separation between embedded and RPC modes. * refactor(structlog): add embedded-mode support to eliminate 99% of stack allocations Introduce dual-mode processing (RPC vs embedded) in StructLog to reduce memory pressure. Embedded mode pre-computes GasUsed and CallToAddress in the tracer, avoiding post-processing passes and stack allocations. - Add GasUsed and CallToAddress fields to StructLog - Document operation modes and field semantics - Extract call address via new helper, supporting both modes - Add opcode constants and isCallOpcode utility * perf(embedded_node): force DisableStack=true in DebugTraceTransaction for embedded mode Reduces memory pressure by skipping full stack capture; the tracer already extracts CallToAddress directly for CALL-family opcodes. * feat: add support for pre-computed GasUsed values from embedded tracer Introduce hasPrecomputedGasUsed() to detect when the tracer already populates GasUsed (embedded mode). Skip expensive post-processing computation in that case, falling back to RPC mode calculation only when needed. This maintains backward compatibility while optimizing performance for embedded mode traces. * style(transaction_processing.go): add blank line after batch slice creation for readability * refactor: change ChainID return type from int32 to int64 docs: remove trailing whitespace in README * test(embedded_node_test.go): remove obsolete TestEmbeddedNode_DelegatesToDataSource_ChainID test * fix(pool): register OnReady callbacks before spawning goroutines Eliminates race condition where MarkReady() could fire before callbacks were registered, causing missed ready events. * refactor(embedded_node): add debug logging for OnReady callback execution refactor(pool): add debug logging for OnReady callback registration and execution * fix(transaction_processing.go): extract pre-computed GasUsed values from structlogs in embedded mode instead of skipping post-processing * test(gas_cost_test.go): add unit tests for hasPrecomputedGasUsed function * feat: detect pre-computed CREATE/CREATE2 addresses to skip expensive scan Introduce hasPrecomputedCreateAddresses() to check whether the tracer already populated CallToAddress for CREATE/CREATE2 opcodes. When true (embedded mode), we skip the costly ComputeCreateAddresses() pass, improving performance for large traces. * style(embedded_node.go): add blank line before callback execution for readability * lint * refactor(structlog): remove ProgramCounter field from Structlog struct The field is no longer needed for downstream processing and simplifies the data model by eliminating redundant information. * feat: introduce transaction call_frame processor for EVM execution analysis Add a new processor that aggregates EVM structlog traces into per-call-frame metrics. It extracts gas usage, opcode counts, error counts, and call hierarchy for every CALL/CREATE frame, including synthetic EOA frames. - Add Config, Processor, and all supporting components - Integrate into Manager: config validation, initialization, task enqueue - Provide forwards & backwards processing modes via Asynq tasks - Emit ClickHouse rows with frame-level gas, depth, type, and parent links - Include comprehensive unit tests for aggregation logic * refactor(call_frame): unify root frame CallType to empty string and drop "EOA" label Replace "ROOT" and "EOA" magic strings with an empty string for the root frame and rely on the initiating CALL opcode for synthetic EOA frames. This removes special-case values and keeps the field strictly tied to actual CALL opcodes, simplifying downstream consumers. * fix(aggregator): skip synthetic EOA rows in opcode count fix(aggregator): compute intrinsic gas only when error_count = 0 feat(aggregator): add SetRootTargetAddress to set root frame target fix(transaction_processing): set root frame target_address from tx.To() fix(transaction_processing): emit consistent root frame for simple transfers * refactor(processor): replace call_frame processor with structlog_agg processor The call_frame processor is superseded by a new structlog_agg implementation that aggregates structlog data into call frame rows with per-opcode statistics. This provides richer analytics while maintaining the same downstream table structure. - Remove all call_frame processor code - Introduce structlog_agg with frame-aware aggregation logic - Update config, manager and imports to use the new processor - Keep the same ClickHouse table name for compatibility * fix: replace leftover "call_frame" references with "structlog_agg" docs: update comments and error messages to match actual processor name * fix(aggregator): skip gas refund and intrinsic gas for failed transactions test(aggregator): add unit tests for failed vs successful tx gas handling Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: prevent uint64 underflow in gas calculations and correct intrinsic gas for failed txs - Add underflow guards in ComputeGasUsed and computeGasUsed when structlog gas values are corrupted or out of order, falling back to GasCost. - Reorder arithmetic in computeIntrinsicGas to avoid underflow when receiptGas < gasCumulative. - Always compute intrinsic gas for root frame, even on failed transactions, since it is charged before EVM execution starts. - Add StartBlock/EndBlock config options to allow reprocessing ranges. - Extend test coverage for all underflow scenarios and failed-tx behavior. * fix(geth): sanitize corrupted gasCost values from Erigon underflow bug Erigon's debug_traceTransaction can return huge gasCost values when an unsigned integer underflow occurs in callGas(). Detect and clamp any gasCost that exceeds the available gas to the actual gas left, matching Reth's behavior. Prevents downstream consumers from seeing implausibly large gas costs. * fix(aggregator): count REVERT as error even when opcode has no error field REVERT executes successfully (no opcode error) but still causes the transaction to fail. The failure is indicated by trace.Failed=true. Update Finalize to set ErrorCount=1 in this case and use the corrected value when deciding whether to apply gas refunds. Add test TestFrameAggregator_RevertWithoutOpcodeError to ensure REVERT transactions are correctly flagged as failed and receive no refund. * fix(state): handle nullable UInt32 column in getLimiterMaxBlock Use toUInt64OrZero to cast nullable UInt32 to UInt64, returning 0 for NULL. Treat 0 as "no data" and return genesis block, preventing nil pointer panic. * fix(manager): start state manager in embedded mode to ensure ClickHouse connections * fix(state): replace toUInt64OrZero with ifNull+toUInt64 for nullable UInt32 The new expression avoids potential type-casting issues in ch-go by explicitly handling NULL values before casting to UInt64. * fix(leaderelection): add callback-based notification for guaranteed delivery The channel-based leadership notification was dropping events when the buffer (size 10) was full, causing instances to get stuck thinking they were still leader when they had lost leadership. This adds OnLeadershipChange(callback) to the Elector interface for guaranteed delivery via synchronous callback invocation. The channel API is preserved for backward compatibility but marked as deprecated. Changes: - Add LeadershipCallback type and OnLeadershipChange method to interface - Implement callback storage and notification in RedisElector - Update handlers to call callbacks first, then channel (best-effort) - Update Manager to use callbacks instead of monitorLeadership goroutine - Add tests for callback invocation, multiple callbacks, guaranteed delivery, and slow callback handling * fix: reorder block processing steps to prevent race condition Move block registration (ClickHouse & Redis) before enqueueing tasks to guarantee the block record exists before any task can complete. Calculate expected task count upfront and warn if enqueueing fails. * refactor(leaderelection): replace hand-rolled Redis locking with redsync (#60) * refactor(leaderelection): replace hand-rolled Redis locking with redsync Replace manual SetNX/Lua script implementation with github.com/go-redsync/redsync/v4 for distributed mutex handling. Changes: - Add redsync v4.15.0 dependency - Simplify Elector interface: remove LeadershipChannel() and GetLeaderID() - Rewrite RedisElector using redsync.Mutex with: - WithSetNXOnExtend() for Redis restart resilience - WithDriftFactor(0.01) for clock drift safety - WithTries(1) for non-blocking acquisition attempts - Update tests to use callback-based notification only - Reduce implementation from ~374 to ~260 lines Breaking changes: - LeadershipChannel() removed (use OnLeadershipChange callback instead) - GetLeaderID() removed (redsync uses opaque internal values) * fix logging * perf: move expensive metrics updates to background workers Replace synchronous, per-block metrics collection with dedicated background goroutines that refresh heavy metrics every 15 s. This removes costly ClickHouse queries from the hot path and reduces block-processing latency. - Remove updateBlockMetrics() from both structlog & structlog_agg - Add start/stopMetricsWorker() helpers and runMetricsWorker() loop - Keep only lightweight BlockHeight update in ProcessNextBlock - Introduce limiter cache in state.Manager to avoid repeated MAX() queries; refresh every 6 s via background goroutine - Update limiter query to use ORDER BY … LIMIT 1 for index usage - Add comprehensive tests for cache hit/miss, concurrent access, single-start guarantees and refresh behaviour * perf(manager.go): remove FINAL keyword from queryLimiterMaxBlock query to avoid full table scan and improve performance * style(manager.go): remove "(optimized)" from debug log message * fix(pending.go): use SetNX with 30min TTL to prevent duplicate block processing (#64) feat(pending.go): add ErrBlockAlreadyBeingProcessed sentinel error refactor(block_processing): acquire Redis lock before ClickHouse mark to prevent races fix(state): include milliseconds in ClickHouse timestamp format * feat(processor): zero-interval processing mode as default (#65) Change default processing behavior to run as fast as possible with no delay between cycles. When no work is available, a 10ms backoff prevents CPU spin. - Remove DefaultInterval (was 10s), add DefaultNoWorkBackoff (10ms) - Remove automatic interval assignment in config validation - Change processBlocks to return bool indicating if work was done - Rewrite runBlockProcessing to use default case for continuous processing - Fixed interval mode (interval > 0) still supported for rate limiting * Add async insert settings to processors (#67) - Add configurable asyncInsert and waitForAsyncInsert settings to each processor's ClickHouse writes to reduce part creation pressure - Use *bool pointers to distinguish omitted config (defaults to true) from explicit false values - Remove unused chunkSize and progressLogThreshold config options - Simplify structlog ProcessTransaction by removing OnInput streaming pattern in favor of simple batch insert - Remove DefaultChunkSize and DefaultProgressLogThreshold from tracker - Update tests to remove chunk-related assertions * feat(rowbuffer): add row batching system for ClickHouse inserts (#68) * feat(rowbuffer): add row batching system for ClickHouse inserts Implements a generic row buffer that pools rows in memory across concurrent tasks and flushes when hitting a row limit (100k default) or timer (1s default). - Add pkg/rowbuffer with Buffer[R any] generic type using Go 1.25 features - Add comprehensive tests using testing/synctest for deterministic concurrency - Add Prometheus metrics for flush operations and pending state - Update structlog, structlog_agg, and simple processors to use row buffer - Replace asyncInsert/waitForAsyncInsert config with bufferMaxRows/bufferFlushInterval This reduces ClickHouse distributed pending inserts by batching many small transaction inserts into fewer large batches. * refactor(logging): improve rowbuffer and pending tracker logs - Change "Decremented pending task count" from debug to trace level - Add processor and table fields to ClickHouse flush logs - Rename flush log messages for clarity ("ClickHouse flush completed/failed") * feat(processor): add batch block fetching for improved throughput (#69) Implement batch block fetching to request multiple blocks at once using BatchCallContext RPC, with proper backpressure handling and contiguous-only batching. Changes: - Add BlocksByNumbers to Node/DataSource interfaces for batch RPC - Implement batch RPC using go-ethereum's BatchCallContext - Add GetAvailableCapacity and ValidateBatchWithinLeash to Limiter - Add NextBlocks method to State Manager for batch number generation - Add InitBlocks batch method to PendingTracker using Redis pipeline - Refactor ProcessNextBlock in all processors to support batch fetching - Add exponential backoff with jitter for backpressure handling The batch size is limited by min(maxPendingBlockRange, availableCapacity) and stops at the first missing/not-found block to maintain contiguity. * feat(tracker): replace counter-based tracking with SET-based BlockCompletionTracker (#66) * feat(tracker): replace counter-based tracking with SET-based BlockCompletionTracker - Replace PendingTracker (counter-based) with BlockCompletionTracker (SET-based) - Use deterministic TaskID for deduplication via asynq.TaskID() - Track completed taskIDs in Redis SETs (idempotent SADD) - Remove asynq.Retention since deduplication now handled by Redis SETs - Fix Redis key prefix to be a proper prefix (prefix:key instead of key:prefix) - Add 30min TTL to all block tracking keys (completed, expected, block_meta) - Support orphaned block detection and reprocessing - Change default processing interval to zero (immediate processing) * docs(config): add staleBlockDetection to example config Document the new stale block detection configuration options introduced with the SET-based block completion tracker. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> * feat(processor): add high-priority reprocess queue for orphaned blocks (#72) Add a dedicated reprocess queue with highest priority (20) that ensures orphaned and stale blocks are processed before regular forwards (10) or backwards (5) processing tasks. Changes: - Add ProcessReprocessQueue and PrefixedProcessReprocessQueue helpers - Enable StrictPriority in asynq server for strict queue ordering - Update all processors to return 3 queues with reprocess as highest priority - Update ReprocessBlock methods to enqueue directly to reprocess queue - Handle non-existent queue gracefully in monitoring (normal for reprocess) - Add comprehensive tests for new queue helpers * feat(rowbuffer): add circuit breaker and concurrent flush limiting (#71) - Add circuit breaker using sony/gobreaker to protect against cascading failures when ClickHouse is unavailable. Configurable via bufferCircuitBreakerMaxFailures and bufferCircuitBreakerTimeout. - Add semaphore to limit concurrent flush operations, configurable via bufferMaxConcurrentFlushes (default: 10). - Add metrics: inflight_flushes, circuit_open, circuit_rejections_total - Fix RPC batch fetching to automatically chunk requests exceeding the 100 block limit imposed by most RPC nodes. - Fix BlocksProcessed metric to correctly count all blocks in a batch by moving increment to processors. - Fix block not found diff calculation: use diff > 0 && diff <= 5 to correctly distinguish blocks that might appear soon from blocks that should already exist. - Add comprehensive tests for circuit breaker, concurrent flushes, semaphore limiting, and block diff calculation. * Revert "feat(rowbuffer): add circuit breaker and concurrent flush limiting (#71)" (#73) This reverts commit fec76c6. --------- Co-authored-by: Andrew Davis <1709934+Savid@users.noreply.github.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Add circuit breaker using sony/gobreaker to protect against cascading failures when ClickHouse is unavailable. Configurable via bufferCircuitBreakerMaxFailures and bufferCircuitBreakerTimeout.
Add semaphore to limit concurrent flush operations, configurable via bufferMaxConcurrentFlushes (default: 10).
Add metrics: inflight_flushes, circuit_open, circuit_rejections_total
Fix RPC batch fetching to automatically chunk requests exceeding the 100 block limit imposed by most RPC nodes.
Fix BlocksProcessed metric to correctly count all blocks in a batch by moving increment to processors.
Fix block not found diff calculation: use diff > 0 && diff <= 5 to correctly distinguish blocks that might appear soon from blocks that should already exist.
Add comprehensive tests for circuit breaker, concurrent flushes, semaphore limiting, and block diff calculation.