Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Scheduler to Support Relay Chain Block Number Provider #6362

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,13 @@ parameter_types! {
#[cfg(not(feature = "runtime-benchmarks"))]
parameter_types! {
pub const MaxScheduledPerBlock: u32 = 50;
pub const MaxScheduledBlocks: u32 = 50;
}

#[cfg(feature = "runtime-benchmarks")]
parameter_types! {
pub const MaxScheduledPerBlock: u32 = 200;
pub const MaxScheduledBlocks: u32 = 200;
}

impl pallet_scheduler::Config for Runtime {
Expand All @@ -622,6 +624,8 @@ impl pallet_scheduler::Config for Runtime {
type WeightInfo = weights::pallet_scheduler::WeightInfo<Runtime>;
type OriginPrivilegeCmp = EqualOrGreatestRootCmp;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Runtime>;
type MaxScheduledBlocks = MaxScheduledBlocks;
}

parameter_types! {
Expand Down
3 changes: 3 additions & 0 deletions polkadot/runtime/rococo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ parameter_types! {
pub MaximumSchedulerWeight: Weight = Perbill::from_percent(80) *
BlockWeights::get().max_block;
pub const MaxScheduledPerBlock: u32 = 50;
pub const MaxScheduledBlocks: u32 = 50;
pub const NoPreimagePostponement: Option<u32> = Some(10);
}

Expand Down Expand Up @@ -331,6 +332,8 @@ impl pallet_scheduler::Config for Runtime {
type WeightInfo = weights::pallet_scheduler::WeightInfo<Runtime>;
type OriginPrivilegeCmp = OriginPrivilegeCmp;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Runtime>;
type MaxScheduledBlocks = MaxScheduledBlocks;
}

parameter_types! {
Expand Down
3 changes: 3 additions & 0 deletions polkadot/runtime/westend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ parameter_types! {
pub MaximumSchedulerWeight: frame_support::weights::Weight = Perbill::from_percent(80) *
BlockWeights::get().max_block;
pub const MaxScheduledPerBlock: u32 = 50;
pub const MaxScheduledBlocks: u32 = 50;
pub const NoPreimagePostponement: Option<u32> = Some(10);
}

Expand All @@ -247,6 +248,8 @@ impl pallet_scheduler::Config for Runtime {
type WeightInfo = weights::pallet_scheduler::WeightInfo<Runtime>;
type OriginPrivilegeCmp = frame_support::traits::EqualPrivilegeOnly;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Runtime>;
type MaxScheduledBlocks = MaxScheduledBlocks;
}

parameter_types! {
Expand Down
5 changes: 5 additions & 0 deletions substrate/bin/node/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ impl pallet_scheduler::Config for Runtime {
type WeightInfo = pallet_scheduler::weights::SubstrateWeight<Runtime>;
type OriginPrivilegeCmp = EqualPrivilegeOnly;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Runtime>;
#[cfg(feature = "runtime-benchmarks")]
type MaxScheduledBlocks = ConstU32<512>;
#[cfg(not(feature = "runtime-benchmarks"))]
type MaxScheduledBlocks = ConstU32<50>;
}

impl pallet_glutton::Config for Runtime {
Expand Down
2 changes: 2 additions & 0 deletions substrate/frame/democracy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ impl pallet_scheduler::Config for Test {
type WeightInfo = ();
type OriginPrivilegeCmp = EqualPrivilegeOnly;
type Preimages = ();
type BlockNumberProvider = frame_system::Pallet<Test>;
type MaxScheduledBlocks = ConstU32<100>;
}

#[derive_impl(pallet_balances::config_preludes::TestDefaultConfig)]
Expand Down
2 changes: 2 additions & 0 deletions substrate/frame/referenda/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl pallet_scheduler::Config for Test {
type WeightInfo = ();
type OriginPrivilegeCmp = EqualPrivilegeOnly;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Test>;
type MaxScheduledBlocks = ConstU32<100>;
}
#[derive_impl(pallet_balances::config_preludes::TestDefaultConfig)]
impl pallet_balances::Config for Test {
Expand Down
70 changes: 50 additions & 20 deletions substrate/frame/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ use frame_system::{
use scale_info::TypeInfo;
use sp_io::hashing::blake2_256;
use sp_runtime::{
traits::{BadOrigin, Dispatchable, One, Saturating, Zero},
traits::{BadOrigin, BlockNumberProvider, Dispatchable, One, Saturating, Zero},
BoundedVec, DispatchError, RuntimeDebug,
};

Expand Down Expand Up @@ -292,6 +292,13 @@ pub mod pallet {

/// The preimage provider with which we look up call hashes to get the call.
type Preimages: QueryPreimage<H = Self::Hashing> + StorePreimage;

/// Provider for the block number. Normally this is the `frame_system` pallet.
type BlockNumberProvider: BlockNumberProvider<BlockNumber = BlockNumberFor<Self>>;
gupnik marked this conversation as resolved.
Show resolved Hide resolved

/// The maximum number of blocks that can be scheduled.
#[pallet::constant]
type MaxScheduledBlocks: Get<u32>;
}

#[pallet::storage]
Expand Down Expand Up @@ -325,6 +332,11 @@ pub mod pallet {
pub(crate) type Lookup<T: Config> =
StorageMap<_, Twox64Concat, TaskName, TaskAddress<BlockNumberFor<T>>>;

/// The queue of block numbers that have scheduled agendas.
#[pallet::storage]
pub(crate) type Queue<T: Config> =
StorageValue<_, BoundedVec<BlockNumberFor<T>, T::MaxScheduledBlocks>, ValueQuery>;

/// Events type.
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
Expand Down Expand Up @@ -376,7 +388,8 @@ pub mod pallet {
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
/// Execute the scheduled calls
fn on_initialize(now: BlockNumberFor<T>) -> Weight {
fn on_initialize(_do_not_use_local_block_number: BlockNumberFor<T>) -> Weight {
let now = T::BlockNumberProvider::current_block_number();
let mut weight_counter = WeightMeter::with_limit(T::MaximumWeight::get());
Self::service_agendas(&mut weight_counter, now, u32::max_value());
weight_counter.consumed()
Expand Down Expand Up @@ -889,7 +902,7 @@ impl<T: Config> Pallet<T> {
fn resolve_time(
when: DispatchTime<BlockNumberFor<T>>,
) -> Result<BlockNumberFor<T>, DispatchError> {
let now = frame_system::Pallet::<T>::block_number();
let now = T::BlockNumberProvider::current_block_number();

let when = match when {
DispatchTime::At(x) => x,
Expand Down Expand Up @@ -926,17 +939,23 @@ impl<T: Config> Pallet<T> {
let mut agenda = Agenda::<T>::get(when);
let index = if (agenda.len() as u32) < T::MaxScheduledPerBlock::get() {
// will always succeed due to the above check.
let _ = agenda.try_push(Some(what));
let _ = agenda.try_push(Some(what.clone()));
agenda.len() as u32 - 1
} else {
if let Some(hole_index) = agenda.iter().position(|i| i.is_none()) {
agenda[hole_index] = Some(what);
agenda[hole_index] = Some(what.clone());
hole_index as u32
} else {
return Err((DispatchError::Exhausted, what))
}
};
Agenda::<T>::insert(when, agenda);
Queue::<T>::mutate(|q| {
if let Err(index) = q.binary_search_by_key(&when, |x| *x) {
q.try_insert(index, when).map_err(|_| (DispatchError::Exhausted, what))?;
}
Ok(())
})?;
Ok(index)
}

Expand All @@ -952,6 +971,11 @@ impl<T: Config> Pallet<T> {
Some(_) => {},
None => {
Agenda::<T>::remove(when);
Queue::<T>::mutate(|q| {
if let Ok(index) = q.binary_search_by_key(&when, |x| *x) {
q.remove(index);
}
});
},
}
}
Expand Down Expand Up @@ -1157,24 +1181,30 @@ impl<T: Config> Pallet<T> {
return
}

let mut incomplete_since = now + One::one();
let mut when = IncompleteSince::<T>::take().unwrap_or(now);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why would not it work with IncompleteSince, without the block Queue?
How we determine the MaxScheduledBlocks bound?
With the IncompleteSince we iterate over blocks that might have no task to execute and this might make a situation with many incomplete blocks even worth. But probably not too much? One more read?
Both solutions need a strategy for a situation when there are two many tasks that can not be completed and the task queue only grow. If such strategy not yet in place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the IncompleteSince we iterate over blocks that might have no task to execute and this might make a situation with many incomplete blocks even worth. But probably not too much? One more read?

Yes, but then this becomes unbounded in case too many blocks are skipped. The idea behind using the Queue is to bound this to a sufficient number.

How we determine the MaxScheduledBlocks bound?

This should be determined similar to the existing MaxScheduledPerBlock?

Both solutions need a strategy for a situation when there are two many tasks that can not be completed and the task queue only grow. If such strategy not yet in place.

There is already a retry mechanism and the task is purged if the retry count is exceeded (even if failed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Queue not only bounds how many blocks gonna be processed from the past. It bounds for how many blocks we can schedule. If the number is 50, we can schedule only 50 jobs with distinct schedule time.

The MaxScheduledPerBlock for me seems simpler to define. Because the block size its exiting constrain the system have. But how many distinct schedule time points you can have is something new.

Retries work in case if a certain task fails while it's function call is being executed (not the scheduler fail). I meant a case when there are many (or few but too heavy) overdue tasks (task_block < now), so that the scheduler never (or needs too many time) to complete them and exist such overdue state to start processing tasks in time. Do we handle such case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Queue not only bounds how many blocks gonna be processed from the past. It bounds for how many blocks we can schedule. If the number is 50, we can schedule only 50 jobs with distinct schedule time

Indeed, I do not find it quite comfortable to run a for loop with IncompleteSince when there could be an unknown number of blocks passed between the successive runs. You could always keep the MaxScheduledBlocks on the higher side that would give you a similar experience?

I meant a case when there are many (or few but too heavy) overdue tasks (task_block < now), so that the scheduler never (or needs too many time) to complete them and exist such overdue state to start processing tasks in time. Do we handle such case?

But this stays as an issue even in the current implementation? The change here just makes it bounded, so that the scheduling itself is blocked in such a case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can put a quite big bound on the MaxScheduledBlocks, it is just a vec of block numbers.

let mut executed = 0;
let queue = Queue::<T>::get();
let end_index = match queue.binary_search_by_key(&now, |x| *x) {
Ok(end_index) => end_index.saturating_add(1),
Err(end_index) => {
if end_index == 0 {
return;
}
end_index
},
};

let max_items = T::MaxScheduledPerBlock::get();
let mut count_down = max;
let service_agenda_base_weight = T::WeightInfo::service_agenda_base(max_items);
while count_down > 0 && when <= now && weight.can_consume(service_agenda_base_weight) {
if !Self::service_agenda(weight, &mut executed, now, when, u32::max_value()) {
incomplete_since = incomplete_since.min(when);
let mut index = 0;
while index < end_index {
let when = queue[index];
let mut executed = 0;
if !Self::service_agenda(weight, &mut executed, now, when, max) {
break;
}
when.saturating_inc();
count_down.saturating_dec();
}
incomplete_since = incomplete_since.min(when);
if incomplete_since <= now {
IncompleteSince::<T>::put(incomplete_since);
index.saturating_inc();
}

Queue::<T>::mutate(|queue| {
queue.drain(0..index);
});
}

/// Returns `true` if the agenda was fully completed, `false` if it should be revisited at a
Expand Down
8 changes: 8 additions & 0 deletions substrate/frame/scheduler/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ impl Config for Test {
type WeightInfo = TestWeightInfo;
type OriginPrivilegeCmp = EqualPrivilegeOnly;
type Preimages = Preimage;
type BlockNumberProvider = frame_system::Pallet<Self>;
type MaxScheduledBlocks = ConstU32<20>;
}

pub type LoggerCall = logger::Call<Test>;
Expand All @@ -244,6 +246,12 @@ pub fn run_to_block(n: u64) {
}
}

pub fn go_to_block(n: u64) {
System::set_block_number(n);
Scheduler::on_initialize(n);
Scheduler::on_finalize(n);
}

pub fn root() -> OriginCaller {
system::RawOrigin::Root.into()
}
42 changes: 38 additions & 4 deletions substrate/frame/scheduler/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,7 @@ fn on_initialize_weight_is_correct() {
));

// Will include the named periodic only
System::set_block_number(1);
assert_eq!(
Scheduler::on_initialize(1),
TestWeightInfo::service_agendas_base() +
Expand All @@ -1648,6 +1649,7 @@ fn on_initialize_weight_is_correct() {
assert_eq!(logger::log(), vec![(root(), 2600u32)]);

// Will include anon and anon periodic
System::set_block_number(2);
assert_eq!(
Scheduler::on_initialize(2),
TestWeightInfo::service_agendas_base() +
Expand All @@ -1663,6 +1665,7 @@ fn on_initialize_weight_is_correct() {
assert_eq!(logger::log(), vec![(root(), 2600u32), (root(), 69u32), (root(), 42u32)]);

// Will include named only
System::set_block_number(3);
assert_eq!(
Scheduler::on_initialize(3),
TestWeightInfo::service_agendas_base() +
Expand All @@ -1678,11 +1681,9 @@ fn on_initialize_weight_is_correct() {
);

// Will contain none
System::set_block_number(4);
let actual_weight = Scheduler::on_initialize(4);
assert_eq!(
actual_weight,
TestWeightInfo::service_agendas_base() + TestWeightInfo::service_agenda_base(0)
);
assert_eq!(actual_weight, TestWeightInfo::service_agendas_base());
});
}

Expand Down Expand Up @@ -3035,3 +3036,36 @@ fn unavailable_call_is_detected() {
assert!(!Preimage::is_requested(&hash));
});
}

#[test]
fn basic_scheduling_with_skipped_blocks_works() {
new_test_ext().execute_with(|| {
// Call to schedule
let call =
RuntimeCall::Logger(LoggerCall::log { i: 42, weight: Weight::from_parts(10, 0) });

// BaseCallFilter should be implemented to accept `Logger::log` runtime call which is
// implemented for `BaseFilter` in the mock runtime
assert!(!<Test as frame_system::Config>::BaseCallFilter::contains(&call));

// Schedule call to be executed at the 4th block
assert_ok!(Scheduler::do_schedule(
DispatchTime::At(4),
None,
127,
root(),
Preimage::bound(call).unwrap()
));

// `log` runtime call should not have executed yet
go_to_block(3);
assert!(logger::log().is_empty());

go_to_block(6);
// `log` runtime call should have executed at block 4
assert_eq!(logger::log(), vec![(root(), 42u32)]);

go_to_block(100);
assert_eq!(logger::log(), vec![(root(), 42u32)]);
});
}
Loading