Skip to content

Conversation

@Savid
Copy link
Member

@Savid Savid commented Feb 2, 2026

  • Add circuit breaker using sony/gobreaker to protect against cascading failures when ClickHouse is unavailable. Configurable via bufferCircuitBreakerMaxFailures and bufferCircuitBreakerTimeout.

  • Add semaphore to limit concurrent flush operations, configurable via bufferMaxConcurrentFlushes (default: 10).

  • Add metrics: inflight_flushes, circuit_open, circuit_rejections_total

  • Fix RPC batch fetching to automatically chunk requests exceeding the 100 block limit imposed by most RPC nodes.

  • Fix BlocksProcessed metric to correctly count all blocks in a batch by moving increment to processors.

  • Fix block not found diff calculation: use diff > 0 && diff <= 5 to correctly distinguish blocks that might appear soon from blocks that should already exist.

  • Add comprehensive tests for circuit breaker, concurrent flushes, semaphore limiting, and block diff calculation.

Savid added 3 commits February 2, 2026 11:59
- Add circuit breaker using sony/gobreaker to protect against cascading
  failures when ClickHouse is unavailable. Configurable via
  bufferCircuitBreakerMaxFailures and bufferCircuitBreakerTimeout.

- Add semaphore to limit concurrent flush operations, configurable via
  bufferMaxConcurrentFlushes (default: 10).

- Add metrics: inflight_flushes, circuit_open, circuit_rejections_total

- Fix RPC batch fetching to automatically chunk requests exceeding the
  100 block limit imposed by most RPC nodes.

- Fix BlocksProcessed metric to correctly count all blocks in a batch
  by moving increment to processors.

- Fix block not found diff calculation: use diff > 0 && diff <= 5 to
  correctly distinguish blocks that might appear soon from blocks that
  should already exist.

- Add comprehensive tests for circuit breaker, concurrent flushes,
  semaphore limiting, and block diff calculation.
@Savid Savid merged commit fec76c6 into release/embed-mode-structlog-agg Feb 2, 2026
3 checks passed
Savid added a commit that referenced this pull request Feb 2, 2026
Savid added a commit that referenced this pull request Feb 2, 2026
Savid added a commit that referenced this pull request Feb 3, 2026
* feat(structlog): add call frame tracking to identify EVM call contexts

Introduce CallTracker to assign sequential frame IDs and maintain
call paths during opcode traversal. This enables accurate
identification of which contract call each opcode belongs to,
even when the same contract is called multiple times.

Extend Structlog with CallFrameID and CallFramePath fields to
persist the tracking information alongside each opcode record.

Update extractCallAddress to handle all CALL-type opcodes
(CALL, CALLCODE, DELEGATECALL, STATICCALL) for complete
call target extraction.

* test: add comprehensive unit tests for extractCallAddress function

* test: add unit tests for CREATE/CREATE2 address extraction
feat: detect CREATE/CREATE2 opcodes and fetch contract address from receipt
refactor: replace extractCallAddress with extractCallAddressWithCreate to
handle contract creation addresses

* style(transaction_processing.go): move comment to line above log to match Go style

* refactor(processor): replace receipt-based CREATE address lookup with trace-based computation

- Remove fetchCreateAddress and hasCreateOpcode helpers
- Introduce ComputeCreateAddresses to extract addresses directly from trace
- Update extractCallAddressWithCreate signature to accept index and map
- Rename queue name from "transaction-structlog" to "transaction_structlog"
- Expand test coverage for nested and failed CREATE scenarios

* fix(call_tracker): align root frame depth with EVM traces (depth 1)

The root frame now starts at depth 1 instead of 0 to match the
actual EVM structlog output, where execution begins at depth 1.
All tests updated to reflect the new depth semantics.

* feat(structlog): add GasSelf field to isolate CALL/CREATE overhead from child gas

Introduce GasSelf to represent the gas consumed by an opcode *excluding*
any gas spent in child frames. For CALL/CREATE opcodes this yields the
pure call overhead (warm/cold access, memory expansion, value transfer);
for all other opcodes GasSelf equals GasUsed. This allows accurate
aggregation of total execution gas without double counting.

- Add ComputeGasSelf() helper that subtracts the sum of *direct* child
 GasUsed values from each CALL/CREATE opcode’s GasUsed.
- Extend Structlog struct and ClickHouse schema with new GasSelf column.
- Update both ProcessTransaction() and ExtractStructlogs() to populate
 the new field.
- Provide extensive unit tests covering nested, sibling and edge cases.

* test(structlog): fix and expand address extraction tests for CALL opcodes
test(structlog): add comprehensive format_address tests for 20-byte padding
refactor(structlog): introduce formatAddress to normalize stack values to 42-char addresses

* WIP: current changes

* fix(structlog): correct EOA call detection to prevent phantom synthetic frames

Replace heuristic precompile check with go-ethereum's canonical list to
avoid mis-classifying early EOAs/contracts as precompiles. Tighten the
EOA detection logic to only create synthetic frames when call depth
remains unchanged (depth == nextDepth) instead of the previous
nextDepth <= depth, eliminating phantom frames for failed calls and
out-of-gas scenarios.

* chore: accidental commit

* refactor: remove unused ParityTrace types and helper from execution package

* feat: add Node interface and EmbeddedNode for library embedding

Introduce abstraction layer for execution nodes to support both RPC-based
and embedded usage patterns.

- Add Node interface defining the contract for execution data providers
- Add DataSource interface for hosts to provide data directly
- Add EmbeddedNode implementation that delegates to a DataSource
- Rename existing implementation to RPCNode
- Add NewPoolWithNodes() for creating pools with pre-created nodes
- Update pool to work with Node interface instead of concrete types

* refactor: abstract execution types to remove CGO dependency

Replace go-ethereum types (Block, Transaction, Receipt) with abstract
interfaces to enable library embedding without CGO. Move geth-specific
RPC implementation to pkg/ethereum/execution/geth/ subpackage with
build tags, allowing clean separation between embedded and RPC modes.

* refactor(structlog): add embedded-mode support to eliminate 99% of stack allocations

Introduce dual-mode processing (RPC vs embedded) in StructLog to reduce
memory pressure. Embedded mode pre-computes GasUsed and CallToAddress
in the tracer, avoiding post-processing passes and stack allocations.

- Add GasUsed and CallToAddress fields to StructLog
- Document operation modes and field semantics
- Extract call address via new helper, supporting both modes
- Add opcode constants and isCallOpcode utility

* perf(embedded_node): force DisableStack=true in DebugTraceTransaction for embedded mode

Reduces memory pressure by skipping full stack capture; the tracer
already extracts CallToAddress directly for CALL-family opcodes.

* feat: add support for pre-computed GasUsed values from embedded tracer

Introduce hasPrecomputedGasUsed() to detect when the tracer already
populates GasUsed (embedded mode). Skip expensive post-processing
computation in that case, falling back to RPC mode calculation only
when needed. This maintains backward compatibility while optimizing
performance for embedded mode traces.

* style(transaction_processing.go): add blank line after batch slice creation for readability

* refactor: change ChainID return type from int32 to int64
docs: remove trailing whitespace in README

* test(embedded_node_test.go): remove obsolete TestEmbeddedNode_DelegatesToDataSource_ChainID test

* fix(pool): register OnReady callbacks before spawning goroutines

Eliminates race condition where MarkReady() could fire before
callbacks were registered, causing missed ready events.

* refactor(embedded_node): add debug logging for OnReady callback execution
refactor(pool): add debug logging for OnReady callback registration and execution

* fix(transaction_processing.go): extract pre-computed GasUsed values from structlogs in embedded mode instead of skipping post-processing

* test(gas_cost_test.go): add unit tests for hasPrecomputedGasUsed function

* feat: detect pre-computed CREATE/CREATE2 addresses to skip expensive scan

Introduce hasPrecomputedCreateAddresses() to check whether the tracer
already populated CallToAddress for CREATE/CREATE2 opcodes.
When true (embedded mode), we skip the costly ComputeCreateAddresses()
pass, improving performance for large traces.

* style(embedded_node.go): add blank line before callback execution for readability

* lint

* refactor(structlog): remove ProgramCounter field from Structlog struct
The field is no longer needed for downstream processing and
simplifies the data model by eliminating redundant information.

* feat: introduce transaction call_frame processor for EVM execution analysis

Add a new processor that aggregates EVM structlog traces into per-call-frame
metrics. It extracts gas usage, opcode counts, error counts, and call
hierarchy for every CALL/CREATE frame, including synthetic EOA frames.

- Add Config, Processor, and all supporting components
- Integrate into Manager: config validation, initialization, task enqueue
- Provide forwards & backwards processing modes via Asynq tasks
- Emit ClickHouse rows with frame-level gas, depth, type, and parent links
- Include comprehensive unit tests for aggregation logic

* refactor(call_frame): unify root frame CallType to empty string and drop "EOA" label

Replace "ROOT" and "EOA" magic strings with an empty string for the root
frame and rely on the initiating CALL opcode for synthetic EOA frames.
This removes special-case values and keeps the field strictly tied to
actual CALL opcodes, simplifying downstream consumers.

* fix(aggregator): skip synthetic EOA rows in opcode count
fix(aggregator): compute intrinsic gas only when error_count = 0
feat(aggregator): add SetRootTargetAddress to set root frame target
fix(transaction_processing): set root frame target_address from tx.To()
fix(transaction_processing): emit consistent root frame for simple transfers

* refactor(processor): replace call_frame processor with structlog_agg processor

The call_frame processor is superseded by a new structlog_agg
implementation that aggregates structlog data into call frame rows
with per-opcode statistics. This provides richer analytics while
maintaining the same downstream table structure.

- Remove all call_frame processor code
- Introduce structlog_agg with frame-aware aggregation logic
- Update config, manager and imports to use the new processor
- Keep the same ClickHouse table name for compatibility

* fix: replace leftover "call_frame" references with "structlog_agg"
docs: update comments and error messages to match actual processor name

* fix(aggregator): skip gas refund and intrinsic gas for failed transactions
test(aggregator): add unit tests for failed vs successful tx gas handling

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix: prevent uint64 underflow in gas calculations and correct intrinsic gas for failed txs

- Add underflow guards in ComputeGasUsed and computeGasUsed when structlog
 gas values are corrupted or out of order, falling back to GasCost.
- Reorder arithmetic in computeIntrinsicGas to avoid underflow when
 receiptGas < gasCumulative.
- Always compute intrinsic gas for root frame, even on failed transactions,
 since it is charged before EVM execution starts.
- Add StartBlock/EndBlock config options to allow reprocessing ranges.
- Extend test coverage for all underflow scenarios and failed-tx behavior.

* fix(geth): sanitize corrupted gasCost values from Erigon underflow bug

Erigon's debug_traceTransaction can return huge gasCost values when
an unsigned integer underflow occurs in callGas(). Detect and clamp
any gasCost that exceeds the available gas to the actual gas left,
matching Reth's behavior. Prevents downstream consumers from seeing
implausibly large gas costs.

* fix(aggregator): count REVERT as error even when opcode has no error field

REVERT executes successfully (no opcode error) but still causes the
transaction to fail. The failure is indicated by trace.Failed=true.
Update Finalize to set ErrorCount=1 in this case and use the corrected
value when deciding whether to apply gas refunds.

Add test TestFrameAggregator_RevertWithoutOpcodeError to ensure REVERT
transactions are correctly flagged as failed and receive no refund.

* fix(state): handle nullable UInt32 column in getLimiterMaxBlock
Use toUInt64OrZero to cast nullable UInt32 to UInt64, returning 0 for NULL.
Treat 0 as "no data" and return genesis block, preventing nil pointer panic.

* fix(manager): start state manager in embedded mode to ensure ClickHouse connections

* fix(state): replace toUInt64OrZero with ifNull+toUInt64 for nullable UInt32
The new expression avoids potential type-casting issues in ch-go by
explicitly handling NULL values before casting to UInt64.

* fix(leaderelection): add callback-based notification for guaranteed delivery

The channel-based leadership notification was dropping events when the
buffer (size 10) was full, causing instances to get stuck thinking they
were still leader when they had lost leadership.

This adds OnLeadershipChange(callback) to the Elector interface for
guaranteed delivery via synchronous callback invocation. The channel
API is preserved for backward compatibility but marked as deprecated.

Changes:
- Add LeadershipCallback type and OnLeadershipChange method to interface
- Implement callback storage and notification in RedisElector
- Update handlers to call callbacks first, then channel (best-effort)
- Update Manager to use callbacks instead of monitorLeadership goroutine
- Add tests for callback invocation, multiple callbacks, guaranteed
  delivery, and slow callback handling

* fix: reorder block processing steps to prevent race condition

Move block registration (ClickHouse & Redis) before enqueueing tasks
to guarantee the block record exists before any task can complete.
Calculate expected task count upfront and warn if enqueueing fails.

* refactor(leaderelection): replace hand-rolled Redis locking with redsync (#60)

* refactor(leaderelection): replace hand-rolled Redis locking with redsync

Replace manual SetNX/Lua script implementation with github.com/go-redsync/redsync/v4
for distributed mutex handling.

Changes:
- Add redsync v4.15.0 dependency
- Simplify Elector interface: remove LeadershipChannel() and GetLeaderID()
- Rewrite RedisElector using redsync.Mutex with:
  - WithSetNXOnExtend() for Redis restart resilience
  - WithDriftFactor(0.01) for clock drift safety
  - WithTries(1) for non-blocking acquisition attempts
- Update tests to use callback-based notification only
- Reduce implementation from ~374 to ~260 lines

Breaking changes:
- LeadershipChannel() removed (use OnLeadershipChange callback instead)
- GetLeaderID() removed (redsync uses opaque internal values)

* fix logging

* perf: move expensive metrics updates to background workers

Replace synchronous, per-block metrics collection with dedicated
background goroutines that refresh heavy metrics every 15 s.
This removes costly ClickHouse queries from the hot path and
reduces block-processing latency.

- Remove updateBlockMetrics() from both structlog & structlog_agg
- Add start/stopMetricsWorker() helpers and runMetricsWorker() loop
- Keep only lightweight BlockHeight update in ProcessNextBlock
- Introduce limiter cache in state.Manager to avoid repeated
 MAX() queries; refresh every 6 s via background goroutine
- Update limiter query to use ORDER BY … LIMIT 1 for index usage
- Add comprehensive tests for cache hit/miss, concurrent access,
 single-start guarantees and refresh behaviour

* perf(manager.go): remove FINAL keyword from queryLimiterMaxBlock query
to avoid full table scan and improve performance

* style(manager.go): remove "(optimized)" from debug log message

* fix(pending.go): use SetNX with 30min TTL to prevent duplicate block processing (#64)

feat(pending.go): add ErrBlockAlreadyBeingProcessed sentinel error
refactor(block_processing): acquire Redis lock before ClickHouse mark to prevent races
fix(state): include milliseconds in ClickHouse timestamp format

* feat(processor): zero-interval processing mode as default (#65)

Change default processing behavior to run as fast as possible with no
delay between cycles. When no work is available, a 10ms backoff prevents
CPU spin.

- Remove DefaultInterval (was 10s), add DefaultNoWorkBackoff (10ms)
- Remove automatic interval assignment in config validation
- Change processBlocks to return bool indicating if work was done
- Rewrite runBlockProcessing to use default case for continuous processing
- Fixed interval mode (interval > 0) still supported for rate limiting

* Add async insert settings to processors (#67)

- Add configurable asyncInsert and waitForAsyncInsert settings to each
  processor's ClickHouse writes to reduce part creation pressure
- Use *bool pointers to distinguish omitted config (defaults to true)
  from explicit false values
- Remove unused chunkSize and progressLogThreshold config options
- Simplify structlog ProcessTransaction by removing OnInput streaming
  pattern in favor of simple batch insert
- Remove DefaultChunkSize and DefaultProgressLogThreshold from tracker
- Update tests to remove chunk-related assertions

* feat(rowbuffer): add row batching system for ClickHouse inserts (#68)

* feat(rowbuffer): add row batching system for ClickHouse inserts

Implements a generic row buffer that pools rows in memory across concurrent
tasks and flushes when hitting a row limit (100k default) or timer (1s default).

- Add pkg/rowbuffer with Buffer[R any] generic type using Go 1.25 features
- Add comprehensive tests using testing/synctest for deterministic concurrency
- Add Prometheus metrics for flush operations and pending state
- Update structlog, structlog_agg, and simple processors to use row buffer
- Replace asyncInsert/waitForAsyncInsert config with bufferMaxRows/bufferFlushInterval

This reduces ClickHouse distributed pending inserts by batching many small
transaction inserts into fewer large batches.

* refactor(logging): improve rowbuffer and pending tracker logs

- Change "Decremented pending task count" from debug to trace level
- Add processor and table fields to ClickHouse flush logs
- Rename flush log messages for clarity ("ClickHouse flush completed/failed")

* feat(processor): add batch block fetching for improved throughput (#69)

Implement batch block fetching to request multiple blocks at once using
BatchCallContext RPC, with proper backpressure handling and contiguous-only
batching.

Changes:
- Add BlocksByNumbers to Node/DataSource interfaces for batch RPC
- Implement batch RPC using go-ethereum's BatchCallContext
- Add GetAvailableCapacity and ValidateBatchWithinLeash to Limiter
- Add NextBlocks method to State Manager for batch number generation
- Add InitBlocks batch method to PendingTracker using Redis pipeline
- Refactor ProcessNextBlock in all processors to support batch fetching
- Add exponential backoff with jitter for backpressure handling

The batch size is limited by min(maxPendingBlockRange, availableCapacity)
and stops at the first missing/not-found block to maintain contiguity.

* feat(tracker): replace counter-based tracking with SET-based BlockCompletionTracker (#66)

* feat(tracker): replace counter-based tracking with SET-based BlockCompletionTracker

- Replace PendingTracker (counter-based) with BlockCompletionTracker (SET-based)
- Use deterministic TaskID for deduplication via asynq.TaskID()
- Track completed taskIDs in Redis SETs (idempotent SADD)
- Remove asynq.Retention since deduplication now handled by Redis SETs
- Fix Redis key prefix to be a proper prefix (prefix:key instead of key:prefix)
- Add 30min TTL to all block tracking keys (completed, expected, block_meta)
- Support orphaned block detection and reprocessing
- Change default processing interval to zero (immediate processing)

* docs(config): add staleBlockDetection to example config

Document the new stale block detection configuration options
introduced with the SET-based block completion tracker.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

* feat(processor): add high-priority reprocess queue for orphaned blocks (#72)

Add a dedicated reprocess queue with highest priority (20) that ensures
orphaned and stale blocks are processed before regular forwards (10) or
backwards (5) processing tasks.

Changes:
- Add ProcessReprocessQueue and PrefixedProcessReprocessQueue helpers
- Enable StrictPriority in asynq server for strict queue ordering
- Update all processors to return 3 queues with reprocess as highest priority
- Update ReprocessBlock methods to enqueue directly to reprocess queue
- Handle non-existent queue gracefully in monitoring (normal for reprocess)
- Add comprehensive tests for new queue helpers

* feat(rowbuffer): add circuit breaker and concurrent flush limiting (#71)

- Add circuit breaker using sony/gobreaker to protect against cascading
  failures when ClickHouse is unavailable. Configurable via
  bufferCircuitBreakerMaxFailures and bufferCircuitBreakerTimeout.

- Add semaphore to limit concurrent flush operations, configurable via
  bufferMaxConcurrentFlushes (default: 10).

- Add metrics: inflight_flushes, circuit_open, circuit_rejections_total

- Fix RPC batch fetching to automatically chunk requests exceeding the
  100 block limit imposed by most RPC nodes.

- Fix BlocksProcessed metric to correctly count all blocks in a batch
  by moving increment to processors.

- Fix block not found diff calculation: use diff > 0 && diff <= 5 to
  correctly distinguish blocks that might appear soon from blocks that
  should already exist.

- Add comprehensive tests for circuit breaker, concurrent flushes,
  semaphore limiting, and block diff calculation.

* Revert "feat(rowbuffer): add circuit breaker and concurrent flush limiting (#71)" (#73)

This reverts commit fec76c6.

---------

Co-authored-by: Andrew Davis <1709934+Savid@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants