diff --git a/mempool/iterator.go b/mempool/iterator.go index b7a75b91c..b5c95121e 100644 --- a/mempool/iterator.go +++ b/mempool/iterator.go @@ -45,7 +45,14 @@ type EVMMempoolIterator struct { // It combines iterators from both transaction pools and selects transactions based on fee priority. // Returns nil if both iterators are empty or nil. The bondDenom parameter specifies the native // token denomination for fee comparisons, and chainId is used for EVM transaction conversion. -func NewEVMMempoolIterator(evmIterator *miner.TransactionsByPriceAndNonce, cosmosIterator mempool.Iterator, logger log.Logger, txConfig client.TxConfig, bondDenom string, chainID *big.Int, blockchain *Blockchain) mempool.Iterator { +func NewEVMMempoolIterator( + evmIterator *miner.TransactionsByPriceAndNonce, + cosmosIterator mempool.Iterator, + logger log.Logger, + txConfig client.TxConfig, + bondDenom string, + blockchain *Blockchain, +) mempool.Iterator { // Check if we have any transactions at all hasEVM := evmIterator != nil && !evmIterator.Empty() hasCosmos := cosmosIterator != nil && cosmosIterator.Tx() != nil @@ -64,7 +71,7 @@ func NewEVMMempoolIterator(evmIterator *miner.TransactionsByPriceAndNonce, cosmo logger: logger, txConfig: txConfig, bondDenom: bondDenom, - chainID: chainID, + chainID: blockchain.Config().ChainID, blockchain: blockchain, } } diff --git a/mempool/mempool.go b/mempool/mempool.go index 25268b1d8..01b720c74 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -404,17 +404,45 @@ func (m *ExperimentalEVMMempool) ReapNewValidTxs(maxBytes uint64, maxGas uint64) func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkmempool.Iterator { m.mtx.Lock() defer m.mtx.Unlock() - ctx := sdk.UnwrapSDKContext(goCtx) - // Wait for the legacypool to Reset at >= blockHeight (this may have - // already happened), to ensure all txs in pending pool are valid. - m.legacyTxPool.WaitForReorgHeight(ctx, ctx.BlockHeight()) + return m.buildIterator(goCtx, i) +} - evmIterator, cosmosIterator := m.getIterators(goCtx, i) +// SelectBy iterates through transactions until the provided filter function returns false. +// It uses the same unified iterator as Select but allows early termination based on +// custom criteria defined by the filter function. +func (m *ExperimentalEVMMempool) SelectBy(goCtx context.Context, txs [][]byte, filter func(sdk.Tx) bool) { + m.mtx.Lock() + defer m.mtx.Unlock() - combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain) + iter := m.buildIterator(goCtx, txs) - return combinedIterator + for iter != nil && filter(iter.Tx()) { + iter = iter.Next() + } +} + +// buildIterator ensures that EVM mempool has checked txs for reorgs up to COMMITTED +// block height and then returns a combined iterator over EVM & Cosmos txs. +func (m *ExperimentalEVMMempool) buildIterator(ctx context.Context, txs [][]byte) sdkmempool.Iterator { + sdkCtx := sdk.UnwrapSDKContext(ctx) + + // context has a block height of the next PROPOSED block, + // but we need to wait for the reorg to complete on the previous COMMITTED block. + committedHeight := sdkCtx.BlockHeight() - 1 + + m.legacyTxPool.WaitForReorgHeight(ctx, committedHeight) + + evmIterator, cosmosIterator := m.getIterators(ctx, txs) + + return NewEVMMempoolIterator( + evmIterator, + cosmosIterator, + m.logger, + m.txConfig, + m.vmKeeper.GetEvmCoinInfo(sdkCtx).Denom, + m.blockchain, + ) } // CountTx returns the total number of transactions in both EVM and Cosmos pools. @@ -519,23 +547,6 @@ func (m *ExperimentalEVMMempool) shouldRemoveFromEVMPool(hash common.Hash, reaso return true } -// SelectBy iterates through transactions until the provided filter function returns false. -// It uses the same unified iterator as Select but allows early termination based on -// custom criteria defined by the filter function. -func (m *ExperimentalEVMMempool) SelectBy(goCtx context.Context, i [][]byte, f func(sdk.Tx) bool) { - m.mtx.Lock() - defer m.mtx.Unlock() - ctx := sdk.UnwrapSDKContext(goCtx) - - evmIterator, cosmosIterator := m.getIterators(goCtx, i) - - combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain) - - for combinedIterator != nil && f(combinedIterator.Tx()) { - combinedIterator = combinedIterator.Next() - } -} - // SetEventBus sets CometBFT event bus to listen for new block header event. func (m *ExperimentalEVMMempool) SetEventBus(eventBus *cmttypes.EventBus) { if m.HasEventBus() { @@ -594,7 +605,7 @@ func (m *ExperimentalEVMMempool) getEVMMessage(tx sdk.Tx) (*evmtypes.MsgEthereum // getIterators prepares iterators over pending EVM and Cosmos transactions. // It configures EVM transactions with proper base fee filtering and priority ordering, // while setting up the Cosmos iterator with the provided exclusion list. -func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, i [][]byte) (*miner.TransactionsByPriceAndNonce, sdkmempool.Iterator) { +func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, txs [][]byte) (*miner.TransactionsByPriceAndNonce, sdkmempool.Iterator) { ctx := sdk.UnwrapSDKContext(goCtx) baseFee := m.vmKeeper.GetBaseFee(ctx) var baseFeeUint *uint256.Int @@ -602,21 +613,18 @@ func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, i [][]byte) baseFeeUint = uint256.MustFromBig(baseFee) } - m.logger.Debug("getting iterators") - - pendingFilter := txpool.PendingFilter{ + evmPendingTxs := m.txPool.Pending(txpool.PendingFilter{ MinTip: m.minTip, BaseFee: baseFeeUint, BlobFee: nil, OnlyPlainTxs: true, OnlyBlobTxs: false, - } - evmPendingTxes := m.txPool.Pending(pendingFilter) - orderedEVMPendingTxes := miner.NewTransactionsByPriceAndNonce(nil, evmPendingTxes, baseFee) + }) - cosmosPendingTxes := m.cosmosPool.Select(ctx, i) + evmIterator := miner.NewTransactionsByPriceAndNonce(nil, evmPendingTxs, baseFee) + cosmosIterator := m.cosmosPool.Select(ctx, txs) - return orderedEVMPendingTxes, cosmosPendingTxes + return evmIterator, cosmosIterator } func (m *ExperimentalEVMMempool) TrackTx(hash common.Hash) error { diff --git a/mempool/txpool/legacypool/legacypool.go b/mempool/txpool/legacypool/legacypool.go index a094d5eed..f39eef0a5 100644 --- a/mempool/txpool/legacypool/legacypool.go +++ b/mempool/txpool/legacypool/legacypool.go @@ -162,8 +162,11 @@ var ( // throttleTxMeter counts how many transactions are rejected due to too-many-changes between // txpool reorgs. throttleTxMeter = metrics.NewRegisteredMeter("txpool/throttle", nil) + // reorgDurationTimer measures how long time a txpool reorg takes. - reorgDurationTimer = metrics.NewRegisteredTimer("txpool/reorgtime", nil) + reorgDurationTimer = metrics.NewRegisteredTimer("txpool/reorgtime", nil) + reorgWaitDurationTimer = metrics.NewRegisteredTimer("txpool/reorgwaittime", nil) + // dropBetweenReorgHistogram counts how many drops we experience between two reorg runs. It is expected // that this number is pretty low, since txpool reorgs happen very frequently. dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015)) @@ -1392,9 +1395,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) { - defer func(t0 time.Time) { - reorgDurationTimer.Update(time.Since(t0)) - }(time.Now()) + defer func(t0 time.Time) { reorgDurationTimer.UpdateSince(t0) }(time.Now()) defer close(done) var promoteAddrs []common.Address @@ -2107,6 +2108,8 @@ func (pool *LegacyPool) Clear() { // height >= height. If the context is cancelled or the pool is shutting down, // this will also return. func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) { + defer func(start time.Time) { reorgWaitDurationTimer.UpdateSince(start) }(time.Now()) + for pool.latestReorgHeight.Load() < height { // reorg loop has not run at the target height, subscribe to the // outcome of the next reorg loop iteration to know when to check again