Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve SchedulerStatus code and test as follow-up #1797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ impl WaitReason {
pub enum SchedulerStatus {
/// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination().
/// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time).
/// Also, this variant is transiently used as a placeholder internally when transitioning
/// scheduler statuses, which isn't observable unless panic is happening.
Unavailable,
/// Scheduler is installed into a bank; could be running or just be idling.
/// This will be transitioned to Stale after certain time has passed if its bank hasn't been
Expand Down Expand Up @@ -329,7 +331,7 @@ impl SchedulerStatus {
return;
}
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
unreachable!("not active: {:?}", self);
unreachable!("not active: {self:?}");
};
let (pool, result_with_timings) = f(scheduler);
*self = Self::Stale(pool, result_with_timings);
Expand Down Expand Up @@ -549,7 +551,8 @@ impl BankWithSchedulerInner {
let scheduler = self.scheduler.read().unwrap();
// Re-register a new timeout listener only after acquiring the read lock;
// Otherwise, the listener would again put scheduler into Stale before the read
// lock under an extremely-rare race condition, causing panic below.
// lock under an extremely-rare race condition, causing panic below in
// active_scheduler().
pool.register_timeout_listener(self.do_create_timeout_listener());
Copy link
Member Author

@ryoqun ryoqun Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this pr, commenting out this register_timeout_listener() won't fail any tests... ;)

not anymore

f(scheduler.active_scheduler())
}
Expand Down
11 changes: 11 additions & 0 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ where
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> S {
assert_matches!(result_with_timings, (Ok(_), _));

// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been
// returned recently
if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop()
Expand Down Expand Up @@ -1711,6 +1713,10 @@ mod tests {
&CheckPoint::TimeoutListenerTriggered(0),
&CheckPoint::TimeoutListenerTriggered(1),
&TestCheckPoint::AfterTimeoutListenerTriggered,
&TestCheckPoint::BeforeTimeoutListenerTriggered,
&CheckPoint::TimeoutListenerTriggered(0),
&CheckPoint::TimeoutListenerTriggered(1),
&TestCheckPoint::AfterTimeoutListenerTriggered,
]);

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
Expand Down Expand Up @@ -1778,6 +1784,11 @@ mod tests {
bank.schedule_transaction_executions([(tx_after_stale, &1)].into_iter())
.unwrap();

// Observe second occurrence of TimeoutListenerTriggered(1), which indicates a new timeout
// lister is registered correctly again for reactivated scheduler.
sleepless_testing::at(TestCheckPoint::BeforeTimeoutListenerTriggered);
sleepless_testing::at(TestCheckPoint::AfterTimeoutListenerTriggered);

let (result, timings) = bank.wait_for_completed_scheduler().unwrap();
assert_matches!(result, Ok(()));
// ResultWithTimings should be carried over across active=>stale=>active transitions.
Expand Down
124 changes: 77 additions & 47 deletions unified-scheduler-pool/src/sleepless_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,29 @@ pub(crate) trait BuilderTracked: Sized {
}

#[cfg(not(test))]
pub(crate) use sleepless_testing_dummy::*;
pub(crate) use dummy::*;
#[cfg(test)]
pub(crate) use sleepless_testing_real::*;
pub(crate) use real::*;

#[cfg(test)]
mod sleepless_testing_real {
mod real {
use {
lazy_static::lazy_static,
log::trace,
std::{
cmp::Ordering::{Equal, Greater, Less},
collections::{HashMap, HashSet},
collections::HashMap,
fmt::Debug,
sync::{Arc, Condvar, Mutex},
thread::{current, JoinHandle, ThreadId},
thread::{current, panicking, JoinHandle, ThreadId},
},
};

#[derive(Debug)]
struct Progress {
_name: String,
check_points: Vec<String>,
current_check_point: Mutex<String>,
current_index: Mutex<usize>,
condvar: Condvar,
}

Expand All @@ -61,61 +62,88 @@ mod sleepless_testing_real {
.into_iter()
.chain(check_points)
.collect::<Vec<_>>();
let check_points_set = check_points.iter().collect::<HashSet<_>>();
assert_eq!(check_points.len(), check_points_set.len());

Self {
_name: name,
check_points,
current_check_point: Mutex::new(initial_check_point),
current_index: Mutex::new(0),
condvar: Condvar::new(),
}
}

fn change_current_check_point(&self, anchored_check_point: String) {
let Some(anchored_index) = self
.check_points
.iter()
.position(|check_point| check_point == &anchored_check_point)
let mut current_index = self.current_index.lock().unwrap();

let Some(anchored_index) = self.anchored_index(*current_index, &anchored_check_point)
else {
// Ignore unrecognizable checkpoints...
trace!("Ignore {} at {:?}", anchored_check_point, current());
return;
};

let mut current_check_point = self.current_check_point.lock().unwrap();

let should_change =
match anchored_index.cmp(&self.expected_next_index(&current_check_point)) {
Equal => true,
Greater => {
// anchor is one of future check points; block the current thread until
// that happens
current_check_point = self
.condvar
.wait_while(current_check_point, |current_check_point| {
anchored_index != self.expected_next_index(current_check_point)
})
.unwrap();
true
}
// anchor is already observed.
Less => false,
};
let next_index = self.expected_next_index(*current_index);
let should_change = match anchored_index.cmp(&next_index) {
Equal => true,
Greater => {
trace!("Blocked on {} at {:?}", anchored_check_point, current());
// anchor is one of future check points; block the current thread until
// that happens
current_index = self
.condvar
.wait_while(current_index, |&mut current_index| {
let Some(anchored_index) =
self.anchored_index(current_index, &anchored_check_point)
else {
// don't wait. seems the progress is made by other threads
// anchored to the same checkpoint.
return false;
};
let next_index = self.expected_next_index(current_index);

// determine we should wait further or not
match anchored_index.cmp(&next_index) {
Equal => false,
Greater => {
trace!(
"Re-blocked on {} ({} != {}) at {:?}",
anchored_check_point,
anchored_index,
next_index,
current()
);
true
}
Less => unreachable!(),
}
})
.unwrap();
true
}
Less => unreachable!(),
};

if should_change {
*current_check_point = anchored_check_point;
if *current_index != anchored_index {
trace!("Progressed to: {} at {:?}", anchored_check_point, current());
*current_index = anchored_index;
}

self.condvar.notify_all();
}
}

fn expected_next_index(&self, current_check_point: &String) -> usize {
let current_index = self
.check_points
.iter()
.position(|check_point| check_point == current_check_point)
.unwrap();
fn expected_next_index(&self, current_index: usize) -> usize {
current_index.checked_add(1).unwrap()
}

fn anchored_index(
&self,
current_index: usize,
anchored_check_point: &String,
) -> Option<usize> {
self.check_points[current_index..]
.iter()
.position(|check_point| check_point == anchored_check_point)
.map(|subslice_index| subslice_index.checked_add(current_index).unwrap())
}
}

lazy_static! {
Expand All @@ -142,11 +170,13 @@ mod sleepless_testing_real {
}

fn deactivate(&self) {
assert_eq!(
*self.0.check_points.last().unwrap(),
*self.0.current_check_point.lock().unwrap(),
"unfinished progress"
);
if !panicking() {
assert_eq!(
self.0.check_points.len().checked_sub(1).unwrap(),
*self.0.current_index.lock().unwrap(),
"unfinished progress"
);
}
THREAD_REGISTRY.lock().unwrap().remove(&self.1).unwrap();
}
}
Expand Down Expand Up @@ -299,7 +329,7 @@ mod sleepless_testing_real {
}

#[cfg(not(test))]
mod sleepless_testing_dummy {
mod dummy {
use std::fmt::Debug;

#[inline]
Expand Down