diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 98e47658560..dfa186dda4f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -53,9 +53,7 @@ impl Handle { { let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); - if let Some(notified) = notified { - me.schedule_task(notified, false); - } + me.schedule_option_task_without_yield(notified); handle } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 6ae11463373..577c0cc3d84 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -1024,6 +1024,12 @@ impl Handle { }) } + pub(super) fn schedule_option_task_without_yield(&self, task: Option) { + if let Some(task) = task { + self.schedule_task(task, false); + } + } + fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { core.stats.inc_local_schedule_count(); diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 110933e58f0..dbaa330937e 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -211,17 +211,32 @@ impl Cell { /// Allocates a new task cell, containing the header, trailer, and core /// structures. pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box> { + // Separated into a non-generic function to reduce LLVM codegen + fn new_header( + state: State, + vtable: &'static Vtable, + #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id: Option, + ) -> Header { + Header { + state, + queue_next: UnsafeCell::new(None), + vtable, + owner_id: UnsafeCell::new(0), + #[cfg(all(tokio_unstable, feature = "tracing"))] + tracing_id, + } + } + #[cfg(all(tokio_unstable, feature = "tracing"))] let tracing_id = future.id(); + let vtable = raw::vtable::(); let result = Box::new(Cell { - header: Header { + header: new_header( state, - queue_next: UnsafeCell::new(None), - vtable: raw::vtable::(), - owner_id: UnsafeCell::new(0), + vtable, #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id, - }, + ), core: Core { scheduler, stage: CoreStage { @@ -229,26 +244,33 @@ impl Cell { }, task_id, }, - trailer: Trailer { - waker: UnsafeCell::new(None), - owned: linked_list::Pointers::new(), - }, + trailer: Trailer::new(), }); #[cfg(debug_assertions)] { - let trailer_addr = (&result.trailer) as *const Trailer as usize; - let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(&result.header)) }; - assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize); - - let scheduler_addr = (&result.core.scheduler) as *const S as usize; - let scheduler_ptr = - unsafe { Header::get_scheduler::(NonNull::from(&result.header)) }; - assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize); - - let id_addr = (&result.core.task_id) as *const Id as usize; - let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(&result.header)) }; - assert_eq!(id_addr, id_ptr.as_ptr() as usize); + // Using a separate function for this code avoids instantiating it separately for every `T`. + unsafe fn check(header: &Header, trailer: &Trailer, scheduler: &S, task_id: &Id) { + let trailer_addr = trailer as *const Trailer as usize; + let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(header)) }; + assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize); + + let scheduler_addr = scheduler as *const S as usize; + let scheduler_ptr = unsafe { Header::get_scheduler::(NonNull::from(header)) }; + assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize); + + let id_addr = task_id as *const Id as usize; + let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(header)) }; + assert_eq!(id_addr, id_ptr.as_ptr() as usize); + } + unsafe { + check( + &result.header, + &result.trailer, + &result.core.scheduler, + &result.core.task_id, + ); + } } result @@ -442,6 +464,13 @@ impl Header { } impl Trailer { + fn new() -> Self { + Trailer { + waker: UnsafeCell::new(None), + owned: linked_list::Pointers::new(), + } + } + pub(super) unsafe fn set_waker(&self, waker: Option) { self.waker.with_mut(|ptr| { *ptr = waker; diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 8e3c3d14fa0..13c46bbf7a5 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -2,8 +2,9 @@ use crate::future::Future; use crate::runtime::task::core::{Cell, Core, Header, Trailer}; use crate::runtime::task::state::{Snapshot, State}; use crate::runtime::task::waker::waker_ref; -use crate::runtime::task::{JoinError, Notified, RawTask, Schedule, Task}; +use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task}; +use std::any::Any; use std::mem; use std::mem::ManuallyDrop; use std::panic; @@ -192,6 +193,15 @@ where match self.state().transition_to_running() { TransitionToRunning::Success => { + // Separated to reduce LLVM codegen + fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture { + match result { + TransitionToIdle::Ok => PollFuture::Done, + TransitionToIdle::OkNotified => PollFuture::Notified, + TransitionToIdle::OkDealloc => PollFuture::Dealloc, + TransitionToIdle::Cancelled => PollFuture::Complete, + } + } let header_ptr = self.header_ptr(); let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&waker_ref); @@ -202,17 +212,13 @@ where return PollFuture::Complete; } - match self.state().transition_to_idle() { - TransitionToIdle::Ok => PollFuture::Done, - TransitionToIdle::OkNotified => PollFuture::Notified, - TransitionToIdle::OkDealloc => PollFuture::Dealloc, - TransitionToIdle::Cancelled => { - // The transition to idle failed because the task was - // cancelled during the poll. - cancel_task(self.core()); - PollFuture::Complete - } + let transition_res = self.state().transition_to_idle(); + if let TransitionToIdle::Cancelled = transition_res { + // The transition to idle failed because the task was + // cancelled during the poll. + cancel_task(self.core()); } + transition_result_to_poll_future(transition_res) } TransitionToRunning::Cancelled => { cancel_task(self.core()); @@ -447,13 +453,16 @@ fn cancel_task(core: &Core) { core.drop_future_or_output(); })); + core.store_output(Err(panic_result_to_join_error(core.task_id, res))); +} + +fn panic_result_to_join_error( + task_id: Id, + res: Result<(), Box>, +) -> JoinError { match res { - Ok(()) => { - core.store_output(Err(JoinError::cancelled(core.task_id))); - } - Err(panic) => { - core.store_output(Err(JoinError::panic(core.task_id, panic))); - } + Ok(()) => JoinError::cancelled(task_id), + Err(panic) => JoinError::panic(task_id, panic), } } @@ -482,10 +491,7 @@ fn poll_future(core: &Core, cx: Context<'_>) -> Po let output = match output { Ok(Poll::Pending) => return Poll::Pending, Ok(Poll::Ready(output)) => Ok(output), - Err(panic) => { - core.scheduler.unhandled_panic(); - Err(JoinError::panic(core.task_id, panic)) - } + Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)), }; // Catch and ignore panics if the future panics on drop. @@ -499,3 +505,13 @@ fn poll_future(core: &Core, cx: Context<'_>) -> Po Poll::Ready(()) } + +#[cold] +fn panic_to_error( + scheduler: &S, + task_id: Id, + panic: Box, +) -> JoinError { + scheduler.unhandled_panic(); + JoinError::panic(task_id, panic) +} diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index fb7dbdc1d95..930a0099d3d 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -96,7 +96,15 @@ impl OwnedTasks { T::Output: Send + 'static, { let (task, notified, join) = super::new_task(task, scheduler, id); + let notified = unsafe { self.bind_inner(task, notified) }; + (join, notified) + } + /// The part of `bind` that's the same for every type of future. + unsafe fn bind_inner(&self, task: Task, notified: Notified) -> Option> + where + S: Schedule, + { unsafe { // safety: We just created the task, so we have exclusive access // to the field. @@ -108,10 +116,10 @@ impl OwnedTasks { drop(lock); drop(notified); task.shutdown(); - (join, None) + None } else { lock.list.push_front(task); - (join, Some(notified)) + Some(notified) } } diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 76e8a6cbf55..be5602dcba3 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -11,18 +11,22 @@ cfg_trace! { #[inline] #[track_caller] pub(crate) fn task(task: F, kind: &'static str, name: Option<&str>, id: u64) -> Instrumented { + #[track_caller] + fn get_span(kind: &'static str, name: Option<&str>, id: u64) -> tracing::Span { + let location = std::panic::Location::caller(); + tracing::trace_span!( + target: "tokio::task", + "runtime.spawn", + %kind, + task.name = %name.unwrap_or_default(), + task.id = id, + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ) + } use tracing::instrument::Instrument; - let location = std::panic::Location::caller(); - let span = tracing::trace_span!( - target: "tokio::task", - "runtime.spawn", - %kind, - task.name = %name.unwrap_or_default(), - task.id = id, - loc.file = location.file(), - loc.line = location.line(), - loc.col = location.column(), - ); + let span = get_span(kind, name, id); task.instrument(span) }