Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Runtime] Bound XCMP queue #2302

Merged
merged 26 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cumulus/pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use codec::{Decode, Encode};
use cumulus_primitives_core::{
relay_chain, AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo,
GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, MessageSendError,
GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, ListChannelInfos, MessageSendError,
OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender,
XcmpMessageHandler, XcmpMessageSource,
};
Expand Down Expand Up @@ -1023,6 +1023,13 @@ impl<T: Config> FeeTracker for Pallet<T> {
}
}

impl<T: Config> ListChannelInfos for Pallet<T> {
fn outgoing_channels() -> Vec<ParaId> {
let Some(state) = Self::relevant_messaging_state() else { return Vec::new() };
state.egress_channels.into_iter().map(|(id, _)| id).collect()
}
}

impl<T: Config> GetChannelInfo for Pallet<T> {
fn get_channel_status(id: ParaId) -> ChannelStatus {
// Note, that we are using `relevant_messaging_state` which may be from the previous
Expand Down
105 changes: 80 additions & 25 deletions cumulus/pallets/xcmp-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub mod weights;
pub use weights::WeightInfo;

use bounded_collections::BoundedBTreeSet;
use codec::{Decode, DecodeLimit, Encode};
use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
use cumulus_primitives_core::{
relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
Expand Down Expand Up @@ -105,7 +105,6 @@ pub mod pallet {

#[pallet::pallet]
#[pallet::storage_version(migration::STORAGE_VERSION)]
#[pallet::without_storage_info]
pub struct Pallet<T>(_);

#[pallet::config]
Expand All @@ -132,6 +131,25 @@ pub mod pallet {
#[pallet::constant]
type MaxInboundSuspended: Get<u32>;

/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
///
/// If this is reached, then no further messages can be sent to channels that do not yet
/// have a message queued. This should be set to the expected maximum of outbound channels
/// which is determined by [`Self::ChannelInfo`]. It is important to set this correctly,
/// since otherwise the congestion control protocol will not work as intended and messages
/// may be dropped. This value increases the PoV and should therefore not be picked too
/// high.
#[pallet::constant]
type MaxActiveOutboundChannels: Get<u32>;

/// The maximal page size for HRMP message pages.
///
/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
/// benchmarking. The limit for the size of a message is slightly below this, since some
/// overhead is incurred for encoding the format.
#[pallet::constant]
type MaxPageSize: Get<u32>;

/// The origin that is allowed to resume or suspend the XCMP queue.
type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;

Expand Down Expand Up @@ -276,6 +294,10 @@ pub mod pallet {
AlreadySuspended,
/// The execution is already resumed.
AlreadyResumed,
/// There are too many active outbound channels.
TooManyOutboundChannels,
/// The message is too big.
TooBig,
}

/// The suspended inbound XCMP channels. All others are not suspended.
Expand All @@ -297,19 +319,28 @@ pub mod pallet {
/// case of the need to send a high-priority signal message this block.
/// The bool is true if there is a signal message waiting to be sent.
#[pallet::storage]
pub(super) type OutboundXcmpStatus<T: Config> =
StorageValue<_, Vec<OutboundChannelDetails>, ValueQuery>;
pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
_,
BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
ValueQuery,
>;

// The new way of doing it:
/// The messages outbound in a given XCMP channel.
#[pallet::storage]
pub(super) type OutboundXcmpMessages<T: Config> =
StorageDoubleMap<_, Blake2_128Concat, ParaId, Twox64Concat, u16, Vec<u8>, ValueQuery>;
pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
_,
Blake2_128Concat,
ParaId,
Twox64Concat,
u16,
BoundedVec<u8, T::MaxPageSize>,
ValueQuery,
>;

/// Any signal messages waiting to be sent.
#[pallet::storage]
pub(super) type SignalMessages<T: Config> =
StorageMap<_, Blake2_128Concat, ParaId, Vec<u8>, ValueQuery>;
StorageMap<_, Blake2_128Concat, ParaId, BoundedVec<u8, T::MaxPageSize>, ValueQuery>;

/// The configuration which controls the dynamics of the outbound queue.
#[pallet::storage]
Expand All @@ -331,15 +362,14 @@ pub mod pallet {
StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
}

#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
pub enum OutboundState {
Ok,
Suspended,
}

/// Struct containing detailed information about the outbound channel.
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo)]
#[cfg_attr(feature = "std", derive(Debug))]
#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
pub struct OutboundChannelDetails {
/// The `ParaId` of the parachain that this channel is connected with.
recipient: ParaId,
Expand Down Expand Up @@ -375,7 +405,7 @@ impl OutboundChannelDetails {
}
}

#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
pub struct QueueConfigData {
/// The number of pages which must be in the queue for the other side to be told to suspend
/// their sending.
Expand Down Expand Up @@ -478,7 +508,9 @@ impl<T: Config> Pallet<T> {
{
details
} else {
all_channels.push(OutboundChannelDetails::new(recipient));
all_channels
.try_push(OutboundChannelDetails::new(recipient))
.map_err(|_| MessageSendError::TooManyChannels)?;
all_channels
.last_mut()
.expect("can't be empty; a new element was just pushed; qed")
Expand All @@ -503,7 +535,7 @@ impl<T: Config> Pallet<T> {
if page.len() + encoded_fragment.len() > max_message_size {
return None
}
page.extend_from_slice(&encoded_fragment[..]);
page.try_extend(encoded_fragment.iter().cloned()).ok()?;
Some(page.len())
},
)
Expand All @@ -521,7 +553,9 @@ impl<T: Config> Pallet<T> {
new_page.extend_from_slice(&encoded_fragment[..]);
let last_page_size = new_page.len();
let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
<OutboundXcmpMessages<T>>::insert(recipient, page_index, new_page);
let bounded_page =
BoundedVec::try_from(new_page).map_err(|_| MessageSendError::TooBig)?;
<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
<OutboundXcmpStatus<T>>::put(all_channels);
(number_of_pages, last_page_size)
};
Expand All @@ -543,17 +577,21 @@ impl<T: Config> Pallet<T> {

/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
/// block.
fn send_signal(dest: ParaId, signal: ChannelSignal) {
fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
let mut s = <OutboundXcmpStatus<T>>::get();
if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
details.signals_exist = true;
} else {
s.push(OutboundChannelDetails::new(dest).with_signals());
s.try_push(OutboundChannelDetails::new(dest).with_signals())
.map_err(|_| Error::<T>::TooManyOutboundChannels)?;
}
<SignalMessages<T>>::mutate(dest, |page| {
*page = (XcmpMessageFormat::Signals, signal).encode();
});

let page = BoundedVec::try_from((XcmpMessageFormat::Signals, signal).encode())
.map_err(|_| Error::<T>::TooBig)?;
ggwpez marked this conversation as resolved.
Show resolved Hide resolved

<SignalMessages<T>>::insert(dest, page);
<OutboundXcmpStatus<T>>::put(s);
Ok(())
}

fn suspend_channel(target: ParaId) {
Expand All @@ -563,7 +601,13 @@ impl<T: Config> Pallet<T> {
defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
details.state = OutboundState::Suspended;
} else {
s.push(OutboundChannelDetails::new(target).with_suspended_state());
if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
// Nothing that we can do here. The outbound channel does not exist either, so
// there should be no message going out as well. The next time that the channel
// can be created it will again get the suspension from the remote side. It can
// therefore result in a few lost messages, but should eventually self-repair.
defensive!("Cannot pause channel; too many outbound channels");
}
0xmovses marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down Expand Up @@ -664,13 +708,17 @@ impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
let suspended = suspended_channels.contains(&para);

if suspended && fp.pages <= resume_threshold {
Self::send_signal(para, ChannelSignal::Resume);
// If the resuming fails then it is not critical. We will retry in the future.
let _ = Self::send_signal(para, ChannelSignal::Resume);

suspended_channels.remove(&para);
<InboundXcmpSuspended<T>>::put(suspended_channels);
} else if !suspended && fp.pages >= suspend_threshold {
log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
Self::send_signal(para, ChannelSignal::Suspend);
if let Err(_) = Self::send_signal(para, ChannelSignal::Suspend) {
// It will retry if `drop_threshold` is not reached, but it could be too late.
defensive!("Could not send suspension signal; future messages may be dropped.");
}

if let Err(err) = suspended_channels.try_insert(para) {
log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
Expand Down Expand Up @@ -842,7 +890,7 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
// since it's so unlikely then for now we just drop it.
defensive!("WARNING: oversize message in queue - dropping");
} else {
result.push((para_id, page));
result.push((para_id, page.into_inner()));
}

let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
Expand Down Expand Up @@ -890,7 +938,14 @@ impl<T: Config> XcmpMessageSource for Pallet<T> {
let pruned = old_statuses_len - statuses.len();
// removing an item from status implies a message being sent, so the result messages must
// be no less than the pruned channels.
statuses.rotate_left(result.len().saturating_sub(pruned));

// TODO <https://github.com/paritytech/parity-common/pull/800>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be done now

Copy link
Member Author

@ggwpez ggwpez Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
let mut statuses_inner = statuses.into_inner();
statuses_inner.rotate_left(result.len().saturating_sub(pruned));
statuses = BoundedVec::try_from(statuses_inner)
.expect("Rotating does not change the length; it still fits; qed");
}

<OutboundXcmpStatus<T>>::put(statuses);

Expand Down
26 changes: 14 additions & 12 deletions cumulus/pallets/xcmp-queue/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

//! A module that is responsible for migration of storage.

pub mod v5;

use crate::{Config, OverweightIndex, Pallet, QueueConfig, QueueConfigData, DEFAULT_POV_SIZE};
use cumulus_primitives_core::XcmpMessageFormat;
use frame_support::{
Expand All @@ -25,7 +27,7 @@ use frame_support::{
};

/// The current storage version.
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(4);
pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(5);

pub const LOG: &str = "runtime::xcmp-queue-migration";

Expand Down Expand Up @@ -264,9 +266,9 @@ pub mod v4 {

/// Migrates `QueueConfigData` to v4, removing deprecated fields and bumping page
/// thresholds to at least the default values.
pub struct UncheckedMigrationToV4<T: Config>(PhantomData<T>);
pub struct UncheckedMigrateV3ToV4<T: Config>(PhantomData<T>);

impl<T: Config> OnRuntimeUpgrade for UncheckedMigrationToV4<T> {
impl<T: Config> OnRuntimeUpgrade for UncheckedMigrateV3ToV4<T> {
fn on_runtime_upgrade() -> Weight {
let translate = |pre: v2::QueueConfigData| -> QueueConfigData {
let pre_default = v2::QueueConfigData::default();
Expand Down Expand Up @@ -299,13 +301,13 @@ pub mod v4 {
}
}

/// [`UncheckedMigrationToV4`] wrapped in a
/// [`UncheckedMigrateV3ToV4`] wrapped in a
/// [`VersionedMigration`](frame_support::migrations::VersionedMigration), ensuring the
/// migration is only performed when on-chain version is 3.
pub type MigrationToV4<T> = frame_support::migrations::VersionedMigration<
pub type MigrateV3ToV4<T> = frame_support::migrations::VersionedMigration<
3,
4,
UncheckedMigrationToV4<T>,
UncheckedMigrateV3ToV4<T>,
Pallet<T>,
<T as frame_system::Config>::DbWeight,
>;
Expand Down Expand Up @@ -372,10 +374,10 @@ mod tests {
&v2.encode(),
);

let bytes = v4::MigrationToV4::<Test>::pre_upgrade();
let bytes = v4::MigrateV3ToV4::<Test>::pre_upgrade();
assert!(bytes.is_ok());
v4::MigrationToV4::<Test>::on_runtime_upgrade();
assert!(v4::MigrationToV4::<Test>::post_upgrade(bytes.unwrap()).is_ok());
v4::MigrateV3ToV4::<Test>::on_runtime_upgrade();
assert!(v4::MigrateV3ToV4::<Test>::post_upgrade(bytes.unwrap()).is_ok());

let v4 = QueueConfig::<Test>::get();

Expand All @@ -401,10 +403,10 @@ mod tests {
&v2.encode(),
);

let bytes = v4::MigrationToV4::<Test>::pre_upgrade();
let bytes = v4::MigrateV3ToV4::<Test>::pre_upgrade();
assert!(bytes.is_ok());
v4::MigrationToV4::<Test>::on_runtime_upgrade();
assert!(v4::MigrationToV4::<Test>::post_upgrade(bytes.unwrap()).is_ok());
v4::MigrateV3ToV4::<Test>::on_runtime_upgrade();
assert!(v4::MigrateV3ToV4::<Test>::post_upgrade(bytes.unwrap()).is_ok());

let v4 = QueueConfig::<Test>::get();

Expand Down
Loading