From 5aca75fbd75e23ef7633afe65da52cfb1e63dfbe Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 6 Apr 2023 23:21:22 +0400 Subject: [PATCH 01/26] unincluded segment draft --- pallets/parachain-system/src/lib.rs | 51 ++++++++ .../src/relay_state_snapshot.rs | 13 ++ .../src/unincluded_segment.rs | 117 ++++++++++++++++++ .../parachain-inherent/src/client_side.rs | 1 + 4 files changed, 182 insertions(+) create mode 100644 pallets/parachain-system/src/unincluded_segment.rs diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index ac3b8648c65..26efae7e20c 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -59,11 +59,14 @@ use xcm::latest::XcmHash; mod migration; mod relay_state_snapshot; +mod unincluded_segment; #[macro_use] pub mod validate_block; #[cfg(test)] mod tests; +use unincluded_segment::{BlockTracker, SegmentTracker}; + /// Register the `validate_block` function that is used by parachains to validate blocks on a /// validator. /// @@ -296,6 +299,34 @@ pub mod pallet { weight += T::DbWeight::get().writes(1); } + let para_head = ParaHead::::take(); + weight += T::DbWeight::get().reads_writes(1, 1); + + // Update unincluded segment related storage values. + if let Some(para_head) = para_head { + let dropped: Vec = >::mutate(|chain| { + let idx = + chain.iter().take_while(|block| block.para_head() != ¶_head).count(); + // Drop the block with an included para head too. + let idx = chain.len().min(idx + 1); + + chain.drain(..idx).collect() + }); + weight += T::DbWeight::get().reads_writes(1, 1); + + if !dropped.is_empty() { + >::mutate(|agg| { + let agg = agg.as_mut().expect( + "dropped part of the segment wasn't empty, hence value exists; qed", + ); + for block in dropped { + agg.used_bandwidth.subtract(block.used_bandwidth()); + } + }); + weight += T::DbWeight::get().reads_writes(1, 1); + } + } + // Remove the validation from the old block. ValidationData::::kill(); ProcessedDownwardMessages::::kill(); @@ -424,6 +455,13 @@ pub mod pallet { .read_messaging_state_snapshot() .expect("Invalid messaging state in relay chain state proof"); + if let Some(para_head) = relay_state_proof + .read_para_head() + .expect("Invalid para head in relay chain state proof") + { + >::put(para_head); + } + >::put(&vfp); >::put(relay_chain_state); >::put(relevant_messaging_state.clone()); @@ -544,6 +582,19 @@ pub mod pallet { Unauthorized, } + /// Latest parachain head data included in the relay chain. + /// + /// This value is optional since it requires extra relay-chain state proof. + #[pallet::storage] + pub(super) type ParaHead = StorageValue<_, relay_chain::HeadData, OptionQuery>; + + #[pallet::storage] + pub(super) type UnincludedSegment = StorageValue<_, Vec, ValueQuery>; + + #[pallet::storage] + pub(super) type AggregatedUnincludedSegment = + StorageValue<_, SegmentTracker, OptionQuery>; + /// In case of a scheduled upgrade, this storage field contains the validation code to be applied. /// /// As soon as the relay chain gives us the go-ahead signal, we will overwrite the [`:code`][well_known_keys::CODE] diff --git a/pallets/parachain-system/src/relay_state_snapshot.rs b/pallets/parachain-system/src/relay_state_snapshot.rs index 0a6426a8012..2fd1eb3707a 100644 --- a/pallets/parachain-system/src/relay_state_snapshot.rs +++ b/pallets/parachain-system/src/relay_state_snapshot.rs @@ -85,6 +85,8 @@ pub enum Error { HrmpEgressChannelIndex(ReadEntryErr), /// The channel identified by the sender and receiver cannot be extracted. HrmpChannel(ParaId, ParaId, ReadEntryErr), + /// The latest included parachain head cannot be extracted. + ParaHead(ReadEntryErr), } #[derive(Debug)] @@ -235,6 +237,17 @@ impl RelayChainStateProof { .map_err(Error::Config) } + /// Read parachain [head data](`relay_chain::HeadData`) from the relay chain state proof. + /// + /// Returns an error if anything failed at reading or decoding. + pub fn read_para_head(&self) -> Result, Error> { + read_optional_entry( + &self.trie_backend, + &relay_chain::well_known_keys::para_head(self.para_id), + ) + .map_err(Error::ParaHead) + } + /// Read the [`Slot`](relay_chain::Slot) from the relay chain state proof. /// /// The slot is slot of the relay chain block this state proof was extracted from. diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs new file mode 100644 index 00000000000..e578a640b66 --- /dev/null +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -0,0 +1,117 @@ +// Copyright 2023 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use codec::{Decode, Encode}; +use cumulus_primitives_core::{relay_chain, ParaId}; +use scale_info::TypeInfo; +use sp_std::collections::btree_map::BTreeMap; + +pub struct BandwidthLimits {} + +pub enum LimitExceededError {} + +#[derive(Default, Copy, Clone, Encode, Decode, TypeInfo)] +pub struct HrmpChannelSize { + pub msg_count: u32, + pub total_bytes: u32, +} + +impl HrmpChannelSize { + pub fn is_empty(&self) -> bool { + self.msg_count == 0 && self.total_bytes == 0 + } + + pub fn append( + &self, + other: &Self, + limits: &BandwidthLimits, + ) -> Result { + let mut new = *self; + + new.msg_count = new.msg_count.saturating_add(other.msg_count); + new.total_bytes = new.total_bytes.saturating_add(other.total_bytes); + + Ok(new) + } + + pub fn subtract(&mut self, other: &Self) { + self.msg_count -= other.msg_count; + self.total_bytes -= other.total_bytes; + } +} + +#[derive(Default, Clone, Encode, Decode, TypeInfo)] +pub struct UsedBandwidth { + pub ump_msg_count: u32, + pub ump_total_bytes: u32, + pub hrmp_outgoing: BTreeMap, +} + +impl UsedBandwidth { + pub fn append( + &self, + other: &Self, + limits: &BandwidthLimits, + ) -> Result { + let mut new = self.clone(); + + new.ump_msg_count = new.ump_msg_count.saturating_add(other.ump_msg_count); + new.ump_total_bytes = new.ump_total_bytes.saturating_add(other.ump_total_bytes); + + for (id, channel) in other.hrmp_outgoing.iter() { + new.hrmp_outgoing.entry(*id).or_default().append(channel, limits)?; + } + + Ok(new) + } + + pub fn subtract(&mut self, other: &Self) { + self.ump_msg_count -= other.ump_msg_count; + self.ump_total_bytes -= other.ump_total_bytes; + + for (id, channel) in other.hrmp_outgoing.iter() { + let entry = self + .hrmp_outgoing + .get_mut(id) + .expect("entry's been inserted earlier with `append`; qed"); + entry.subtract(channel); + } + + self.hrmp_outgoing.retain(|_, channel| !channel.is_empty()); + } +} + +#[derive(Encode, Decode, TypeInfo)] +pub struct BlockTracker { + used_bandwidth: UsedBandwidth, + para_head: relay_chain::HeadData, +} + +impl BlockTracker { + pub fn used_bandwidth(&self) -> &UsedBandwidth { + &self.used_bandwidth + } + + pub fn para_head(&self) -> &relay_chain::HeadData { + &self.para_head + } +} + +#[derive(Encode, Decode, TypeInfo)] +pub struct SegmentTracker { + pub used_bandwidth: UsedBandwidth, + pub hrmp_watermark: relay_chain::BlockNumber, +} diff --git a/primitives/parachain-inherent/src/client_side.rs b/primitives/parachain-inherent/src/client_side.rs index 6f2cd5eb504..a6e0bd1641b 100644 --- a/primitives/parachain-inherent/src/client_side.rs +++ b/primitives/parachain-inherent/src/client_side.rs @@ -104,6 +104,7 @@ async fn collect_relay_storage_proof( relevant_keys.push(relay_well_known_keys::hrmp_egress_channel_index(para_id)); relevant_keys.push(relay_well_known_keys::upgrade_go_ahead_signal(para_id)); relevant_keys.push(relay_well_known_keys::upgrade_restriction_signal(para_id)); + relevant_keys.push(relay_well_known_keys::para_head(para_id)); relevant_keys.extend(ingress_channels.into_iter().map(|sender| { relay_well_known_keys::hrmp_channels(HrmpChannelId { sender, recipient: para_id }) })); From 169441bde421d431094ce9b6c0b09159b5a582e0 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 7 Apr 2023 19:14:55 +0400 Subject: [PATCH 02/26] read para head from storage proof --- pallets/parachain-system/src/lib.rs | 32 +++++++++++++++-------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 26efae7e20c..886621ae089 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -299,8 +299,23 @@ pub mod pallet { weight += T::DbWeight::get().writes(1); } - let para_head = ParaHead::::take(); - weight += T::DbWeight::get().reads_writes(1, 1); + let para_head = { + let relay_chain_state = Self::relay_state_proof() + .expect("relay state proof must be present in storage"); + let validation_data = + Self::validation_data().expect("validation data must be present in storage"); + + let relay_state_proof = RelayChainStateProof::new( + T::SelfParaId::get(), + validation_data.relay_parent_storage_root, + relay_chain_state, + ) + .expect("Invalid relay chain state proof"); + relay_state_proof + .read_para_head() + .expect("Invalid para head in relay chain state proof") + }; + weight += T::DbWeight::get().reads(2); // Update unincluded segment related storage values. if let Some(para_head) = para_head { @@ -455,13 +470,6 @@ pub mod pallet { .read_messaging_state_snapshot() .expect("Invalid messaging state in relay chain state proof"); - if let Some(para_head) = relay_state_proof - .read_para_head() - .expect("Invalid para head in relay chain state proof") - { - >::put(para_head); - } - >::put(&vfp); >::put(relay_chain_state); >::put(relevant_messaging_state.clone()); @@ -582,12 +590,6 @@ pub mod pallet { Unauthorized, } - /// Latest parachain head data included in the relay chain. - /// - /// This value is optional since it requires extra relay-chain state proof. - #[pallet::storage] - pub(super) type ParaHead = StorageValue<_, relay_chain::HeadData, OptionQuery>; - #[pallet::storage] pub(super) type UnincludedSegment = StorageValue<_, Vec, ValueQuery>; From b9ba0c18fa0279409bb62bc14055eaa4838fe258 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 7 Apr 2023 19:26:41 +0400 Subject: [PATCH 03/26] read_para_head -> read_included_para_head --- pallets/parachain-system/src/lib.rs | 2 +- pallets/parachain-system/src/relay_state_snapshot.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 886621ae089..26fa4610265 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -312,7 +312,7 @@ pub mod pallet { ) .expect("Invalid relay chain state proof"); relay_state_proof - .read_para_head() + .read_included_para_head() .expect("Invalid para head in relay chain state proof") }; weight += T::DbWeight::get().reads(2); diff --git a/pallets/parachain-system/src/relay_state_snapshot.rs b/pallets/parachain-system/src/relay_state_snapshot.rs index 2fd1eb3707a..a74118d8701 100644 --- a/pallets/parachain-system/src/relay_state_snapshot.rs +++ b/pallets/parachain-system/src/relay_state_snapshot.rs @@ -237,10 +237,10 @@ impl RelayChainStateProof { .map_err(Error::Config) } - /// Read parachain [head data](`relay_chain::HeadData`) from the relay chain state proof. + /// Read latest included parachain [head data](`relay_chain::HeadData`) from the relay chain state proof. /// /// Returns an error if anything failed at reading or decoding. - pub fn read_para_head(&self) -> Result, Error> { + pub fn read_included_para_head(&self) -> Result, Error> { read_optional_entry( &self.trie_backend, &relay_chain::well_known_keys::para_head(self.para_id), From dcebe471a7791864c60b2d76c7be70ef4e486b55 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 7 Apr 2023 19:54:30 +0400 Subject: [PATCH 04/26] Provide pub interface --- pallets/parachain-system/src/lib.rs | 2 +- .../src/unincluded_segment.rs | 40 ++++++++++++------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 26fa4610265..458cb13d21b 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -335,7 +335,7 @@ pub mod pallet { "dropped part of the segment wasn't empty, hence value exists; qed", ); for block in dropped { - agg.used_bandwidth.subtract(block.used_bandwidth()); + agg.subtract(&block); } }); weight += T::DbWeight::get().reads_writes(1, 1); diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index e578a640b66..a51fc1ca33b 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -30,15 +30,11 @@ pub struct HrmpChannelSize { } impl HrmpChannelSize { - pub fn is_empty(&self) -> bool { + fn is_empty(&self) -> bool { self.msg_count == 0 && self.total_bytes == 0 } - pub fn append( - &self, - other: &Self, - limits: &BandwidthLimits, - ) -> Result { + fn append(&self, other: &Self, limits: &BandwidthLimits) -> Result { let mut new = *self; new.msg_count = new.msg_count.saturating_add(other.msg_count); @@ -47,7 +43,7 @@ impl HrmpChannelSize { Ok(new) } - pub fn subtract(&mut self, other: &Self) { + fn subtract(&mut self, other: &Self) { self.msg_count -= other.msg_count; self.total_bytes -= other.total_bytes; } @@ -61,11 +57,7 @@ pub struct UsedBandwidth { } impl UsedBandwidth { - pub fn append( - &self, - other: &Self, - limits: &BandwidthLimits, - ) -> Result { + fn append(&self, other: &Self, limits: &BandwidthLimits) -> Result { let mut new = self.clone(); new.ump_msg_count = new.ump_msg_count.saturating_add(other.ump_msg_count); @@ -78,7 +70,7 @@ impl UsedBandwidth { Ok(new) } - pub fn subtract(&mut self, other: &Self) { + fn subtract(&mut self, other: &Self) { self.ump_msg_count -= other.ump_msg_count; self.ump_total_bytes -= other.ump_total_bytes; @@ -112,6 +104,24 @@ impl BlockTracker { #[derive(Encode, Decode, TypeInfo)] pub struct SegmentTracker { - pub used_bandwidth: UsedBandwidth, - pub hrmp_watermark: relay_chain::BlockNumber, + used_bandwidth: UsedBandwidth, + hrmp_watermark: relay_chain::BlockNumber, +} + +impl SegmentTracker { + pub fn append( + &mut self, + block: &BlockTracker, + hrmp_watermark: relay_chain::BlockNumber, + limits: &BandwidthLimits, + ) -> Result<(), LimitExceededError> { + self.used_bandwidth = self.used_bandwidth.append(block.used_bandwidth(), limits)?; + self.hrmp_watermark = hrmp_watermark; + + Ok(()) + } + + pub fn subtract(&mut self, block: &BlockTracker) { + self.used_bandwidth.subtract(block.used_bandwidth()); + } } From 7af8e6771616275ba35fe01d9f126f74bb750c85 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 7 Apr 2023 20:50:20 +0400 Subject: [PATCH 05/26] add errors --- .../src/unincluded_segment.rs | 67 +++++++++++++++++-- 1 file changed, 61 insertions(+), 6 deletions(-) diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index a51fc1ca33b..931f1268712 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -19,9 +19,23 @@ use cumulus_primitives_core::{relay_chain, ParaId}; use scale_info::TypeInfo; use sp_std::collections::btree_map::BTreeMap; -pub struct BandwidthLimits {} +pub struct HrmpOutboundLimits { + pub bytes_remaining: u32, + pub messages_remaining: u32, +} -pub enum LimitExceededError {} +pub struct TotalBandwidthLimits { + pub ump_messages_remaining: u32, + pub ump_bytes_remaining: u32, + pub hrmp_outgoing: BTreeMap, +} + +pub enum LimitExceededError { + HrmpMessagesOverflow { recipient: ParaId, messages_remaining: u32, messages_submitted: u32 }, + HrmpBytesOverflow { recipient: ParaId, bytes_remaining: u32, bytes_submitted: u32 }, + UmpMessagesOverflow { messages_remaining: u32, messages_submitted: u32 }, + UmpBytesOverflow { bytes_remaining: u32, bytes_submitted: u32 }, +} #[derive(Default, Copy, Clone, Encode, Decode, TypeInfo)] pub struct HrmpChannelSize { @@ -34,11 +48,35 @@ impl HrmpChannelSize { self.msg_count == 0 && self.total_bytes == 0 } - fn append(&self, other: &Self, limits: &BandwidthLimits) -> Result { + fn append( + &self, + other: &Self, + recipient: ParaId, + limits: &TotalBandwidthLimits, + ) -> Result { + let limits = limits + .hrmp_outgoing + .get(&recipient) + .expect("limit for declared hrmp channel must be present; qed"); + let mut new = *self; new.msg_count = new.msg_count.saturating_add(other.msg_count); + if new.msg_count > limits.messages_remaining { + return Err(LimitExceededError::HrmpMessagesOverflow { + recipient, + messages_remaining: limits.messages_remaining, + messages_submitted: new.msg_count, + }) + } new.total_bytes = new.total_bytes.saturating_add(other.total_bytes); + if new.total_bytes > limits.bytes_remaining { + return Err(LimitExceededError::HrmpBytesOverflow { + recipient, + bytes_remaining: limits.bytes_remaining, + bytes_submitted: new.total_bytes, + }) + } Ok(new) } @@ -57,14 +95,31 @@ pub struct UsedBandwidth { } impl UsedBandwidth { - fn append(&self, other: &Self, limits: &BandwidthLimits) -> Result { + fn append( + &self, + other: &Self, + limits: &TotalBandwidthLimits, + ) -> Result { let mut new = self.clone(); new.ump_msg_count = new.ump_msg_count.saturating_add(other.ump_msg_count); + if new.ump_msg_count > limits.ump_messages_remaining { + return Err(LimitExceededError::UmpMessagesOverflow { + messages_remaining: limits.ump_messages_remaining, + messages_submitted: new.ump_msg_count, + }) + } new.ump_total_bytes = new.ump_total_bytes.saturating_add(other.ump_total_bytes); + if new.ump_total_bytes > limits.ump_bytes_remaining { + return Err(LimitExceededError::UmpBytesOverflow { + bytes_remaining: limits.ump_bytes_remaining, + bytes_submitted: new.ump_total_bytes, + }) + } for (id, channel) in other.hrmp_outgoing.iter() { - new.hrmp_outgoing.entry(*id).or_default().append(channel, limits)?; + let current = new.hrmp_outgoing.entry(*id).or_default(); + *current = current.append(channel, *id, limits)?; } Ok(new) @@ -113,7 +168,7 @@ impl SegmentTracker { &mut self, block: &BlockTracker, hrmp_watermark: relay_chain::BlockNumber, - limits: &BandwidthLimits, + limits: &TotalBandwidthLimits, ) -> Result<(), LimitExceededError> { self.used_bandwidth = self.used_bandwidth.append(block.used_bandwidth(), limits)?; self.hrmp_watermark = hrmp_watermark; From f60558c33f9910512fc823462c3571d95998a63f Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 10 Apr 2023 21:53:48 +0400 Subject: [PATCH 06/26] fix unincluded segment update --- pallets/parachain-system/src/lib.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 458cb13d21b..c831adb539f 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -320,10 +320,11 @@ pub mod pallet { // Update unincluded segment related storage values. if let Some(para_head) = para_head { let dropped: Vec = >::mutate(|chain| { - let idx = - chain.iter().take_while(|block| block.para_head() != ¶_head).count(); - // Drop the block with an included para head too. - let idx = chain.len().min(idx + 1); + // Drop everything up to the block with an included para head, if present. + let idx = chain + .iter() + .position(|block| block.para_head() == ¶_head) + .map_or(0, |idx| idx + 1); // inclusive. chain.drain(..idx).collect() }); From 9b63c04ba5c6ba61aea6580d60b6253dc9f67324 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 10 Apr 2023 21:58:53 +0400 Subject: [PATCH 07/26] BlockTracker -> Ancestor --- pallets/parachain-system/src/lib.rs | 6 +++--- pallets/parachain-system/src/unincluded_segment.rs | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index c831adb539f..aceaf1eb9f2 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -65,7 +65,7 @@ pub mod validate_block; #[cfg(test)] mod tests; -use unincluded_segment::{BlockTracker, SegmentTracker}; +use unincluded_segment::{Ancestor, SegmentTracker}; /// Register the `validate_block` function that is used by parachains to validate blocks on a /// validator. @@ -319,7 +319,7 @@ pub mod pallet { // Update unincluded segment related storage values. if let Some(para_head) = para_head { - let dropped: Vec = >::mutate(|chain| { + let dropped: Vec = >::mutate(|chain| { // Drop everything up to the block with an included para head, if present. let idx = chain .iter() @@ -592,7 +592,7 @@ pub mod pallet { } #[pallet::storage] - pub(super) type UnincludedSegment = StorageValue<_, Vec, ValueQuery>; + pub(super) type UnincludedSegment = StorageValue<_, Vec, ValueQuery>; #[pallet::storage] pub(super) type AggregatedUnincludedSegment = diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 931f1268712..15ccaf12fbc 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -142,12 +142,12 @@ impl UsedBandwidth { } #[derive(Encode, Decode, TypeInfo)] -pub struct BlockTracker { +pub struct Ancestor { used_bandwidth: UsedBandwidth, para_head: relay_chain::HeadData, } -impl BlockTracker { +impl Ancestor { pub fn used_bandwidth(&self) -> &UsedBandwidth { &self.used_bandwidth } @@ -166,7 +166,7 @@ pub struct SegmentTracker { impl SegmentTracker { pub fn append( &mut self, - block: &BlockTracker, + block: &Ancestor, hrmp_watermark: relay_chain::BlockNumber, limits: &TotalBandwidthLimits, ) -> Result<(), LimitExceededError> { @@ -176,7 +176,7 @@ impl SegmentTracker { Ok(()) } - pub fn subtract(&mut self, block: &BlockTracker) { + pub fn subtract(&mut self, block: &Ancestor) { self.used_bandwidth.subtract(block.used_bandwidth()); } } From a32577bd2a6562cfb6413b235791b4465ff7b2e0 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Tue, 11 Apr 2023 02:14:47 +0400 Subject: [PATCH 08/26] add a dmp limit --- pallets/parachain-system/src/unincluded_segment.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 15ccaf12fbc..3f5fbad2a5f 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -28,6 +28,7 @@ pub struct TotalBandwidthLimits { pub ump_messages_remaining: u32, pub ump_bytes_remaining: u32, pub hrmp_outgoing: BTreeMap, + pub dmp_remaining_messages: u32, } pub enum LimitExceededError { @@ -35,6 +36,7 @@ pub enum LimitExceededError { HrmpBytesOverflow { recipient: ParaId, bytes_remaining: u32, bytes_submitted: u32 }, UmpMessagesOverflow { messages_remaining: u32, messages_submitted: u32 }, UmpBytesOverflow { bytes_remaining: u32, bytes_submitted: u32 }, + DmpMessagesUnderflow { messages_remaining: u32, messages_processed: u32 }, } #[derive(Default, Copy, Clone, Encode, Decode, TypeInfo)] @@ -92,6 +94,7 @@ pub struct UsedBandwidth { pub ump_msg_count: u32, pub ump_total_bytes: u32, pub hrmp_outgoing: BTreeMap, + pub dmp_processed_count: u32, } impl UsedBandwidth { @@ -116,6 +119,13 @@ impl UsedBandwidth { bytes_submitted: new.ump_total_bytes, }) } + new.dmp_processed_count = new.dmp_processed_count.saturating_add(other.dmp_processed_count); + if new.dmp_processed_count > limits.dmp_remaining_messages { + return Err(LimitExceededError::DmpMessagesUnderflow { + messages_remaining: limits.dmp_remaining_messages, + messages_processed: new.dmp_processed_count, + }) + } for (id, channel) in other.hrmp_outgoing.iter() { let current = new.hrmp_outgoing.entry(*id).or_default(); @@ -128,6 +138,7 @@ impl UsedBandwidth { fn subtract(&mut self, other: &Self) { self.ump_msg_count -= other.ump_msg_count; self.ump_total_bytes -= other.ump_total_bytes; + self.dmp_processed_count -= other.dmp_processed_count; for (id, channel) in other.hrmp_outgoing.iter() { let entry = self From 2f9e6c23be3f06993f7a691ca2101f218888a135 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Tue, 11 Apr 2023 02:27:03 +0400 Subject: [PATCH 09/26] Read para head depending on the storage switch --- pallets/parachain-system/src/lib.rs | 14 +++++++++++--- .../parachain-system/src/relay_state_snapshot.rs | 9 +++------ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index aceaf1eb9f2..f33a3456c2e 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -299,7 +299,9 @@ pub mod pallet { weight += T::DbWeight::get().writes(1); } - let para_head = { + // If `MaxUnincludedLen` is present in the storage, parachain head + // is always expected to be included into the relay storage proof. + let para_head = >::get().map(|_max_len| { let relay_chain_state = Self::relay_state_proof() .expect("relay state proof must be present in storage"); let validation_data = @@ -314,11 +316,14 @@ pub mod pallet { relay_state_proof .read_included_para_head() .expect("Invalid para head in relay chain state proof") - }; - weight += T::DbWeight::get().reads(2); + }); + weight += T::DbWeight::get().reads(1); // Update unincluded segment related storage values. if let Some(para_head) = para_head { + // Weight used for reading para head. + weight += T::DbWeight::get().reads(2); + let dropped: Vec = >::mutate(|chain| { // Drop everything up to the block with an included para head, if present. let idx = chain @@ -591,6 +596,9 @@ pub mod pallet { Unauthorized, } + #[pallet::storage] + pub(super) type MaxUnincludedLen = StorageValue<_, T::BlockNumber, OptionQuery>; + #[pallet::storage] pub(super) type UnincludedSegment = StorageValue<_, Vec, ValueQuery>; diff --git a/pallets/parachain-system/src/relay_state_snapshot.rs b/pallets/parachain-system/src/relay_state_snapshot.rs index a74118d8701..9da5a03ce83 100644 --- a/pallets/parachain-system/src/relay_state_snapshot.rs +++ b/pallets/parachain-system/src/relay_state_snapshot.rs @@ -240,12 +240,9 @@ impl RelayChainStateProof { /// Read latest included parachain [head data](`relay_chain::HeadData`) from the relay chain state proof. /// /// Returns an error if anything failed at reading or decoding. - pub fn read_included_para_head(&self) -> Result, Error> { - read_optional_entry( - &self.trie_backend, - &relay_chain::well_known_keys::para_head(self.para_id), - ) - .map_err(Error::ParaHead) + pub fn read_included_para_head(&self) -> Result { + read_entry(&self.trie_backend, &relay_chain::well_known_keys::para_head(self.para_id), None) + .map_err(Error::ParaHead) } /// Read the [`Slot`](relay_chain::Slot) from the relay chain state proof. From 8617f8cbe133436aafcb9d36f3ea52907b6cf951 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Wed, 12 Apr 2023 02:28:38 +0400 Subject: [PATCH 10/26] doc comments --- .../src/unincluded_segment.rs | 90 +++++++++++++++++-- 1 file changed, 82 insertions(+), 8 deletions(-) diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 3f5fbad2a5f..52d98b08f7c 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -14,42 +14,93 @@ // You should have received a copy of the GNU General Public License // along with Cumulus. If not, see . +//! Primitives used for tracking message queues constraints in an unincluded block segment +//! of the parachain. +//! +//! Unincluded segment describes a chain of latest included block descendants, which are not yet +//! sent to relay chain. + use codec::{Decode, Encode}; use cumulus_primitives_core::{relay_chain, ParaId}; use scale_info::TypeInfo; use sp_std::collections::btree_map::BTreeMap; +/// Constraints on outbound HRMP channel. pub struct HrmpOutboundLimits { + /// The maximum bytes that can be written to the channel. pub bytes_remaining: u32, + /// The maximum messages that can be written to the channel. pub messages_remaining: u32, } +/// Constraints imposed on the entire segment, i.e. based on the latest included parablock. pub struct TotalBandwidthLimits { + /// The amount of UMP messages remaining. pub ump_messages_remaining: u32, + /// The amount of UMP bytes remaining. pub ump_bytes_remaining: u32, + /// The limitations of all registered outbound HRMP channels. pub hrmp_outgoing: BTreeMap, + /// Number of remaining DMP messages. pub dmp_remaining_messages: u32, } +/// The error type for updating bandwidth used by a segment. pub enum LimitExceededError { - HrmpMessagesOverflow { recipient: ParaId, messages_remaining: u32, messages_submitted: u32 }, - HrmpBytesOverflow { recipient: ParaId, bytes_remaining: u32, bytes_submitted: u32 }, - UmpMessagesOverflow { messages_remaining: u32, messages_submitted: u32 }, - UmpBytesOverflow { bytes_remaining: u32, bytes_submitted: u32 }, - DmpMessagesUnderflow { messages_remaining: u32, messages_processed: u32 }, + /// Too many messages submitted to HRMP channel. + HrmpMessagesOverflow { + /// Parachain id of the recipient. + recipient: ParaId, + /// The amount of remaining messages in the capacity of the channel. + messages_remaining: u32, + /// The amount of messages submitted to the channel. + messages_submitted: u32, + }, + /// Too many bytes submitted to HRMP channel. + HrmpBytesOverflow { + /// Parachain id of the recipient. + recipient: ParaId, + /// The amount of remaining bytes in the capacity of the channel. + bytes_remaining: u32, + /// The amount of bytes submitted to the channel. + bytes_submitted: u32, + }, + UmpMessagesOverflow { + messages_remaining: u32, + messages_submitted: u32, + }, + /// Too many messages submitted to UMP queue. + UmpBytesOverflow { + /// The amount of remaining messages in the capacity of UMP. + bytes_remaining: u32, + /// The amount of messages submitted to UMP. + bytes_submitted: u32, + }, + /// Too many messages processed from DMP. + DmpMessagesUnderflow { + /// The amount of messages waiting to be processed from DMP. + messages_remaining: u32, + /// The amount of DMP messages processed. + messages_processed: u32, + }, } +/// The number of messages and size in bytes submitted to HRMP channel. #[derive(Default, Copy, Clone, Encode, Decode, TypeInfo)] -pub struct HrmpChannelSize { +pub struct HrmpChannelUpdate { + /// The amount of messages submitted to the channel. pub msg_count: u32, + /// The amount of bytes submitted to the channel. pub total_bytes: u32, } -impl HrmpChannelSize { +impl HrmpChannelUpdate { + /// Returns `true` if the update is empty, `false` otherwise. fn is_empty(&self) -> bool { self.msg_count == 0 && self.total_bytes == 0 } + /// Tries to append another update, respecting given bandwidth limits. fn append( &self, other: &Self, @@ -83,21 +134,31 @@ impl HrmpChannelSize { Ok(new) } + /// Subtracts previously added channel update. fn subtract(&mut self, other: &Self) { self.msg_count -= other.msg_count; self.total_bytes -= other.total_bytes; } } +/// Bandwidth used by a parachain block(s). +/// +/// This struct can be created with pub items, however, it should +/// never hit the storage directly to avoid bypassing limitations checks. #[derive(Default, Clone, Encode, Decode, TypeInfo)] pub struct UsedBandwidth { + /// The amount of UMP messages sent. pub ump_msg_count: u32, + /// The amount of UMP bytes sent. pub ump_total_bytes: u32, - pub hrmp_outgoing: BTreeMap, + /// Outbound HRMP channels updates. + pub hrmp_outgoing: BTreeMap, + /// The amount of DMP messages processed. pub dmp_processed_count: u32, } impl UsedBandwidth { + /// Tries to append another update, respecting given bandwidth limits. fn append( &self, other: &Self, @@ -135,6 +196,7 @@ impl UsedBandwidth { Ok(new) } + /// Subtracts previously added bandwidth update. fn subtract(&mut self, other: &Self) { self.ump_msg_count -= other.ump_msg_count; self.ump_total_bytes -= other.ump_total_bytes; @@ -152,29 +214,40 @@ impl UsedBandwidth { } } +/// Ancestors of the block being currently executed which are not yet included +/// into the relay chain. #[derive(Encode, Decode, TypeInfo)] pub struct Ancestor { + /// Bandwidth used by this block. used_bandwidth: UsedBandwidth, + /// Output head data of this block. para_head: relay_chain::HeadData, } impl Ancestor { + /// Returns [`UsedBandwidth`] of this block. pub fn used_bandwidth(&self) -> &UsedBandwidth { &self.used_bandwidth } + /// Returns [output head data](`relay_chain::HeadData`) of this block. pub fn para_head(&self) -> &relay_chain::HeadData { &self.para_head } } +/// Struct that keeps track of bandwidth used by the unincluded part of the chain +/// along with the latest HRMP watermark. #[derive(Encode, Decode, TypeInfo)] pub struct SegmentTracker { + /// Bandwidth used by the segment. used_bandwidth: UsedBandwidth, + /// The mark which specifies the block number up to which all inbound HRMP messages are processed. hrmp_watermark: relay_chain::BlockNumber, } impl SegmentTracker { + /// Tries to append another block to the tracker, respecting given bandwidth limits. pub fn append( &mut self, block: &Ancestor, @@ -187,6 +260,7 @@ impl SegmentTracker { Ok(()) } + /// Removes previously added block from the tracker. pub fn subtract(&mut self, block: &Ancestor) { self.used_bandwidth.subtract(block.used_bandwidth()); } From 12971828afd747d5a365641cbd04035375f5a943 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Wed, 12 Apr 2023 17:37:54 +0400 Subject: [PATCH 11/26] storage items docs --- pallets/parachain-system/src/lib.rs | 12 ++++++++++++ pallets/parachain-system/src/unincluded_segment.rs | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index f33a3456c2e..80263988146 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -596,12 +596,24 @@ pub mod pallet { Unauthorized, } + /// Maximum number of latest included block descendants the runtime is allowed to accept. In other words, + /// these are ancestor of the block being currently executed, not yet sent to the relay chain runtime. + /// + /// This value is optional, but once set to `Some` by the governance, should never go back to `None`. + /// Requires latest included para head to be present in the relay chain storage proof. #[pallet::storage] pub(super) type MaxUnincludedLen = StorageValue<_, T::BlockNumber, OptionQuery>; + /// Latest included block descendants the runtime accepted. In other words, these are + /// ancestor of the block being currently executed, not yet sent to the relay chain runtime. + /// + /// The segment length is limited by [`MaxUnincludedLen`]. #[pallet::storage] pub(super) type UnincludedSegment = StorageValue<_, Vec, ValueQuery>; + /// Storage field that keeps track of bandwidth used by the unincluded segment + /// along with the latest HRMP watermark. Used for limiting the acceptance of new + /// blocks with respect to relay chain constraints. #[pallet::storage] pub(super) type AggregatedUnincludedSegment = StorageValue<_, SegmentTracker, OptionQuery>; diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 52d98b08f7c..2a50c2d5f54 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -214,7 +214,7 @@ impl UsedBandwidth { } } -/// Ancestors of the block being currently executed which are not yet included +/// Ancestor of the block being currently executed, not yet included /// into the relay chain. #[derive(Encode, Decode, TypeInfo)] pub struct Ancestor { From b439fbdabe605b5fc241afc56ee7fd79ae6511cb Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Wed, 12 Apr 2023 20:21:43 +0400 Subject: [PATCH 12/26] add a sanity check on block initialize --- pallets/parachain-system/src/lib.rs | 66 ++++++++++++++++++----------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 80263988146..50d4e654674 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -301,7 +301,7 @@ pub mod pallet { // If `MaxUnincludedLen` is present in the storage, parachain head // is always expected to be included into the relay storage proof. - let para_head = >::get().map(|_max_len| { + let para_head_with_len = >::get().map(|max_len| { let relay_chain_state = Self::relay_state_proof() .expect("relay state proof must be present in storage"); let validation_data = @@ -313,38 +313,54 @@ pub mod pallet { relay_chain_state, ) .expect("Invalid relay chain state proof"); - relay_state_proof - .read_included_para_head() - .expect("Invalid para head in relay chain state proof") + ( + relay_state_proof + .read_included_para_head() + .expect("Invalid para head in relay chain state proof"), + max_len, + ) }); weight += T::DbWeight::get().reads(1); // Update unincluded segment related storage values. - if let Some(para_head) = para_head { + if let Some((para_head, max_len)) = para_head_with_len { // Weight used for reading para head. weight += T::DbWeight::get().reads(2); - let dropped: Vec = >::mutate(|chain| { - // Drop everything up to the block with an included para head, if present. - let idx = chain - .iter() - .position(|block| block.para_head() == ¶_head) - .map_or(0, |idx| idx + 1); // inclusive. - - chain.drain(..idx).collect() - }); - weight += T::DbWeight::get().reads_writes(1, 1); - - if !dropped.is_empty() { - >::mutate(|agg| { - let agg = agg.as_mut().expect( - "dropped part of the segment wasn't empty, hence value exists; qed", - ); - for block in dropped { - agg.subtract(&block); - } - }); + if max_len > 0u32.into() { + let (dropped, left_count): (Vec, u32) = + >::mutate(|chain| { + // Drop everything up to the block with an included para head, if present. + let idx = chain + .iter() + .position(|block| block.para_head() == ¶_head) + .map_or(0, |idx| idx + 1); // inclusive. + + let left_count = (idx..chain.len()).count() as u32; + let dropped = chain.drain(..idx).collect(); + (dropped, left_count) + }); weight += T::DbWeight::get().reads_writes(1, 1); + + // sanity-check there's place for the block at finalization phase. + // + // TODO: this potentially restricts parachains from decreasing `MaxUnincludedLen` value. + assert!( + max_len > left_count.into(), + "no space left for the block in the unincluded segment" + ); + + if !dropped.is_empty() { + >::mutate(|agg| { + let agg = agg.as_mut().expect( + "dropped part of the segment wasn't empty, hence value exists; qed", + ); + for block in dropped { + agg.subtract(&block); + } + }); + weight += T::DbWeight::get().reads_writes(1, 1); + } } } From be8062698532bae3dbaab0164a1899e6c633e948 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Wed, 12 Apr 2023 21:59:18 +0400 Subject: [PATCH 13/26] Check watermark --- .../src/unincluded_segment.rs | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 2a50c2d5f54..b8556a6d237 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -46,7 +46,7 @@ pub struct TotalBandwidthLimits { } /// The error type for updating bandwidth used by a segment. -pub enum LimitExceededError { +pub enum BandwidthUpdateError { /// Too many messages submitted to HRMP channel. HrmpMessagesOverflow { /// Parachain id of the recipient. @@ -65,15 +65,18 @@ pub enum LimitExceededError { /// The amount of bytes submitted to the channel. bytes_submitted: u32, }, + /// Too many messages submitted to UMP queue. UmpMessagesOverflow { + /// The amount of remaining messages in the capacity of UMP. messages_remaining: u32, + /// The amount of messages submitted to UMP. messages_submitted: u32, }, - /// Too many messages submitted to UMP queue. + /// Too many bytes submitted to UMP. UmpBytesOverflow { - /// The amount of remaining messages in the capacity of UMP. + /// The amount of remaining bytes in the capacity of UMP. bytes_remaining: u32, - /// The amount of messages submitted to UMP. + /// The amount of bytes submitted to UMP. bytes_submitted: u32, }, /// Too many messages processed from DMP. @@ -83,6 +86,13 @@ pub enum LimitExceededError { /// The amount of DMP messages processed. messages_processed: u32, }, + /// Invalid HRMP watermark. + InvalidHrmpWatermark { + /// HRMP watermark submitted by the candidate. + submitted: relay_chain::BlockNumber, + /// Latest tracked HRMP watermark. + latest: relay_chain::BlockNumber, + }, } /// The number of messages and size in bytes submitted to HRMP channel. @@ -106,7 +116,7 @@ impl HrmpChannelUpdate { other: &Self, recipient: ParaId, limits: &TotalBandwidthLimits, - ) -> Result { + ) -> Result { let limits = limits .hrmp_outgoing .get(&recipient) @@ -116,7 +126,7 @@ impl HrmpChannelUpdate { new.msg_count = new.msg_count.saturating_add(other.msg_count); if new.msg_count > limits.messages_remaining { - return Err(LimitExceededError::HrmpMessagesOverflow { + return Err(BandwidthUpdateError::HrmpMessagesOverflow { recipient, messages_remaining: limits.messages_remaining, messages_submitted: new.msg_count, @@ -124,7 +134,7 @@ impl HrmpChannelUpdate { } new.total_bytes = new.total_bytes.saturating_add(other.total_bytes); if new.total_bytes > limits.bytes_remaining { - return Err(LimitExceededError::HrmpBytesOverflow { + return Err(BandwidthUpdateError::HrmpBytesOverflow { recipient, bytes_remaining: limits.bytes_remaining, bytes_submitted: new.total_bytes, @@ -163,26 +173,26 @@ impl UsedBandwidth { &self, other: &Self, limits: &TotalBandwidthLimits, - ) -> Result { + ) -> Result { let mut new = self.clone(); new.ump_msg_count = new.ump_msg_count.saturating_add(other.ump_msg_count); if new.ump_msg_count > limits.ump_messages_remaining { - return Err(LimitExceededError::UmpMessagesOverflow { + return Err(BandwidthUpdateError::UmpMessagesOverflow { messages_remaining: limits.ump_messages_remaining, messages_submitted: new.ump_msg_count, }) } new.ump_total_bytes = new.ump_total_bytes.saturating_add(other.ump_total_bytes); if new.ump_total_bytes > limits.ump_bytes_remaining { - return Err(LimitExceededError::UmpBytesOverflow { + return Err(BandwidthUpdateError::UmpBytesOverflow { bytes_remaining: limits.ump_bytes_remaining, bytes_submitted: new.ump_total_bytes, }) } new.dmp_processed_count = new.dmp_processed_count.saturating_add(other.dmp_processed_count); if new.dmp_processed_count > limits.dmp_remaining_messages { - return Err(LimitExceededError::DmpMessagesUnderflow { + return Err(BandwidthUpdateError::DmpMessagesUnderflow { messages_remaining: limits.dmp_remaining_messages, messages_processed: new.dmp_processed_count, }) @@ -253,7 +263,14 @@ impl SegmentTracker { block: &Ancestor, hrmp_watermark: relay_chain::BlockNumber, limits: &TotalBandwidthLimits, - ) -> Result<(), LimitExceededError> { + ) -> Result<(), BandwidthUpdateError> { + if hrmp_watermark <= self.hrmp_watermark { + return Err(BandwidthUpdateError::InvalidHrmpWatermark { + submitted: hrmp_watermark, + latest: self.hrmp_watermark, + }) + } + self.used_bandwidth = self.used_bandwidth.append(block.used_bandwidth(), limits)?; self.hrmp_watermark = hrmp_watermark; @@ -263,5 +280,7 @@ impl SegmentTracker { /// Removes previously added block from the tracker. pub fn subtract(&mut self, block: &Ancestor) { self.used_bandwidth.subtract(block.used_bandwidth()); + // Watermark doesn't need to be updated since the is always dropped + // from the tail of the segment. } } From d1f064a37036bc3091cbf707b2ce232e9e77be6a Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 13 Apr 2023 02:27:05 +0400 Subject: [PATCH 14/26] append to the segment on block finalize --- pallets/parachain-system/src/lib.rs | 87 ++++++++++++++++--- .../src/unincluded_segment.rs | 34 +++++++- 2 files changed, 105 insertions(+), 16 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 50d4e654674..9c32c95d938 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -48,7 +48,7 @@ use frame_system::{ensure_none, ensure_root}; use polkadot_parachain::primitives::RelayChainBlockNumber; use scale_info::TypeInfo; use sp_runtime::{ - traits::{Block as BlockT, BlockNumberProvider, Hash}, + traits::{Block as BlockT, BlockNumberProvider, Hash, Zero}, transaction_validity::{ InvalidTransaction, TransactionLongevity, TransactionSource, TransactionValidity, ValidTransaction, @@ -65,7 +65,9 @@ pub mod validate_block; #[cfg(test)] mod tests; -use unincluded_segment::{Ancestor, SegmentTracker}; +use unincluded_segment::{ + Ancestor, HrmpChannelUpdate, SegmentTracker, TotalBandwidthLimits, UsedBandwidth, +}; /// Register the `validate_block` function that is used by parachains to validate blocks on a /// validator. @@ -235,7 +237,7 @@ pub mod pallet { }, }; - >::mutate(|up| { + let (ump_msg_count, ump_total_bytes) = >::mutate(|up| { let (count, size) = relevant_messaging_state.relay_dispatch_queue_size; let available_capacity = cmp::min( @@ -246,24 +248,32 @@ pub mod pallet { // Count the number of messages we can possibly fit in the given constraints, i.e. // available_capacity and available_size. - let num = up + let (num, total_size) = up .iter() - .scan((available_capacity as usize, available_size as usize), |state, msg| { - let (cap_left, size_left) = *state; - match (cap_left.checked_sub(1), size_left.checked_sub(msg.len())) { - (Some(new_cap), Some(new_size)) => { + .scan((0u32, 0u32), |state, msg| { + let (cap_used, size_used) = *state; + let new_cap = cap_used.saturating_add(1); + let new_size = size_used.saturating_add(msg.len() as u32); + match available_capacity + .checked_sub(new_cap) + .and(available_size.checked_sub(new_size)) + { + Some(_) => { *state = (new_cap, new_size); - Some(()) + Some(*state) }, _ => None, } }) - .count(); + .last() + .unwrap_or_default(); // TODO: #274 Return back messages that do not longer fit into the queue. - UpwardMessages::::put(&up[..num]); - *up = up.split_off(num); + UpwardMessages::::put(&up[..num as usize]); + *up = up.split_off(num as usize); + + (num, total_size) }); // Sending HRMP messages is a little bit more involved. There are the following @@ -285,6 +295,40 @@ pub mod pallet { .map(|(recipient, data)| OutboundHrmpMessage { recipient, data }) .collect::>(); + if MaxUnincludedLen::::get().filter(|len| !len.is_zero()).is_some() { + let dmp_remaining_messages = PendingDownwardMessages::::get().len() as u32; + let limits = + TotalBandwidthLimits::new(&relevant_messaging_state, dmp_remaining_messages); + + let hrmp_outgoing = outbound_messages + .iter() + .map(|msg| { + ( + msg.recipient, + HrmpChannelUpdate { msg_count: 1, total_bytes: msg.data.len() as u32 }, + ) + }) + .collect(); + let dmp_processed_count = ProcessedDownwardMessages::::get(); + let used_bandwidth = UsedBandwidth { + ump_msg_count, + ump_total_bytes, + hrmp_outgoing, + dmp_processed_count, + }; + // The bandwidth constructed was ensured to satisfy relay chain constraints. + let ancestor = Ancestor::new_unchecked(used_bandwidth, todo!()); + + // Check in `on_initialize` guarantees there's space for this block. + UnincludedSegment::::append(ancestor); + + let watermark = HrmpWatermark::::get(); + AggregatedUnincludedSegment::::mutate(|agg| { + let agg = agg.get_or_insert_with(SegmentTracker::default); + agg.append(&ancestor, watermark, &limits) + .expect("unincluded segment limits exceeded"); + }); + } HrmpOutboundMessages::::put(outbound_messages); } @@ -327,7 +371,10 @@ pub mod pallet { // Weight used for reading para head. weight += T::DbWeight::get().reads(2); - if max_len > 0u32.into() { + if !max_len.is_zero() { + // Weight used by `on_finalize` in case the len is non-zero. + weight += T::DbWeight::get().reads_writes(4, 2); + let (dropped, left_count): (Vec, u32) = >::mutate(|chain| { // Drop everything up to the block with an included para head, if present. @@ -371,8 +418,9 @@ pub mod pallet { UpwardMessages::::kill(); HrmpOutboundMessages::::kill(); CustomValidationHeadData::::kill(); + PendingDownwardMessages::::kill(); - weight += T::DbWeight::get().writes(6); + weight += T::DbWeight::get().writes(7); // Here, in `on_initialize` we must report the weight for both `on_initialize` and // `on_finalize`. @@ -404,6 +452,9 @@ pub mod pallet { 4 + hrmp_max_message_num_per_candidate as u64, ); + // Always try to read `MaxUnincludedLen` in `on_finalize`. + weight += T::DbWeight::get().reads(1); + weight } } @@ -496,6 +547,7 @@ pub mod pallet { >::put(relay_chain_state); >::put(relevant_messaging_state.clone()); >::put(host_config); + >::put(downward_messages.clone()); ::on_validation_data(&vfp); @@ -634,6 +686,13 @@ pub mod pallet { pub(super) type AggregatedUnincludedSegment = StorageValue<_, SegmentTracker, OptionQuery>; + /// Downward messages sent by the relay chain waiting to be processed. + /// + /// Updated on every block. + #[pallet::storage] + pub(super) type PendingDownwardMessages = + StorageValue<_, Vec, ValueQuery>; + /// In case of a scheduled upgrade, this storage field contains the validation code to be applied. /// /// As soon as the relay chain gives us the go-ahead signal, we will overwrite the [`:code`][well_known_keys::CODE] diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index b8556a6d237..f53a83a2a8d 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -20,6 +20,7 @@ //! Unincluded segment describes a chain of latest included block descendants, which are not yet //! sent to relay chain. +use super::relay_state_snapshot::MessagingStateSnapshot; use codec::{Decode, Encode}; use cumulus_primitives_core::{relay_chain, ParaId}; use scale_info::TypeInfo; @@ -45,7 +46,31 @@ pub struct TotalBandwidthLimits { pub dmp_remaining_messages: u32, } +impl TotalBandwidthLimits { + /// Creates new limits from the messaging state. + pub fn new(messaging_state: &MessagingStateSnapshot, dmp_remaining_messages: u32) -> Self { + let (ump_messages_remaining, ump_bytes_remaining) = + messaging_state.relay_dispatch_queue_size; + let hrmp_outgoing = messaging_state + .egress_channels + .iter() + .map(|(id, channel)| { + ( + *id, + HrmpOutboundLimits { + bytes_remaining: channel.max_total_size, + messages_remaining: channel.max_capacity, + }, + ) + }) + .collect(); + + Self { ump_messages_remaining, ump_bytes_remaining, hrmp_outgoing, dmp_remaining_messages } + } +} + /// The error type for updating bandwidth used by a segment. +#[derive(Debug)] pub enum BandwidthUpdateError { /// Too many messages submitted to HRMP channel. HrmpMessagesOverflow { @@ -235,6 +260,11 @@ pub struct Ancestor { } impl Ancestor { + /// Creates new ancestor without validating the bandwidth used. + pub fn new_unchecked(used_bandwidth: UsedBandwidth, para_head: relay_chain::HeadData) -> Self { + Self { used_bandwidth, para_head } + } + /// Returns [`UsedBandwidth`] of this block. pub fn used_bandwidth(&self) -> &UsedBandwidth { &self.used_bandwidth @@ -248,7 +278,7 @@ impl Ancestor { /// Struct that keeps track of bandwidth used by the unincluded part of the chain /// along with the latest HRMP watermark. -#[derive(Encode, Decode, TypeInfo)] +#[derive(Default, Encode, Decode, TypeInfo)] pub struct SegmentTracker { /// Bandwidth used by the segment. used_bandwidth: UsedBandwidth, @@ -264,7 +294,7 @@ impl SegmentTracker { hrmp_watermark: relay_chain::BlockNumber, limits: &TotalBandwidthLimits, ) -> Result<(), BandwidthUpdateError> { - if hrmp_watermark <= self.hrmp_watermark { + if self.hrmp_watermark > 0 && hrmp_watermark <= self.hrmp_watermark { return Err(BandwidthUpdateError::InvalidHrmpWatermark { submitted: hrmp_watermark, latest: self.hrmp_watermark, From 303884cc1e0244eedda2a5f23961e219e7d6929a Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 17 Apr 2023 17:55:01 +0400 Subject: [PATCH 15/26] Move segment update into set_validation_data --- pallets/parachain-system/src/lib.rs | 125 +++++++++++++--------------- 1 file changed, 57 insertions(+), 68 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 9c32c95d938..c70da88f5e4 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -343,74 +343,6 @@ pub mod pallet { weight += T::DbWeight::get().writes(1); } - // If `MaxUnincludedLen` is present in the storage, parachain head - // is always expected to be included into the relay storage proof. - let para_head_with_len = >::get().map(|max_len| { - let relay_chain_state = Self::relay_state_proof() - .expect("relay state proof must be present in storage"); - let validation_data = - Self::validation_data().expect("validation data must be present in storage"); - - let relay_state_proof = RelayChainStateProof::new( - T::SelfParaId::get(), - validation_data.relay_parent_storage_root, - relay_chain_state, - ) - .expect("Invalid relay chain state proof"); - ( - relay_state_proof - .read_included_para_head() - .expect("Invalid para head in relay chain state proof"), - max_len, - ) - }); - weight += T::DbWeight::get().reads(1); - - // Update unincluded segment related storage values. - if let Some((para_head, max_len)) = para_head_with_len { - // Weight used for reading para head. - weight += T::DbWeight::get().reads(2); - - if !max_len.is_zero() { - // Weight used by `on_finalize` in case the len is non-zero. - weight += T::DbWeight::get().reads_writes(4, 2); - - let (dropped, left_count): (Vec, u32) = - >::mutate(|chain| { - // Drop everything up to the block with an included para head, if present. - let idx = chain - .iter() - .position(|block| block.para_head() == ¶_head) - .map_or(0, |idx| idx + 1); // inclusive. - - let left_count = (idx..chain.len()).count() as u32; - let dropped = chain.drain(..idx).collect(); - (dropped, left_count) - }); - weight += T::DbWeight::get().reads_writes(1, 1); - - // sanity-check there's place for the block at finalization phase. - // - // TODO: this potentially restricts parachains from decreasing `MaxUnincludedLen` value. - assert!( - max_len > left_count.into(), - "no space left for the block in the unincluded segment" - ); - - if !dropped.is_empty() { - >::mutate(|agg| { - let agg = agg.as_mut().expect( - "dropped part of the segment wasn't empty, hence value exists; qed", - ); - for block in dropped { - agg.subtract(&block); - } - }); - weight += T::DbWeight::get().reads_writes(1, 1); - } - } - } - // Remove the validation from the old block. ValidationData::::kill(); ProcessedDownwardMessages::::kill(); @@ -562,6 +494,7 @@ pub mod pallet { horizontal_messages, vfp.relay_parent_number, ); + total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof); Ok(PostDispatchInfo { actual_weight: Some(total_weight), pays_fee: Pays::No }) } @@ -1109,6 +1042,62 @@ impl Pallet { weight_used } + /// Drop blocks from the unincluded segment with respect to the latest parachain head. + /// + /// No-op if [`MaxUnincludedLen`] is not set. + fn maybe_drop_included_ancestors(relay_state_proof: &RelayChainStateProof) -> Weight { + let mut weight_used = Weight::zero(); + // If `MaxUnincludedLen` is present in the storage, parachain head + // is always expected to be included into the relay storage proof. + let para_head_with_len = >::get().map(|max_len| { + ( + relay_state_proof + .read_included_para_head() + .expect("Invalid para head in relay chain state proof"), + max_len, + ) + }); + weight_used += T::DbWeight::get().reads(1); + let Some((para_head, max_len)) = para_head_with_len else { return weight_used }; + + if !max_len.is_zero() { + let (dropped, left_count): (Vec, u32) = + >::mutate(|chain| { + // Drop everything up to the block with an included para head, if present. + let idx = chain + .iter() + .position(|block| block.para_head() == ¶_head) + .map_or(0, |idx| idx + 1); // inclusive. + + let left_count = (idx..chain.len()).count() as u32; + let dropped = chain.drain(..idx).collect(); + (dropped, left_count) + }); + weight_used += T::DbWeight::get().reads_writes(1, 1); + + // sanity-check there's place for the block at finalization phase. + // + // TODO: this potentially restricts parachains from decreasing `MaxUnincludedLen` value. + assert!( + max_len > left_count.into(), + "no space left for the block in the unincluded segment" + ); + + if !dropped.is_empty() { + >::mutate(|agg| { + let agg = agg.as_mut().expect( + "dropped part of the segment wasn't empty, hence value exists; qed", + ); + for block in dropped { + agg.subtract(&block); + } + }); + weight_used += T::DbWeight::get().reads_writes(1, 1); + } + } + weight_used + } + /// Put a new validation function into a particular location where polkadot /// monitors for updates. Calling this function notifies polkadot that a new /// upgrade has been scheduled. From ae4a4433cbbeff531e24ab0e9ca06c8ca5d06980 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 17 Apr 2023 23:29:47 +0400 Subject: [PATCH 16/26] Resolve para head todo --- pallets/parachain-system/src/lib.rs | 49 ++++++++++++++----- .../src/unincluded_segment.rs | 34 ++++++++----- 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index c70da88f5e4..3009a30b456 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -295,7 +295,7 @@ pub mod pallet { .map(|(recipient, data)| OutboundHrmpMessage { recipient, data }) .collect::>(); - if MaxUnincludedLen::::get().filter(|len| !len.is_zero()).is_some() { + if MaxUnincludedLen::::get().map_or(false, |max_len| !max_len.is_zero()) { let dmp_remaining_messages = PendingDownwardMessages::::get().len() as u32; let limits = TotalBandwidthLimits::new(&relevant_messaging_state, dmp_remaining_messages); @@ -317,10 +317,7 @@ pub mod pallet { dmp_processed_count, }; // The bandwidth constructed was ensured to satisfy relay chain constraints. - let ancestor = Ancestor::new_unchecked(used_bandwidth, todo!()); - - // Check in `on_initialize` guarantees there's space for this block. - UnincludedSegment::::append(ancestor); + let ancestor = Ancestor::new_unchecked(used_bandwidth); let watermark = HrmpWatermark::::get(); AggregatedUnincludedSegment::::mutate(|agg| { @@ -328,6 +325,8 @@ pub mod pallet { agg.append(&ancestor, watermark, &limits) .expect("unincluded segment limits exceeded"); }); + // Check in `on_initialize` guarantees there's space for this block. + UnincludedSegment::::append(ancestor); } HrmpOutboundMessages::::put(outbound_messages); } @@ -343,6 +342,23 @@ pub mod pallet { weight += T::DbWeight::get().writes(1); } + // New para head was unknown during block finalization, update it. + if MaxUnincludedLen::::get().map_or(false, |max_len| !max_len.is_zero()) { + >::mutate(|chain| { + if let Some(ancestor) = chain.last_mut() { + let parent = frame_system::Pallet::::parent_hash(); + // Ancestor is the latest finalized block, thus current parent is + // its output head. + ancestor.replace_para_head_hash(parent); + } + }); + weight += T::DbWeight::get().reads_writes(1, 1); + + // Weight used during finalization. + weight += T::DbWeight::get().reads_writes(4, 2); + } + weight += T::DbWeight::get().reads(1); + // Remove the validation from the old block. ValidationData::::kill(); ProcessedDownwardMessages::::kill(); @@ -606,18 +622,19 @@ pub mod pallet { pub(super) type MaxUnincludedLen = StorageValue<_, T::BlockNumber, OptionQuery>; /// Latest included block descendants the runtime accepted. In other words, these are - /// ancestor of the block being currently executed, not yet sent to the relay chain runtime. + /// ancestors of the block being currently executed, not yet sent to the relay chain runtime. /// /// The segment length is limited by [`MaxUnincludedLen`]. #[pallet::storage] - pub(super) type UnincludedSegment = StorageValue<_, Vec, ValueQuery>; + pub(super) type UnincludedSegment = + StorageValue<_, Vec>, ValueQuery>; - /// Storage field that keeps track of bandwidth used by the unincluded segment - /// along with the latest HRMP watermark. Used for limiting the acceptance of new - /// blocks with respect to relay chain constraints. + /// Storage field that keeps track of bandwidth used by the unincluded segment along with the latest + /// the latest HRMP watermark. Used for limiting the acceptance of new blocks with respect to relay + /// chain constraints. #[pallet::storage] pub(super) type AggregatedUnincludedSegment = - StorageValue<_, SegmentTracker, OptionQuery>; + StorageValue<_, SegmentTracker, OptionQuery>; /// Downward messages sent by the relay chain waiting to be processed. /// @@ -1060,13 +1077,19 @@ impl Pallet { weight_used += T::DbWeight::get().reads(1); let Some((para_head, max_len)) = para_head_with_len else { return weight_used }; + let para_head_hash = T::Hashing::hash(¶_head.0); if !max_len.is_zero() { - let (dropped, left_count): (Vec, u32) = + let (dropped, left_count): (Vec>, u32) = >::mutate(|chain| { // Drop everything up to the block with an included para head, if present. let idx = chain .iter() - .position(|block| block.para_head() == ¶_head) + .position(|block| { + let head_hash = block.para_head_hash().expect( + "para head hash is updated during block initialization; qed", + ); + head_hash == ¶_head_hash + }) .map_or(0, |idx| idx + 1); // inclusive. let left_count = (idx..chain.len()).count() as u32; diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index f53a83a2a8d..7e122751b03 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -24,7 +24,7 @@ use super::relay_state_snapshot::MessagingStateSnapshot; use codec::{Decode, Encode}; use cumulus_primitives_core::{relay_chain, ParaId}; use scale_info::TypeInfo; -use sp_std::collections::btree_map::BTreeMap; +use sp_std::{collections::btree_map::BTreeMap, marker::PhantomData}; /// Constraints on outbound HRMP channel. pub struct HrmpOutboundLimits { @@ -252,17 +252,18 @@ impl UsedBandwidth { /// Ancestor of the block being currently executed, not yet included /// into the relay chain. #[derive(Encode, Decode, TypeInfo)] -pub struct Ancestor { +pub struct Ancestor { /// Bandwidth used by this block. used_bandwidth: UsedBandwidth, - /// Output head data of this block. - para_head: relay_chain::HeadData, + /// Output head data hash of this block. This may be optional in case the head data has not + /// yet been posted on chain, but should be updated during initialization of the next block. + para_head_hash: Option, } -impl Ancestor { +impl Ancestor { /// Creates new ancestor without validating the bandwidth used. - pub fn new_unchecked(used_bandwidth: UsedBandwidth, para_head: relay_chain::HeadData) -> Self { - Self { used_bandwidth, para_head } + pub fn new_unchecked(used_bandwidth: UsedBandwidth) -> Self { + Self { used_bandwidth, para_head_hash: None } } /// Returns [`UsedBandwidth`] of this block. @@ -271,26 +272,33 @@ impl Ancestor { } /// Returns [output head data](`relay_chain::HeadData`) of this block. - pub fn para_head(&self) -> &relay_chain::HeadData { - &self.para_head + pub fn para_head_hash(&self) -> Option<&H> { + self.para_head_hash.as_ref() + } + + /// Set para head hash of this block. + pub fn replace_para_head_hash(&mut self, para_head_hash: H) { + self.para_head_hash.replace(para_head_hash); } } /// Struct that keeps track of bandwidth used by the unincluded part of the chain /// along with the latest HRMP watermark. #[derive(Default, Encode, Decode, TypeInfo)] -pub struct SegmentTracker { +pub struct SegmentTracker { /// Bandwidth used by the segment. used_bandwidth: UsedBandwidth, /// The mark which specifies the block number up to which all inbound HRMP messages are processed. hrmp_watermark: relay_chain::BlockNumber, + /// `H` is the type of para head hash. + phantom_data: PhantomData, } -impl SegmentTracker { +impl SegmentTracker { /// Tries to append another block to the tracker, respecting given bandwidth limits. pub fn append( &mut self, - block: &Ancestor, + block: &Ancestor, hrmp_watermark: relay_chain::BlockNumber, limits: &TotalBandwidthLimits, ) -> Result<(), BandwidthUpdateError> { @@ -308,7 +316,7 @@ impl SegmentTracker { } /// Removes previously added block from the tracker. - pub fn subtract(&mut self, block: &Ancestor) { + pub fn subtract(&mut self, block: &Ancestor) { self.used_bandwidth.subtract(block.used_bandwidth()); // Watermark doesn't need to be updated since the is always dropped // from the tail of the segment. From 4639b0d7c6ca268b5e171e2b0514ac294489d9be Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 17 Apr 2023 23:43:27 +0400 Subject: [PATCH 17/26] option watermark --- .../parachain-system/src/unincluded_segment.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 7e122751b03..21e42a65ec3 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -289,7 +289,7 @@ pub struct SegmentTracker { /// Bandwidth used by the segment. used_bandwidth: UsedBandwidth, /// The mark which specifies the block number up to which all inbound HRMP messages are processed. - hrmp_watermark: relay_chain::BlockNumber, + hrmp_watermark: Option, /// `H` is the type of para head hash. phantom_data: PhantomData, } @@ -302,15 +302,17 @@ impl SegmentTracker { hrmp_watermark: relay_chain::BlockNumber, limits: &TotalBandwidthLimits, ) -> Result<(), BandwidthUpdateError> { - if self.hrmp_watermark > 0 && hrmp_watermark <= self.hrmp_watermark { - return Err(BandwidthUpdateError::InvalidHrmpWatermark { - submitted: hrmp_watermark, - latest: self.hrmp_watermark, - }) + if let Some(watermark) = self.hrmp_watermark.as_ref() { + if &hrmp_watermark <= watermark { + return Err(BandwidthUpdateError::InvalidHrmpWatermark { + submitted: hrmp_watermark, + latest: *watermark, + }) + } } self.used_bandwidth = self.used_bandwidth.append(block.used_bandwidth(), limits)?; - self.hrmp_watermark = hrmp_watermark; + self.hrmp_watermark.replace(hrmp_watermark); Ok(()) } From 0e4b14a7e54dcebae3f0f5d6231bb44d88dd8733 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 17 Apr 2023 23:53:25 +0400 Subject: [PATCH 18/26] fix comment --- pallets/parachain-system/src/unincluded_segment.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 21e42a65ec3..570d192e4d9 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -271,7 +271,7 @@ impl Ancestor { &self.used_bandwidth } - /// Returns [output head data](`relay_chain::HeadData`) of this block. + /// Returns hashed [output head data](`relay_chain::HeadData`) of this block. pub fn para_head_hash(&self) -> Option<&H> { self.para_head_hash.as_ref() } From 29d92e801365a4e294980fa4a4ffffc381804e06 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 20 Apr 2023 01:26:24 +0400 Subject: [PATCH 19/26] Drop dmq check --- pallets/parachain-system/src/lib.rs | 24 +++--------------- .../src/unincluded_segment.rs | 25 +++---------------- 2 files changed, 8 insertions(+), 41 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 3009a30b456..c6cbbe52f78 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -296,9 +296,7 @@ pub mod pallet { .collect::>(); if MaxUnincludedLen::::get().map_or(false, |max_len| !max_len.is_zero()) { - let dmp_remaining_messages = PendingDownwardMessages::::get().len() as u32; - let limits = - TotalBandwidthLimits::new(&relevant_messaging_state, dmp_remaining_messages); + let limits = TotalBandwidthLimits::new(&relevant_messaging_state); let hrmp_outgoing = outbound_messages .iter() @@ -309,13 +307,8 @@ pub mod pallet { ) }) .collect(); - let dmp_processed_count = ProcessedDownwardMessages::::get(); - let used_bandwidth = UsedBandwidth { - ump_msg_count, - ump_total_bytes, - hrmp_outgoing, - dmp_processed_count, - }; + let used_bandwidth = + UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing }; // The bandwidth constructed was ensured to satisfy relay chain constraints. let ancestor = Ancestor::new_unchecked(used_bandwidth); @@ -366,9 +359,8 @@ pub mod pallet { UpwardMessages::::kill(); HrmpOutboundMessages::::kill(); CustomValidationHeadData::::kill(); - PendingDownwardMessages::::kill(); - weight += T::DbWeight::get().writes(7); + weight += T::DbWeight::get().writes(6); // Here, in `on_initialize` we must report the weight for both `on_initialize` and // `on_finalize`. @@ -495,7 +487,6 @@ pub mod pallet { >::put(relay_chain_state); >::put(relevant_messaging_state.clone()); >::put(host_config); - >::put(downward_messages.clone()); ::on_validation_data(&vfp); @@ -636,13 +627,6 @@ pub mod pallet { pub(super) type AggregatedUnincludedSegment = StorageValue<_, SegmentTracker, OptionQuery>; - /// Downward messages sent by the relay chain waiting to be processed. - /// - /// Updated on every block. - #[pallet::storage] - pub(super) type PendingDownwardMessages = - StorageValue<_, Vec, ValueQuery>; - /// In case of a scheduled upgrade, this storage field contains the validation code to be applied. /// /// As soon as the relay chain gives us the go-ahead signal, we will overwrite the [`:code`][well_known_keys::CODE] diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 570d192e4d9..4481bc11b91 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -42,13 +42,13 @@ pub struct TotalBandwidthLimits { pub ump_bytes_remaining: u32, /// The limitations of all registered outbound HRMP channels. pub hrmp_outgoing: BTreeMap, - /// Number of remaining DMP messages. - pub dmp_remaining_messages: u32, + // Downward queue is not checked against limits since new block always + // exhausts available messages from it. } impl TotalBandwidthLimits { /// Creates new limits from the messaging state. - pub fn new(messaging_state: &MessagingStateSnapshot, dmp_remaining_messages: u32) -> Self { + pub fn new(messaging_state: &MessagingStateSnapshot) -> Self { let (ump_messages_remaining, ump_bytes_remaining) = messaging_state.relay_dispatch_queue_size; let hrmp_outgoing = messaging_state @@ -65,7 +65,7 @@ impl TotalBandwidthLimits { }) .collect(); - Self { ump_messages_remaining, ump_bytes_remaining, hrmp_outgoing, dmp_remaining_messages } + Self { ump_messages_remaining, ump_bytes_remaining, hrmp_outgoing } } } @@ -104,13 +104,6 @@ pub enum BandwidthUpdateError { /// The amount of bytes submitted to UMP. bytes_submitted: u32, }, - /// Too many messages processed from DMP. - DmpMessagesUnderflow { - /// The amount of messages waiting to be processed from DMP. - messages_remaining: u32, - /// The amount of DMP messages processed. - messages_processed: u32, - }, /// Invalid HRMP watermark. InvalidHrmpWatermark { /// HRMP watermark submitted by the candidate. @@ -188,8 +181,6 @@ pub struct UsedBandwidth { pub ump_total_bytes: u32, /// Outbound HRMP channels updates. pub hrmp_outgoing: BTreeMap, - /// The amount of DMP messages processed. - pub dmp_processed_count: u32, } impl UsedBandwidth { @@ -215,13 +206,6 @@ impl UsedBandwidth { bytes_submitted: new.ump_total_bytes, }) } - new.dmp_processed_count = new.dmp_processed_count.saturating_add(other.dmp_processed_count); - if new.dmp_processed_count > limits.dmp_remaining_messages { - return Err(BandwidthUpdateError::DmpMessagesUnderflow { - messages_remaining: limits.dmp_remaining_messages, - messages_processed: new.dmp_processed_count, - }) - } for (id, channel) in other.hrmp_outgoing.iter() { let current = new.hrmp_outgoing.entry(*id).or_default(); @@ -235,7 +219,6 @@ impl UsedBandwidth { fn subtract(&mut self, other: &Self) { self.ump_msg_count -= other.ump_msg_count; self.ump_total_bytes -= other.ump_total_bytes; - self.dmp_processed_count -= other.dmp_processed_count; for (id, channel) in other.hrmp_outgoing.iter() { let entry = self From e28312b5ed510069d99ce545735dddd629e6c7f1 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 20 Apr 2023 02:19:23 +0400 Subject: [PATCH 20/26] fix weight --- pallets/parachain-system/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index c6cbbe52f78..8bd7cf7131d 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -348,7 +348,7 @@ pub mod pallet { weight += T::DbWeight::get().reads_writes(1, 1); // Weight used during finalization. - weight += T::DbWeight::get().reads_writes(4, 2); + weight += T::DbWeight::get().reads_writes(2, 2); } weight += T::DbWeight::get().reads(1); From 72b913f1d2697d5cd467c83ff23b76c5856793d1 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 20 Apr 2023 20:05:21 +0400 Subject: [PATCH 21/26] doc-comments on inherent invariant --- pallets/parachain-system/src/lib.rs | 12 ++++++++++++ pallets/parachain-system/src/unincluded_segment.rs | 2 -- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 8bd7cf7131d..6de08efb59f 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -296,6 +296,12 @@ pub mod pallet { .collect::>(); if MaxUnincludedLen::::get().map_or(false, |max_len| !max_len.is_zero()) { + // NOTE: these limits don't account for the amount of processed messages from + // downward and horizontal queues. + // + // This is correct because: + // - inherent never contains messages that were previously processed. + // - current implementation always attempts to exhaust each message queue. let limits = TotalBandwidthLimits::new(&relevant_messaging_state); let hrmp_outgoing = outbound_messages @@ -423,6 +429,12 @@ pub mod pallet { "ValidationData must be updated only once in a block", ); + // NOTE: the inherent data is expected to be unique, even if this block is built + // in the context of the same relay parent as the previous one. In particular, + // the inherent shouldn't contain messages that were already processed by any of the + // ancestors. + // + // This invariant should be upheld by the node-side. let ParachainInherentData { validation_data: vfp, relay_chain_state, diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 4481bc11b91..ec3c6216f03 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -42,8 +42,6 @@ pub struct TotalBandwidthLimits { pub ump_bytes_remaining: u32, /// The limitations of all registered outbound HRMP channels. pub hrmp_outgoing: BTreeMap, - // Downward queue is not checked against limits since new block always - // exhausts available messages from it. } impl TotalBandwidthLimits { From f39c7fce03baccaf3ffcf9938707219ff63c6b15 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 21 Apr 2023 17:06:08 +0400 Subject: [PATCH 22/26] Remove TODO --- pallets/parachain-system/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 6de08efb59f..97307a774e0 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -1096,7 +1096,8 @@ impl Pallet { // sanity-check there's place for the block at finalization phase. // - // TODO: this potentially restricts parachains from decreasing `MaxUnincludedLen` value. + // If this fails, the max segment len is reached and parachain should wait + // for ancestor's inclusion. assert!( max_len > left_count.into(), "no space left for the block in the unincluded segment" From 8ad0eee4483e2f5a94a9b2ab0a8f35d20e887f16 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 21 Apr 2023 17:16:14 +0400 Subject: [PATCH 23/26] add todo --- pallets/parachain-system/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 97307a774e0..e813a3aab27 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -321,6 +321,9 @@ pub mod pallet { let watermark = HrmpWatermark::::get(); AggregatedUnincludedSegment::::mutate(|agg| { let agg = agg.get_or_insert_with(SegmentTracker::default); + // TODO: In order of this panic to be correct, outbound message source should + // respect bandwidth limits as well. + // agg.append(&ancestor, watermark, &limits) .expect("unincluded segment limits exceeded"); }); From ea6b35cc09c743e8e75ca70b5bc5ed57ccbff3aa Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 21 Apr 2023 20:33:48 +0400 Subject: [PATCH 24/26] primitives tests --- Cargo.lock | 1 + pallets/parachain-system/Cargo.toml | 1 + .../src/unincluded_segment.rs | 306 +++++++++++++++++- 3 files changed, 307 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 97e893c299e..10b520c0ad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2150,6 +2150,7 @@ dependencies = [ name = "cumulus-pallet-parachain-system" version = "0.1.0" dependencies = [ + "assert_matches", "bytes", "cumulus-pallet-parachain-system-proc-macro", "cumulus-primitives-core", diff --git a/pallets/parachain-system/Cargo.toml b/pallets/parachain-system/Cargo.toml index 668457be002..0d6f1954b9e 100644 --- a/pallets/parachain-system/Cargo.toml +++ b/pallets/parachain-system/Cargo.toml @@ -36,6 +36,7 @@ cumulus-primitives-core = { path = "../../primitives/core", default-features = f cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent", default-features = false } [dev-dependencies] +assert_matches = "1.5" hex-literal = "0.4.1" lazy_static = "1.4" diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index ec3c6216f03..d0e5dd47f61 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -112,7 +112,7 @@ pub enum BandwidthUpdateError { } /// The number of messages and size in bytes submitted to HRMP channel. -#[derive(Default, Copy, Clone, Encode, Decode, TypeInfo)] +#[derive(Debug, Default, Copy, Clone, Encode, Decode, TypeInfo)] pub struct HrmpChannelUpdate { /// The amount of messages submitted to the channel. pub msg_count: u32, @@ -305,3 +305,307 @@ impl SegmentTracker { // from the tail of the segment. } } + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + + #[test] + fn hrmp_msg_count_limits() { + let para_0 = ParaId::from(0); + let para_0_limits = HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: 5 }; + + let para_1 = ParaId::from(1); + let para_1_limits = HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: 3 }; + let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into(); + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing, + }; + + let mut hrmp_update = HrmpChannelUpdate::default(); + assert!(hrmp_update.is_empty()); + + for _ in 0..5 { + hrmp_update = hrmp_update + .append(&HrmpChannelUpdate { msg_count: 1, total_bytes: 10 }, para_0, &limits) + .expect("update is withing the limits"); + } + assert_matches!( + hrmp_update.append( + &HrmpChannelUpdate { msg_count: 1, total_bytes: 10 }, + para_0, + &limits, + ), + Err(BandwidthUpdateError::HrmpMessagesOverflow { + recipient, + messages_remaining, + messages_submitted, + }) if recipient == para_0 && messages_remaining == 5 && messages_submitted == 6 + ); + + let mut hrmp_update = HrmpChannelUpdate::default(); + hrmp_update = hrmp_update + .append(&HrmpChannelUpdate { msg_count: 2, total_bytes: 10 }, para_1, &limits) + .expect("update is withing the limits"); + assert_matches!( + hrmp_update.append( + &HrmpChannelUpdate { msg_count: 3, total_bytes: 10 }, + para_1, + &limits, + ), + Err(BandwidthUpdateError::HrmpMessagesOverflow { + recipient, + messages_remaining, + messages_submitted, + }) if recipient == para_1 && messages_remaining == 3 && messages_submitted == 5 + ); + } + + #[test] + fn hrmp_bytes_limits() { + let para_0 = ParaId::from(0); + let para_0_limits = + HrmpOutboundLimits { bytes_remaining: 25, messages_remaining: u32::MAX }; + + let hrmp_outgoing = [(para_0, para_0_limits)].into(); + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing, + }; + + let mut hrmp_update = HrmpChannelUpdate::default(); + assert!(hrmp_update.is_empty()); + + for _ in 0..5 { + hrmp_update = hrmp_update + .append(&HrmpChannelUpdate { msg_count: 1, total_bytes: 4 }, para_0, &limits) + .expect("update is withing the limits"); + } + assert_matches!( + hrmp_update.append( + &HrmpChannelUpdate { msg_count: 1, total_bytes: 6 }, + para_0, + &limits, + ), + Err(BandwidthUpdateError::HrmpBytesOverflow { + recipient, + bytes_remaining, + bytes_submitted, + }) if recipient == para_0 && bytes_remaining == 25 && bytes_submitted == 26 + ); + } + + #[test] + fn hrmp_limits_with_segment() { + let create_used_hrmp = + |hrmp_outgoing| UsedBandwidth { ump_msg_count: 0, ump_total_bytes: 0, hrmp_outgoing }; + + let para_0 = ParaId::from(0); + let para_0_limits = HrmpOutboundLimits { bytes_remaining: 30, messages_remaining: 10 }; + + let para_1 = ParaId::from(1); + let para_1_limits = HrmpOutboundLimits { bytes_remaining: 20, messages_remaining: 3 }; + let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into(); + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing, + }; + + let mut segment = SegmentTracker::default(); + + let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 6 }; + let ancestor_0 = Ancestor { + used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), + para_head_hash: None::, + }; + segment.append(&ancestor_0, 0, &limits).expect("update is withing the limits"); + + for watermark in 1..5 { + let ancestor = Ancestor { + used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), + para_head_hash: None::, + }; + segment + .append(&ancestor, watermark, &limits) + .expect("update is withing the limits"); + } + + let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 1 }; + let ancestor_5 = Ancestor { + used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), + para_head_hash: None::, + }; + assert_matches!( + segment.append(&ancestor_5, 5, &limits), + Err(BandwidthUpdateError::HrmpBytesOverflow { + recipient, + bytes_remaining, + bytes_submitted, + }) if recipient == para_0 && bytes_remaining == 30 && bytes_submitted == 31 + ); + // Remove the first ancestor from the segment to make space. + segment.subtract(&ancestor_0); + segment.append(&ancestor_5, 5, &limits).expect("update is withing the limits"); + + let para_1_update = HrmpChannelUpdate { msg_count: 3, total_bytes: 10 }; + let ancestor = Ancestor { + used_bandwidth: create_used_hrmp([(para_1, para_1_update)].into()), + para_head_hash: None::, + }; + segment.append(&ancestor, 6, &limits).expect("update is withing the limits"); + + assert_matches!( + segment.append(&ancestor, 7, &limits), + Err(BandwidthUpdateError::HrmpMessagesOverflow { + recipient, + messages_remaining, + messages_submitted, + }) if recipient == para_1 && messages_remaining == 3 && messages_submitted == 6 + ); + } + + #[test] + fn ump_limits_with_segment() { + let create_used_ump = |(ump_msg_count, ump_total_bytes)| UsedBandwidth { + ump_msg_count, + ump_total_bytes, + hrmp_outgoing: BTreeMap::default(), + }; + + let limits = TotalBandwidthLimits { + ump_messages_remaining: 5, + ump_bytes_remaining: 50, + hrmp_outgoing: BTreeMap::default(), + }; + + let mut segment = SegmentTracker::default(); + + let ancestor_0 = Ancestor { + used_bandwidth: create_used_ump((1, 10)), + para_head_hash: None::, + }; + segment.append(&ancestor_0, 0, &limits).expect("update is withing the limits"); + + for watermark in 1..4 { + let ancestor = Ancestor { + used_bandwidth: create_used_ump((1, 10)), + para_head_hash: None::, + }; + segment + .append(&ancestor, watermark, &limits) + .expect("update is withing the limits"); + } + + let ancestor_4 = Ancestor { + used_bandwidth: create_used_ump((1, 30)), + para_head_hash: None::, + }; + assert_matches!( + segment.append(&ancestor_4, 4, &limits), + Err(BandwidthUpdateError::UmpBytesOverflow { + bytes_remaining, + bytes_submitted, + }) if bytes_remaining == 50 && bytes_submitted == 70 + ); + + let ancestor = Ancestor { + used_bandwidth: create_used_ump((1, 5)), + para_head_hash: None::, + }; + segment.append(&ancestor, 4, &limits).expect("update is withing the limits"); + assert_matches!( + segment.append(&ancestor, 5, &limits), + Err(BandwidthUpdateError::UmpMessagesOverflow { + messages_remaining, + messages_submitted, + }) if messages_remaining == 5 && messages_submitted == 6 + ); + } + + #[test] + fn segment_hrmp_watermark() { + let mut segment = SegmentTracker::default(); + + let ancestor = Ancestor { + used_bandwidth: UsedBandwidth::default(), + para_head_hash: None::, + }; + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing: BTreeMap::default(), + }; + + segment + .append(&ancestor, 0, &limits) + .expect("nothing to compare the watermark with in default segment"); + assert_matches!( + segment.append(&ancestor, 0, &limits), + Err(BandwidthUpdateError::InvalidHrmpWatermark { + submitted, + latest, + }) if submitted == 0 && latest == 0 + ); + + for watermark in 1..5 { + segment.append(&ancestor, watermark, &limits).expect("hrmp watermark is valid"); + } + for watermark in 0..5 { + assert_matches!( + segment.append(&ancestor, watermark, &limits), + Err(BandwidthUpdateError::InvalidHrmpWatermark { + submitted, + latest, + }) if submitted == watermark && latest == 4 + ); + } + } + + #[test] + fn segment_drops_empty_hrmp_channels() { + let create_used_hrmp = + |hrmp_outgoing| UsedBandwidth { ump_msg_count: 0, ump_total_bytes: 0, hrmp_outgoing }; + + let para_0 = ParaId::from(0); + let para_0_limits = + HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: u32::MAX }; + + let para_1 = ParaId::from(1); + let para_1_limits = + HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: u32::MAX }; + let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into(); + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing, + }; + + let mut segment = SegmentTracker::default(); + + let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 1 }; + let ancestor_0 = Ancestor { + used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), + para_head_hash: None::, + }; + segment.append(&ancestor_0, 0, &limits).expect("update is withing the limits"); + let para_1_update = HrmpChannelUpdate { msg_count: 3, total_bytes: 10 }; + let ancestor_1 = Ancestor { + used_bandwidth: create_used_hrmp([(para_1, para_1_update)].into()), + para_head_hash: None::, + }; + segment.append(&ancestor_1, 1, &limits).expect("update is withing the limits"); + + assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 2); + + segment.subtract(&ancestor_0); + assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 1); + + segment.subtract(&ancestor_1); + assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 0); + } +} From 163b5164d7f96d57b7c17f7d618e44a579e516cb Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sat, 22 Apr 2023 02:51:28 +0400 Subject: [PATCH 25/26] pallet tests --- pallets/parachain-system/src/tests.rs | 97 ++++++++++++++++++++++++++- test/relay-sproof-builder/src/lib.rs | 5 ++ 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/pallets/parachain-system/src/tests.rs b/pallets/parachain-system/src/tests.rs index 70e4c106bf2..8edbabaf5a7 100755 --- a/pallets/parachain-system/src/tests.rs +++ b/pallets/parachain-system/src/tests.rs @@ -38,6 +38,7 @@ use sp_runtime::{ traits::{BlakeTwo256, IdentityLookup}, DispatchErrorWithPostInfo, }; +use sp_std::collections::vec_deque::VecDeque; use sp_version::RuntimeVersion; use std::cell::RefCell; @@ -231,6 +232,11 @@ struct BlockTests { persisted_validation_data_hook: Option>, inherent_data_hook: Option>, + inclusion_delay: Option, + max_unincluded_len: Option, + + included_para_head: Option, + pending_blocks: VecDeque, } impl BlockTests { @@ -291,9 +297,25 @@ impl BlockTests { self } + fn with_unincluded_segment(mut self, inclusion_delay: usize, max_unincluded_len: u64) -> Self { + self.inclusion_delay.replace(inclusion_delay); + self.max_unincluded_len.replace(max_unincluded_len); + self + } + fn run(&mut self) { self.ran = true; wasm_ext().execute_with(|| { + let mut parent_head_data = { + let header = Header::new_from_number(0); + relay_chain::HeadData(header.encode()) + }; + + if let Some(max_unincluded_len) = self.max_unincluded_len { + // Initialize included head if the segment is enabled. + self.included_para_head.replace(parent_head_data.clone()); + >::put(max_unincluded_len); + } for BlockTest { n, within_block, after_block } in self.tests.iter() { // clear pending updates, as applicable if let Some(upgrade_block) = self.pending_upgrade { @@ -303,11 +325,17 @@ impl BlockTests { } // begin initialization + let parent_hash = BlakeTwo256::hash(&parent_head_data.0); System::reset_events(); - System::initialize(&n, &Default::default(), &Default::default()); + System::initialize(&n, &parent_hash, &Default::default()); // now mess with the storage the way validate_block does let mut sproof_builder = RelayStateSproofBuilder::default(); + sproof_builder.included_para_head = self + .included_para_head + .clone() + .unwrap_or_else(|| parent_head_data.clone()) + .into(); if let Some(ref hook) = self.relay_sproof_builder_hook { hook(self, *n as RelayChainBlockNumber, &mut sproof_builder); } @@ -364,7 +392,23 @@ impl BlockTests { } // clean up - System::finalize(); + let header = System::finalize(); + let head_data = relay_chain::HeadData(header.encode()); + parent_head_data = head_data.clone(); + match self.inclusion_delay { + Some(delay) if delay > 0 => { + self.pending_blocks.push_back(head_data); + if self.pending_blocks.len() > delay { + let included = self.pending_blocks.pop_front().unwrap(); + + self.included_para_head.replace(included); + } + }, + _ => { + self.included_para_head.replace(head_data); + }, + } + if let Some(after_block) = after_block { after_block(); } @@ -387,6 +431,55 @@ fn block_tests_run_on_drop() { BlockTests::new().add(123, || panic!("if this test passes, block tests run properly")); } +#[test] +fn unincluded_segment_works() { + BlockTests::new() + .with_unincluded_segment(1, 10) + .add_with_post_test( + 123, + || {}, + || { + let segment = >::get(); + assert_eq!(segment.len(), 1); + assert!(>::get().is_some()); + }, + ) + .add_with_post_test( + 124, + || {}, + || { + let segment = >::get(); + assert_eq!(segment.len(), 2); + }, + ) + .add_with_post_test( + 125, + || {}, + || { + let segment = >::get(); + // Block 123 was popped from the segment, the len is still 2. + assert_eq!(segment.len(), 2); + }, + ); +} + +#[test] +#[should_panic] +fn unincluded_segment_is_limited() { + BlockTests::new() + .with_unincluded_segment(10, 1) + .add_with_post_test( + 123, + || {}, + || { + let segment = >::get(); + assert_eq!(segment.len(), 1); + assert!(>::get().is_some()); + }, + ) + .add(124, || {}); // The previous block wasn't included yet, should panic in `create_inherent`. +} + #[test] fn events() { BlockTests::new() diff --git a/test/relay-sproof-builder/src/lib.rs b/test/relay-sproof-builder/src/lib.rs index decc6ee3aa0..b63ecdf5fc7 100644 --- a/test/relay-sproof-builder/src/lib.rs +++ b/test/relay-sproof-builder/src/lib.rs @@ -46,6 +46,7 @@ pub struct RelayStateSproofBuilder { pub current_epoch: u64, pub randomness: relay_chain::Hash, pub additional_key_values: Vec<(Vec, Vec)>, + pub included_para_head: Option, } impl Default for RelayStateSproofBuilder { @@ -73,6 +74,7 @@ impl Default for RelayStateSproofBuilder { current_epoch: 0u64, randomness: relay_chain::Hash::default(), additional_key_values: vec![], + included_para_head: None, } } } @@ -124,6 +126,9 @@ impl RelayStateSproofBuilder { dmq_mqc_head.encode(), ); } + if let Some(para_head) = self.included_para_head { + insert(relay_chain::well_known_keys::para_head(self.para_id), para_head.encode()); + } if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_size { insert( relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id), From 515f790368d101969168541f23d3742cf50ea8ea Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Sat, 22 Apr 2023 02:53:52 +0400 Subject: [PATCH 26/26] doc comments --- pallets/parachain-system/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index e813a3aab27..3edcc6dfb14 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -302,6 +302,8 @@ pub mod pallet { // This is correct because: // - inherent never contains messages that were previously processed. // - current implementation always attempts to exhaust each message queue. + // + // let limits = TotalBandwidthLimits::new(&relevant_messaging_state); let hrmp_outgoing = outbound_messages @@ -437,7 +439,7 @@ pub mod pallet { // the inherent shouldn't contain messages that were already processed by any of the // ancestors. // - // This invariant should be upheld by the node-side. + // This invariant should be upheld by the `ProvideInherent` implementation. let ParachainInherentData { validation_data: vfp, relay_chain_state,