Skip to content

Commit 3d80ab8

Browse files
authored
feat(watcher): Configureable block query range (#377)
* feat(watcher): Configureable block query range * fix: CI check
1 parent 0c6705e commit 3d80ab8

File tree

7 files changed

+45
-14
lines changed

7 files changed

+45
-14
lines changed

crates/node/src/args.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,18 @@ impl ScrollRollupNodeConfig {
327327
let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
328328
if let Some(provider) = l1_provider.filter(|_| !self.test) {
329329
tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher");
330-
(None, Some(L1Watcher::spawn(provider, l1_start_block_number, node_config).await))
330+
(
331+
None,
332+
Some(
333+
L1Watcher::spawn(
334+
provider,
335+
l1_start_block_number,
336+
node_config,
337+
self.l1_provider_args.logs_query_block_range,
338+
)
339+
.await,
340+
),
341+
)
331342
} else {
332343
// Create a channel for L1 notifications that we can use to inject L1 messages for
333344
// testing
@@ -604,6 +615,9 @@ pub struct L1ProviderArgs {
604615
/// The initial backoff for the provider.
605616
#[arg(long = "l1.initial-backoff", id = "l1_initial_backoff", value_name = "L1_INITIAL_BACKOFF", default_value_t = constants::L1_PROVIDER_INITIAL_BACKOFF)]
606617
pub initial_backoff: u64,
618+
/// The logs query block range.
619+
#[arg(long = "l1.query-range", id = "l1_query_range", value_name = "L1_QUERY_RANGE", default_value_t = constants::LOGS_QUERY_BLOCK_RANGE)]
620+
pub logs_query_block_range: u64,
607621
}
608622

609623
/// The arguments for the Beacon provider.

crates/node/src/constants.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ pub(crate) const L1_PROVIDER_MAX_RETRIES: u32 = 10;
88
/// The initial backoff for the L1 provider.
99
pub(crate) const L1_PROVIDER_INITIAL_BACKOFF: u64 = 100;
1010

11+
/// The block range used to fetch L1 logs.
12+
pub(crate) const LOGS_QUERY_BLOCK_RANGE: u64 = 500;
13+
1114
/// The max retries for the L2 provider.
1215
pub(crate) const L2_PROVIDER_MAX_RETRIES: u32 = u32::MAX;
1316

crates/node/tests/sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> {
5353
compute_units_per_second: 500,
5454
max_retries: 10,
5555
initial_backoff: 100,
56+
logs_query_block_range: 500,
5657
},
5758
engine_driver_args: EngineDriverArgs { sync_at_startup: false },
5859
sequencer_args: SequencerArgs {

crates/watcher/src/lib.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ use std::{
2929
};
3030
use tokio::sync::mpsc;
3131

32-
/// The block range used to fetch L1 logs.
33-
pub const LOGS_QUERY_BLOCK_RANGE: u64 = 500;
3432
/// The maximum count of unfinalized blocks we can have in Ethereum.
3533
pub const MAX_UNFINALIZED_BLOCK_COUNT: usize = 96;
3634

@@ -84,6 +82,8 @@ pub struct L1Watcher<EP> {
8482
metrics: WatcherMetrics,
8583
/// Whether the watcher is synced to the L1 head.
8684
is_synced: bool,
85+
/// The log query block range.
86+
log_query_block_range: u64,
8787
}
8888

8989
/// The L1 notification type yielded by the [`L1Watcher`].
@@ -158,10 +158,11 @@ where
158158
execution_provider: EP,
159159
start_block: Option<u64>,
160160
config: Arc<NodeConfig>,
161+
log_query_block_range: u64,
161162
) -> mpsc::Receiver<Arc<L1Notification>> {
162163
tracing::trace!(target: "scroll::watcher", ?start_block, ?config, "spawning L1 watcher");
163164

164-
let (tx, rx) = mpsc::channel(LOGS_QUERY_BLOCK_RANGE as usize);
165+
let (tx, rx) = mpsc::channel(log_query_block_range as usize);
165166

166167
let fetch_block_number = async |tag: BlockNumberOrTag| {
167168
let block = loop {
@@ -192,6 +193,7 @@ where
192193
config,
193194
metrics: WatcherMetrics::default(),
194195
is_synced: false,
196+
log_query_block_range,
195197
};
196198

197199
// notify at spawn.
@@ -612,7 +614,7 @@ where
612614
async fn update_current_block(&mut self, latest: &Block) -> L1WatcherResult<()> {
613615
self.current_block_number = self
614616
.current_block_number
615-
.saturating_add(LOGS_QUERY_BLOCK_RANGE)
617+
.saturating_add(self.log_query_block_range)
616618
.min(latest.header.number);
617619
self.notify(L1Notification::Processed(self.current_block_number)).await
618620
}
@@ -637,7 +639,8 @@ where
637639

638640
/// Returns the next range of logs, for the block range in
639641
/// \[[`current_block`](field@L1Watcher::current_block_number);
640-
/// [`current_block`](field@L1Watcher::current_block_number) + [`LOGS_QUERY_BLOCK_RANGE`]\].
642+
/// [`current_block`](field@L1Watcher::current_block_number) +
643+
/// [`field@L1Watcher::log_query_block_range`]\].
641644
async fn next_filtered_logs(&self, latest_block_number: u64) -> L1WatcherResult<Vec<Log>> {
642645
// set the block range for the query
643646
let address_book = &self.config.address_book;
@@ -654,7 +657,7 @@ where
654657
]);
655658
let to_block = self
656659
.current_block_number
657-
.saturating_add(LOGS_QUERY_BLOCK_RANGE)
660+
.saturating_add(self.log_query_block_range)
658661
.min(latest_block_number);
659662

660663
// skip a block for `from_block` since `self.current_block_number` is the last indexed
@@ -679,6 +682,8 @@ mod tests {
679682
use arbitrary::Arbitrary;
680683
use scroll_l1::abi::calls::commitBatchCall;
681684

685+
const LOG_QUERY_BLOCK_RANGE: u64 = 500;
686+
682687
// Returns a L1Watcher along with the receiver end of the L1Notifications.
683688
fn l1_watcher(
684689
unfinalized_blocks: Vec<Header>,
@@ -699,7 +704,7 @@ mod tests {
699704
vec![latest],
700705
);
701706

702-
let (tx, rx) = mpsc::channel(LOGS_QUERY_BLOCK_RANGE as usize);
707+
let (tx, rx) = mpsc::channel(LOG_QUERY_BLOCK_RANGE as usize);
703708
(
704709
L1Watcher {
705710
execution_provider: provider,
@@ -710,6 +715,7 @@ mod tests {
710715
config: Arc::new(NodeConfig::mainnet()),
711716
metrics: WatcherMetrics::default(),
712717
is_synced: false,
718+
log_query_block_range: LOG_QUERY_BLOCK_RANGE,
713719
},
714720
rx,
715721
)

crates/watcher/tests/indexing.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio::select;
1818
async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> {
1919
const CHAIN_LEN: usize = 200;
2020
const HALF_CHAIN_LEN: usize = 100;
21+
const LOGS_QUERY_BLOCK_RANGE: u64 = 500;
2122

2223
// Given
2324
let (finalized, latest, headers) = chain(CHAIN_LEN);
@@ -58,7 +59,8 @@ async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()>
5859
);
5960

6061
// spawn the watcher and verify received notifications are consistent.
61-
let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await;
62+
let mut l1_watcher =
63+
L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await;
6264
let mut prev_block_number = 0;
6365
let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(2));
6466
let _ = ticker.tick().await;

crates/watcher/tests/logs.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> {
2020

2121
const CHAIN_LEN: usize = 200;
2222
const HALF_CHAIN_LEN: usize = CHAIN_LEN / 2;
23+
const LOGS_QUERY_BLOCK_RANGE: u64 = 500;
2324

2425
// Given
2526
let (finalized, _, headers) = chain(CHAIN_LEN);
@@ -62,7 +63,8 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> {
6263
);
6364

6465
// spawn the watcher and verify received notifications are consistent.
65-
let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await;
66+
let mut l1_watcher =
67+
L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await;
6668
let mut received_logs = Vec::new();
6769
loop {
6870
let notification = l1_watcher.recv().await.map(|notif| (*notif).clone());

crates/watcher/tests/reorg.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use rollup_node_primitives::NodeConfig;
1010
use rollup_node_watcher::{
1111
random, test_utils::provider::MockProvider, Block, L1Notification, L1Watcher,
1212
};
13+
const LOGS_QUERY_BLOCK_RANGE: u64 = 500;
1314

1415
// Generate a set blocks that will be fed to the l1 watcher.
1516
// Every fork_cycle blocks, generates a small reorg.
@@ -71,7 +72,8 @@ async fn test_should_detect_reorg() -> eyre::Result<()> {
7172
);
7273

7374
// spawn the watcher and verify received notifications are consistent.
74-
let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await;
75+
let mut l1_watcher =
76+
L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await;
7577

7678
// skip the first two events
7779
l1_watcher.recv().await.unwrap();
@@ -92,7 +94,7 @@ async fn test_should_detect_reorg() -> eyre::Result<()> {
9294
}
9395

9496
if latest_number == latest.header.number {
95-
continue
97+
continue;
9698
}
9799

98100
let mut notification = l1_watcher.recv().await.unwrap();
@@ -172,7 +174,8 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> {
172174
);
173175

174176
// spawn the watcher and verify received notifications are consistent.
175-
let mut l1_watcher = L1Watcher::spawn(mock_provider, None, Arc::new(config)).await;
177+
let mut l1_watcher =
178+
L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await;
176179

177180
// skip the first two events
178181
l1_watcher.recv().await.unwrap();
@@ -193,7 +196,7 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> {
193196
}
194197

195198
if latest_number == latest.header.number {
196-
continue
199+
continue;
197200
}
198201

199202
let mut notification = l1_watcher.recv().await.unwrap();

0 commit comments

Comments
 (0)