diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs index 85d12ed473467..05eac9454473f 100644 --- a/cumulus/pallets/parachain-system/src/lib.rs +++ b/cumulus/pallets/parachain-system/src/lib.rs @@ -1277,12 +1277,17 @@ impl Pallet { let (messages, hashed_messages) = horizontal_messages.messages(); let mut mqc_heads = >::get(); + Self::prune_closed_mqc_heads(ingress_channels, &mut mqc_heads); + if messages.is_empty() { Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads); let last_processed_msg = InboundMessageId { sent_at: relay_parent_number, reverse_idx: 0 }; + LastProcessedHrmpMessage::::put(last_processed_msg); HrmpWatermark::::put(relay_parent_number); + LastHrmpMqcHeads::::put(&mqc_heads); // write back in case of modification + return T::DbWeight::get().reads_writes(1, 2); } @@ -1302,7 +1307,9 @@ impl Pallet { } last_processed_msg.sent_at = msg.sent_at; } - >::put(&mqc_heads); + + LastHrmpMqcHeads::::put(&mqc_heads); + for (sender, msg) in hashed_messages { Self::check_hrmp_message_metadata( ingress_channels, @@ -1334,6 +1341,19 @@ impl Pallet { weight_used.saturating_add(T::DbWeight::get().reads_writes(2, 3)) } + /// Remove all MQC heads that do not correspond to an open channel. + fn prune_closed_mqc_heads( + ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)], + mqc_heads: &mut BTreeMap, + ) { + // Complexity is O(N * lg N) but could be optimized for O(N) + mqc_heads.retain(|para, _| { + ingress_channels + .binary_search_by_key(para, |&(channel_sender, _)| channel_sender) + .is_ok() + }); + } + /// Drop blocks from the unincluded segment with respect to the latest parachain head. fn maybe_drop_included_ancestors( relay_state_proof: &RelayChainStateProof, diff --git a/cumulus/pallets/parachain-system/src/tests.rs b/cumulus/pallets/parachain-system/src/tests.rs index 4b69d674e9eca..c5ede55b5875c 100755 --- a/cumulus/pallets/parachain-system/src/tests.rs +++ b/cumulus/pallets/parachain-system/src/tests.rs @@ -1422,6 +1422,106 @@ fn receive_hrmp() { .add(3, || {}); } +// A channel that was force removed from RC state will clean up any remaining state. +#[test] +fn receive_hrmp_channel_suddenly_removed_from_relay_state() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + // 300 - one new message + sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&mk_hrmp(1, 1)).head()); + }, + 2 => { + // 300 - is gone, this should trigger the cleanup + }, + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.horizontal_messages.insert(ParaId::from(300), vec![mk_hrmp(1, 1)]); + }, + 2 => {}, + _ => unreachable!(), + }) + .add(1, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!(&*m, &[(ParaId::from(300), 1, vec![1])], "Received on channel 300"); + m.clear(); + }); + assert!( + LastHrmpMqcHeads::::get().contains_key(&ParaId::from(300)), + "Channel 300 should be present" + ); + }) + .add(2, || { + assert_eq!( + LastHrmpMqcHeads::::get() + .into_iter() + .map(|(para, _)| para) + .collect::>(), + vec![], + "Channel 300 should be removed" + ); + }); +} + +// Same as above but other code path since another channel contains a message. +#[test] +fn receive_hrmp_channel_suddenly_removed_from_relay_state2() { + BlockTests::new() + .with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num { + 1 => { + // 200 - one new message + sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&mk_hrmp(1, 1)).head()); + // 300 - one new message + sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&mk_hrmp(1, 1)).head()); + }, + 2 => { + // 200 - no new messages, mqc stayed the same. + sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head = + Some(MessageQueueChain::default().extend_hrmp(&mk_hrmp(1, 1)).head()); + // 300 - is gone, this should trigger the cleanup + }, + _ => unreachable!(), + }) + .with_inherent_data(|_, relay_block_num, data| match relay_block_num { + 1 => { + data.horizontal_messages.insert(ParaId::from(200), vec![mk_hrmp(1, 1)]); + data.horizontal_messages.insert(ParaId::from(300), vec![mk_hrmp(1, 1)]); + }, + 2 => {}, + _ => unreachable!(), + }) + .add(1, || { + HANDLED_XCMP_MESSAGES.with(|m| { + let mut m = m.borrow_mut(); + assert_eq!( + &*m, + &[(ParaId::from(200), 1, vec![1]), (ParaId::from(300), 1, vec![1])] + ); + m.clear(); + }); + assert!( + LastHrmpMqcHeads::::get().contains_key(&ParaId::from(300)), + "Channel 300 should be present" + ); + }) + .add(2, || { + assert_eq!( + LastHrmpMqcHeads::::get() + .into_iter() + .map(|(para, _)| para) + .collect::>(), + vec![ParaId::from(200)], + "Channel 300 should be removed but 200 should be present" + ); + }); +} + #[test] fn receive_hrmp_empty_channel() { BlockTests::new() diff --git a/prdoc/pr_10324.prdoc b/prdoc/pr_10324.prdoc new file mode 100644 index 0000000000000..96217312f60c6 --- /dev/null +++ b/prdoc/pr_10324.prdoc @@ -0,0 +1,8 @@ +title: Cleanup HRMP channels that were force removed from RC state +doc: +- audience: Runtime Dev + description: |- + Cleanup old LastHrmpMqcHeads entries when the corresponding channel was remove from RC state +crates: +- name: cumulus-pallet-parachain-system + bump: patch