Skip to content

Commit aac1ba7

Browse files
authored
feat: skipped l1 messages (#380)
* feat: skipped l1 messages * fix: comments * fix: function proxy issue
1 parent 3d80ab8 commit aac1ba7

File tree

10 files changed

+241
-13
lines changed

10 files changed

+241
-13
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,7 @@ impl<
426426
let batch_info = batch.batch_info;
427427
tracing::info!(target: "scroll::chain_orchestrator", batch_info = ?batch_info, num_blocks = batch.attributes.len(), "Handling derived batch");
428428

429+
let skipped_l1_messages = batch.skipped_l1_messages.clone();
429430
let batch_reconciliation_result =
430431
reconcile_batch(&self.l2_client, batch, self.engine.fcs()).await?;
431432
let aggregated_actions = batch_reconciliation_result.aggregate_actions();
@@ -493,7 +494,9 @@ impl<
493494
batch_reconciliation_result.into_batch_consolidation_outcome(reorg_results).await?;
494495

495496
// Insert the batch consolidation outcome into the database.
496-
let consolidation_outcome = batch_consolidation_outcome.clone();
497+
let mut consolidation_outcome = batch_consolidation_outcome.clone();
498+
consolidation_outcome.with_skipped_l1_messages(skipped_l1_messages);
499+
497500
self.database.insert_batch_consolidation_outcome(consolidation_outcome).await?;
498501

499502
Ok(Some(ChainOrchestratorEvent::BatchConsolidated(batch_consolidation_outcome)))

crates/database/db/src/db.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,8 @@ impl DatabaseWriteOperations for Database {
167167
&self,
168168
block_number: u64,
169169
) -> Result<u64, DatabaseError> {
170-
self.tx_mut(
171-
move |tx| async move { tx.delete_l2_blocks_gt_block_number(block_number).await },
172-
)
173-
.await
170+
self.tx_mut(move |tx| async move { tx.delete_batches_gt_block_number(block_number).await })
171+
.await
174172
}
175173

176174
async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result<u64, DatabaseError> {
@@ -186,6 +184,14 @@ impl DatabaseWriteOperations for Database {
186184
.await
187185
}
188186

187+
async fn update_skipped_l1_messages(&self, indexes: Vec<u64>) -> Result<(), DatabaseError> {
188+
self.tx_mut(move |tx| {
189+
let indexes = indexes.clone();
190+
async move { tx.update_skipped_l1_messages(indexes).await }
191+
})
192+
.await
193+
}
194+
189195
async fn delete_l1_messages_gt(
190196
&self,
191197
l1_block_number: u64,

crates/database/db/src/models/l1_message.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub struct Model {
1818
sender: Vec<u8>,
1919
input: Vec<u8>,
2020
pub(crate) l2_block_number: Option<i64>,
21+
skipped: bool,
2122
}
2223

2324
/// The relation for the L1 message model.
@@ -40,6 +41,7 @@ impl From<L1MessageEnvelope> for ActiveModel {
4041
sender: ActiveValue::Set(value.transaction.sender.to_vec()),
4142
input: ActiveValue::Set(value.transaction.input.to_vec()),
4243
l2_block_number: ActiveValue::Set(value.l2_block_number.map(|b| b as i64)),
44+
skipped: ActiveValue::Set(false),
4345
}
4446
}
4547
}

crates/database/db/src/operations.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ pub trait DatabaseWriteOperations {
5858
/// Insert an [`L1MessageEnvelope`] into the database.
5959
async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError>;
6060

61+
/// Sets the `skipped` column to true for the provided list of L1 messages queue indexes.
62+
async fn update_skipped_l1_messages(&self, indexes: Vec<u64>) -> Result<(), DatabaseError>;
63+
6164
/// Delete all [`L1MessageEnvelope`]s with a block number greater than the provided block
6265
/// number and return them.
6366
async fn delete_l1_messages_gt(
@@ -329,6 +332,15 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
329332
}
330333
}
331334

335+
async fn update_skipped_l1_messages(&self, indexes: Vec<u64>) -> Result<(), DatabaseError> {
336+
Ok(models::l1_message::Entity::update_many()
337+
.col_expr(models::l1_message::Column::Skipped, Expr::value(true))
338+
.filter(models::l1_message::Column::QueueIndex.is_in(indexes.iter().map(|&x| x as i64)))
339+
.exec(self.get_connection())
340+
.await
341+
.map(|_| ())?)
342+
}
343+
332344
async fn delete_l1_messages_gt(
333345
&self,
334346
l1_block_number: u64,
@@ -531,6 +543,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
531543
self.insert_block(block.block_info, outcome.batch_info).await?;
532544
self.update_l1_messages_with_l2_block(block).await?;
533545
}
546+
self.update_skipped_l1_messages(outcome.skipped_l1_messages).await?;
534547
Ok(())
535548
}
536549

@@ -867,9 +880,10 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
867880

868881
// Create a filter condition for messages that have an L1 block number less than or
869882
// equal to the finalized block number and have not been included in an L2 block
870-
// (i.e. L2BlockNumber is null).
883+
// (i.e. L2BlockNumber is null) nor skipped.
871884
let condition = Condition::all()
872885
.add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64))
886+
.add(models::l1_message::Column::Skipped.eq(false))
873887
.add(models::l1_message::Column::L2BlockNumber.is_null());
874888
// Yield n messages matching the condition ordered by increasing queue index.
875889
Ok(models::l1_message::Entity::find()
@@ -899,9 +913,10 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
899913
};
900914
// Create a filter condition for messages that have an L1 block number less than
901915
// or equal to the target block number and have not been included in an L2 block
902-
// (i.e. L2BlockNumber is null).
916+
// (i.e. L2BlockNumber is null) nor skipped.
903917
let condition = Condition::all()
904918
.add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64))
919+
.add(models::l1_message::Column::Skipped.eq(false))
905920
.add(models::l1_message::Column::L2BlockNumber.is_null());
906921
// Yield n messages matching the condition ordered by increasing queue index.
907922
Ok(models::l1_message::Entity::find()

crates/database/migration/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod m20250929_161536_add_additional_indexes;
1515
mod m20251001_125444_add_index_processed;
1616
mod m20251005_160938_add_initial_l1_block_numbers;
1717
mod m20251013_140946_add_initial_l1_processed_block_number;
18+
mod m20251021_070729_add_skipped_column;
1819

1920
mod migration_info;
2021
pub use migration_info::{
@@ -42,6 +43,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
4243
Box::new(m20251001_125444_add_index_processed::Migration),
4344
Box::new(m20251005_160938_add_initial_l1_block_numbers::Migration),
4445
Box::new(m20251013_140946_add_initial_l1_processed_block_number::Migration),
46+
Box::new(m20251021_070729_add_skipped_column::Migration),
4547
]
4648
}
4749
}

crates/database/migration/src/m20250304_125946_add_l1_msg_table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@ pub(crate) enum L1Message {
4646
Value,
4747
Sender,
4848
Input,
49+
Skipped,
4950
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
use crate::m20250304_125946_add_l1_msg_table::L1Message;
2+
use sea_orm_migration::prelude::*;
3+
4+
#[derive(DeriveMigrationName)]
5+
pub struct Migration;
6+
7+
#[async_trait::async_trait]
8+
impl MigrationTrait for Migration {
9+
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
10+
// Add the `skipped` column to the `l1_message` table.
11+
manager
12+
.alter_table(
13+
Table::alter()
14+
.table(L1Message::Table)
15+
.add_column(
16+
ColumnDef::new(L1Message::Skipped).boolean().not_null().default(false),
17+
)
18+
.to_owned(),
19+
)
20+
.await?;
21+
22+
// Add index on `skipped` for the `l1_message` table.
23+
manager
24+
.create_index(
25+
Index::create()
26+
.name("idx_l1_message_skipped")
27+
.col(L1Message::Skipped)
28+
.table(L1Message::Table)
29+
.to_owned(),
30+
)
31+
.await?;
32+
33+
Ok(())
34+
}
35+
36+
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
37+
// drop the `skipped` column on the `l1_message` table.
38+
manager
39+
.alter_table(
40+
Table::alter().table(L1Message::Table).drop_column(L1Message::Skipped).to_owned(),
41+
)
42+
.await?;
43+
44+
// Drop index `skipped` for the `l1_message` table.
45+
manager
46+
.drop_index(
47+
Index::drop().name("idx_l1_message_skipped").table(L1Message::Table).to_owned(),
48+
)
49+
.await?;
50+
51+
Ok(())
52+
}
53+
}

crates/derivation-pipeline/src/lib.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ pub struct BatchDerivationResult {
244244
pub attributes: Vec<DerivedAttributes>,
245245
/// The batch info associated with the derived attributes.
246246
pub batch_info: BatchInfo,
247+
/// The list of skipped L1 messages indexes.
248+
pub skipped_l1_messages: Vec<u64>,
247249
}
248250

249251
/// The derived attributes along with the block number they correspond to.
@@ -287,8 +289,11 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
287289
iter_l1_messages_from_payload(&l1_provider, payload_data, l1_v2_message_queue_start_index)
288290
.await?;
289291

290-
let skipped_l1_messages = decoded.data.skipped_l1_message_bitmap.clone().unwrap_or_default();
291-
let mut skipped_l1_messages = skipped_l1_messages.into_iter();
292+
let skipped_l1_messages_bitmap =
293+
decoded.data.skipped_l1_message_bitmap.clone().unwrap_or_default();
294+
let mut skipped_l1_messages_bitmap = skipped_l1_messages_bitmap.into_iter();
295+
let mut skipped_l1_messages = Vec::new();
296+
292297
let blocks = decoded.data.into_l2_blocks();
293298
let mut attributes = Vec::with_capacity(blocks.len());
294299

@@ -304,8 +309,10 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
304309
let mut txs = Vec::with_capacity(block.context.num_transactions as usize);
305310
for _ in 0..block.context.num_l1_messages {
306311
// check if the next l1 message should be skipped.
307-
if matches!(skipped_l1_messages.next(), Some(bit) if bit) {
308-
let _ = l1_messages_iter.next();
312+
if matches!(skipped_l1_messages_bitmap.next(), Some(bit) if bit) {
313+
if let Some(msg) = l1_messages_iter.next() {
314+
skipped_l1_messages.push(msg.transaction.queue_index)
315+
}
309316
continue;
310317
}
311318

@@ -346,6 +353,7 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
346353
Ok(BatchDerivationResult {
347354
attributes,
348355
batch_info: BatchInfo { index: batch.index, hash: batch.hash },
356+
skipped_l1_messages,
349357
})
350358
}
351359

crates/primitives/src/batch.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,18 +60,25 @@ pub struct BatchConsolidationOutcome {
6060
pub batch_info: BatchInfo,
6161
/// The consolidation outcomes for each block in the batch.
6262
pub blocks: Vec<L2BlockInfoWithL1Messages>,
63+
/// The list of skipped L1 messages index.
64+
pub skipped_l1_messages: Vec<u64>,
6365
}
6466

6567
impl BatchConsolidationOutcome {
6668
/// Creates a new empty batch consolidation outcome for the given batch info.
6769
pub const fn new(batch_info: BatchInfo) -> Self {
68-
Self { batch_info, blocks: Vec::new() }
70+
Self { batch_info, blocks: Vec::new(), skipped_l1_messages: Vec::new() }
6971
}
7072

7173
/// Pushes a block consolidation outcome to the batch.
7274
pub fn push_block(&mut self, block: L2BlockInfoWithL1Messages) {
7375
self.blocks.push(block);
7476
}
77+
78+
/// Adds the skipped L1 messages indexes.
79+
pub fn with_skipped_l1_messages(&mut self, skipped: Vec<u64>) {
80+
self.skipped_l1_messages = skipped;
81+
}
7582
}
7683

7784
/// The outcome of consolidating a block with the L2 chain.

0 commit comments

Comments
 (0)