Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

(async backing) parachain-system: track limitations for unincluded blocks #2438

Merged
merged 26 commits into from
Apr 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pallets/parachain-system/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ cumulus-primitives-core = { path = "../../primitives/core", default-features = f
cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent", default-features = false }

[dev-dependencies]
assert_matches = "1.5"
hex-literal = "0.4.1"
lazy_static = "1.4"

Expand Down
185 changes: 174 additions & 11 deletions pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use frame_system::{ensure_none, ensure_root};
use polkadot_parachain::primitives::RelayChainBlockNumber;
use scale_info::TypeInfo;
use sp_runtime::{
traits::{Block as BlockT, BlockNumberProvider, Hash},
traits::{Block as BlockT, BlockNumberProvider, Hash, Zero},
transaction_validity::{
InvalidTransaction, TransactionLongevity, TransactionSource, TransactionValidity,
ValidTransaction,
Expand All @@ -59,11 +59,16 @@ use xcm::latest::XcmHash;

mod migration;
mod relay_state_snapshot;
mod unincluded_segment;
#[macro_use]
pub mod validate_block;
#[cfg(test)]
mod tests;

use unincluded_segment::{
Ancestor, HrmpChannelUpdate, SegmentTracker, TotalBandwidthLimits, UsedBandwidth,
};

/// Register the `validate_block` function that is used by parachains to validate blocks on a
/// validator.
///
Expand Down Expand Up @@ -232,7 +237,7 @@ pub mod pallet {
},
};

<PendingUpwardMessages<T>>::mutate(|up| {
let (ump_msg_count, ump_total_bytes) = <PendingUpwardMessages<T>>::mutate(|up| {
let (count, size) = relevant_messaging_state.relay_dispatch_queue_size;

let available_capacity = cmp::min(
Expand All @@ -243,24 +248,32 @@ pub mod pallet {

// Count the number of messages we can possibly fit in the given constraints, i.e.
// available_capacity and available_size.
let num = up
let (num, total_size) = up
.iter()
.scan((available_capacity as usize, available_size as usize), |state, msg| {
let (cap_left, size_left) = *state;
match (cap_left.checked_sub(1), size_left.checked_sub(msg.len())) {
(Some(new_cap), Some(new_size)) => {
.scan((0u32, 0u32), |state, msg| {
let (cap_used, size_used) = *state;
let new_cap = cap_used.saturating_add(1);
let new_size = size_used.saturating_add(msg.len() as u32);
match available_capacity
.checked_sub(new_cap)
.and(available_size.checked_sub(new_size))
{
Some(_) => {
*state = (new_cap, new_size);
Some(())
Some(*state)
},
_ => None,
}
})
.count();
.last()
.unwrap_or_default();

// TODO: #274 Return back messages that do not longer fit into the queue.

UpwardMessages::<T>::put(&up[..num]);
*up = up.split_off(num);
UpwardMessages::<T>::put(&up[..num as usize]);
*up = up.split_off(num as usize);

(num, total_size)
});

// Sending HRMP messages is a little bit more involved. There are the following
Expand All @@ -282,6 +295,43 @@ pub mod pallet {
.map(|(recipient, data)| OutboundHrmpMessage { recipient, data })
.collect::<Vec<_>>();

if MaxUnincludedLen::<T>::get().map_or(false, |max_len| !max_len.is_zero()) {
// NOTE: these limits don't account for the amount of processed messages from
// downward and horizontal queues.
//
// This is correct because:
// - inherent never contains messages that were previously processed.
// - current implementation always attempts to exhaust each message queue.
slumber marked this conversation as resolved.
Show resolved Hide resolved
//
// <https://github.com/paritytech/cumulus/issues/2472>
let limits = TotalBandwidthLimits::new(&relevant_messaging_state);

let hrmp_outgoing = outbound_messages
.iter()
.map(|msg| {
(
msg.recipient,
HrmpChannelUpdate { msg_count: 1, total_bytes: msg.data.len() as u32 },
)
})
.collect();
let used_bandwidth =
UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing };
// The bandwidth constructed was ensured to satisfy relay chain constraints.
let ancestor = Ancestor::new_unchecked(used_bandwidth);

let watermark = HrmpWatermark::<T>::get();
AggregatedUnincludedSegment::<T>::mutate(|agg| {
let agg = agg.get_or_insert_with(SegmentTracker::default);
// TODO: In order of this panic to be correct, outbound message source should
// respect bandwidth limits as well.
// <https://github.com/paritytech/cumulus/issues/2471>
agg.append(&ancestor, watermark, &limits)
.expect("unincluded segment limits exceeded");
slumber marked this conversation as resolved.
Show resolved Hide resolved
});
// Check in `on_initialize` guarantees there's space for this block.
UnincludedSegment::<T>::append(ancestor);
}
HrmpOutboundMessages::<T>::put(outbound_messages);
}

Expand All @@ -296,6 +346,23 @@ pub mod pallet {
weight += T::DbWeight::get().writes(1);
}

// New para head was unknown during block finalization, update it.
if MaxUnincludedLen::<T>::get().map_or(false, |max_len| !max_len.is_zero()) {
<UnincludedSegment<T>>::mutate(|chain| {
if let Some(ancestor) = chain.last_mut() {
let parent = frame_system::Pallet::<T>::parent_hash();
// Ancestor is the latest finalized block, thus current parent is
// its output head.
ancestor.replace_para_head_hash(parent);
}
});
weight += T::DbWeight::get().reads_writes(1, 1);

// Weight used during finalization.
weight += T::DbWeight::get().reads_writes(2, 2);
}
weight += T::DbWeight::get().reads(1);

// Remove the validation from the old block.
ValidationData::<T>::kill();
ProcessedDownwardMessages::<T>::kill();
Expand Down Expand Up @@ -336,6 +403,9 @@ pub mod pallet {
4 + hrmp_max_message_num_per_candidate as u64,
);

// Always try to read `MaxUnincludedLen` in `on_finalize`.
weight += T::DbWeight::get().reads(1);

weight
}
}
Expand Down Expand Up @@ -364,6 +434,12 @@ pub mod pallet {
"ValidationData must be updated only once in a block",
);

// NOTE: the inherent data is expected to be unique, even if this block is built
// in the context of the same relay parent as the previous one. In particular,
// the inherent shouldn't contain messages that were already processed by any of the
// ancestors.
//
// This invariant should be upheld by the `ProvideInherent` implementation.
let ParachainInherentData {
validation_data: vfp,
relay_chain_state,
Expand Down Expand Up @@ -442,6 +518,7 @@ pub mod pallet {
horizontal_messages,
vfp.relay_parent_number,
);
total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof);

Ok(PostDispatchInfo { actual_weight: Some(total_weight), pays_fee: Pays::No })
}
Expand Down Expand Up @@ -544,6 +621,29 @@ pub mod pallet {
Unauthorized,
}

/// Maximum number of latest included block descendants the runtime is allowed to accept. In other words,
/// these are ancestor of the block being currently executed, not yet sent to the relay chain runtime.
///
/// This value is optional, but once set to `Some` by the governance, should never go back to `None`.
/// Requires latest included para head to be present in the relay chain storage proof.
#[pallet::storage]
pub(super) type MaxUnincludedLen<T: Config> = StorageValue<_, T::BlockNumber, OptionQuery>;

/// Latest included block descendants the runtime accepted. In other words, these are
/// ancestors of the block being currently executed, not yet sent to the relay chain runtime.
///
/// The segment length is limited by [`MaxUnincludedLen`].
#[pallet::storage]
pub(super) type UnincludedSegment<T: Config> =
StorageValue<_, Vec<Ancestor<T::Hash>>, ValueQuery>;

/// Storage field that keeps track of bandwidth used by the unincluded segment along with the latest
/// the latest HRMP watermark. Used for limiting the acceptance of new blocks with respect to relay
/// chain constraints.
#[pallet::storage]
pub(super) type AggregatedUnincludedSegment<T: Config> =
StorageValue<_, SegmentTracker<T::Hash>, OptionQuery>;

/// In case of a scheduled upgrade, this storage field contains the validation code to be applied.
///
/// As soon as the relay chain gives us the go-ahead signal, we will overwrite the [`:code`][well_known_keys::CODE]
Expand Down Expand Up @@ -960,6 +1060,69 @@ impl<T: Config> Pallet<T> {
weight_used
}

/// Drop blocks from the unincluded segment with respect to the latest parachain head.
///
/// No-op if [`MaxUnincludedLen`] is not set.
fn maybe_drop_included_ancestors(relay_state_proof: &RelayChainStateProof) -> Weight {
let mut weight_used = Weight::zero();
// If `MaxUnincludedLen` is present in the storage, parachain head
// is always expected to be included into the relay storage proof.
let para_head_with_len = <MaxUnincludedLen<T>>::get().map(|max_len| {
(
relay_state_proof
.read_included_para_head()
.expect("Invalid para head in relay chain state proof"),
max_len,
)
});
weight_used += T::DbWeight::get().reads(1);
let Some((para_head, max_len)) = para_head_with_len else { return weight_used };

let para_head_hash = T::Hashing::hash(&para_head.0);
if !max_len.is_zero() {
let (dropped, left_count): (Vec<Ancestor<T::Hash>>, u32) =
<UnincludedSegment<T>>::mutate(|chain| {
// Drop everything up to the block with an included para head, if present.
let idx = chain
.iter()
.position(|block| {
let head_hash = block.para_head_hash().expect(
"para head hash is updated during block initialization; qed",
);
head_hash == &para_head_hash
})
.map_or(0, |idx| idx + 1); // inclusive.

let left_count = (idx..chain.len()).count() as u32;
let dropped = chain.drain(..idx).collect();
(dropped, left_count)
});
weight_used += T::DbWeight::get().reads_writes(1, 1);

// sanity-check there's place for the block at finalization phase.
//
// If this fails, the max segment len is reached and parachain should wait
// for ancestor's inclusion.
assert!(
max_len > left_count.into(),
"no space left for the block in the unincluded segment"
);

if !dropped.is_empty() {
<AggregatedUnincludedSegment<T>>::mutate(|agg| {
let agg = agg.as_mut().expect(
"dropped part of the segment wasn't empty, hence value exists; qed",
);
for block in dropped {
agg.subtract(&block);
}
});
weight_used += T::DbWeight::get().reads_writes(1, 1);
}
}
weight_used
}

/// Put a new validation function into a particular location where polkadot
/// monitors for updates. Calling this function notifies polkadot that a new
/// upgrade has been scheduled.
Expand Down
10 changes: 10 additions & 0 deletions pallets/parachain-system/src/relay_state_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub enum Error {
HrmpEgressChannelIndex(ReadEntryErr),
/// The channel identified by the sender and receiver cannot be extracted.
HrmpChannel(ParaId, ParaId, ReadEntryErr),
/// The latest included parachain head cannot be extracted.
ParaHead(ReadEntryErr),
}

#[derive(Debug)]
Expand Down Expand Up @@ -235,6 +237,14 @@ impl RelayChainStateProof {
.map_err(Error::Config)
}

/// Read latest included parachain [head data](`relay_chain::HeadData`) from the relay chain state proof.
///
/// Returns an error if anything failed at reading or decoding.
pub fn read_included_para_head(&self) -> Result<relay_chain::HeadData, Error> {
read_entry(&self.trie_backend, &relay_chain::well_known_keys::para_head(self.para_id), None)
.map_err(Error::ParaHead)
}

/// Read the [`Slot`](relay_chain::Slot) from the relay chain state proof.
///
/// The slot is slot of the relay chain block this state proof was extracted from.
Expand Down
Loading