From ee626f5587dd2455c80f00519bc9bccb9eae3118 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Fri, 3 May 2024 23:18:06 +0800 Subject: [PATCH 01/31] first commit --- tokio/src/runtime/builder.rs | 7 +- tokio/src/runtime/driver.rs | 7 +- tokio/src/runtime/time/entry.rs | 14 +++- tokio/src/runtime/time/mod.rs | 143 ++++++++++++++++++++++---------- tokio/src/util/mod.rs | 2 +- 5 files changed, 121 insertions(+), 52 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 3b09c0d4b10..d3b1297e5bd 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -702,7 +702,7 @@ impl Builder { } } - fn get_cfg(&self) -> driver::Cfg { + fn get_cfg(&self, workers: usize) -> driver::Cfg { driver::Cfg { enable_pause_time: match self.kind { Kind::CurrentThread => true, @@ -715,6 +715,7 @@ impl Builder { enable_time: self.enable_time, start_paused: self.start_paused, nevents: self.nevents, + workers, } } @@ -1095,7 +1096,7 @@ impl Builder { use crate::runtime::scheduler::{self, CurrentThread}; use crate::runtime::{runtime::Scheduler, Config}; - let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; + let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?; // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); @@ -1248,7 +1249,7 @@ cfg_rt_multi_thread! { let core_threads = self.worker_threads.unwrap_or_else(num_cpus); - let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; + let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?; // Create the blocking pool let blocking_pool = diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 64928228b46..9f14823600c 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -40,6 +40,7 @@ pub(crate) struct Cfg { pub(crate) enable_pause_time: bool, pub(crate) start_paused: bool, pub(crate) nevents: usize, + pub(crate) workers: usize, } impl Driver { @@ -48,7 +49,8 @@ impl Driver { let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); - let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, &clock); + let (time_driver, time_handle) = + create_time_driver(cfg.enable_time, io_stack, &clock, cfg.workers); Ok(( Self { inner: time_driver }, @@ -306,9 +308,10 @@ cfg_time! { enable: bool, io_stack: IoStack, clock: &Clock, + workers: usize, ) -> (TimeDriver, TimeHandle) { if enable { - let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock); + let (driver, handle) = crate::runtime::time::Driver::new(io_stack, clock, workers as u32); (TimeDriver::Enabled { driver }, Some(handle)) } else { diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index a6be0e62a13..224cd36cd23 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -328,6 +328,8 @@ pub(super) type EntryList = crate::util::linked_list::LinkedList Self { + pub(super) fn new(shard_id: u32) -> Self { Self { + shard_id, cached_when: AtomicU64::new(0), true_when: AtomicU64::new(0), pointers: linked_list::Pointers::new(), @@ -443,6 +446,11 @@ impl TimerShared { pub(super) fn might_be_registered(&self) -> bool { self.state.might_be_registered() } + + /// Gets the shard id. + pub(super) fn shard_id(&self) -> u32 { + self.shard_id + } } unsafe impl linked_list::Link for TimerShared { @@ -490,8 +498,10 @@ impl TimerEntry { fn inner(&self) -> &TimerShared { let inner = unsafe { &*self.inner.get() }; if inner.is_none() { + let shard_id = + super::rand::thread_rng_n(self.driver.driver().time().inner.get_shard_size()); unsafe { - *self.inner.get() = Some(TimerShared::new()); + *self.inner.get() = Some(TimerShared::new(shard_id)); } } return inner.as_ref().unwrap(); diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 8cd51c5cb4a..d1e20bf3bc0 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -12,6 +12,7 @@ use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION}; mod handle; pub(crate) use self::handle::Handle; +use self::wheel::Wheel; mod source; pub(crate) use source::TimeSource; @@ -26,6 +27,7 @@ use crate::time::{Clock, Duration}; use crate::util::WakeList; use std::fmt; +use std::sync::atomic::AtomicU64; use std::{num::NonZeroU64, ptr::NonNull}; /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. @@ -91,8 +93,11 @@ pub(crate) struct Driver { /// Timer state shared between `Driver`, `Handle`, and `Registration`. struct Inner { - // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex - pub(super) state: Mutex, + /// The earliest time at which we promise to wake up without unparking. + next_wake: AtomicU64, + + /// Sharded Timer wheels. + wheels: Box<[Mutex]>, /// True if the driver is being shutdown. pub(super) is_shutdown: AtomicBool, @@ -107,15 +112,6 @@ struct Inner { did_wake: AtomicBool, } -/// Time state shared which must be protected by a `Mutex` -struct InnerState { - /// The earliest time at which we promise to wake up without unparking. - next_wake: Option, - - /// Timer wheel. - wheel: wheel::Wheel, -} - // ===== impl Driver ===== impl Driver { @@ -123,18 +119,20 @@ impl Driver { /// thread and `time_source` to get the current time and convert to ticks. /// /// Specifying the source of time is useful when testing. - pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) { + pub(crate) fn new(park: IoStack, clock: &Clock, shards: u32) -> (Driver, Handle) { let time_source = TimeSource::new(clock); + let mut wheels = vec![]; + for _ in 0..shards { + wheels.push(Mutex::new(wheel::Wheel::new())); + } + let handle = Handle { time_source, inner: Inner { - state: Mutex::new(InnerState { - next_wake: None, - wheel: wheel::Wheel::new(), - }), + next_wake: AtomicU64::new(0), + wheels: wheels.into_boxed_slice(), is_shutdown: AtomicBool::new(false), - #[cfg(feature = "test-util")] did_wake: AtomicBool::new(false), }, @@ -171,15 +169,24 @@ impl Driver { fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option) { let handle = rt_handle.time(); - let mut lock = handle.inner.state.lock(); - assert!(!handle.is_shutdown()); - let next_wake = lock.wheel.next_expiration_time(); - lock.next_wake = - next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + // Finds out the min expiration time to park. + let mut next_wake: Option = None; + for id in 0..rt_handle.time().inner.get_shard_size() { + let lock = unsafe { rt_handle.time().inner.lock_sharded_wheel(id) }; - drop(lock); + if let Some(expiration_time) = lock.next_expiration_time() { + next_wake = Some(match next_wake { + Some(t) => t.min(expiration_time), + None => expiration_time, + }); + } + } + + rt_handle.time().inner.set_next_wake( + next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())), + ); match next_wake { Some(when) => { @@ -253,22 +260,30 @@ impl Handle { self.process_at_time(now); } - pub(self) fn process_at_time(&self, mut now: u64) { - let mut waker_list = WakeList::new(); + pub(self) fn process_at_time(&self, now: u64) { + let shards = self.inner.get_shard_size(); + // For fairness, randomly select one to start. + let start = rand::thread_rng_n(shards); + for i in start..shards + start { + self.process_at_sharded_time(i, now); + } + } - let mut lock = self.inner.lock(); + pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) { + let mut waker_list = WakeList::new(); + let mut lock = self.inner.lock_sharded_wheel(id); - if now < lock.wheel.elapsed() { + if now < lock.elapsed() { // Time went backwards! This normally shouldn't happen as the Rust language // guarantees that an Instant is monotonic, but can happen when running // Linux in a VM on a Windows host due to std incorrectly trusting the // hardware clock to be monotonic. // // See for more information. - now = lock.wheel.elapsed(); + now = lock.elapsed(); } - while let Some(entry) = lock.wheel.poll(now) { + while let Some(entry) = lock.poll(now) { debug_assert!(unsafe { entry.is_pending() }); // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. @@ -281,15 +296,15 @@ impl Handle { waker_list.wake_all(); - lock = self.inner.lock(); + lock = self.inner.lock_sharded_wheel(id); } } } - lock.next_wake = lock - .wheel - .poll_at() - .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + self.inner.set_next_wake( + lock.poll_at() + .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())), + ); drop(lock); @@ -308,10 +323,10 @@ impl Handle { /// `add_entry` must not be called concurrently. pub(self) unsafe fn clear_entry(&self, entry: NonNull) { unsafe { - let mut lock = self.inner.lock(); + let mut lock = self.inner.lock_sharded_wheel(entry.as_ref().shard_id()); if entry.as_ref().might_be_registered() { - lock.wheel.remove(entry); + lock.remove(entry); } entry.as_ref().handle().fire(Ok(())); @@ -331,12 +346,12 @@ impl Handle { entry: NonNull, ) { let waker = unsafe { - let mut lock = self.inner.lock(); + let mut lock = self.inner.lock_sharded_wheel(entry.as_ref().shard_id()); // We may have raced with a firing/deregistration, so check before // deregistering. if unsafe { entry.as_ref().might_be_registered() } { - lock.wheel.remove(entry); + lock.remove(entry); } // Now that we have exclusive control of this entry, mint a handle to reinsert it. @@ -350,10 +365,11 @@ impl Handle { // Note: We don't have to worry about racing with some other resetting // thread, because add_entry and reregister require exclusive control of // the timer entry. - match unsafe { lock.wheel.insert(entry) } { + match unsafe { lock.insert(entry) } { Ok(when) => { - if lock - .next_wake + if self + .inner + .get_next_wake() .map(|next_wake| when < next_wake.get()) .unwrap_or(true) { @@ -389,15 +405,35 @@ impl Handle { // ===== impl Inner ===== impl Inner { - /// Locks the driver's inner structure - pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { - self.state.lock() + /// Locks the driver's sharded wheel structure. + pub(super) fn lock_sharded_wheel( + &self, + shard_id: u32, + ) -> crate::loom::sync::MutexGuard<'_, Wheel> { + let index = shard_id % (self.wheels.len() as u32); + // Safety: This modulo operation ensures that the index is not out of bounds. + unsafe { self.wheels.get_unchecked(index as usize).lock() } } // Check whether the driver has been shutdown pub(super) fn is_shutdown(&self) -> bool { self.is_shutdown.load(Ordering::SeqCst) } + + // Gets the number of shards. + fn get_shard_size(&self) -> u32 { + self.wheels.len() as u32 + } + + fn set_next_wake(&self, value: Option) { + let val = value.map(|v| v.get()).unwrap_or(0); + self.next_wake.store(val, Ordering::Relaxed); + } + + fn get_next_wake(&self) -> Option { + let val = self.next_wake.load(Ordering::Relaxed); + NonZeroU64::new(val) + } } impl fmt::Debug for Inner { @@ -406,5 +442,24 @@ impl fmt::Debug for Inner { } } +mod rand { + use crate::util::rand::FastRand; + use std::cell::Cell; + + // Used by `TimerEntry`. + pub(crate) fn thread_rng_n(n: u32) -> u32 { + thread_local! { + static THREAD_RNG: Cell = Cell::new(FastRand::new()); + } + + THREAD_RNG.with(|cell_rng| { + let mut rng = cell_rng.get(); + let ret = rng.fastrand_n(n); + cell_rng.set(rng); + ret + }) + } +} + #[cfg(test)] mod tests; diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index d821ec897cf..7cf371195ff 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -49,7 +49,7 @@ cfg_rt! { pub(crate) mod sharded_list; } -#[cfg(any(feature = "rt", feature = "macros"))] +#[cfg(any(feature = "rt", feature = "macros", feature = "time"))] pub(crate) mod rand; cfg_rt! { From ed42fae8ea3717788bd3b4606f0b06b6a5e887aa Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sat, 4 May 2024 15:15:47 +0800 Subject: [PATCH 02/31] fix args in build_alt_threaded_runtime --- tokio/src/runtime/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index d3b1297e5bd..05f736d3e50 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1296,7 +1296,7 @@ cfg_rt_multi_thread! { use crate::runtime::scheduler::MultiThreadAlt; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); - let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; + let (driver, driver_handle) = driver::Driver::new(self.get_cfg(core_threads))?; // Create the blocking pool let blocking_pool = From 1915c5763d03ef0c847f5be5f941d77907617a81 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sat, 4 May 2024 15:28:17 +0800 Subject: [PATCH 03/31] fix ci --- tokio/src/runtime/driver.rs | 1 + tokio/src/runtime/time/mod.rs | 2 +- tokio/src/util/rand.rs | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 9f14823600c..11aa7abb7b7 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -364,6 +364,7 @@ cfg_not_time! { _enable: bool, io_stack: IoStack, _clock: &Clock, + _workers: usize, ) -> (TimeDriver, TimeHandle) { (io_stack, ()) } diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index d1e20bf3bc0..eedf45c9d2b 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -26,8 +26,8 @@ use crate::time::error::Error; use crate::time::{Clock, Duration}; use crate::util::WakeList; +use crate::loom::sync::atomic::AtomicU64; use std::fmt; -use std::sync::atomic::AtomicU64; use std::{num::NonZeroU64, ptr::NonNull}; /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 67c45693c9c..aad85b973ff 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -71,6 +71,7 @@ impl FastRand { #[cfg(any( feature = "macros", feature = "rt-multi-thread", + feature = "time", all(feature = "sync", feature = "rt") ))] pub(crate) fn fastrand_n(&mut self, n: u32) -> u32 { From 3f9a443cc2b3a3c5361e1a27c7df7189a8a326cb Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sat, 4 May 2024 20:54:15 +0800 Subject: [PATCH 04/31] adopt code review suggestions from FrankReh --- tokio/src/runtime/time/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index eedf45c9d2b..94979d3f7f1 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -120,12 +120,12 @@ impl Driver { /// /// Specifying the source of time is useful when testing. pub(crate) fn new(park: IoStack, clock: &Clock, shards: u32) -> (Driver, Handle) { - let time_source = TimeSource::new(clock); + assert!(shards > 0); - let mut wheels = vec![]; - for _ in 0..shards { - wheels.push(Mutex::new(wheel::Wheel::new())); - } + let time_source = TimeSource::new(clock); + let wheels: Vec<_> = (0..shards) + .map(|_| Mutex::new(wheel::Wheel::new())) + .collect(); let handle = Handle { time_source, @@ -174,7 +174,7 @@ impl Driver { // Finds out the min expiration time to park. let mut next_wake: Option = None; for id in 0..rt_handle.time().inner.get_shard_size() { - let lock = unsafe { rt_handle.time().inner.lock_sharded_wheel(id) }; + let lock = rt_handle.time().inner.lock_sharded_wheel(id); if let Some(expiration_time) = lock.next_expiration_time() { next_wake = Some(match next_wake { From 5954e4f3114b72c85971512d506656b943cd8c40 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 10:24:01 +0800 Subject: [PATCH 05/31] fix set_next_wake_up&& small refactor --- tokio/src/runtime/time/mod.rs | 39 ++++++++++++++++------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 94979d3f7f1..6af10eb4d85 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -172,17 +172,12 @@ impl Driver { assert!(!handle.is_shutdown()); // Finds out the min expiration time to park. - let mut next_wake: Option = None; - for id in 0..rt_handle.time().inner.get_shard_size() { - let lock = rt_handle.time().inner.lock_sharded_wheel(id); - - if let Some(expiration_time) = lock.next_expiration_time() { - next_wake = Some(match next_wake { - Some(t) => t.min(expiration_time), - None => expiration_time, - }); - } - } + let next_wake = (0..rt_handle.time().inner.get_shard_size()) + .filter_map(|id| { + let lock = rt_handle.time().inner.lock_sharded_wheel(id); + lock.next_expiration_time() + }) + .min(); rt_handle.time().inner.set_next_wake( next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())), @@ -264,12 +259,16 @@ impl Handle { let shards = self.inner.get_shard_size(); // For fairness, randomly select one to start. let start = rand::thread_rng_n(shards); - for i in start..shards + start { - self.process_at_sharded_time(i, now); - } + + let next_wake_up = (start..shards + start) + .filter_map(|i| self.process_at_sharded_time(i, now)) + .min(); + + self.inner.set_next_wake(next_wake_up); } - pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) { + // Returns the next wakeup time of this shard. + pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) -> Option { let mut waker_list = WakeList::new(); let mut lock = self.inner.lock_sharded_wheel(id); @@ -300,15 +299,13 @@ impl Handle { } } } - - self.inner.set_next_wake( - lock.poll_at() - .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())), - ); - + let next_wake_up = lock + .poll_at() + .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); drop(lock); waker_list.wake_all(); + next_wake_up } /// Removes a registered timer from the driver. From 565432b6ee1211a3abc1aca8d9e6343e53f6479b Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 14:09:37 +0800 Subject: [PATCH 06/31] thread-local id --- .../src/runtime/scheduler/multi_thread/worker.rs | 4 ++++ tokio/src/runtime/time/entry.rs | 15 +++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 83e70795f4f..8d1c4dddb49 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -742,6 +742,10 @@ impl Context { pub(crate) fn defer(&self, waker: &Waker) { self.defer.defer(waker); } + + pub(crate) fn get_worker_index(&self) -> usize { + self.worker.index + } } impl Core { diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 224cd36cd23..955fbfa3049 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -58,6 +58,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::atomic::Ordering; +use crate::runtime::context; use crate::runtime::scheduler; use crate::sync::AtomicWaker; use crate::time::Instant; @@ -498,8 +499,7 @@ impl TimerEntry { fn inner(&self) -> &TimerShared { let inner = unsafe { &*self.inner.get() }; if inner.is_none() { - let shard_id = - super::rand::thread_rng_n(self.driver.driver().time().inner.get_shard_size()); + let shard_id = self.get_shard_id(); unsafe { *self.inner.get() = Some(TimerShared::new(shard_id)); } @@ -507,6 +507,17 @@ impl TimerEntry { return inner.as_ref().unwrap(); } + fn get_shard_id(&self) -> u32 { + let shard_size = self.driver.driver().time().inner.get_shard_size(); + use scheduler::Context::MultiThread; + let id = context::with_scheduler(|ctx| match ctx { + Some(MultiThread(ctx)) => ctx.get_worker_index() as u32, + Some(_) => 1, + _ => super::rand::thread_rng_n(shard_size), + }); + id % shard_size + } + pub(crate) fn deadline(&self) -> Instant { self.deadline } From 71bb30c7169d31d83b82c74f2a4ac33178aa0d19 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 14:53:39 +0800 Subject: [PATCH 07/31] use crate::runtime::context::thread_rng_n --- tokio/src/runtime/context.rs | 6 +++++- tokio/src/runtime/time/entry.rs | 11 +++++++++-- tokio/src/runtime/time/mod.rs | 21 +-------------------- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 62e4fc9474c..f639173b212 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -121,7 +121,11 @@ tokio_thread_local! { } } -#[cfg(any(feature = "macros", all(feature = "sync", feature = "rt")))] +#[cfg(any( + feature = "time", + feature = "macros", + all(feature = "sync", feature = "rt") +))] pub(crate) fn thread_rng_n(n: u32) -> u32 { CONTEXT.with(|ctx| { let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new); diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 955fbfa3049..72b04033a0c 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -507,13 +507,20 @@ impl TimerEntry { return inner.as_ref().unwrap(); } + // Gets the shard id. If current thread is a worker thread, we use its worker index as a shard id. + // Otherwise, we use a random number generator to obtain the shard id. fn get_shard_id(&self) -> u32 { let shard_size = self.driver.driver().time().inner.get_shard_size(); use scheduler::Context::MultiThread; + + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] + use scheduler::Context::MultiThreadAlt; let id = context::with_scheduler(|ctx| match ctx { Some(MultiThread(ctx)) => ctx.get_worker_index() as u32, - Some(_) => 1, - _ => super::rand::thread_rng_n(shard_size), + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] + Some(MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, + Some(_) => 0, + _ => crate::runtime::context::thread_rng_n(shard_size), }); id % shard_size } diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 6af10eb4d85..d6058b5881f 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -258,7 +258,7 @@ impl Handle { pub(self) fn process_at_time(&self, now: u64) { let shards = self.inner.get_shard_size(); // For fairness, randomly select one to start. - let start = rand::thread_rng_n(shards); + let start = crate::runtime::context::thread_rng_n(shards); let next_wake_up = (start..shards + start) .filter_map(|i| self.process_at_sharded_time(i, now)) @@ -439,24 +439,5 @@ impl fmt::Debug for Inner { } } -mod rand { - use crate::util::rand::FastRand; - use std::cell::Cell; - - // Used by `TimerEntry`. - pub(crate) fn thread_rng_n(n: u32) -> u32 { - thread_local! { - static THREAD_RNG: Cell = Cell::new(FastRand::new()); - } - - THREAD_RNG.with(|cell_rng| { - let mut rng = cell_rng.get(); - let ret = rng.fastrand_n(n); - cell_rng.set(rng); - ret - }) - } -} - #[cfg(test)] mod tests; From 8bf56ca4d3ca68aa69eba231e7d08fb92c4e6bd0 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 15:00:16 +0800 Subject: [PATCH 08/31] rm feature --- tokio/src/util/mod.rs | 2 +- tokio/src/util/rand.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 7cf371195ff..d821ec897cf 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -49,7 +49,7 @@ cfg_rt! { pub(crate) mod sharded_list; } -#[cfg(any(feature = "rt", feature = "macros", feature = "time"))] +#[cfg(any(feature = "rt", feature = "macros"))] pub(crate) mod rand; cfg_rt! { diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index aad85b973ff..67c45693c9c 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -71,7 +71,6 @@ impl FastRand { #[cfg(any( feature = "macros", feature = "rt-multi-thread", - feature = "time", all(feature = "sync", feature = "rt") ))] pub(crate) fn fastrand_n(&mut self, n: u32) -> u32 { From e4288c6070efb8cbf1eb2fec54c60bcdf5849da1 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 15:05:19 +0800 Subject: [PATCH 09/31] fix multi_thread_alt --- tokio/src/runtime/scheduler/multi_thread_alt/worker.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index c315e382291..f7871d10e64 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1311,6 +1311,10 @@ impl Context { fn shared(&self) -> &Shared { &self.handle.shared } + + pub(crate) fn get_worker_index(&self) -> usize { + self.index + } } impl Core { From 7cdddd62ebb9aa1a40efa0bdabd33ea04a3fabeb Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 15:10:31 +0800 Subject: [PATCH 10/31] ci: allow(dead_code) --- tokio/src/runtime/scheduler/multi_thread/worker.rs | 3 ++- tokio/src/runtime/scheduler/multi_thread_alt/worker.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8d1c4dddb49..9aa6077fa00 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -742,7 +742,8 @@ impl Context { pub(crate) fn defer(&self, waker: &Waker) { self.defer.defer(waker); } - + + #[allow(dead_code)] pub(crate) fn get_worker_index(&self) -> usize { self.worker.index } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index f7871d10e64..e02dee8679c 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1311,7 +1311,8 @@ impl Context { fn shared(&self) -> &Shared { &self.handle.shared } - + + #[allow(dead_code)] pub(crate) fn get_worker_index(&self) -> usize { self.index } From 178baeb16ac8ec0bbb6e395f462464bac3cced86 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 15:12:05 +0800 Subject: [PATCH 11/31] ci: rustfmt --- tokio/src/runtime/scheduler/multi_thread/worker.rs | 2 +- tokio/src/runtime/scheduler/multi_thread_alt/worker.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 9aa6077fa00..9f0dd98dfdc 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -742,7 +742,7 @@ impl Context { pub(crate) fn defer(&self, waker: &Waker) { self.defer.defer(waker); } - + #[allow(dead_code)] pub(crate) fn get_worker_index(&self) -> usize { self.worker.index diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index e02dee8679c..c9e5beed2be 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1311,7 +1311,7 @@ impl Context { fn shared(&self) -> &Shared { &self.handle.shared } - + #[allow(dead_code)] pub(crate) fn get_worker_index(&self) -> usize { self.index From 2de08f295f22c72102f8029bbb07c671586ebc23 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 15:37:31 +0800 Subject: [PATCH 12/31] fix: condition feature --- tokio/src/runtime/time/entry.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 72b04033a0c..b92524bc925 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -511,15 +511,13 @@ impl TimerEntry { // Otherwise, we use a random number generator to obtain the shard id. fn get_shard_id(&self) -> u32 { let shard_size = self.driver.driver().time().inner.get_shard_size(); - use scheduler::Context::MultiThread; - - #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] - use scheduler::Context::MultiThreadAlt; let id = context::with_scheduler(|ctx| match ctx { - Some(MultiThread(ctx)) => ctx.get_worker_index() as u32, + #[cfg(feature = "rt")] + Some(scheduler::Context::CurrentThread(ctx)) => 0, + #[cfg(feature = "rt-multi-thread")] + Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] - Some(MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, - Some(_) => 0, + Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, _ => crate::runtime::context::thread_rng_n(shard_size), }); id % shard_size From c808e815c609f91dd289716b1dde507c7945b18b Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 15:39:15 +0800 Subject: [PATCH 13/31] fix: ci --- tokio/src/runtime/time/entry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index b92524bc925..1a6af26f35a 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -513,7 +513,7 @@ impl TimerEntry { let shard_size = self.driver.driver().time().inner.get_shard_size(); let id = context::with_scheduler(|ctx| match ctx { #[cfg(feature = "rt")] - Some(scheduler::Context::CurrentThread(ctx)) => 0, + Some(scheduler::Context::CurrentThread(_ctx)) => 0, #[cfg(feature = "rt-multi-thread")] Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] From 11ce15afbd2432775e6210bb18f5422bc5966687 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 15:50:54 +0800 Subject: [PATCH 14/31] use cfg_rt and cfg_not_rt --- tokio/src/runtime/time/entry.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 1a6af26f35a..1d1e787278b 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -509,18 +509,27 @@ impl TimerEntry { // Gets the shard id. If current thread is a worker thread, we use its worker index as a shard id. // Otherwise, we use a random number generator to obtain the shard id. - fn get_shard_id(&self) -> u32 { - let shard_size = self.driver.driver().time().inner.get_shard_size(); - let id = context::with_scheduler(|ctx| match ctx { - #[cfg(feature = "rt")] - Some(scheduler::Context::CurrentThread(_ctx)) => 0, - #[cfg(feature = "rt-multi-thread")] - Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, - #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] - Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, - _ => crate::runtime::context::thread_rng_n(shard_size), - }); - id % shard_size + cfg_rt! { + fn get_shard_id(&self) -> u32 { + let shard_size = self.driver.driver().time().inner.get_shard_size(); + let id = context::with_scheduler(|ctx| match ctx { + Some(scheduler::Context::CurrentThread(_ctx)) => 0, + #[cfg(feature = "rt-multi-thread")] + Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] + Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, + _ => crate::runtime::context::thread_rng_n(shard_size), + }); + id % shard_size + } + } + + cfg_not_rt! { + fn get_shard_id(&self) -> u32 { + let shard_size = self.driver.driver().time().inner.get_shard_size(); + let id = crate::runtime::context::thread_rng_n(shard_size); + id % shard_size + } } pub(crate) fn deadline(&self) -> Instant { From 1d309bf0eaa69b7ea29080d9d1b178078b60b0a3 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 15:56:09 +0800 Subject: [PATCH 15/31] add time feature --- tokio/src/runtime/context.rs | 2 +- tokio/src/util/rand.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index f639173b212..59520db1ed9 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -57,7 +57,7 @@ struct Context { #[cfg(feature = "rt")] runtime: Cell, - #[cfg(any(feature = "rt", feature = "macros"))] + #[cfg(any(feature = "rt", feature = "macros", feature = "time"))] rng: Cell>, /// Tracks the amount of "work" a task may still do before yielding back to diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 67c45693c9c..aad85b973ff 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -71,6 +71,7 @@ impl FastRand { #[cfg(any( feature = "macros", feature = "rt-multi-thread", + feature = "time", all(feature = "sync", feature = "rt") ))] pub(crate) fn fastrand_n(&mut self, n: u32) -> u32 { From 256c2790db2e0f110926f7f37e8d6fab68d9e3a9 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 16:05:12 +0800 Subject: [PATCH 16/31] add time feature again --- tokio/src/runtime/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 59520db1ed9..bde5b339545 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -3,7 +3,7 @@ use crate::runtime::coop; use std::cell::Cell; -#[cfg(any(feature = "rt", feature = "macros"))] +#[cfg(any(feature = "rt", feature = "macros", feature = "time"))] use crate::util::rand::FastRand; cfg_rt! { From 6c8aa239081fca2af61e1adef41fda2d2d45316d Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 16:27:39 +0800 Subject: [PATCH 17/31] do not use random in context when shutdown --- tokio/src/runtime/time/mod.rs | 12 ++++++------ tokio/src/runtime/time/tests/mod.rs | 16 +++++++++------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index d6058b5881f..a730f283de5 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -162,7 +162,7 @@ impl Driver { // Advance time forward to the end of time. - handle.process_at_time(u64::MAX); + handle.process_at_time(0, u64::MAX); self.park.shutdown(rt_handle); } @@ -251,14 +251,14 @@ impl Handle { /// Runs timer related logic, and returns the next wakeup time pub(self) fn process(&self, clock: &Clock) { let now = self.time_source().now(clock); - - self.process_at_time(now); + // For fairness, randomly select one to start. + let shards = self.inner.get_shard_size(); + let start = crate::runtime::context::thread_rng_n(shards); + self.process_at_time(start, now); } - pub(self) fn process_at_time(&self, now: u64) { + pub(self) fn process_at_time(&self, start: u32, now: u64) { let shards = self.inner.get_shard_size(); - // For fairness, randomly select one to start. - let start = crate::runtime::context::thread_rng_n(shards); let next_wake_up = (start..shards + start) .filter_map(|i| self.process_at_sharded_time(i, now)) diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 520dc00a462..676cf55f9c6 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -68,7 +68,7 @@ fn single_timer() { // This may or may not return Some (depending on how it races with the // thread). If it does return None, however, the timer should complete // synchronously. - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); + time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) @@ -102,7 +102,7 @@ fn drop_timer() { let clock = handle.inner.driver().clock(); // advance 2s in the future. - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); + time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) @@ -138,7 +138,7 @@ fn change_waker() { let clock = handle.inner.driver().clock(); // advance 2s - time.process_at_time(time.time_source().now(clock) + 2_000_000_000); + time.process_at_time(0, time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); }) @@ -181,6 +181,7 @@ fn reset_future() { // This may or may not return a wakeup time. handle.process_at_time( + 0, handle .time_source() .instant_to_tick(start + Duration::from_millis(1500)), @@ -189,6 +190,7 @@ fn reset_future() { assert!(!finished_early.load(Ordering::Relaxed)); handle.process_at_time( + 0, handle .time_source() .instant_to_tick(start + Duration::from_millis(2500)), @@ -231,7 +233,7 @@ fn poll_process_levels() { } for t in 1..normal_or_miri(1024, 64) { - handle.inner.driver().time().process_at_time(t as u64); + handle.inner.driver().time().process_at_time(0, t as u64); for (deadline, future) in entries.iter_mut().enumerate() { let mut context = Context::from_waker(noop_waker_ref()); @@ -260,8 +262,8 @@ fn poll_process_levels_targeted() { let handle = handle.inner.driver().time(); - handle.process_at_time(62); + handle.process_at_time(0, 62); assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); - handle.process_at_time(192); - handle.process_at_time(192); + handle.process_at_time(0, 192); + handle.process_at_time(0, 192); } From 1fb35c938a213528ebd760acead04df211482e57 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 16:34:15 +0800 Subject: [PATCH 18/31] add time feature --- tokio/src/util/rand.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index aad85b973ff..985396f41ef 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -1,5 +1,7 @@ +#[cfg(any(feature = "rt", feature = "time"))] +mod rt; + cfg_rt! { - mod rt; pub(crate) use rt::RngSeedGenerator; cfg_unstable! { From 2497479bcd31b379c9d8f3370dae618c4290d7e3 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 16:45:14 +0800 Subject: [PATCH 19/31] fix time feature --- tokio/src/runtime/context.rs | 2 +- tokio/src/runtime/time/entry.rs | 2 +- tokio/src/util/mod.rs | 2 +- tokio/src/util/rand.rs | 4 +--- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index bde5b339545..76918114bc3 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -100,7 +100,7 @@ tokio_thread_local! { #[cfg(feature = "rt")] runtime: Cell::new(EnterRuntime::NotEntered), - #[cfg(any(feature = "rt", feature = "macros"))] + #[cfg(any(feature = "rt", feature = "macros", feature = "time"))] rng: Cell::new(None), budget: Cell::new(coop::Budget::unconstrained()), diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 1d1e787278b..4b3127b765f 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -527,7 +527,7 @@ impl TimerEntry { cfg_not_rt! { fn get_shard_id(&self) -> u32 { let shard_size = self.driver.driver().time().inner.get_shard_size(); - let id = crate::runtime::context::thread_rng_n(shard_size); + let id = context::thread_rng_n(shard_size); id % shard_size } } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index d821ec897cf..7cf371195ff 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -49,7 +49,7 @@ cfg_rt! { pub(crate) mod sharded_list; } -#[cfg(any(feature = "rt", feature = "macros"))] +#[cfg(any(feature = "rt", feature = "macros", feature = "time"))] pub(crate) mod rand; cfg_rt! { diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 985396f41ef..aad85b973ff 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -1,7 +1,5 @@ -#[cfg(any(feature = "rt", feature = "time"))] -mod rt; - cfg_rt! { + mod rt; pub(crate) use rt::RngSeedGenerator; cfg_unstable! { From 18dd900d9961c162d99b0ac9257d72b049869a28 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Sun, 5 May 2024 16:57:30 +0800 Subject: [PATCH 20/31] use context::thread_rng_n --- tokio/src/runtime/time/entry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 4b3127b765f..906dac0a04b 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -518,7 +518,7 @@ impl TimerEntry { Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, - _ => crate::runtime::context::thread_rng_n(shard_size), + _ => context::thread_rng_n(shard_size), }); id % shard_size } From fea8e32cdde40ebd4c2813ab92d8375d1cf99ee5 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Tue, 14 May 2024 19:08:28 +0800 Subject: [PATCH 21/31] feat: try to avoid lock in clear_entry --- tokio/src/runtime/time/entry.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 906dac0a04b..6164683b43d 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -542,8 +542,9 @@ impl TimerEntry { /// Cancels and deregisters the timer. This operation is irreversible. pub(crate) fn cancel(self: Pin<&mut Self>) { - // Avoid calling the `clear_entry` method, because it has not been initialized yet. - if !self.is_inner_init() { + // Avoid calling the `clear_entry` method, because it has not been + // initialized or registered. + if !self.is_inner_init() || !self.inner().might_be_registered() { return; } // We need to perform an acq/rel fence with the driver thread, and the From 5ecfd75c1819083ae8feb24dc24d7c5ee33e7905 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Tue, 14 May 2024 19:20:34 +0800 Subject: [PATCH 22/31] revert head --- tokio/src/runtime/time/entry.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 6164683b43d..906dac0a04b 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -542,9 +542,8 @@ impl TimerEntry { /// Cancels and deregisters the timer. This operation is irreversible. pub(crate) fn cancel(self: Pin<&mut Self>) { - // Avoid calling the `clear_entry` method, because it has not been - // initialized or registered. - if !self.is_inner_init() || !self.inner().might_be_registered() { + // Avoid calling the `clear_entry` method, because it has not been initialized yet. + if !self.is_inner_init() { return; } // We need to perform an acq/rel fence with the driver thread, and the From 16ff7ae59e1952237ae00bbcbdc2a289b13f4420 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Fri, 17 May 2024 21:30:51 +0800 Subject: [PATCH 23/31] style: rename get_shard_id method to generate_shard_id function --- tokio/src/runtime/time/entry.rs | 50 ++++++++++++++++----------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 17c9f3ff933..2028f2137b7 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -494,7 +494,8 @@ impl TimerEntry { fn inner(&self) -> &TimerShared { let inner = unsafe { &*self.inner.get() }; if inner.is_none() { - let shard_id = self.get_shard_id(); + let shard_size = self.driver.driver().time().inner.get_shard_size(); + let shard_id = generate_shard_id(shard_size); unsafe { *self.inner.get() = Some(TimerShared::new(shard_id)); } @@ -502,31 +503,6 @@ impl TimerEntry { return inner.as_ref().unwrap(); } - // Gets the shard id. If current thread is a worker thread, we use its worker index as a shard id. - // Otherwise, we use a random number generator to obtain the shard id. - cfg_rt! { - fn get_shard_id(&self) -> u32 { - let shard_size = self.driver.driver().time().inner.get_shard_size(); - let id = context::with_scheduler(|ctx| match ctx { - Some(scheduler::Context::CurrentThread(_ctx)) => 0, - #[cfg(feature = "rt-multi-thread")] - Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, - #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] - Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, - _ => context::thread_rng_n(shard_size), - }); - id % shard_size - } - } - - cfg_not_rt! { - fn get_shard_id(&self) -> u32 { - let shard_size = self.driver.driver().time().inner.get_shard_size(); - let id = context::thread_rng_n(shard_size); - id % shard_size - } - } - pub(crate) fn deadline(&self) -> Instant { self.deadline } @@ -678,3 +654,25 @@ impl Drop for TimerEntry { unsafe { Pin::new_unchecked(self) }.as_mut().cancel(); } } + +// Generates a shard id. If current thread is a worker thread, we use its worker index as a shard id. +// Otherwise, we use a random number generator to obtain the shard id. +cfg_rt! { + fn generate_shard_id(shard_size: u32) -> u32 { + let id = context::with_scheduler(|ctx| match ctx { + Some(scheduler::Context::CurrentThread(_ctx)) => 0, + #[cfg(feature = "rt-multi-thread")] + Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, + #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] + Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, + _ => context::thread_rng_n(shard_size), + }); + id % shard_size + } +} + +cfg_not_rt! { + fn generate_shard_id(shard_size: usize) -> u32 { + context::thread_rng_n(shard_size) % shard_size + } +} From b5d8ca1bf907c4e8be2b5e0d2711cf2af0640520 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Fri, 17 May 2024 21:38:24 +0800 Subject: [PATCH 24/31] fix: change shard_size type from usize to u32 --- tokio/src/runtime/time/entry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 2028f2137b7..af415059773 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -672,7 +672,7 @@ cfg_rt! { } cfg_not_rt! { - fn generate_shard_id(shard_size: usize) -> u32 { + fn generate_shard_id(shard_size: u32) -> u32 { context::thread_rng_n(shard_size) % shard_size } } From ba8fc74cf2abdf431672fdd62af2303f9551555d Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Fri, 17 May 2024 21:40:06 +0800 Subject: [PATCH 25/31] rm the unnecessary mold operation --- tokio/src/runtime/time/entry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index af415059773..c1301d66525 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -673,6 +673,6 @@ cfg_rt! { cfg_not_rt! { fn generate_shard_id(shard_size: u32) -> u32 { - context::thread_rng_n(shard_size) % shard_size + context::thread_rng_n(shard_size) } } From 4a2c88ec40155e4d01b618c26994264988705a03 Mon Sep 17 00:00:00 2001 From: Weijia Jiang Date: Mon, 20 May 2024 19:13:41 +0800 Subject: [PATCH 26/31] Update tokio/src/runtime/time/entry.rs Co-authored-by: Alice Ryhl --- tokio/src/runtime/time/entry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index c1301d66525..834077caa3d 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -665,7 +665,7 @@ cfg_rt! { Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))] Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, - _ => context::thread_rng_n(shard_size), + None => context::thread_rng_n(shard_size), }); id % shard_size } From c069805b1fcbec120e8fb0c56e6f9da7e01d90fe Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Mon, 20 May 2024 22:07:02 +0800 Subject: [PATCH 27/31] feat: add AtomicOptionNonZeroU64 helper type --- tokio/src/runtime/time/mod.rs | 56 ++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index a730f283de5..84c870b44cf 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -30,6 +30,33 @@ use crate::loom::sync::atomic::AtomicU64; use std::fmt; use std::{num::NonZeroU64, ptr::NonNull}; +struct AtomicOptionNonZeroU64(AtomicU64); + +// A helper type to store the `next_wake`. +impl AtomicOptionNonZeroU64 { + fn new(val: Option) -> Self { + Self { + 0: AtomicU64::new(Self::turn(val)), + } + } + + fn store(&self, val: Option) { + self.0.store(Self::turn(val), Ordering::Relaxed); + } + + fn load(&self) -> Option { + NonZeroU64::new(self.0.load(Ordering::Relaxed)) + } + + fn turn(val: Option) -> u64 { + match val { + Some(0) => 1, + Some(i) => i, + None => 0, + } + } +} + /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. /// /// A `Driver` instance tracks the state necessary for managing time and @@ -94,7 +121,7 @@ pub(crate) struct Driver { /// Timer state shared between `Driver`, `Handle`, and `Registration`. struct Inner { /// The earliest time at which we promise to wake up without unparking. - next_wake: AtomicU64, + next_wake: AtomicOptionNonZeroU64, /// Sharded Timer wheels. wheels: Box<[Mutex]>, @@ -130,7 +157,7 @@ impl Driver { let handle = Handle { time_source, inner: Inner { - next_wake: AtomicU64::new(0), + next_wake: AtomicOptionNonZeroU64::new(Some(0)), wheels: wheels.into_boxed_slice(), is_shutdown: AtomicBool::new(false), #[cfg(feature = "test-util")] @@ -179,9 +206,7 @@ impl Driver { }) .min(); - rt_handle.time().inner.set_next_wake( - next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())), - ); + rt_handle.time().inner.next_wake.store(next_wake); match next_wake { Some(when) => { @@ -264,11 +289,11 @@ impl Handle { .filter_map(|i| self.process_at_sharded_time(i, now)) .min(); - self.inner.set_next_wake(next_wake_up); + self.inner.next_wake.store(next_wake_up); } // Returns the next wakeup time of this shard. - pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) -> Option { + pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) -> Option { let mut waker_list = WakeList::new(); let mut lock = self.inner.lock_sharded_wheel(id); @@ -299,9 +324,7 @@ impl Handle { } } } - let next_wake_up = lock - .poll_at() - .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); + let next_wake_up = lock.poll_at(); drop(lock); waker_list.wake_all(); @@ -366,7 +389,8 @@ impl Handle { Ok(when) => { if self .inner - .get_next_wake() + .next_wake + .load() .map(|next_wake| when < next_wake.get()) .unwrap_or(true) { @@ -421,16 +445,6 @@ impl Inner { fn get_shard_size(&self) -> u32 { self.wheels.len() as u32 } - - fn set_next_wake(&self, value: Option) { - let val = value.map(|v| v.get()).unwrap_or(0); - self.next_wake.store(val, Ordering::Relaxed); - } - - fn get_next_wake(&self) -> Option { - let val = self.next_wake.load(Ordering::Relaxed); - NonZeroU64::new(val) - } } impl fmt::Debug for Inner { From 5e3d1b5ffe0d66baf1c71db804ae23b7e95e9b9b Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Mon, 20 May 2024 22:09:22 +0800 Subject: [PATCH 28/31] fix: ci --- tokio/src/runtime/time/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 84c870b44cf..b72334aa2ef 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -35,9 +35,7 @@ struct AtomicOptionNonZeroU64(AtomicU64); // A helper type to store the `next_wake`. impl AtomicOptionNonZeroU64 { fn new(val: Option) -> Self { - Self { - 0: AtomicU64::new(Self::turn(val)), - } + Self(AtomicU64::new(Self::turn(val))) } fn store(&self, val: Option) { From 8ca8ddb5e5b3051269036dff69d3e581661074f6 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Mon, 20 May 2024 22:14:59 +0800 Subject: [PATCH 29/31] fix: next_wake shoud be in new method --- tokio/src/runtime/time/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index b72334aa2ef..01f22d42ae6 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -155,7 +155,7 @@ impl Driver { let handle = Handle { time_source, inner: Inner { - next_wake: AtomicOptionNonZeroU64::new(Some(0)), + next_wake: AtomicOptionNonZeroU64::new(None), wheels: wheels.into_boxed_slice(), is_shutdown: AtomicBool::new(false), #[cfg(feature = "test-util")] From 0c408f09ee698ee61dbff0982078a9abc220f673 Mon Sep 17 00:00:00 2001 From: wathenjiang Date: Tue, 21 May 2024 22:35:53 +0800 Subject: [PATCH 30/31] add the helper next_wake_time function --- tokio/src/runtime/time/mod.rs | 48 ++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 01f22d42ae6..37b04ef0000 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -34,25 +34,18 @@ struct AtomicOptionNonZeroU64(AtomicU64); // A helper type to store the `next_wake`. impl AtomicOptionNonZeroU64 { - fn new(val: Option) -> Self { - Self(AtomicU64::new(Self::turn(val))) + fn new(val: Option) -> Self { + Self(AtomicU64::new(val.map_or(0, NonZeroU64::get))) } - fn store(&self, val: Option) { - self.0.store(Self::turn(val), Ordering::Relaxed); + fn store(&self, val: Option) { + self.0 + .store(val.map_or(0, NonZeroU64::get), Ordering::Relaxed); } fn load(&self) -> Option { NonZeroU64::new(self.0.load(Ordering::Relaxed)) } - - fn turn(val: Option) -> u64 { - match val { - Some(0) => 1, - Some(i) => i, - None => 0, - } - } } /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. @@ -197,16 +190,20 @@ impl Driver { assert!(!handle.is_shutdown()); // Finds out the min expiration time to park. - let next_wake = (0..rt_handle.time().inner.get_shard_size()) + let expiration_time = (0..rt_handle.time().inner.get_shard_size()) .filter_map(|id| { let lock = rt_handle.time().inner.lock_sharded_wheel(id); lock.next_expiration_time() }) .min(); - rt_handle.time().inner.next_wake.store(next_wake); + rt_handle + .time() + .inner + .next_wake + .store(next_wake_time(expiration_time)); - match next_wake { + match expiration_time { Some(when) => { let now = handle.time_source.now(rt_handle.clock()); // Note that we effectively round up to 1ms here - this avoids @@ -270,6 +267,23 @@ impl Driver { } } +// Helper function to turn expiration_time into next_wake_time. +// Since the `park_timeout` will round up to 1ms for avoiding very +// short-duration microsecond-resolution sleeps, we do the same here. +// The conversion is as follows +// None => None +// Some(0) => Some(1) +// Some(i) => Some(i) +fn next_wake_time(expiration_time: Option) -> Option { + expiration_time.and_then(|v| { + if v == 0 { + NonZeroU64::new(1) + } else { + NonZeroU64::new(v) + } + }) +} + impl Handle { /// Runs timer related logic, and returns the next wakeup time pub(self) fn process(&self, clock: &Clock) { @@ -283,11 +297,11 @@ impl Handle { pub(self) fn process_at_time(&self, start: u32, now: u64) { let shards = self.inner.get_shard_size(); - let next_wake_up = (start..shards + start) + let expiration_time = (start..shards + start) .filter_map(|i| self.process_at_sharded_time(i, now)) .min(); - self.inner.next_wake.store(next_wake_up); + self.inner.next_wake.store(next_wake_time(expiration_time)); } // Returns the next wakeup time of this shard. From 71fd4ff3f0cd277924fb577008b1281143f1732d Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 22 May 2024 13:30:51 +0200 Subject: [PATCH 31/31] Update tokio/src/runtime/scheduler/multi_thread_alt/worker.rs --- tokio/src/runtime/scheduler/multi_thread_alt/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index c9e5beed2be..63ae0a49743 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -1312,7 +1312,7 @@ impl Context { &self.handle.shared } - #[allow(dead_code)] + #[cfg_attr(not(feature = "time"), allow(dead_code))] pub(crate) fn get_worker_index(&self) -> usize { self.index }