From cb67f28fe3c99956959669277fde889bc2dc252e Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 27 Oct 2022 08:51:03 -0700 Subject: [PATCH] rt: switch io::handle refs with scheduler:Handle (#5128) The `schedule::Handle` reference is the internal runtime handle. This patch replaces owned refs to `runtime::io::Handle` with `scheduler::Handle`. --- tokio/src/io/async_fd.rs | 8 +- tokio/src/io/bsd/poll_aio.rs | 5 +- tokio/src/io/poll_evented.rs | 8 +- tokio/src/runtime/context.rs | 18 ----- tokio/src/runtime/driver.rs | 11 +-- tokio/src/runtime/handle.rs | 4 +- tokio/src/runtime/io/mod.rs | 32 -------- tokio/src/runtime/io/registration.rs | 20 +++-- tokio/src/runtime/scheduler/mod.rs | 21 +++++ tokio/src/time/sleep.rs | 113 ++++++++++++--------------- 10 files changed, 101 insertions(+), 139 deletions(-) diff --git a/tokio/src/io/async_fd.rs b/tokio/src/io/async_fd.rs index eb544b95a0b..92fc6b38fd2 100644 --- a/tokio/src/io/async_fd.rs +++ b/tokio/src/io/async_fd.rs @@ -1,5 +1,6 @@ use crate::io::Interest; -use crate::runtime::io::{Handle, ReadyEvent, Registration}; +use crate::runtime::io::{ReadyEvent, Registration}; +use crate::runtime::scheduler; use mio::unix::SourceFd; use std::io; @@ -200,12 +201,13 @@ impl AsyncFd { where T: AsRawFd, { - Self::new_with_handle_and_interest(inner, Handle::current(), interest) + Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest) } + #[track_caller] pub(crate) fn new_with_handle_and_interest( inner: T, - handle: Handle, + handle: scheduler::Handle, interest: Interest, ) -> io::Result { let fd = inner.as_raw_fd(); diff --git a/tokio/src/io/bsd/poll_aio.rs b/tokio/src/io/bsd/poll_aio.rs index ceb83135c16..6ac9e2880e1 100644 --- a/tokio/src/io/bsd/poll_aio.rs +++ b/tokio/src/io/bsd/poll_aio.rs @@ -1,7 +1,8 @@ //! Use POSIX AIO futures with Tokio. use crate::io::interest::Interest; -use crate::runtime::io::{Handle, ReadyEvent, Registration}; +use crate::runtime::io::{ReadyEvent, Registration}; +use crate::runtime::scheduler; use mio::event::Source; use mio::Registry; use mio::Token; @@ -118,7 +119,7 @@ impl Aio { fn new_with_interest(io: E, interest: Interest) -> io::Result { let mut io = MioSource(io); - let handle = Handle::current(); + let handle = scheduler::Handle::current(); let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; Ok(Self { io, registration }) } diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 974fdd1a389..240d0d4ad40 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -1,5 +1,6 @@ use crate::io::interest::Interest; -use crate::runtime::io::{Handle, Registration}; +use crate::runtime::io::Registration; +use crate::runtime::scheduler; use mio::event::Source; use std::fmt; @@ -103,13 +104,14 @@ impl PollEvented { #[track_caller] #[cfg_attr(feature = "signal", allow(unused))] pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result { - Self::new_with_interest_and_handle(io, interest, Handle::current()) + Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current()) } + #[track_caller] pub(crate) fn new_with_interest_and_handle( mut io: E, interest: Interest, - handle: Handle, + handle: scheduler::Handle, ) -> io::Result { let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; Ok(Self { diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index e2e0917c2ec..9b20ecbcb50 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -24,24 +24,6 @@ pub(crate) fn current() -> Handle { } } -cfg_io_driver! { - #[track_caller] - pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { - match CONTEXT.try_with(|ctx| { - let ctx = ctx.borrow(); - ctx.as_ref() - .expect(crate::util::error::CONTEXT_MISSING_ERROR) - .inner - .driver() - .io - .clone() - }) { - Ok(io_handle) => io_handle, - Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), - } - } -} - cfg_signal_internal! { #[cfg(unix)] pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index aeba46061a1..7da12e54d99 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -82,10 +82,11 @@ impl Handle { } cfg_io_driver! { + #[track_caller] pub(crate) fn io(&self) -> &crate::runtime::io::Handle { self.io .as_ref() - .expect("A Tokio 1.x context was found, but I/O is disabled. Call `enable_io` on the runtime builder to enable I/O.") + .expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.") } } @@ -170,14 +171,6 @@ cfg_io_driver! { } } - #[track_caller] - pub(crate) fn expect(self, msg: &'static str) -> crate::runtime::io::Handle { - match self { - IoHandle::Enabled(v) => v, - IoHandle::Disabled(..) => panic!("{}", msg), - } - } - pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> { match self { IoHandle::Enabled(v) => Some(v), diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 5f5603f8a0b..91974252610 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -96,7 +96,9 @@ impl Handle { /// ``` #[track_caller] pub fn current() -> Self { - context::current() + Handle { + inner: scheduler::Handle::current(), + } } /// Returns a Handle view over the currently running Runtime diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index 6ba48a717c2..26dd6a2164f 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -236,38 +236,6 @@ impl fmt::Debug for Driver { } } -// ===== impl Handle ===== - -cfg_rt! { - impl Handle { - /// Returns a handle to the current reactor. - /// - /// # Panics - /// - /// This function panics if there is no current reactor set and `rt` feature - /// flag is not enabled. - #[track_caller] - pub(crate) fn current() -> Self { - crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.") - } - } -} - -cfg_not_rt! { - impl Handle { - /// Returns a handle to the current reactor. - /// - /// # Panics - /// - /// This function panics if there is no current reactor set, or if the `rt` - /// feature flag is not enabled. - #[track_caller] - pub(crate) fn current() -> Self { - panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) - } - } -} - cfg_net! { cfg_metrics! { impl Handle { diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 7b823460ad6..1e9d9fc25de 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -2,6 +2,7 @@ use crate::io::interest::Interest; use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo}; +use crate::runtime::scheduler; use crate::util::slab; use mio::event::Source; @@ -43,8 +44,8 @@ cfg_io_driver! { /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] pub(crate) struct Registration { - /// Handle to the associated driver. - handle: Handle, + /// Handle to the associated runtime. + handle: scheduler::Handle, /// Reference to state stored by the driver. shared: slab::Ref, @@ -66,12 +67,13 @@ impl Registration { /// /// - `Ok` if the registration happened successfully /// - `Err` if an error was encountered during registration + #[track_caller] pub(crate) fn new_with_interest_and_handle( io: &mut impl Source, interest: Interest, - handle: Handle, + handle: scheduler::Handle, ) -> io::Result { - let shared = handle.inner.add_source(io, interest)?; + let shared = handle.io().inner.add_source(io, interest)?; Ok(Registration { handle, shared }) } @@ -93,7 +95,7 @@ impl Registration { /// /// `Err` is returned if an error is encountered. pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { - self.handle.inner.deregister_source(io) + self.handle().inner.deregister_source(io) } pub(crate) fn clear_readiness(&self, event: ReadyEvent) { @@ -146,7 +148,7 @@ impl Registration { let coop = ready!(crate::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); - if self.handle.inner.is_shutdown() { + if self.handle().inner.is_shutdown() { return Poll::Ready(Err(gone())); } @@ -195,6 +197,10 @@ impl Registration { res => res, } } + + fn handle(&self) -> &Handle { + self.handle.io() + } } impl Drop for Registration { @@ -224,7 +230,7 @@ cfg_io_readiness! { pin!(fut); crate::future::poll_fn(|cx| { - if self.handle.inner.is_shutdown() { + if self.handle().inner.is_shutdown() { return Poll::Ready(Err(io::Error::new( io::ErrorKind::Other, crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 0f7dae49ece..31546673bbe 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -40,6 +40,13 @@ impl Handle { } } + cfg_io_driver! { + #[track_caller] + pub(crate) fn io(&self) -> &crate::runtime::io::Handle { + self.driver().io() + } + } + cfg_time! { #[track_caller] pub(crate) fn time(&self) -> &crate::runtime::time::Handle { @@ -62,6 +69,11 @@ cfg_rt! { use crate::util::RngSeedGenerator; impl Handle { + #[track_caller] + pub(crate) fn current() -> Handle { + crate::runtime::context::current().inner + } + pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { match self { Handle::CurrentThread(h) => &h.blocking_spawner, @@ -156,3 +168,12 @@ cfg_rt! { } } } + +cfg_not_rt! { + impl Handle { + #[track_caller] + pub(crate) fn current() -> Handle { + panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) + } + } +} diff --git a/tokio/src/time/sleep.rs b/tokio/src/time/sleep.rs index 306b45d582c..5eb99c559f6 100644 --- a/tokio/src/time/sleep.rs +++ b/tokio/src/time/sleep.rs @@ -248,76 +248,61 @@ cfg_not_trace! { } impl Sleep { - cfg_rt! { - #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] - #[track_caller] - pub(crate) fn new_timeout( - deadline: Instant, - location: Option<&'static Location<'static>>, - ) -> Sleep { - use crate::runtime::Handle; - - let handle = Handle::current().inner; - let entry = TimerEntry::new(&handle, deadline); - - #[cfg(all(tokio_unstable, feature = "tracing"))] - let inner = { - let handle = &handle.time(); - let time_source = handle.time_source(); - let deadline_tick = time_source.deadline_to_tick(deadline); - let duration = deadline_tick.saturating_sub(time_source.now()); - - let location = location.expect("should have location if tracing"); - let resource_span = tracing::trace_span!( - "runtime.resource", - concrete_type = "Sleep", - kind = "timer", - loc.file = location.file(), - loc.line = location.line(), - loc.col = location.column(), + #[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))] + #[track_caller] + pub(crate) fn new_timeout( + deadline: Instant, + location: Option<&'static Location<'static>>, + ) -> Sleep { + use crate::runtime::scheduler; + + let handle = scheduler::Handle::current(); + let entry = TimerEntry::new(&handle, deadline); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let inner = { + let handle = &handle.time(); + let time_source = handle.time_source(); + let deadline_tick = time_source.deadline_to_tick(deadline); + let duration = deadline_tick.saturating_sub(time_source.now()); + + let location = location.expect("should have location if tracing"); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sleep", + kind = "timer", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + let async_op_span = resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + duration = duration, + duration.unit = "ms", + duration.op = "override", ); - let async_op_span = resource_span.in_scope(|| { - tracing::trace!( - target: "runtime::resource::state_update", - duration = duration, - duration.unit = "ms", - duration.op = "override", - ); - - tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") - }); - - let async_op_poll_span = - async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); - - let ctx = trace::AsyncOpTracingCtx { - async_op_span, - async_op_poll_span, - resource_span, - }; - - Inner { - deadline, - ctx, - } + tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout") + }); + + let async_op_poll_span = + async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll")); + + let ctx = trace::AsyncOpTracingCtx { + async_op_span, + async_op_poll_span, + resource_span, }; - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let inner = Inner { deadline }; + Inner { deadline, ctx } + }; - Sleep { inner, entry } - } - } + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let inner = Inner { deadline }; - cfg_not_rt! { - #[track_caller] - pub(crate) fn new_timeout( - _deadline: Instant, - _location: Option<&'static Location<'static>>, - ) -> Sleep { - panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR) - } + Sleep { inner, entry } } pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {