Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1277,12 +1277,17 @@ impl<T: Config> Pallet<T> {
let (messages, hashed_messages) = horizontal_messages.messages();
let mut mqc_heads = <LastHrmpMqcHeads<T>>::get();

Self::prune_closed_mqc_heads(ingress_channels, &mut mqc_heads);

if messages.is_empty() {
Self::check_hrmp_mcq_heads(ingress_channels, &mut mqc_heads);
let last_processed_msg =
InboundMessageId { sent_at: relay_parent_number, reverse_idx: 0 };

LastProcessedHrmpMessage::<T>::put(last_processed_msg);
HrmpWatermark::<T>::put(relay_parent_number);
LastHrmpMqcHeads::<T>::put(&mqc_heads); // write back in case of modification

return T::DbWeight::get().reads_writes(1, 2);
}

Expand All @@ -1302,7 +1307,9 @@ impl<T: Config> Pallet<T> {
}
last_processed_msg.sent_at = msg.sent_at;
}
<LastHrmpMqcHeads<T>>::put(&mqc_heads);

LastHrmpMqcHeads::<T>::put(&mqc_heads);

for (sender, msg) in hashed_messages {
Self::check_hrmp_message_metadata(
ingress_channels,
Expand Down Expand Up @@ -1334,6 +1341,19 @@ impl<T: Config> Pallet<T> {
weight_used.saturating_add(T::DbWeight::get().reads_writes(2, 3))
}

/// Remove all MQC heads that do not correspond to an open channel.
fn prune_closed_mqc_heads(
ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
mqc_heads: &mut BTreeMap<ParaId, MessageQueueChain>,
) {
// Complexity is O(N * lg N) but could be optimized for O(N)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you expand on how this could be done ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Mqc Heads and ingress channels are both sorted. So instead of always calling binary search, you can keep an index and increment it. Although just writing this, I think the increment would possibly need to happen multiple times (loop).
Not sure if its worth it in practice, hence why i used the trivial approach here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant if you can write this in the comment :D

It sounds easy to implement. But anyway, yes, probably it won't bring a very big performance benefit so we can also leave it as it is for the moment.

mqc_heads.retain(|para, _| {
ingress_channels
.binary_search_by_key(para, |&(channel_sender, _)| channel_sender)
.is_ok()
});
}

/// Drop blocks from the unincluded segment with respect to the latest parachain head.
fn maybe_drop_included_ancestors(
relay_state_proof: &RelayChainStateProof,
Expand Down
100 changes: 100 additions & 0 deletions cumulus/pallets/parachain-system/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,106 @@ fn receive_hrmp() {
.add(3, || {});
}

// A channel that was force removed from RC state will clean up any remaining state.
#[test]
fn receive_hrmp_channel_suddenly_removed_from_relay_state() {
BlockTests::new()
.with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num {
1 => {
// 300 - one new message
sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head =
Some(MessageQueueChain::default().extend_hrmp(&mk_hrmp(1, 1)).head());
},
2 => {
// 300 - is gone, this should trigger the cleanup
},
_ => unreachable!(),
})
.with_inherent_data(|_, relay_block_num, data| match relay_block_num {
1 => {
data.horizontal_messages.insert(ParaId::from(300), vec![mk_hrmp(1, 1)]);
},
2 => {},
_ => unreachable!(),
})
.add(1, || {
HANDLED_XCMP_MESSAGES.with(|m| {
let mut m = m.borrow_mut();
assert_eq!(&*m, &[(ParaId::from(300), 1, vec![1])], "Received on channel 300");
m.clear();
});
assert!(
LastHrmpMqcHeads::<Test>::get().contains_key(&ParaId::from(300)),
"Channel 300 should be present"
);
})
.add(2, || {
assert_eq!(
LastHrmpMqcHeads::<Test>::get()
.into_iter()
.map(|(para, _)| para)
.collect::<Vec<_>>(),
vec![],
"Channel 300 should be removed"
);
});
}

// Same as above but other code path since another channel contains a message.
#[test]
fn receive_hrmp_channel_suddenly_removed_from_relay_state2() {
BlockTests::new()
.with_relay_sproof_builder(|_, relay_block_num, sproof| match relay_block_num {
1 => {
// 200 - one new message
sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head =
Some(MessageQueueChain::default().extend_hrmp(&mk_hrmp(1, 1)).head());
// 300 - one new message
sproof.upsert_inbound_channel(ParaId::from(300)).mqc_head =
Some(MessageQueueChain::default().extend_hrmp(&mk_hrmp(1, 1)).head());
},
2 => {
// 200 - no new messages, mqc stayed the same.
sproof.upsert_inbound_channel(ParaId::from(200)).mqc_head =
Some(MessageQueueChain::default().extend_hrmp(&mk_hrmp(1, 1)).head());
// 300 - is gone, this should trigger the cleanup
},
_ => unreachable!(),
})
.with_inherent_data(|_, relay_block_num, data| match relay_block_num {
1 => {
data.horizontal_messages.insert(ParaId::from(200), vec![mk_hrmp(1, 1)]);
data.horizontal_messages.insert(ParaId::from(300), vec![mk_hrmp(1, 1)]);
},
2 => {},
_ => unreachable!(),
})
.add(1, || {
HANDLED_XCMP_MESSAGES.with(|m| {
let mut m = m.borrow_mut();
assert_eq!(
&*m,
&[(ParaId::from(200), 1, vec![1]), (ParaId::from(300), 1, vec![1])]
);
m.clear();
});
assert!(
LastHrmpMqcHeads::<Test>::get().contains_key(&ParaId::from(300)),
"Channel 300 should be present"
);
})
.add(2, || {
assert_eq!(
LastHrmpMqcHeads::<Test>::get()
.into_iter()
.map(|(para, _)| para)
.collect::<Vec<_>>(),
vec![ParaId::from(200)],
"Channel 300 should be removed but 200 should be present"
);
});
}

#[test]
fn receive_hrmp_empty_channel() {
BlockTests::new()
Expand Down
8 changes: 8 additions & 0 deletions prdoc/pr_10324.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
title: Cleanup HRMP channels that were force removed from RC state
doc:
- audience: Runtime Dev
description: |-
Cleanup old LastHrmpMqcHeads entries when the corresponding channel was remove from RC state
crates:
- name: cumulus-pallet-parachain-system
bump: patch
Loading