Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,692 changes: 1,363 additions & 1,329 deletions Cargo.lock

Large diffs are not rendered by default.

61 changes: 34 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,52 @@ publish = false

[workspace.dependencies]
# reth
reth = { git = "https://github.com/paradigmxyz/reth" }
reth-chainspec = { git = "https://github.com/paradigmxyz/reth" }
reth-discv5 = { git = "https://github.com/paradigmxyz/reth" }
reth-execution-errors = { git = "https://github.com/paradigmxyz/reth" }
reth-execution-types = { git = "https://github.com/paradigmxyz/reth" }
reth-exex = { git = "https://github.com/paradigmxyz/reth", features = ["serde"] }
reth-eth-wire = { git = "https://github.com/paradigmxyz/reth" }
reth-evm = { git = "https://github.com/paradigmxyz/reth" }
reth-network = { git = "https://github.com/paradigmxyz/reth", features = ["test-utils"] }
reth-network-api = { git = "https://github.com/paradigmxyz/reth" }
reth-network-peers = { git = "https://github.com/paradigmxyz/reth" }
reth-node-api = { git = "https://github.com/paradigmxyz/reth" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth" }
reth-primitives = { git = "https://github.com/paradigmxyz/reth" }
reth-provider = { git = "https://github.com/paradigmxyz/reth" }
reth-revm = { git = "https://github.com/paradigmxyz/reth" }
reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth" }
reth-tracing = { git = "https://github.com/paradigmxyz/reth" }
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-discv5 = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-execution-errors = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-execution-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3", features = ["serde"] }
reth-eth-wire = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-network = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3", features = ["test-utils"] }
reth-network-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-network-peers = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-provider = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-revm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-tracing = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }

# alloy
alloy-eips = { version = "0.5.4", default-features = false }
alloy-consensus = { version = "0.5.4", default-features = false }
alloy-primitives = { version = "0.8.9", default-features = false }
alloy-eips = { version = "0.7", default-features = false }
alloy-genesis = { version = "0.7", default-features = false }
alloy-consensus = { version = "0.7", default-features = false }
alloy-rlp = "0.3.4"
alloy-rpc-types = { version = "0.5.4", features = [
alloy-rpc-types = { version = "0.7", features = [
"eth",
], default-features = false }
alloy-signer = { version = "0.5.4", default-features = false }
alloy-signer-local = { version = "0.5.4", default-features = false }
alloy-signer = { version = "0.7", default-features = false }
alloy-signer-local = { version = "0.7", default-features = false }

alloy-primitives = { version = "0.8", default-features = false }
alloy-sol-types = { version = "0.8", features = ["json"] }

discv5 = "0.8"
enr = "0.12.1"
secp256k1 = { version = "0.29", default-features = false, features = [
"global-context",
"recovery",
] }

# async
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"
tokio-tungstenite = { version = "0.23", features = ["native-tls"] }
tokio-tungstenite = { version = "0.24", features = ["native-tls"] }

# serde
serde = "1"
Expand All @@ -68,5 +75,5 @@ serde_json = "1"
eyre = "0.6"

# testing
reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth" }
reth-testing-utils = { git = "https://github.com/paradigmxyz/reth" }
reth-exex-test-utils = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
reth-testing-utils = { git = "https://github.com/paradigmxyz/reth", tag = "v1.1.3" }
39 changes: 19 additions & 20 deletions backfill/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
mod rpc;

use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};

use crate::rpc::{BackfillRpcExt, BackfillRpcExtApiServer};
use alloy_primitives::BlockNumber;
use clap::{Args, Parser};
use eyre::OptionExt;
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use jsonrpsee::tracing::instrument;
use reth::{
api::NodeTypes,
chainspec::EthereumChainSpecParser,
primitives::{BlockId, BlockNumberOrTag},
providers::{BlockIdReader, BlockReader, HeaderProvider, StateProviderFactory},
primitives::EthPrimitives,
providers::BlockIdReader,
rpc::types::{BlockId, BlockNumberOrTag},
};
use reth_evm::execute::BlockExecutorProvider;
use reth_execution_types::Chain;
use reth_exex::{BackfillJob, BackfillJobFactory, ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_tracing::tracing::{error, info};
use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};

use crate::rpc::{BackfillRpcExt, BackfillRpcExtApiServer};

/// The message type used to communicate with the ExEx.
enum BackfillMessage {
/// Start a backfill job for the given range.
Expand Down Expand Up @@ -55,7 +54,10 @@ struct BackfillExEx<Node: FullNodeComponents> {
backfill_jobs: HashMap<u64, oneshot::Sender<oneshot::Sender<()>>>,
}

impl<Node: FullNodeComponents> BackfillExEx<Node> {
impl<Node> BackfillExEx<Node>
where
Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>,
{
/// Creates a new instance of the ExEx.
fn new(
ctx: ExExContext<Node>,
Expand Down Expand Up @@ -191,7 +193,7 @@ impl<Node: FullNodeComponents> BackfillExEx<Node> {
backfill_tx: mpsc::UnboundedSender<BackfillMessage>,
cancel_rx: oneshot::Receiver<oneshot::Sender<()>>,
) {
let backfill = backfill_with_job(job);
let backfill = backfill_with_job(job.into_stream());

tokio::select! {
result = backfill => {
Expand All @@ -214,15 +216,12 @@ impl<Node: FullNodeComponents> BackfillExEx<Node> {

/// Backfills the given range of blocks in parallel, calling the
/// [`process_committed_chain`] method for each block.
async fn backfill_with_job<
E: BlockExecutorProvider + Send,
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Send + Unpin + 'static,
>(
job: BackfillJob<E, P>,
) -> eyre::Result<()> {
job
// Convert the backfill job into a parallel stream
.into_stream()
async fn backfill_with_job<S, E>(st: S) -> eyre::Result<()>
where
S: Stream<Item = Result<Chain, E>>,
E: Into<eyre::Error>,
{
st
// Covert the block execution error into `eyre`
.map_err(Into::into)
// Process each block, returning early if an error occurs
Expand Down Expand Up @@ -304,7 +303,7 @@ fn main() -> eyre::Result<()> {
.backfill_job_factory
.backfill(args.from_block.unwrap_or(1)..=to_block);

backfill_with_job(job).await.map_err(|err| {
backfill_with_job(job.into_stream()).await.map_err(|err| {
eyre::eyre!("failed to backfill for the provided args: {err:?}")
})?;
}
Expand Down
2 changes: 1 addition & 1 deletion discv5/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ reth.workspace = true

# networking
discv5.workspace = true
enr = "0.12"
enr.workspace = true

# async
futures-util.workspace = true
Expand Down
1 change: 1 addition & 0 deletions in-memory-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ reth-node-ethereum.workspace = true
reth-tracing.workspace = true
reth-execution-types.workspace = true

alloy-consensus.workspace = true
eyre.workspace = true
futures-util.workspace = true

Expand Down
26 changes: 16 additions & 10 deletions in-memory-state/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![warn(unused_crate_dependencies)]

use alloy_consensus::BlockHeader;
use futures_util::{FutureExt, TryStreamExt};
use reth::{api::NodeTypes, primitives::EthPrimitives};
use reth_execution_types::ExecutionOutcome;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
Expand All @@ -27,7 +29,10 @@ impl<Node: FullNodeComponents> InMemoryStateExEx<Node> {
}
}

impl<Node: FullNodeComponents + Unpin> Future for InMemoryStateExEx<Node> {
impl<Node> Future for InMemoryStateExEx<Node>
where
Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>> + Unpin,
{
type Output = eyre::Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -40,11 +45,11 @@ impl<Node: FullNodeComponents + Unpin> Future for InMemoryStateExEx<Node> {
}
ExExNotification::ChainReorged { old, new } => {
// revert to block before the reorg
this.execution_outcome.revert_to(new.first().number - 1);
this.execution_outcome.revert_to(new.first().number() - 1);
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
this.execution_outcome.revert_to(old.first().number - 1);
this.execution_outcome.revert_to(old.first().number() - 1);
info!(reverted_chain = ?old.range(), "Received revert");
}
};
Expand Down Expand Up @@ -76,7 +81,8 @@ fn main() -> eyre::Result<()> {

#[cfg(test)]
mod tests {
use reth::revm::db::BundleState;
use super::*;
use reth::{primitives::SealedBlockWithSenders, revm::db::BundleState};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex_test_utils::{test_exex_context, PollOnce};
use reth_testing_utils::generators::{self, random_block, random_receipt, BlockParams};
Expand All @@ -87,20 +93,20 @@ mod tests {
let mut rng = &mut generators::rng();

let (ctx, handle) = test_exex_context().await?;
let mut exex = pin!(super::InMemoryStateExEx::new(ctx));
let mut exex = pin!(InMemoryStateExEx::new(ctx));

let mut expected_state = ExecutionOutcome::default();

// Generate first block and its state
let block_1 =
let block_1: SealedBlockWithSenders =
random_block(&mut rng, 0, BlockParams { tx_count: Some(1), ..Default::default() })
.seal_with_senders()
.ok_or(eyre::eyre!("failed to recover senders"))?;
let block_number_1 = block_1.number;
let block_number_1 = block_1.header().number();
let execution_outcome1 = ExecutionOutcome::new(
BundleState::default(),
vec![random_receipt(&mut rng, &block_1.body.transactions[0], None)].into(),
block_1.number,
block_1.header().number(),
vec![],
);
// Extend the expected state with the first block
Expand All @@ -117,14 +123,14 @@ mod tests {
assert_eq!(exex.as_mut().execution_outcome, expected_state);

// Generate second block and its state
let block_2 =
let block_2: SealedBlockWithSenders =
random_block(&mut rng, 1, BlockParams { tx_count: Some(2), ..Default::default() })
.seal_with_senders()
.ok_or(eyre::eyre!("failed to recover senders"))?;
let execution_outcome2 = ExecutionOutcome::new(
BundleState::default(),
vec![random_receipt(&mut rng, &block_2.body.transactions[0], None)].into(),
block_2.number,
block_2.header().number(),
vec![],
);
// Extend the expected execution outcome with the second block
Expand Down
4 changes: 3 additions & 1 deletion op-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ reth-node-ethereum.workspace = true
reth-primitives.workspace = true
reth-tracing.workspace = true
reth.workspace = true
reth-evm.workspace = true

# alloy
alloy-primitives.workspace = true
Expand All @@ -23,13 +24,14 @@ alloy-eips.workspace = true
# misc
eyre.workspace = true
futures.workspace = true
rusqlite = { version = "0.31.0", features = ["bundled"] }
rusqlite = { version = "0.32", features = ["bundled"] }
tokio.workspace = true

[dev-dependencies]
# reth
reth-exex-test-utils.workspace = true
reth-testing-utils.workspace = true
alloy-primitives = { workspace = true, features = ["rand"]}

# alloy
alloy-consensus.workspace = true
Expand Down
30 changes: 18 additions & 12 deletions op-bridge/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use alloy_primitives::{address, Address};
use alloy_sol_types::{sol, SolEventInterface};
use futures::{Future, FutureExt, TryStreamExt};
use reth::api::NodeTypes;
use reth_execution_types::Chain;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_primitives::{Log, SealedBlockWithSenders, TransactionSigned};
use reth_primitives::{EthPrimitives, Log, SealedBlockWithSenders, TransactionSigned};
use reth_tracing::tracing::info;
use rusqlite::Connection;

Expand All @@ -24,10 +25,13 @@ const OP_BRIDGES: [Address; 6] = [
/// Initializes the ExEx.
///
/// Opens up a SQLite database and creates the tables (if they don't exist).
async fn init<Node: FullNodeComponents>(
async fn init<Node>(
ctx: ExExContext<Node>,
mut connection: Connection,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
) -> eyre::Result<impl Future<Output = eyre::Result<()>>>
where
Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>,
{
create_tables(&mut connection)?;

Ok(op_bridge_exex(ctx, connection))
Expand Down Expand Up @@ -98,10 +102,13 @@ fn create_tables(connection: &mut Connection) -> rusqlite::Result<()> {

/// An example of ExEx that listens to ETH bridging events from OP Stack chains
/// and stores deposits and withdrawals in a SQLite database.
async fn op_bridge_exex<Node: FullNodeComponents>(
async fn op_bridge_exex<Node>(
mut ctx: ExExContext<Node>,
connection: Connection,
) -> eyre::Result<()> {
) -> eyre::Result<()>
where
Node: FullNodeComponents<Types: NodeTypes<Primitives = EthPrimitives>>,
{
// Process all new chain state notifications
while let Some(notification) = ctx.notifications.try_next().await? {
// Revert all deposits and withdrawals
Expand Down Expand Up @@ -219,7 +226,8 @@ fn decode_chain_into_events(
.flat_map(|(block, receipts)| {
block
.body
.transactions()
.transactions
.iter()
.zip(receipts.iter().flatten())
.map(move |(tx, receipt)| (block, tx, receipt))
})
Expand Down Expand Up @@ -270,8 +278,7 @@ fn main() -> eyre::Result<()> {

#[cfg(test)]
mod tests {
use std::pin::pin;

use super::*;
use alloy_consensus::TxLegacy;
use alloy_eips::eip7685::Requests;
use alloy_primitives::{Address, TxKind, U256};
Expand All @@ -280,12 +287,11 @@ mod tests {
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex_test_utils::{test_exex_context, PollOnce};
use reth_primitives::{
Block, BlockBody, Header, Log, Receipt, Transaction, TransactionSigned, TxType,
Block, BlockBody, BlockExt, Header, Log, Receipt, Transaction, TransactionSigned, TxType,
};
use reth_testing_utils::generators::sign_tx_with_random_key_pair;
use rusqlite::Connection;

use crate::{L1StandardBridge, OP_BRIDGES};
use std::pin::pin;

/// Given the address of a bridge contract and an event, construct a transaction signed with a
/// random private key and a receipt for that transaction.
Expand Down Expand Up @@ -351,7 +357,7 @@ mod tests {
body: BlockBody { transactions: vec![deposit_tx, withdrawal_tx], ..Default::default() },
}
.seal_slow()
.seal_with_senders()
.seal_with_senders::<Block>()
.ok_or_else(|| eyre::eyre!("failed to recover senders"))?;

// Construct a chain
Expand Down
Loading
Loading