diff --git a/Cargo.lock b/Cargo.lock index 2f44bc62a88c..396df57e0b63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4442,6 +4442,8 @@ dependencies = [ "pallet-vesting", "parity-scale-codec", "polkadot-primitives", + "rand 0.7.3", + "rand_chacha 0.2.2", "rustc-hex", "serde", "serde_derive", diff --git a/roadmap/implementors-guide/src/runtime/inclusioninherent.md b/roadmap/implementors-guide/src/runtime/inclusioninherent.md index 0fceb78f7f95..bd5ecc375a93 100644 --- a/roadmap/implementors-guide/src/runtime/inclusioninherent.md +++ b/roadmap/implementors-guide/src/runtime/inclusioninherent.md @@ -17,9 +17,8 @@ Included: Option<()>, ## Entry Points * `inclusion`: This entry-point accepts two parameters: [`Bitfields`](/type-definitions.html#signed-availability-bitfield) and [`BackedCandidates`](/type-definitions.html#backed-candidate). - 1. The `Bitfields` are first forwarded to the `process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. - 1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it, and add timed-out cores to the free cores. + 1. The `Bitfields` are first forwarded to the `process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`. + 1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it, and add timed-out cores to the free cores, annotated with `FreedReason::TimedOut`. 1. Invoke `Scheduler::schedule(freed)` - 1. Pass the `BackedCandidates` along with the output of `Scheduler::scheduled` to the `Inclusion::process_candidates` routine, getting a list of all newly-occupied cores. 1. Call `Scheduler::occupied` for all scheduled cores where a backed candidate was submitted. 1. If all of the above succeeds, set `Included` to `Some(())`. diff --git a/roadmap/implementors-guide/src/runtime/paras.md b/roadmap/implementors-guide/src/runtime/paras.md index b523c1fe20c3..d22021d68116 100644 --- a/roadmap/implementors-guide/src/runtime/paras.md +++ b/roadmap/implementors-guide/src/runtime/paras.md @@ -56,6 +56,8 @@ Storage layout: ```rust /// All parachains. Ordered ascending by ParaId. Parathreads are not included. Parachains: Vec, +/// All parathreads. +Parathreads: map ParaId => Option<()>, /// The head-data of every registered para. Heads: map ParaId => Option; /// The validation code of every live para. @@ -94,6 +96,7 @@ OutgoingParas: Vec; 1. Clean up outgoing paras. This means removing the entries under `Heads`, `ValidationCode`, `FutureCodeUpgrades`, and `FutureCode`. An according entry should be added to `PastCode`, `PastCodeMeta`, and `PastCodePruning` using the outgoing `ParaId` and removed `ValidationCode` value. This is because any outdated validation code must remain available on-chain for a determined amount of blocks, and validation code outdated by de-registering the para is still subject to that invariant. 1. Apply all incoming paras by initializing the `Heads` and `ValidationCode` using the genesis parameters. 1. Amend the `Parachains` list to reflect changes in registered parachains. +1. Amend the `Parathreads` set to reflect changes in registered parathreads. ## Initialization @@ -106,6 +109,7 @@ OutgoingParas: Vec; * `schedule_code_upgrade(ParaId, ValidationCode, expected_at: BlockNumber)`: Schedule a future code upgrade of the given parachain, to be applied after inclusion of a block of the same parachain executed in the context of a relay-chain block with number >= `expected_at`. * `note_new_head(ParaId, HeadData, BlockNumber)`: note that a para has progressed to a new head, where the new head was executed in the context of a relay-chain block with given number. This will apply pending code upgrades based on the block number provided. * `validation_code_at(ParaId, at: BlockNumber, assume_intermediate: Option)`: Fetches the validation code to be used when validating a block in the context of the given relay-chain height. A second block number parameter may be used to tell the lookup to proceed as if an intermediate parablock has been included at the given relay-chain height. This may return past, current, or (with certain choices of `assume_intermediate`) future code. `assume_intermediate`, if provided, must be before `at`. If the validation code has been pruned, this will return `None`. +* `is_parathread(ParaId) -> bool`: Returns true if the para ID references any live parathread. ## Finalization diff --git a/roadmap/implementors-guide/src/runtime/scheduler.md b/roadmap/implementors-guide/src/runtime/scheduler.md index e7d80d8d7d0a..b365308743b1 100644 --- a/roadmap/implementors-guide/src/runtime/scheduler.md +++ b/roadmap/implementors-guide/src/runtime/scheduler.md @@ -90,14 +90,15 @@ struct ParathreadEntry { // A queued parathread entry, pre-assigned to a core. struct QueuedParathread { - claim: ParathreadEntry, - core: CoreIndex, + claim: ParathreadEntry, + /// offset within the set of para-threads ranged `0..config.parathread_cores`. + core_offset: u32, } struct ParathreadQueue { - queue: Vec, - // this value is between 0 and config.parathread_cores - next_core: CoreIndex, + queue: Vec, + /// offset within the set of para-threads ranged `0..config.parathread_cores`. + next_core_offset: u32, } enum CoreOccupied { @@ -105,12 +106,22 @@ enum CoreOccupied { Parachain, } +enum AssignmentKind { + Parachain, + Parathread(CollatorId, u32), +} + struct CoreAssignment { core: CoreIndex, para_id: ParaId, - collator: Option, + kind: AssignmentKind, group_idx: GroupIndex, } +// reasons a core might be freed. +enum FreedReason { + Concluded, + TimedOut, +} ``` Storage layout: @@ -150,6 +161,7 @@ Actions: - First, we obtain "shuffled validators" `SV` by shuffling the validators using the `SessionChangeNotification`'s random seed. - The groups are selected by partitioning `SV`. The first V % N groups will have (V / N) + 1 members, while the remaining groups will have (V / N) members each. 1. Prune the parathread queue to remove all retries beyond `configuration.parathread_retries`. + - Also prune all parathread claims corresponding to de-registered parathreads. - all pruned claims should have their entry removed from the parathread index. - assign all non-pruned claims to new cores if the number of parathread cores has changed between the `new_config` and `old_config` of the `SessionChangeNotification`. - Assign claims in equal balance across all cores if rebalancing, and set the `next_core` of the `ParathreadQueue` by incrementing the relative index of the last assigned core and taking it modulo the number of parathread cores. @@ -172,15 +184,16 @@ Actions: - The core used for the parathread claim is the `next_core` field of the `ParathreadQueue` and adding `Paras::parachains().len()` to it. - `next_core` is then updated by adding 1 and taking it modulo `config.parathread_cores`. - The claim is then added to the claim index. -- `schedule(Vec)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned. +- `schedule(Vec<(CoreIndex, FreedReason)>)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned and why they are being returned. - All freed parachain cores should be assigned to their respective parachain - - All freed parathread cores should have the claim removed from the claim index. + - All freed parathread cores whose reason for freeing was `FreedReason::Concluded` should have the claim removed from the claim index. + - All freed parathread cores whose reason for freeing was `FreedReason::TimedOut` should have the claim added to the parathread queue again without retries incremented - All freed parathread cores should take the next parathread entry from the queue. - The i'th validator group will be assigned to the `(i+k)%n`'th core at any point in time, where `k` is the number of rotations that have occurred in the session, and `n` is the total number of cores. This makes upcoming rotations within the same session predictable. - `scheduled() -> Vec`: Get currently scheduled core assignments. - `occupied(Vec)`. Note that the given cores have become occupied. - - Fails if any given cores were not scheduled. - - Fails if the given cores are not sorted ascending by core index + - Behavior undefined if any given cores were not scheduled. + - Behavior undefined if the given cores are not sorted ascending by core index - This clears them from `Scheduled` and marks each corresponding `core` in the `AvailabilityCores` as occupied. - Since both the availability cores and the newly-occupied cores lists are sorted ascending, this method can be implemented efficiently. - `core_para(CoreIndex) -> ParaId`: return the currently-scheduled or occupied ParaId for the given core. diff --git a/runtime/parachains/Cargo.toml b/runtime/parachains/Cargo.toml index adaf400a2e2a..5288e267b67e 100644 --- a/runtime/parachains/Cargo.toml +++ b/runtime/parachains/Cargo.toml @@ -35,6 +35,9 @@ frame-benchmarking = { git = "https://github.com/paritytech/substrate", branch = primitives = { package = "polkadot-primitives", path = "../../primitives", default-features = false } libsecp256k1 = { version = "0.3.2", default-features = false, optional = true } +rand = { version = "0.7", default-features = false } +rand_chacha = { version = "0.2.2", default-features = false } + [dev-dependencies] hex-literal = "0.2.1" keyring = { package = "sp-keyring", git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/runtime/parachains/src/configuration.rs b/runtime/parachains/src/configuration.rs index b747dbf85cd2..8849abe25132 100644 --- a/runtime/parachains/src/configuration.rs +++ b/runtime/parachains/src/configuration.rs @@ -33,7 +33,7 @@ use system::ensure_root; /// All configuration of the runtime with respect to parachains and parathreads. #[derive(Clone, Encode, Decode, PartialEq, Default)] #[cfg_attr(test, derive(Debug))] -pub struct HostConfiguration { +pub struct HostConfiguration { /// The minimum frequency at which parachains can update their validation code. pub validation_upgrade_frequency: BlockNumber, /// The delay, in blocks, before a validation upgrade is applied. @@ -49,7 +49,7 @@ pub struct HostConfiguration { pub parathread_cores: u32, /// The number of retries that a parathread author has to submit their block. pub parathread_retries: u32, - /// How often parachain groups should be rotated across parachains. + /// How often parachain groups should be rotated across parachains. Must be non-zero. pub parachain_rotation_frequency: BlockNumber, /// The availability period, in blocks, for parachains. This is the amount of blocks /// after inclusion that validators have to make the block available and signal its availability to diff --git a/runtime/parachains/src/initializer.rs b/runtime/parachains/src/initializer.rs index c942de4756c1..2b86d19654cd 100644 --- a/runtime/parachains/src/initializer.rs +++ b/runtime/parachains/src/initializer.rs @@ -25,11 +25,29 @@ use primitives::{ parachain::{ValidatorId}, }; use frame_support::{ - decl_storage, decl_module, decl_error, + decl_storage, decl_module, decl_error, traits::Randomness, }; -use crate::{configuration, paras}; +use crate::{configuration::{self, HostConfiguration}, paras, scheduler}; + +/// Information about a session change that has just occurred. +#[derive(Default, Clone)] +pub struct SessionChangeNotification { + /// The new validators in the session. + pub validators: Vec, + /// The qeueud validators for the following session. + pub queued: Vec, + /// The configuration before handling the session change + pub prev_config: HostConfiguration, + /// The configuration after handling the session change. + pub new_config: HostConfiguration, + /// A secure random seed for the session, gathered from BABE. + pub random_seed: [u8; 32], +} -pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } +pub trait Trait: system::Trait + configuration::Trait + paras::Trait + scheduler::Trait { + /// A randomness beacon. + type Randomness: Randomness; +} decl_storage! { trait Store for Module as Initializer { @@ -62,7 +80,8 @@ decl_module! { // - Inclusion // - Validity let total_weight = configuration::Module::::initializer_initialize(now) + - paras::Module::::initializer_initialize(now); + paras::Module::::initializer_initialize(now) + + scheduler::Module::::initializer_initialize(now); HasInitialized::set(Some(())); @@ -70,6 +89,9 @@ decl_module! { } fn on_finalize() { + // reverse initialization order. + + scheduler::Module::::initializer_finalize(); paras::Module::::initializer_finalize(); configuration::Module::::initializer_finalize(); HasInitialized::take(); @@ -90,8 +112,32 @@ impl Module { let validators: Vec<_> = validators.map(|(_, v)| v).collect(); let queued: Vec<_> = queued.map(|(_, v)| v).collect(); + let prev_config = >::config(); + + let random_seed = { + let mut buf = [0u8; 32]; + let random_hash = T::Randomness::random(&b"paras"[..]); + let len = sp_std::cmp::min(32, random_hash.as_ref().len()); + buf[..len].copy_from_slice(&random_hash.as_ref()[..len]); + buf + }; + + // We can't pass the new config into the thing that determines the new config, + // so we don't pass the `SessionChangeNotification` into this module. configuration::Module::::initializer_on_new_session(&validators, &queued); - paras::Module::::initializer_on_new_session(&validators, &queued); + + let new_config = >::config(); + + let notification = SessionChangeNotification { + validators, + queued, + prev_config, + new_config, + random_seed, + }; + + paras::Module::::initializer_on_new_session(¬ification); + scheduler::Module::::initializer_on_new_session(¬ification); } } diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index 6f08545008a9..79a6b5bcc442 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -30,7 +30,7 @@ use primitives::{ }; use frame_support::{ impl_outer_origin, impl_outer_dispatch, parameter_types, - weights::Weight, + weights::Weight, traits::Randomness as RandomnessT, }; /// A test runtime struct. @@ -47,6 +47,14 @@ impl_outer_dispatch! { } } +pub struct TestRandomness; + +impl RandomnessT for TestRandomness { + fn random(_subject: &[u8]) -> H256 { + Default::default() + } +} + parameter_types! { pub const BlockHashCount: u32 = 250; pub const MaximumBlockWeight: Weight = 4 * 1024 * 1024; @@ -80,12 +88,16 @@ impl system::Trait for Test { type OnKilledAccount = (); } -impl crate::initializer::Trait for Test { } +impl crate::initializer::Trait for Test { + type Randomness = TestRandomness; +} impl crate::configuration::Trait for Test { } impl crate::paras::Trait for Test { } +impl crate::scheduler::Trait for Test { } + pub type System = system::Module; /// Mocked initializer. @@ -97,6 +109,9 @@ pub type Configuration = crate::configuration::Module; /// Mocked paras. pub type Paras = crate::paras::Module; +/// Mocked scheduler. +pub type Scheduler = crate::scheduler::Module; + /// Create a new set of test externalities. pub fn new_test_ext(state: GenesisConfig) -> TestExternalities { let mut t = state.system.build_storage::().unwrap(); diff --git a/runtime/parachains/src/paras.rs b/runtime/parachains/src/paras.rs index 51f76d6df774..9d8c74ca0932 100644 --- a/runtime/parachains/src/paras.rs +++ b/runtime/parachains/src/paras.rs @@ -27,7 +27,7 @@ use sp_std::prelude::*; use sp_std::marker::PhantomData; use sp_runtime::traits::One; use primitives::{ - parachain::{ValidatorId, Id as ParaId, ValidationCode, HeadData}, + parachain::{Id as ParaId, ValidationCode, HeadData}, }; use frame_support::{ decl_storage, decl_module, decl_error, @@ -35,7 +35,7 @@ use frame_support::{ weights::Weight, }; use codec::{Encode, Decode}; -use crate::configuration; +use crate::{configuration, initializer::SessionChangeNotification}; #[cfg(feature = "std")] use serde::{Serialize, Deserialize}; @@ -159,11 +159,11 @@ impl ParaPastCodeMeta { #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub struct ParaGenesisArgs { /// The initial head data to use. - genesis_head: HeadData, + pub genesis_head: HeadData, /// The initial validation code to use. - validation_code: ValidationCode, + pub validation_code: ValidationCode, /// True if parachain, false if parathread. - parachain: bool, + pub parachain: bool, } @@ -171,6 +171,8 @@ decl_storage! { trait Store for Module as Paras { /// All parachains. Ordered ascending by ParaId. Parathreads are not included. Parachains get(fn parachains): Vec; + /// All parathreads. + Parathreads: map hasher(twox_64_concat) ParaId => Option<()>; /// The head-data of every registered para. Heads get(fn parachain_head): map hasher(twox_64_concat) ParaId => Option; /// The validation code of every live para. @@ -253,7 +255,7 @@ impl Module { pub(crate) fn initializer_finalize() { } /// Called by the initializer to note that a new session has started. - pub(crate) fn initializer_on_new_session(_validators: &[ValidatorId], _queued: &[ValidatorId]) { + pub(crate) fn initializer_on_new_session(_notification: &SessionChangeNotification) { let now = >::block_number(); let mut parachains = Self::clean_up_outgoing(now); Self::apply_incoming(&mut parachains); @@ -268,6 +270,8 @@ impl Module { for outgoing_para in outgoing { if let Ok(i) = parachains.binary_search(&outgoing_para) { parachains.remove(i); + } else { + ::Parathreads::remove(&outgoing_para); } ::Heads::remove(&outgoing_para); @@ -296,6 +300,8 @@ impl Module { if let Err(i) = parachains.binary_search(&upcoming_para) { parachains.insert(i, upcoming_para); } + } else { + ::Parathreads::insert(&upcoming_para, ()); } ::Heads::insert(&upcoming_para, genesis_data.genesis_head); @@ -515,6 +521,11 @@ impl Module { } } } + + /// Whether a para ID corresponds to any live parathread. + pub(crate) fn is_parathread(id: ParaId) -> bool { + Parathreads::get(&id).is_some() + } } #[cfg(test)] @@ -536,7 +547,7 @@ mod tests { System::set_block_number(b + 1); if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) { - Paras::initializer_on_new_session(&[], &[]); + Paras::initializer_on_new_session(&Default::default()); } Paras::initializer_initialize(b + 1); } @@ -1040,18 +1051,24 @@ mod tests { ); assert_eq!(::UpcomingParas::get(), vec![c, b, a]); + assert!(::Parathreads::get(&a).is_none()); + // run to block without session change. run_to_block(2, None); assert_eq!(Paras::parachains(), Vec::new()); assert_eq!(::UpcomingParas::get(), vec![c, b, a]); + assert!(::Parathreads::get(&a).is_none()); + run_to_block(3, Some(vec![3])); assert_eq!(Paras::parachains(), vec![c, b]); assert_eq!(::UpcomingParas::get(), Vec::new()); + assert!(::Parathreads::get(&a).is_some()); + assert_eq!(Paras::current_code(&a), Some(vec![2].into())); assert_eq!(Paras::current_code(&b), Some(vec![1].into())); assert_eq!(Paras::current_code(&c), Some(vec![3].into())); diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 1f45de2df705..4f0554407774 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -13,3 +13,1512 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . + +//! The scheduler module for parachains and parathreads. +//! +//! This module is responsible for two main tasks: +//! - Paritioning validators into groups and assigning groups to parachains and parathreads +//! - Scheduling parachains and parathreads +//! +//! It aims to achieve these tasks with these goals in mind: +//! - It should be possible to know at least a block ahead-of-time, ideally more, +//! which validators are going to be assigned to which parachains. +//! - Parachains that have a candidate pending availability in this fork of the chain +//! should not be assigned. +//! - Validator assignments should not be gameable. Malicious cartels should not be able to +//! manipulate the scheduler to assign themselves as desired. +//! - High or close to optimal throughput of parachains and parathreads. Work among validator groups should be balanced. +//! +//! The Scheduler manages resource allocation using the concept of "Availability Cores". +//! There will be one availability core for each parachain, and a fixed number of cores +//! used for multiplexing parathreads. Validators will be partitioned into groups, with the same +//! number of groups as availability cores. Validator groups will be assigned to different availability cores +//! over time. + +use sp_std::prelude::*; +use sp_std::convert::TryInto; +use primitives::{ + parachain::{Id as ParaId, CollatorId, ValidatorIndex}, +}; +use frame_support::{ + decl_storage, decl_module, decl_error, + weights::Weight, +}; +use codec::{Encode, Decode}; +use sp_runtime::traits::{Saturating, Zero}; + +use rand::{SeedableRng, seq::SliceRandom}; +use rand_chacha::ChaCha20Rng; + +use crate::{configuration, paras, initializer::SessionChangeNotification}; + +/// The unique (during session) index of a core. +#[derive(Encode, Decode, Default, PartialOrd, Ord, Eq, PartialEq, Clone, Copy)] +#[cfg_attr(test, derive(Debug))] +pub struct CoreIndex(u32); + +/// The unique (during session) index of a validator group. +#[derive(Encode, Decode, Default, Clone, Copy)] +#[cfg_attr(test, derive(PartialEq, Debug))] +pub struct GroupIndex(u32); + +/// A claim on authoring the next block for a given parathread. +#[derive(Clone, Encode, Decode, Default)] +#[cfg_attr(test, derive(PartialEq, Debug))] +pub struct ParathreadClaim(pub ParaId, pub CollatorId); + +/// An entry tracking a claim to ensure it does not pass the maximum number of retries. +#[derive(Clone, Encode, Decode, Default)] +#[cfg_attr(test, derive(PartialEq, Debug))] +pub struct ParathreadEntry { + claim: ParathreadClaim, + retries: u32, +} + +/// A queued parathread entry, pre-assigned to a core. +#[derive(Encode, Decode, Default)] +#[cfg_attr(test, derive(PartialEq, Debug))] +pub struct QueuedParathread { + claim: ParathreadEntry, + core_offset: u32, +} + +/// The queue of all parathread claims. +#[derive(Encode, Decode, Default)] +#[cfg_attr(test, derive(PartialEq, Debug))] +pub struct ParathreadClaimQueue { + queue: Vec, + // this value is between 0 and config.parathread_cores + next_core_offset: u32, +} + +impl ParathreadClaimQueue { + /// Queue a parathread entry to be processed. + /// + /// Provide the entry and the number of parathread cores, which must be greater than 0. + fn enqueue_entry(&mut self, entry: ParathreadEntry, n_parathread_cores: u32) { + let core_offset = self.next_core_offset; + self.next_core_offset = (self.next_core_offset + 1) % n_parathread_cores; + + self.queue.push(QueuedParathread { + claim: entry, + core_offset, + }) + } + + // Take next queued entry with given core offset, if any. + fn take_next_on_core(&mut self, core_offset: u32) -> Option { + let pos = self.queue.iter().position(|queued| queued.core_offset == core_offset); + pos.map(|i| self.queue.remove(i).claim) + } +} + +/// What is occupying a specific availability core. +#[derive(Clone, Encode, Decode)] +#[cfg_attr(test, derive(PartialEq, Debug))] +pub(crate) enum CoreOccupied { + Parathread(ParathreadEntry), + Parachain, +} + +/// The assignment type. +#[derive(Clone, Encode, Decode)] +#[cfg_attr(test, derive(PartialEq, Debug))] +pub enum AssignmentKind { + Parachain, + Parathread(CollatorId, u32), +} + +/// How a free core is scheduled to be assigned. +#[derive(Encode, Decode)] +#[cfg_attr(test, derive(PartialEq, Debug))] +pub struct CoreAssignment { + /// The core that is assigned. + pub core: CoreIndex, + /// The unique ID of the para that is assigned to the core. + pub para_id: ParaId, + /// The kind of the assignment. + pub kind: AssignmentKind, + /// The index of the validator group assigned to the core. + pub group_idx: GroupIndex, +} + +impl CoreAssignment { + /// Get the ID of a collator who is required to collate this block. + #[allow(unused)] + pub(crate) fn required_collator(&self) -> Option<&CollatorId> { + match self.kind { + AssignmentKind::Parachain => None, + AssignmentKind::Parathread(ref id, _) => Some(id), + } + } + + #[allow(unused)] + fn to_core_occupied(&self) -> CoreOccupied { + match self.kind { + AssignmentKind::Parachain => CoreOccupied::Parachain, + AssignmentKind::Parathread(ref collator, retries) => CoreOccupied::Parathread( + ParathreadEntry { + claim: ParathreadClaim(self.para_id, collator.clone()), + retries, + } + ), + } + } +} + +/// Reasons a core might be freed +#[allow(unused)] +pub enum FreedReason { + /// The core's work concluded and the parablock assigned to it is considered available. + Concluded, + /// The core's work timed out. + TimedOut, +} + +pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } + +decl_storage! { + trait Store for Module as ParaScheduler { + /// All the validator groups. One for each core. + /// + /// Bound: The number of cores is the sum of the numbers of parachains and parathread multiplexers. + /// Reasonably, 100-1000. The dominant factor is the number of validators: safe upper bound at 10k. + ValidatorGroups: Vec>; + + /// A queue of upcoming claims and which core they should be mapped onto. + /// + /// The number of queued claims is bounded at the `scheduling_lookahead` + /// multiplied by the number of parathread multiplexer cores. Reasonably, 10 * 50 = 500. + ParathreadQueue: ParathreadClaimQueue; + /// One entry for each availability core. Entries are `None` if the core is not currently occupied. Can be + /// temporarily `Some` if scheduled but not occupied. + /// The i'th parachain belongs to the i'th core, with the remaining cores all being + /// parathread-multiplexers. + /// + /// Bounded by the number of cores: one for each parachain and parathread multiplexer. + AvailabilityCores: Vec>; + /// An index used to ensure that only one claim on a parathread exists in the queue or is + /// currently being handled by an occupied core. + /// + /// Bounded by the number of parathread cores and scheduling lookahead. Reasonably, 10 * 50 = 500. + ParathreadClaimIndex: Vec; + /// The block number where the session start occurred. Used to track how many group rotations have occurred. + SessionStartBlock: T::BlockNumber; + /// Currently scheduled cores - free but up to be occupied. Ephemeral storage item that's wiped on finalization. + /// + /// Bounded by the number of cores: one for each parachain and parathread multiplexer. + Scheduled get(fn scheduled): Vec; // sorted ascending by CoreIndex. + } +} + +decl_error! { + pub enum Error for Module { } +} + +decl_module! { + /// The scheduler module. + pub struct Module for enum Call where origin: ::Origin { + type Error = Error; + } +} + +impl Module { + /// Called by the initializer to initialize the scheduler module. + pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight { + Self::schedule(Vec::new()); + + 0 + } + + /// Called by the initializer to finalize the scheduler module. + pub(crate) fn initializer_finalize() { + // Free all scheduled cores and return parathread claims to queue, with retries incremented. + let config = >::config(); + ParathreadQueue::mutate(|queue| { + for core_assignment in Scheduled::take() { + if let AssignmentKind::Parathread(collator, retries) = core_assignment.kind { + let entry = ParathreadEntry { + claim: ParathreadClaim(core_assignment.para_id, collator), + retries: retries + 1, + }; + + if entry.retries <= config.parathread_retries { + queue.enqueue_entry(entry, config.parathread_cores); + } + } + } + }) + + } + + /// Called by the initializer to note that a new session has started. + pub(crate) fn initializer_on_new_session(notification: &SessionChangeNotification) { + let &SessionChangeNotification { + ref validators, + ref random_seed, + ref new_config, + .. + } = notification; + let config = new_config; + + let mut thread_queue = ParathreadQueue::get(); + let n_parachains = >::parachains().len() as u32; + let n_cores = n_parachains + config.parathread_cores; + + >::set(>::block_number()); + AvailabilityCores::mutate(|cores| { + // clear all occupied cores. + for maybe_occupied in cores.iter_mut() { + if let Some(CoreOccupied::Parathread(claim)) = maybe_occupied.take() { + let queued = QueuedParathread { + claim, + core_offset: 0, // this gets set later in the re-balancing. + }; + + thread_queue.queue.push(queued); + } + } + + cores.resize(n_cores as _, None); + }); + + // shuffle validators into groups. + if n_cores == 0 || validators.is_empty() { + ValidatorGroups::set(Vec::new()); + } else { + let mut rng: ChaCha20Rng = SeedableRng::from_seed(*random_seed); + + let mut shuffled_indices: Vec<_> = (0..validators.len()) + .enumerate() + .map(|(i, _)| i as ValidatorIndex) + .collect(); + + shuffled_indices.shuffle(&mut rng); + + let group_base_size = validators.len() / n_cores as usize; + let n_larger_groups = validators.len() % n_cores as usize; + let groups: Vec> = (0..n_cores).map(|core_id| { + let n_members = if (core_id as usize) < n_larger_groups { + group_base_size + 1 + } else { + group_base_size + }; + + shuffled_indices.drain(shuffled_indices.len() - n_members ..).rev().collect() + }).collect(); + + ValidatorGroups::set(groups); + } + + // prune out all parathread claims with too many retries. + // assign all non-pruned claims to new cores, if they've changed. + ParathreadClaimIndex::mutate(|claim_index| { + // wipe all parathread metadata if no parathread cores are configured. + if config.parathread_cores == 0 { + thread_queue = ParathreadClaimQueue { + queue: Vec::new(), + next_core_offset: 0, + }; + claim_index.clear(); + return; + } + + // prune out all entries beyond retry or that no longer correspond to live parathread. + thread_queue.queue.retain(|queued| { + let will_keep = queued.claim.retries <= config.parathread_retries + && >::is_parathread(queued.claim.claim.0); + + if !will_keep { + let claim_para = queued.claim.claim.0; + + // clean up the pruned entry from the index. + if let Ok(i) = claim_index.binary_search(&claim_para) { + claim_index.remove(i); + } + } + + will_keep + }); + + // do re-balancing of claims. + { + for (i, queued) in thread_queue.queue.iter_mut().enumerate() { + queued.core_offset = (i as u32) % config.parathread_cores; + } + + thread_queue.next_core_offset = + ((thread_queue.queue.len()) as u32) % config.parathread_cores; + } + }); + ParathreadQueue::set(thread_queue); + } + + /// Add a parathread claim to the queue. If there is a competing claim in the queue or currently + /// assigned to a core, this call will fail. This call will also fail if the queue is full. + /// + /// Fails if the claim does not correspond to any live parathread. + #[allow(unused)] + pub fn add_parathread_claim(claim: ParathreadClaim) { + if !>::is_parathread(claim.0) { return } + + let config = >::config(); + let queue_max_size = config.parathread_cores * config.scheduling_lookahead; + + ParathreadQueue::mutate(|queue| { + if queue.queue.len() >= queue_max_size as usize { return } + + let para_id = claim.0; + + let competes_with_another = ParathreadClaimIndex::mutate(|index| { + match index.binary_search(¶_id) { + Ok(_) => true, + Err(i) => { + index.insert(i, para_id); + false + } + } + }); + + if competes_with_another { return } + + let entry = ParathreadEntry { claim, retries: 0 }; + queue.enqueue_entry(entry, config.parathread_cores); + }) + } + + /// Schedule all unassigned cores, where possible. Provide a list of cores that should be considered + /// newly-freed along with the reason for them being freed. The list is assumed to be sorted in + /// ascending order by core index. + pub(crate) fn schedule(just_freed_cores: Vec<(CoreIndex, FreedReason)>) { + let mut cores = AvailabilityCores::get(); + let config = >::config(); + + for (freed_index, freed_reason) in just_freed_cores { + if (freed_index.0 as usize) < cores.len() { + match cores[freed_index.0 as usize].take() { + None => continue, + Some(CoreOccupied::Parachain) => {}, + Some(CoreOccupied::Parathread(entry)) => { + match freed_reason { + FreedReason::Concluded => { + // After a parathread candidate has successfully been included, + // open it up for further claims! + ParathreadClaimIndex::mutate(|index| { + if let Ok(i) = index.binary_search(&entry.claim.0) { + index.remove(i); + } + }) + } + FreedReason::TimedOut => { + // If a parathread candidate times out, it's not the collator's fault, + // so we don't increment retries. + ParathreadQueue::mutate(|queue| { + queue.enqueue_entry(entry, config.parathread_cores); + }) + } + } + } + } + } + } + + let parachains = >::parachains(); + let mut scheduled = Scheduled::get(); + let mut parathread_queue = ParathreadQueue::get(); + let now = >::block_number(); + + if ValidatorGroups::get().is_empty() { return } + + { + let mut prev_scheduled_in_order = scheduled.iter().enumerate().peekable(); + + // Updates to the previous list of scheduled updates and the position of where to insert + // them, without accounting for prior updates. + let mut scheduled_updates: Vec<(usize, CoreAssignment)> = Vec::new(); + + // single-sweep O(n) in the number of cores. + for (core_index, _core) in cores.iter().enumerate().filter(|(_, ref c)| c.is_none()) { + let schedule_and_insert_at = { + // advance the iterator until just before the core index we are looking at now. + while prev_scheduled_in_order.peek().map_or( + false, + |(_, assign)| (assign.core.0 as usize) < core_index, + ) { + let _ = prev_scheduled_in_order.next(); + } + + // check the first entry already scheduled with core index >= than the one we + // are looking at. 3 cases: + // 1. No such entry, clearly this core is not scheduled, so we need to schedule and put at the end. + // 2. Entry exists and has same index as the core we are inspecting. do not schedule again. + // 3. Entry exists and has higher index than the core we are inspecting. schedule and note + // insertion position. + prev_scheduled_in_order.peek().map_or( + Some(scheduled.len()), + |(idx_in_scheduled, assign)| if (assign.core.0 as usize) == core_index { + None + } else { + Some(*idx_in_scheduled) + }, + ) + }; + + let schedule_and_insert_at = match schedule_and_insert_at { + None => continue, + Some(at) => at, + }; + + let core = CoreIndex(core_index as u32); + + let core_assignment = if core_index < parachains.len() { + // parachain core. + Some(CoreAssignment { + kind: AssignmentKind::Parachain, + para_id: parachains[core_index], + core: core.clone(), + group_idx: Self::group_assigned_to_core(core, now) + .expect("core is not out of bounds and we are guaranteed \ + to be after the most recent session start; qed"), + }) + } else { + // parathread core offset, rel. to beginning. + let core_offset = (core_index - parachains.len()) as u32; + + parathread_queue.take_next_on_core(core_offset).map(|entry| CoreAssignment { + kind: AssignmentKind::Parathread(entry.claim.1, entry.retries), + para_id: entry.claim.0, + core: core.clone(), + group_idx: Self::group_assigned_to_core(core, now) + .expect("core is not out of bounds and we are guaranteed \ + to be after the most recent session start; qed"), + }) + }; + + if let Some(assignment) = core_assignment { + scheduled_updates.push((schedule_and_insert_at, assignment)) + } + } + + // at this point, because `Scheduled` is guaranteed to be sorted and we navigated unassigned + // core indices in ascending order, we can enact the updates prepared by the previous actions. + // + // while inserting, we have to account for the amount of insertions already done. + // + // This is O(n) as well, capped at n operations, where n is the number of cores. + for (num_insertions_before, (insert_at, to_insert)) in scheduled_updates.into_iter().enumerate() { + let insert_at = num_insertions_before + insert_at; + scheduled.insert(insert_at, to_insert); + } + + // scheduled is guaranteed to be sorted after this point because it was sorted before, and we + // applied sorted updates at their correct positions, accounting for the offsets of previous + // insertions. + } + + Scheduled::set(scheduled); + ParathreadQueue::set(parathread_queue); + } + + /// Note that the given cores have become occupied. Behavior undefined if any of the given cores were not scheduled + /// or the slice is not sorted ascending by core index. + /// + /// Complexity: O(n) in the number of scheduled cores, which is capped at the number of total cores. + /// This is efficient in the case that most scheduled cores are occupied. + #[allow(unused)] + pub(crate) fn occupied(now_occupied: &[CoreIndex]) { + if now_occupied.is_empty() { return } + + let mut availability_cores = AvailabilityCores::get(); + Scheduled::mutate(|scheduled| { + // The constraints on the function require that now_occupied is a sorted subset of the + // `scheduled` cores, which are also sorted. + + let mut occupied_iter = now_occupied.iter().cloned().peekable(); + scheduled.retain(|assignment| { + let retain = occupied_iter.peek().map_or(true, |occupied_idx| { + occupied_idx != &assignment.core + }); + + if !retain { + // remove this entry - it's now occupied. and begin inspecting the next extry + // of the occupied iterator. + let _ = occupied_iter.next(); + + availability_cores[assignment.core.0 as usize] = Some(assignment.to_core_occupied()); + } + + retain + }) + }); + + AvailabilityCores::set(availability_cores); + } + + /// Get the para (chain or thread) ID assigned to a particular core or index, if any. Core indices + /// out of bounds will return `None`, as will indices of unassigned cores. + #[allow(unused)] + pub(crate) fn core_para(core_index: CoreIndex) -> Option { + let cores = AvailabilityCores::get(); + match cores.get(core_index.0 as usize).and_then(|c| c.as_ref()) { + None => None, + Some(CoreOccupied::Parachain) => { + let parachains = >::parachains(); + Some(parachains[core_index.0 as usize]) + } + Some(CoreOccupied::Parathread(ref entry)) => Some(entry.claim.0), + } + } + + /// Get the validators in the given group, if the group index is valid for this session. + #[allow(unused)] + pub(crate) fn group_validators(group_index: GroupIndex) -> Option> { + ValidatorGroups::get().get(group_index.0 as usize).map(|g| g.clone()) + } + + /// Get the group assigned to a specific core by index at the current block number. Result undefined if the core index is unknown + /// or the block number is less than the session start index. + pub(crate) fn group_assigned_to_core(core: CoreIndex, at: T::BlockNumber) -> Option { + let config = >::config(); + let session_start_block = >::get(); + + if at < session_start_block { return None } + + if config.parachain_rotation_frequency.is_zero() { + // interpret this as "no rotations" + return Some(GroupIndex(core.0)); + } + + let validator_groups = ValidatorGroups::get(); + + if core.0 as usize >= validator_groups.len() { return None } + + let rotations_since_session_start: T::BlockNumber = + (at - session_start_block) / config.parachain_rotation_frequency.into(); + + let rotations_since_session_start + = match >::try_into(rotations_since_session_start) + { + Ok(i) => i, + Err(_) => 0, // can only happen if rotations occur only once every u32::max(), + // so functionally no difference in behavior. + }; + + let group_idx = (core.0 as usize + rotations_since_session_start as usize) % validator_groups.len(); + Some(GroupIndex(group_idx as u32)) + } + + /// Returns an optional predicate that should be used for timing out occupied cores. + /// + /// If `None`, no timing-out should be done. The predicate accepts the index of the core, and the + /// block number since which it has been occupied, and the respective parachain and parathread + /// timeouts, i.e. only within `max(config.chain_availability_period, config.thread_availability_period)` + /// of the last rotation would this return `Some`. + /// + /// This really should not be a box, but is working around a compiler limitation described here: + /// https://users.rust-lang.org/t/cannot-unify-associated-type-in-impl-fn-with-concrete-type/44129 + /// which prevents us from testing the code if using `impl Trait`. + #[allow(unused)] + pub(crate) fn availability_timeout_predicate() -> Option bool>> { + let now = >::block_number(); + let config = >::config(); + + let session_start = >::get(); + let blocks_since_session_start = now.saturating_sub(session_start); + let blocks_since_last_rotation = blocks_since_session_start % config.parachain_rotation_frequency; + + let absolute_cutoff = sp_std::cmp::max( + config.chain_availability_period, + config.thread_availability_period, + ); + + let availability_cores = AvailabilityCores::get(); + + if blocks_since_last_rotation >= absolute_cutoff { + None + } else { + Some(Box::new(move |core_index: CoreIndex, pending_since| { + match availability_cores.get(core_index.0 as usize) { + None => true, // out-of-bounds, doesn't really matter what is returned. + Some(None) => true, // core not occupied, still doesn't really matter. + Some(Some(CoreOccupied::Parachain)) => { + if blocks_since_last_rotation >= config.chain_availability_period { + false // no pruning except recently after rotation. + } else { + now.saturating_sub(pending_since) >= config.chain_availability_period + } + } + Some(Some(CoreOccupied::Parathread(_))) => { + if blocks_since_last_rotation >= config.thread_availability_period { + false // no pruning except recently after rotation. + } else { + now.saturating_sub(pending_since) >= config.thread_availability_period + } + } + } + })) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use primitives::{BlockNumber, parachain::ValidatorId}; + use frame_support::traits::{OnFinalize, OnInitialize}; + use keyring::Sr25519Keyring; + + use crate::mock::{new_test_ext, Configuration, Paras, System, Scheduler, GenesisConfig as MockGenesisConfig}; + use crate::initializer::SessionChangeNotification; + use crate::configuration::HostConfiguration; + use crate::paras::ParaGenesisArgs; + + fn run_to_block( + to: BlockNumber, + new_session: impl Fn(BlockNumber) -> Option>, + ) { + while System::block_number() < to { + let b = System::block_number(); + + Scheduler::initializer_finalize(); + Paras::initializer_finalize(); + + System::on_finalize(b); + + System::on_initialize(b + 1); + System::set_block_number(b + 1); + + if let Some(notification) = new_session(b + 1) { + Paras::initializer_on_new_session(¬ification); + Scheduler::initializer_on_new_session(¬ification); + } + + Paras::initializer_initialize(b + 1); + Scheduler::initializer_initialize(b + 1); + } + } + + fn default_config() -> HostConfiguration { + HostConfiguration { + parathread_cores: 3, + parachain_rotation_frequency: 10, + chain_availability_period: 3, + thread_availability_period: 5, + scheduling_lookahead: 2, + parathread_retries: 1, + ..Default::default() + } + } + + #[test] + fn add_parathread_claim_works() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let thread_id = ParaId::from(10); + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + new_test_ext(genesis_config).execute_with(|| { + Paras::schedule_para_initialize(thread_id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + + assert!(!Paras::is_parathread(thread_id)); + + run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None }); + + assert!(Paras::is_parathread(thread_id)); + + { + Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator.clone())); + let queue = ParathreadQueue::get(); + assert_eq!(queue.next_core_offset, 1); + assert_eq!(queue.queue.len(), 1); + assert_eq!(queue.queue[0], QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_id, collator.clone()), + retries: 0, + }, + core_offset: 0, + }); + } + + // due to the index, completing claims are not allowed. + { + let collator2 = CollatorId::from(Sr25519Keyring::Bob.public()); + Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator2.clone())); + let queue = ParathreadQueue::get(); + assert_eq!(queue.next_core_offset, 1); + assert_eq!(queue.queue.len(), 1); + assert_eq!(queue.queue[0], QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_id, collator.clone()), + retries: 0, + }, + core_offset: 0, + }); + } + + // claims on non-live parathreads have no effect. + { + let thread_id2 = ParaId::from(11); + Scheduler::add_parathread_claim(ParathreadClaim(thread_id2, collator.clone())); + let queue = ParathreadQueue::get(); + assert_eq!(queue.next_core_offset, 1); + assert_eq!(queue.queue.len(), 1); + assert_eq!(queue.queue[0], QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_id, collator.clone()), + retries: 0, + }, + core_offset: 0, + }); + } + }) + } + + #[test] + fn cannot_add_claim_when_no_parathread_cores() { + let config = { + let mut config = default_config(); + config.parathread_cores = 0; + config + }; + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config, + ..Default::default() + }, + ..Default::default() + }; + + let thread_id = ParaId::from(10); + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + new_test_ext(genesis_config).execute_with(|| { + Paras::schedule_para_initialize(thread_id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + + assert!(!Paras::is_parathread(thread_id)); + + run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None }); + + assert!(Paras::is_parathread(thread_id)); + + Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator.clone())); + assert_eq!(ParathreadQueue::get(), Default::default()); + }); + } + + #[test] + fn session_change_prunes_cores_beyond_retries_and_those_from_non_live_parathreads() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + let max_parathread_retries = default_config().parathread_retries; + + let thread_a = ParaId::from(1); + let thread_b = ParaId::from(2); + let thread_c = ParaId::from(3); + let thread_d = ParaId::from(4); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(Configuration::config(), default_config()); + + // threads a, b, and c will be live in next session, but not d. + { + Paras::schedule_para_initialize(thread_a, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + + Paras::schedule_para_initialize(thread_b, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + + Paras::schedule_para_initialize(thread_c, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + } + + // set up a queue as if n_cores was 4 and with some with many retries. + ParathreadQueue::put({ + let mut queue = ParathreadClaimQueue::default(); + + // Will be pruned: too many retries. + queue.enqueue_entry(ParathreadEntry { + claim: ParathreadClaim(thread_a, collator.clone()), + retries: max_parathread_retries + 1, + }, 4); + + // Will not be pruned. + queue.enqueue_entry(ParathreadEntry { + claim: ParathreadClaim(thread_b, collator.clone()), + retries: max_parathread_retries, + }, 4); + + // Will not be pruned. + queue.enqueue_entry(ParathreadEntry { + claim: ParathreadClaim(thread_c, collator.clone()), + retries: 0, + }, 4); + + // Will be pruned: not a live parathread. + queue.enqueue_entry(ParathreadEntry { + claim: ParathreadClaim(thread_d, collator.clone()), + retries: 0, + }, 4); + + queue + }); + + ParathreadClaimIndex::put(vec![thread_a, thread_b, thread_c, thread_d]); + + run_to_block( + 10, + |b| match b { + 10 => Some(SessionChangeNotification { + new_config: Configuration::config(), + ..Default::default() + }), + _ => None, + } + ); + assert_eq!(Configuration::config(), default_config()); + + let queue = ParathreadQueue::get(); + assert_eq!( + queue.queue, + vec![ + QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_b, collator.clone()), + retries: max_parathread_retries, + }, + core_offset: 0, + }, + QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_c, collator.clone()), + retries: 0, + }, + core_offset: 1, + }, + ] + ); + assert_eq!(queue.next_core_offset, 2); + + assert_eq!(ParathreadClaimIndex::get(), vec![thread_b, thread_c]); + }) + } + + #[test] + fn session_change_shuffles_validators() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + assert_eq!(default_config().parathread_cores, 3); + new_test_ext(genesis_config).execute_with(|| { + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + // ensure that we have 5 groups by registering 2 parachains. + Paras::schedule_para_initialize(chain_a, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: true, + }); + Paras::schedule_para_initialize(chain_b, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: true, + }); + + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Bob.public()), + ValidatorId::from(Sr25519Keyring::Charlie.public()), + ValidatorId::from(Sr25519Keyring::Dave.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ValidatorId::from(Sr25519Keyring::Ferdie.public()), + ValidatorId::from(Sr25519Keyring::One.public()), + ], + random_seed: [99; 32], + ..Default::default() + }), + _ => None, + }); + + let groups = ValidatorGroups::get(); + assert_eq!(groups.len(), 5); + + // first two groups have the overflow. + for i in 0..2 { + assert_eq!(groups[i].len(), 2); + } + + for i in 2..5 { + assert_eq!(groups[i].len(), 1); + } + }); + } + + #[test] + fn schedule_schedules() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + let thread_a = ParaId::from(3); + let thread_b = ParaId::from(4); + let thread_c = ParaId::from(5); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(default_config().parathread_cores, 3); + + // register 2 parachains + schedule_blank_para(chain_a, true); + schedule_blank_para(chain_b, true); + + // and 3 parathreads + schedule_blank_para(thread_a, false); + schedule_blank_para(thread_b, false); + schedule_blank_para(thread_c, false); + + // start a new session to activate, 5 validators for 5 cores. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Bob.public()), + ValidatorId::from(Sr25519Keyring::Charlie.public()), + ValidatorId::from(Sr25519Keyring::Dave.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + { + let scheduled = Scheduler::scheduled(); + assert_eq!(scheduled.len(), 2); + + assert_eq!(scheduled[0], CoreAssignment { + core: CoreIndex(0), + para_id: chain_a, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(0), + }); + + assert_eq!(scheduled[1], CoreAssignment { + core: CoreIndex(1), + para_id: chain_b, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(1), + }); + } + + // add a couple of parathread claims. + Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_c, collator.clone())); + + run_to_block(2, |_| None); + + { + let scheduled = Scheduler::scheduled(); + assert_eq!(scheduled.len(), 4); + + assert_eq!(scheduled[0], CoreAssignment { + core: CoreIndex(0), + para_id: chain_a, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(0), + }); + + assert_eq!(scheduled[1], CoreAssignment { + core: CoreIndex(1), + para_id: chain_b, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(1), + }); + + assert_eq!(scheduled[2], CoreAssignment{ + core: CoreIndex(2), + para_id: thread_a, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(2), + }); + + assert_eq!(scheduled[3], CoreAssignment{ + core: CoreIndex(3), + para_id: thread_c, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(3), + }); + } + }); + } + + #[test] + fn schedule_schedules_including_just_freed() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + let thread_a = ParaId::from(3); + let thread_b = ParaId::from(4); + let thread_c = ParaId::from(5); + let thread_d = ParaId::from(6); + let thread_e = ParaId::from(7); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(default_config().parathread_cores, 3); + + // register 2 parachains + schedule_blank_para(chain_a, true); + schedule_blank_para(chain_b, true); + + // and 5 parathreads + schedule_blank_para(thread_a, false); + schedule_blank_para(thread_b, false); + schedule_blank_para(thread_c, false); + schedule_blank_para(thread_d, false); + schedule_blank_para(thread_e, false); + + // start a new session to activate, 5 validators for 5 cores. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Bob.public()), + ValidatorId::from(Sr25519Keyring::Charlie.public()), + ValidatorId::from(Sr25519Keyring::Dave.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + // add a couple of parathread claims now that the parathreads are live. + Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_c, collator.clone())); + + run_to_block(2, |_| None); + + assert_eq!(Scheduler::scheduled().len(), 4); + + // cores 0, 1, 2, and 3 should be occupied. mark them as such. + Scheduler::occupied(&[CoreIndex(0), CoreIndex(1), CoreIndex(2), CoreIndex(3)]); + + { + let cores = AvailabilityCores::get(); + + assert!(cores[0].is_some()); + assert!(cores[1].is_some()); + assert!(cores[2].is_some()); + assert!(cores[3].is_some()); + assert!(cores[4].is_none()); + + assert!(Scheduler::scheduled().is_empty()); + } + + // add a couple more parathread claims - the claim on `b` will go to the 3rd parathread core (4) + // and the claim on `d` will go back to the 1st parathread core (2). The claim on `e` then + // will go for core `3`. + Scheduler::add_parathread_claim(ParathreadClaim(thread_b, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_d, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_e, collator.clone())); + + run_to_block(3, |_| None); + + { + let scheduled = Scheduler::scheduled(); + + // cores 0 and 1 are occupied by parachains. cores 2 and 3 are occupied by parathread + // claims. core 4 was free. + assert_eq!(scheduled.len(), 1); + assert_eq!(scheduled[0], CoreAssignment { + core: CoreIndex(4), + para_id: thread_b, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(4), + }); + } + + // now note that cores 0, 2, and 3 were freed. + Scheduler::schedule(vec![ + (CoreIndex(0), FreedReason::Concluded), + (CoreIndex(2), FreedReason::Concluded), + (CoreIndex(3), FreedReason::TimedOut), // should go back on queue. + ]); + + { + let scheduled = Scheduler::scheduled(); + + // 1 thing scheduled before, + 3 cores freed. + assert_eq!(scheduled.len(), 4); + assert_eq!(scheduled[0], CoreAssignment { + core: CoreIndex(0), + para_id: chain_a, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(0), + }); + assert_eq!(scheduled[1], CoreAssignment { + core: CoreIndex(2), + para_id: thread_d, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(2), + }); + assert_eq!(scheduled[2], CoreAssignment { + core: CoreIndex(3), + para_id: thread_e, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(3), + }); + assert_eq!(scheduled[3], CoreAssignment { + core: CoreIndex(4), + para_id: thread_b, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(4), + }); + + // the prior claim on thread A concluded, but the claim on thread C was marked as + // timed out. + let index = ParathreadClaimIndex::get(); + let parathread_queue = ParathreadQueue::get(); + + // thread A claim should have been wiped, but thread C claim should remain. + assert_eq!(index, vec![thread_b, thread_c, thread_d, thread_e]); + + // Although C was descheduled, the core `4` was occupied so C goes back on the queue. + assert_eq!(parathread_queue.queue.len(), 1); + assert_eq!(parathread_queue.queue[0], QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_c, collator.clone()), + retries: 0, // retries not incremented by timeout - validators' fault. + }, + core_offset: 2, // reassigned to next core. thread_e claim was on offset 1. + }); + } + }); + } + + #[test] + fn schedule_rotates_groups() { + let config = { + let mut config = default_config(); + + // make sure parathread requests don't retry-out + config.parathread_retries = config.parachain_rotation_frequency * 3; + config.parathread_cores = 2; + config + }; + + let rotation_frequency = config.parachain_rotation_frequency; + let parathread_cores = config.parathread_cores; + + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: config.clone(), + ..Default::default() + }, + ..Default::default() + }; + + let thread_a = ParaId::from(1); + let thread_b = ParaId::from(2); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(default_config().parathread_cores, 3); + + schedule_blank_para(thread_a, false); + schedule_blank_para(thread_b, false); + + // start a new session to activate, 5 validators for 5 cores. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: config.clone(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + let session_start_block = ::SessionStartBlock::get(); + assert_eq!(session_start_block, 1); + + Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_b, collator.clone())); + + run_to_block(2, |_| None); + + let assert_groups_rotated = |rotations: u32| { + let scheduled = Scheduler::scheduled(); + assert_eq!(scheduled.len(), 2); + assert_eq!(scheduled[0].group_idx, GroupIndex((0u32 + rotations) % parathread_cores)); + assert_eq!(scheduled[1].group_idx, GroupIndex((1u32 + rotations) % parathread_cores)); + }; + + assert_groups_rotated(0); + + // one block before first rotation. + run_to_block(rotation_frequency, |_| None); + + let rotations_since_session_start = (rotation_frequency - session_start_block) / rotation_frequency; + assert_eq!(rotations_since_session_start, 0); + assert_groups_rotated(0); + + // first rotation. + run_to_block(rotation_frequency + 1, |_| None); + assert_groups_rotated(1); + + // one block before second rotation. + run_to_block(rotation_frequency * 2, |_| None); + assert_groups_rotated(1); + + // second rotation. + run_to_block(rotation_frequency * 2 + 1, |_| None); + assert_groups_rotated(2); + }); + } + + #[test] + fn parathread_claims_are_pruned_after_retries() { + let max_retries = default_config().parathread_retries; + + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let thread_a = ParaId::from(1); + let thread_b = ParaId::from(2); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(default_config().parathread_cores, 3); + + schedule_blank_para(thread_a, false); + schedule_blank_para(thread_b, false); + + // start a new session to activate, 5 validators for 5 cores. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_b, collator.clone())); + + run_to_block(2, |_| None); + assert_eq!(Scheduler::scheduled().len(), 2); + + run_to_block(2 + max_retries, |_| None); + assert_eq!(Scheduler::scheduled().len(), 2); + + run_to_block(2 + max_retries + 1, |_| None); + assert_eq!(Scheduler::scheduled().len(), 0); + }); + } + + #[test] + fn availability_predicate_works() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let HostConfiguration { + parachain_rotation_frequency, + chain_availability_period, + thread_availability_period, + .. + } = default_config(); + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + assert!(chain_availability_period < thread_availability_period && + thread_availability_period < parachain_rotation_frequency); + + let chain_a = ParaId::from(1); + let thread_a = ParaId::from(2); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + schedule_blank_para(chain_a, true); + schedule_blank_para(thread_a, false); + + // start a new session with our chain & thread registered. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Bob.public()), + ValidatorId::from(Sr25519Keyring::Charlie.public()), + ValidatorId::from(Sr25519Keyring::Dave.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + // assign some availability cores. + { + AvailabilityCores::mutate(|cores| { + cores[0] = Some(CoreOccupied::Parachain); + cores[1] = Some(CoreOccupied::Parathread(ParathreadEntry { + claim: ParathreadClaim(thread_a, collator), + retries: 0, + })) + }); + } + + run_to_block(1 + thread_availability_period, |_| None); + assert!(Scheduler::availability_timeout_predicate().is_none()); + + run_to_block(1 + parachain_rotation_frequency, |_| None); + + { + let pred = Scheduler::availability_timeout_predicate() + .expect("predicate exists recently after rotation"); + + let now = System::block_number(); + let would_be_timed_out = now - thread_availability_period; + for i in 0..AvailabilityCores::get().len() { + // returns true for unoccupied cores. + // And can time out both threads and chains at this stage. + assert!(pred(CoreIndex(i as u32), would_be_timed_out)); + } + + assert!(!pred(CoreIndex(0), now)); // assigned: chain + assert!(!pred(CoreIndex(1), now)); // assigned: thread + assert!(pred(CoreIndex(2), now)); + + // check the tighter bound on chains vs threads. + assert!(pred(CoreIndex(0), now - chain_availability_period)); + assert!(!pred(CoreIndex(1), now - chain_availability_period)); + + // check the threshold is exact. + assert!(!pred(CoreIndex(0), now - chain_availability_period + 1)); + assert!(!pred(CoreIndex(1), now - thread_availability_period + 1)); + } + + run_to_block(1 + parachain_rotation_frequency + chain_availability_period, |_| None); + + { + let pred = Scheduler::availability_timeout_predicate() + .expect("predicate exists recently after rotation"); + + let would_be_timed_out = System::block_number() - thread_availability_period; + + assert!(!pred(CoreIndex(0), would_be_timed_out)); // chains can't be timed out now. + assert!(pred(CoreIndex(1), would_be_timed_out)); // but threads can. + } + + run_to_block(1 + parachain_rotation_frequency + thread_availability_period, |_| None); + + assert!(Scheduler::availability_timeout_predicate().is_none()); + }); + } +}