diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 3b09c0d4b10..05f736d3e50 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -702,7 +702,7 @@ impl Builder { } } - fn get_cfg(&self) -> driver::Cfg { + fn get_cfg(&self, workers: usize) -> driver::Cfg { driver::Cfg { enable_pause_time: match self.kind { Kind::CurrentThread => true, @@ -715,6 +715,7 @@ impl Builder { enable_time: self.enable_time, start_paused: self.start_paused, nevents: self.nevents, + workers, } } @@ -1095,7 +1096,7 @@ impl Builder { use crate::runtime::scheduler::{self, CurrentThread}; use crate::runtime::{runtime::Scheduler, Config}; - let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; + let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?; // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); @@ -1248,7 +1249,7 @@ cfg_rt_multi_thread! { let core_threads = self.worker_threads.unwrap_or_else(num_cpus); - let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; + let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?; // Create the blocking pool let blocking_pool = @@ -1295,7 +1296,7 @@ cfg_rt_multi_thread! { use crate::runtime::scheduler::MultiThreadAlt; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); - let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; + let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?; // Create the blocking pool let blocking_pool = diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 62e4fc9474c..76918114bc3 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -3,7 +3,7 @@ use crate::runtime::coop; use std::cell::Cell; -#[cfg(any(feature = "rt", feature = "macros"))] +#[cfg(any(feature = "rt", feature = "macros", feature = "time"))] use crate::util::rand::FastRand; cfg_rt! { @@ -57,7 +57,7 @@ struct Context { #[cfg(feature = "rt")] runtime: Cell, - #[cfg(any(feature = "rt", feature = "macros"))] + #[cfg(any(feature = "rt", feature = "macros", feature = "time"))] rng: Cell>, /// Tracks the amount of "work" a task may still do before yielding back to @@ -100,7 +100,7 @@ tokio_thread_local! { #[cfg(feature = "rt")] runtime: Cell::new(EnterRuntime::NotEntered), - #[cfg(any(feature = "rt", feature = "macros"))] + #[cfg(any(feature = "rt", feature = "macros", feature = "time"))] rng: Cell::new(None), budget: Cell::new(coop::Budget::unconstrained()), @@ -121,7 +121,11 @@ tokio_thread_local! { } } -#[cfg(any(feature = "macros", all(feature = "sync", feature = "rt")))] +#[cfg(any( + feature = "time", + feature = "macros", + all(feature = "sync", feature = "rt") +))] pub(crate) fn thread_rng_n(n: u32) -> u32 { CONTEXT.with(|ctx| { let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new); diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 64928228b46..11aa7abb7b7 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -40,6 +40,7 @@ pub(crate) struct Cfg { pub(crate) enable_pause_time: bool, pub(crate) start_paused: bool, pub(crate) nevents: usize, + pub(crate) workers: usize, } impl Driver { @@ -48,7 +49,8 @@ impl Driver { let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); - let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock); + let (time_driver, time_handle) = + create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers); Ok(( Self { inner: time_driver }, @@ -306,9 +308,10 @@ cfg_time! { enable: bool, io_stack: IoStack, clock: &Clock, + workers: usize, ) -> (TimeDriver, TimeHandle) { if enable { - let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock); + let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock, workers as u32); (TimeDriver::Enabled { driver }, Some(handle)) } else { @@ -361,6 +364,7 @@ cfg_not_time! { _enable: bool, io_stack: IoStack, _clock: &Clock, + _workers: usize, ) -> (TimeDriver, TimeHandle) { (io_stack, ()) } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 83e70795f4f..9f0dd98dfdc 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -742,6 +742,11 @@ impl Context { pub(crate) fn defer(&self, waker: &Waker) { self.defer.defer(waker); } + + #[allow(dead_code)] + pub(crate) fn get_worker_index(&self) -> usize { + self.worker.index + } } impl Core { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index c315e382291..63ae0a49743 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1311,6 +1311,11 @@ impl Context { fn shared(&self) -> &Shared { &self.handle.shared } + + #[cfg_attr(not(feature = "time"), allow(dead_code))] + pub(crate) fn get_worker_index(&self) -> usize { + self.index + } } impl Core { diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 7991ee0dc0a..834077caa3d 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -58,6 +58,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::atomic::Ordering; +use crate::runtime::context; use crate::runtime::scheduler; use crate::sync::AtomicWaker; use crate::time::Instant; @@ -328,6 +329,8 @@ pub(super) type EntryList = crate::util::linked_list::LinkedList Self { + pub(super) fn new(shard_id: u32) -> Self { Self { + shard_id, cached_when: AtomicU64::new(0), pointers: linked_list::Pointers::new(), state: StateCell::default(), @@ -438,6 +442,11 @@ impl TimerShared { pub(super) fn might_be_registered(&self) -> bool { self.state.might_be_registered() } + + /// Gets the shard id. + pub(super) fn shard_id(&self) -> u32 { + self.shard_id + } } unsafe impl linked_list::Link for TimerShared { @@ -485,8 +494,10 @@ impl TimerEntry { fn inner(&self) -> &TimerShared { let inner = unsafe { &*self.inner.get() }; if inner.is_none() { + let shard_size = self.driver.driver().time().inner.get_shard_size(); + let shard_id = generate_shard_id(shard_size); unsafe { - *self.inner.get() = Some(TimerShared::new()); + *self.inner.get() = Some(TimerShared::new(shard_id)); } } return inner.as_ref().unwrap(); @@ -643,3 +654,25 @@ impl Drop for TimerEntry { unsafe { Pin::new_unchecked(self) }.as_mut().cancel(); } } + +// Generates a shard id. If current thread is a worker thread, we use its worker index as a shard id. +// Otherwise, we use a random number generator to obtain the shard id. +cfg_rt! { + fn generate_shard_id(shard_size: u32) -> u32 { + let id = context::with_scheduler(|ctx| match ctx { + Some(scheduler::Context::CurrentThread(_ctx)) => 0, + #[cfg(feature = "rt-multi-thread")] + Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] + Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, + None => context::thread_rng_n(shard_size), + }); + id % shard_size + } +} + +cfg_not_rt! { + fn generate_shard_id(shard_size: u32) -> u32 { + context::thread_rng_n(shard_size) + } +} diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 8cd51c5cb4a..37b04ef0000 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -12,6 +12,7 @@ use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION}; mod handle; pub(crate) use self::handle::Handle; +use self::wheel::Wheel; mod source; pub(crate) use source::TimeSource; @@ -25,9 +26,28 @@ use crate::time::error::Error; use crate::time::{Clock, Duration}; use crate::util::WakeList; +use crate::loom::sync::atomic::AtomicU64; use std::fmt; use std::{num::NonZeroU64, ptr::NonNull}; +struct AtomicOptionNonZeroU64(AtomicU64); + +// A helper type to store the `next_wake`. +impl AtomicOptionNonZeroU64 { + fn new(val: Option) -> Self { + Self(AtomicU64::new(val.map_or(0, NonZeroU64::get))) + } + + fn store(&self, val: Option) { + self.0 + .store(val.map_or(0, NonZeroU64::get), Ordering::Relaxed); + } + + fn load(&self) -> Option { + NonZeroU64::new(self.0.load(Ordering::Relaxed)) + } +} + /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. /// /// A `Driver` instance tracks the state necessary for managing time and @@ -91,8 +111,11 @@ pub(crate) struct Driver { /// Timer state shared between `Driver`, `Handle`, and `Registration`. struct Inner { - // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex - pub(super) state: Mutex, + /// The earliest time at which we promise to wake up without unparking. + next_wake: AtomicOptionNonZeroU64, + + /// Sharded Timer wheels. + wheels: Box<[Mutex]>, /// True if the driver is being shutdown. pub(super) is_shutdown: AtomicBool, @@ -107,15 +130,6 @@ struct Inner { did_wake: AtomicBool, } -/// Time state shared which must be protected by a `Mutex` -struct InnerState { - /// The earliest time at which we promise to wake up without unparking. - next_wake: Option, - - /// Timer wheel. - wheel: wheel::Wheel, -} - // ===== impl Driver ===== impl Driver { @@ -123,18 +137,20 @@ impl Driver { /// thread and `time_source` to get the current time and convert to ticks. /// /// Specifying the source of time is useful when testing. - pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) { + pub(crate) fn new(park: IoStack, clock: &Clock, shards: u32) -> (Driver, Handle) { + assert!(shards > 0); + let time_source = TimeSource::new(clock); + let wheels: Vec<_> = (0..shards) + .map(|_| Mutex::new(wheel::Wheel::new())) + .collect(); let handle = Handle { time_source, inner: Inner { - state: Mutex::new(InnerState { - next_wake: None, - wheel: wheel::Wheel::new(), - }), + next_wake: AtomicOptionNonZeroU64::new(None), + wheels: wheels.into_boxed_slice(), is_shutdown: AtomicBool::new(false), - #[cfg(feature = "test-util")] did_wake: AtomicBool::new(false), }, @@ -164,24 +180,30 @@ impl Driver { // Advance time forward to the end of time. - handle.process_at_time(u64::MAX); + handle.process_at_time(0, u64::MAX); self.park.shutdown(rt_handle); } fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option) { let handle = rt_handle.time(); - let mut lock = handle.inner.state.lock(); - assert!(!handle.is_shutdown()); - let next_wake = lock.wheel.next_expiration_time(); - lock.next_wake = - next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - - drop(lock); - - match next_wake { + // Finds out the min expiration time to park. + let expiration_time = (0..rt_handle.time().inner.get_shard_size()) + .filter_map(|id| { + let lock = rt_handle.time().inner.lock_sharded_wheel(id); + lock.next_expiration_time() + }) + .min(); + + rt_handle + .time() + .inner + .next_wake + .store(next_wake_time(expiration_time)); + + match expiration_time { Some(when) => { let now = handle.time_source.now(rt_handle.clock()); // Note that we effectively round up to 1ms here - this avoids @@ -245,30 +267,59 @@ impl Driver { } } +// Helper function to turn expiration_time into next_wake_time. +// Since the `park_timeout` will round up to 1ms for avoiding very +// short-duration microsecond-resolution sleeps, we do the same here. +// The conversion is as follows +// None => None +// Some(0) => Some(1) +// Some(i) => Some(i) +fn next_wake_time(expiration_time: Option) -> Option { + expiration_time.and_then(|v| { + if v == 0 { + NonZeroU64::new(1) + } else { + NonZeroU64::new(v) + } + }) +} + impl Handle { /// Runs timer related logic, and returns the next wakeup time pub(self) fn process(&self, clock: &Clock) { let now = self.time_source().now(clock); + // For fairness, randomly select one to start. + let shards = self.inner.get_shard_size(); + let start = crate::runtime::context::thread_rng_n(shards); + self.process_at_time(start, now); + } - self.process_at_time(now); + pub(self) fn process_at_time(&self, start: u32, now: u64) { + let shards = self.inner.get_shard_size(); + + let expiration_time = (start..shards + start) + .filter_map(|i| self.process_at_sharded_time(i, now)) + .min(); + + self.inner.next_wake.store(next_wake_time(expiration_time)); } - pub(self) fn process_at_time(&self, mut now: u64) { + // Returns the next wakeup time of this shard. + pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) -> Option { let mut waker_list = WakeList::new(); + let mut lock = self.inner.lock_sharded_wheel(id); - let mut lock = self.inner.lock(); - - if now < lock.wheel.elapsed() { + if now < lock.elapsed() { // Time went backwards! This normally shouldn't happen as the Rust language // guarantees that an Instant is monotonic, but can happen when running // Linux in a VM on a Windows host due to std incorrectly trusting the // hardware clock to be monotonic. // // See for more information. - now = lock.wheel.elapsed(); + now = lock.elapsed(); } - while let Some(entry) = lock.wheel.poll(now) { + while let Some(entry) = lock.poll(now) { debug_assert!(unsafe { entry.is_pending() }); // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. @@ -281,19 +332,15 @@ impl Handle { waker_list.wake_all(); - lock = self.inner.lock(); + lock = self.inner.lock_sharded_wheel(id); } } } - - lock.next_wake = lock - .wheel - .poll_at() - .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); - + let next_wake_up = lock.poll_at(); drop(lock); waker_list.wake_all(); + next_wake_up } /// Removes a registered timer from the driver. @@ -308,10 +355,10 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let mut lock = self.inner.lock(); + let mut lock = self.inner.lock_sharded_wheel(entry.as_ref().shard_id()); if entry.as_ref().might_be_registered() { - lock.wheel.remove(entry); + lock.remove(entry); } entry.as_ref().handle().fire(Ok(())); @@ -331,12 +378,12 @@ impl Handle { entry: NonNull, ) { let waker = unsafe { - let mut lock = self.inner.lock(); + let mut lock = self.inner.lock_sharded_wheel(entry.as_ref().shard_id()); // We may have raced with a firing/deregistration, so check before // deregistering. if unsafe { entry.as_ref().might_be_registered() } { - lock.wheel.remove(entry); + lock.remove(entry); } // Now that we have exclusive control of this entry, mint a handle to reinsert it. @@ -350,10 +397,12 @@ impl Handle { // Note: We don't have to worry about racing with some other resetting // thread, because add_entry and reregister require exclusive control of // the timer entry. - match unsafe { lock.wheel.insert(entry) } { + match unsafe { lock.insert(entry) } { Ok(when) => { - if lock + if self + .inner .next_wake + .load() .map(|next_wake| when < next_wake.get()) .unwrap_or(true) { @@ -389,15 +438,25 @@ impl Handle { // ===== impl Inner ===== impl Inner { - /// Locks the driver's inner structure - pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { - self.state.lock() + /// Locks the driver's sharded wheel structure. + pub(super) fn lock_sharded_wheel( + &self, + shard_id: u32, + ) -> crate::loom::sync::MutexGuard<'_, Wheel> { + let index = shard_id % (self.wheels.len() as u32); + // Safety: This modulo operation ensures that the index is not out of bounds. + unsafe { self.wheels.get_unchecked(index as usize).lock() } } // Check whether the driver has been shutdown pub(super) fn is_shutdown(&self) -> bool { self.is_shutdown.load(Ordering::SeqCst) } + + // Gets the number of shards. + fn get_shard_size(&self) -> u32 { + self.wheels.len() as u32 + } } impl fmt::Debug for Inner { diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 520dc00a462..676cf55f9c6 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -68,7 +68,7 @@ fn single_timer() { // This may or may not return Some (depending on how it races with the // thread). If it does return None, however, the timer should complete // synchronously. - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); + time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) @@ -102,7 +102,7 @@ fn drop_timer() { let clock = handle.inner.driver().clock(); // advance 2s in the future. - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); + time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) @@ -138,7 +138,7 @@ fn change_waker() { let clock = handle.inner.driver().clock(); // advance 2s - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); + time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) @@ -181,6 +181,7 @@ fn reset_future() { // This may or may not return a wakeup time. handle.process_at_time( + 0, handle .time_source() .instant_to_tick(start + Duration::from_millis(1500)), @@ -189,6 +190,7 @@ fn reset_future() { assert!(!finished_early.load(Ordering::Relaxed)); handle.process_at_time( + 0, handle .time_source() .instant_to_tick(start + Duration::from_millis(2500)), @@ -231,7 +233,7 @@ fn poll_process_levels() { } for t in 1..normal_or_miri(1024, 64) { - handle.inner.driver().time().process_at_time(t as u64); + handle.inner.driver().time().process_at_time(0, t as u64); for (deadline, future) in entries.iter_mut().enumerate() { let mut context = Context::from_waker(noop_waker_ref()); @@ -260,8 +262,8 @@ fn poll_process_levels_targeted() { let handle = handle.inner.driver().time(); - handle.process_at_time(62); + handle.process_at_time(0, 62); assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); - handle.process_at_time(192); - handle.process_at_time(192); + handle.process_at_time(0, 192); + handle.process_at_time(0, 192); } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index d821ec897cf..7cf371195ff 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -49,7 +49,7 @@ cfg_rt! { pub(crate) mod sharded_list; } -#[cfg(any(feature = "rt", feature = "macros"))] +#[cfg(any(feature = "rt", feature = "macros", feature = "time"))] pub(crate) mod rand; cfg_rt! { diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 67c45693c9c..aad85b973ff 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -71,6 +71,7 @@ impl FastRand { #[cfg(any( feature = "macros", feature = "rt-multi-thread", + feature = "time", all(feature = "sync", feature = "rt") ))] pub(crate) fn fastrand_n(&mut self, n: u32) -> u32 {