Skip to content

Commit

Permalink
rt: fix panic in task abort when off rt (#3159)
Browse files Browse the repository at this point in the history
A call to `JoinHandle::abort` releases a task. When called from outside of the runtime,
this panics due to the current implementation checking for a thread-local worker context.

This change makes accessing the thread-local context optional under release, by falling
back to remotely marking a task remotely as dropped. Behaving the same as if the core
was stolen by another worker.

Fixes #3157
  • Loading branch information
udoprog authored Dec 4, 2020
1 parent 00500d1 commit a125ebd
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 37 deletions.
94 changes: 57 additions & 37 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,52 +629,72 @@ impl task::Schedule for Arc<Worker> {
fn release(&self, task: &Task) -> Option<Task> {
use std::ptr::NonNull;

CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
enum Immediate {
Removed(Option<Task>),
Core(bool),
}

if self.eq(&cx.worker) {
let mut maybe_core = cx.core.borrow_mut();
let immediate = CURRENT.with(|maybe_cx| {
let cx = match maybe_cx {
Some(cx) => cx,
None => return Immediate::Core(false),
};

if let Some(core) = &mut *maybe_core {
// Directly remove the task
//
// safety: the task is inserted in the list in `bind`.
unsafe {
let ptr = NonNull::from(task.header());
return core.tasks.remove(ptr);
}
if !self.eq(&cx.worker) {
return Immediate::Core(cx.core.borrow().is_some());
}

let mut maybe_core = cx.core.borrow_mut();

if let Some(core) = &mut *maybe_core {
// Directly remove the task
//
// safety: the task is inserted in the list in `bind`.
unsafe {
let ptr = NonNull::from(task.header());
return Immediate::Removed(core.tasks.remove(ptr));
}
}

// Track the task to be released by the worker that owns it
//
// Safety: We get a new handle without incrementing the ref-count.
// A ref-count is held by the "owned" linked list and it is only
// ever removed from that list as part of the release process: this
// method or popping the task from `pending_drop`. Thus, we can rely
// on the ref-count held by the linked-list to keep the memory
// alive.
//
// When the task is removed from the stack, it is forgotten instead
// of dropped.
let task = unsafe { Task::from_raw(task.header().into()) };
Immediate::Core(false)
});

self.remote().pending_drop.push(task);
// Checks if we were called from within a worker, allowing for immediate
// removal of a scheduled task. Else we have to go through the slower
// process below where we remotely mark a task as dropped.
let worker_has_core = match immediate {
Immediate::Removed(task) => return task,
Immediate::Core(worker_has_core) => worker_has_core,
};

if cx.core.borrow().is_some() {
return None;
}
// Track the task to be released by the worker that owns it
//
// Safety: We get a new handle without incrementing the ref-count.
// A ref-count is held by the "owned" linked list and it is only
// ever removed from that list as part of the release process: this
// method or popping the task from `pending_drop`. Thus, we can rely
// on the ref-count held by the linked-list to keep the memory
// alive.
//
// When the task is removed from the stack, it is forgotten instead
// of dropped.
let task = unsafe { Task::from_raw(task.header().into()) };

// The worker core has been handed off to another thread. In the
// event that the scheduler is currently shutting down, the thread
// that owns the task may be waiting on the release to complete
// shutdown.
if self.inject().is_closed() {
self.remote().unpark.unpark();
}
self.remote().pending_drop.push(task);

None
})
if worker_has_core {
return None;
}

// The worker core has been handed off to another thread. In the
// event that the scheduler is currently shutting down, the thread
// that owns the task may be waiting on the release to complete
// shutdown.
if self.inject().is_closed() {
self.remote().unpark.unpark();
}

None
}

fn schedule(&self, task: Notified) {
Expand Down
26 changes: 26 additions & 0 deletions tokio/tests/task_abort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

/// Checks that a suspended task can be aborted without panicking as reported in
/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
#[test]
fn test_abort_without_panic_3157() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.worker_threads(1)
.build()
.unwrap();

rt.block_on(async move {
let handle = tokio::spawn(async move {
println!("task started");
tokio::time::sleep(std::time::Duration::new(100, 0)).await
});

// wait for task to sleep.
tokio::time::sleep(std::time::Duration::new(1, 0)).await;

handle.abort();
let _ = handle.await;
});
}

0 comments on commit a125ebd

Please sign in to comment.