Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions mempool/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ type EVMMempoolIterator struct {
// It combines iterators from both transaction pools and selects transactions based on fee priority.
// Returns nil if both iterators are empty or nil. The bondDenom parameter specifies the native
// token denomination for fee comparisons, and chainId is used for EVM transaction conversion.
func NewEVMMempoolIterator(evmIterator *miner.TransactionsByPriceAndNonce, cosmosIterator mempool.Iterator, logger log.Logger, txConfig client.TxConfig, bondDenom string, chainID *big.Int, blockchain *Blockchain) mempool.Iterator {
func NewEVMMempoolIterator(
evmIterator *miner.TransactionsByPriceAndNonce,
cosmosIterator mempool.Iterator,
logger log.Logger,
txConfig client.TxConfig,
bondDenom string,
blockchain *Blockchain,
) mempool.Iterator {
// Check if we have any transactions at all
hasEVM := evmIterator != nil && !evmIterator.Empty()
hasCosmos := cosmosIterator != nil && cosmosIterator.Tx() != nil
Expand All @@ -64,7 +71,7 @@ func NewEVMMempoolIterator(evmIterator *miner.TransactionsByPriceAndNonce, cosmo
logger: logger,
txConfig: txConfig,
bondDenom: bondDenom,
chainID: chainID,
chainID: blockchain.Config().ChainID,
blockchain: blockchain,
}
}
Expand Down
74 changes: 41 additions & 33 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,17 +404,45 @@ func (m *ExperimentalEVMMempool) ReapNewValidTxs(maxBytes uint64, maxGas uint64)
func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkmempool.Iterator {
m.mtx.Lock()
defer m.mtx.Unlock()
ctx := sdk.UnwrapSDKContext(goCtx)

// Wait for the legacypool to Reset at >= blockHeight (this may have
// already happened), to ensure all txs in pending pool are valid.
m.legacyTxPool.WaitForReorgHeight(ctx, ctx.BlockHeight())
return m.buildIterator(goCtx, i)
}

evmIterator, cosmosIterator := m.getIterators(goCtx, i)
// SelectBy iterates through transactions until the provided filter function returns false.
// It uses the same unified iterator as Select but allows early termination based on
// custom criteria defined by the filter function.
func (m *ExperimentalEVMMempool) SelectBy(goCtx context.Context, txs [][]byte, filter func(sdk.Tx) bool) {
m.mtx.Lock()
defer m.mtx.Unlock()

combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain)
iter := m.buildIterator(goCtx, txs)

return combinedIterator
for iter != nil && filter(iter.Tx()) {
iter = iter.Next()
}
}

// buildIterator ensures that EVM mempool has checked txs for reorgs up to COMMITTED
// block height and then returns a combined iterator over EVM & Cosmos txs.
func (m *ExperimentalEVMMempool) buildIterator(ctx context.Context, txs [][]byte) sdkmempool.Iterator {
sdkCtx := sdk.UnwrapSDKContext(ctx)

// context has a block height of the next PROPOSED block,
// but we need to wait for the reorg to complete on the previous COMMITTED block.
committedHeight := sdkCtx.BlockHeight() - 1

m.legacyTxPool.WaitForReorgHeight(ctx, committedHeight)

evmIterator, cosmosIterator := m.getIterators(ctx, txs)

return NewEVMMempoolIterator(
evmIterator,
cosmosIterator,
m.logger,
m.txConfig,
m.vmKeeper.GetEvmCoinInfo(sdkCtx).Denom,
m.blockchain,
)
}

// CountTx returns the total number of transactions in both EVM and Cosmos pools.
Expand Down Expand Up @@ -519,23 +547,6 @@ func (m *ExperimentalEVMMempool) shouldRemoveFromEVMPool(hash common.Hash, reaso
return true
}

// SelectBy iterates through transactions until the provided filter function returns false.
// It uses the same unified iterator as Select but allows early termination based on
// custom criteria defined by the filter function.
func (m *ExperimentalEVMMempool) SelectBy(goCtx context.Context, i [][]byte, f func(sdk.Tx) bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
ctx := sdk.UnwrapSDKContext(goCtx)

evmIterator, cosmosIterator := m.getIterators(goCtx, i)

combinedIterator := NewEVMMempoolIterator(evmIterator, cosmosIterator, m.logger, m.txConfig, m.vmKeeper.GetEvmCoinInfo(ctx).Denom, m.blockchain.Config().ChainID, m.blockchain)

for combinedIterator != nil && f(combinedIterator.Tx()) {
combinedIterator = combinedIterator.Next()
}
}

// SetEventBus sets CometBFT event bus to listen for new block header event.
func (m *ExperimentalEVMMempool) SetEventBus(eventBus *cmttypes.EventBus) {
if m.HasEventBus() {
Expand Down Expand Up @@ -594,29 +605,26 @@ func (m *ExperimentalEVMMempool) getEVMMessage(tx sdk.Tx) (*evmtypes.MsgEthereum
// getIterators prepares iterators over pending EVM and Cosmos transactions.
// It configures EVM transactions with proper base fee filtering and priority ordering,
// while setting up the Cosmos iterator with the provided exclusion list.
func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, i [][]byte) (*miner.TransactionsByPriceAndNonce, sdkmempool.Iterator) {
func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, txs [][]byte) (*miner.TransactionsByPriceAndNonce, sdkmempool.Iterator) {
ctx := sdk.UnwrapSDKContext(goCtx)
baseFee := m.vmKeeper.GetBaseFee(ctx)
var baseFeeUint *uint256.Int
if baseFee != nil {
baseFeeUint = uint256.MustFromBig(baseFee)
}

m.logger.Debug("getting iterators")

pendingFilter := txpool.PendingFilter{
evmPendingTxs := m.txPool.Pending(txpool.PendingFilter{
MinTip: m.minTip,
BaseFee: baseFeeUint,
BlobFee: nil,
OnlyPlainTxs: true,
OnlyBlobTxs: false,
}
evmPendingTxes := m.txPool.Pending(pendingFilter)
orderedEVMPendingTxes := miner.NewTransactionsByPriceAndNonce(nil, evmPendingTxes, baseFee)
})

cosmosPendingTxes := m.cosmosPool.Select(ctx, i)
evmIterator := miner.NewTransactionsByPriceAndNonce(nil, evmPendingTxs, baseFee)
cosmosIterator := m.cosmosPool.Select(ctx, txs)

return orderedEVMPendingTxes, cosmosPendingTxes
return evmIterator, cosmosIterator
}

func (m *ExperimentalEVMMempool) TrackTx(hash common.Hash) error {
Expand Down
11 changes: 7 additions & 4 deletions mempool/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,11 @@
// throttleTxMeter counts how many transactions are rejected due to too-many-changes between
// txpool reorgs.
throttleTxMeter = metrics.NewRegisteredMeter("txpool/throttle", nil)

// reorgDurationTimer measures how long time a txpool reorg takes.
reorgDurationTimer = metrics.NewRegisteredTimer("txpool/reorgtime", nil)
reorgDurationTimer = metrics.NewRegisteredTimer("txpool/reorgtime", nil)
reorgWaitDurationTimer = metrics.NewRegisteredTimer("txpool/reorgwaittime", nil)

// dropBetweenReorgHistogram counts how many drops we experience between two reorg runs. It is expected
// that this number is pretty low, since txpool reorgs happen very frequently.
dropBetweenReorgHistogram = metrics.NewRegisteredHistogram("txpool/dropbetweenreorg", nil, metrics.NewExpDecaySample(1028, 0.015))
Expand Down Expand Up @@ -1392,9 +1395,7 @@

// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) {
defer func(t0 time.Time) {
reorgDurationTimer.Update(time.Since(t0))
}(time.Now())
defer func(t0 time.Time) { reorgDurationTimer.UpdateSince(t0) }(time.Now())
defer close(done)

var promoteAddrs []common.Address
Expand Down Expand Up @@ -2107,6 +2108,8 @@
// height >= height. If the context is cancelled or the pool is shutting down,
// this will also return.
func (pool *LegacyPool) WaitForReorgHeight(ctx context.Context, height int64) {
defer func(start time.Time) { reorgWaitDurationTimer.UpdateSince(start) }(time.Now())

for pool.latestReorgHeight.Load() < height {
// reorg loop has not run at the target height, subscribe to the
// outcome of the next reorg loop iteration to know when to check again
Expand Down
Loading