Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support aborting a task remotely in a multithreaded runtime without panicking #3159

Merged
merged 2 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 57 additions & 37 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,52 +629,72 @@ impl task::Schedule for Arc<Worker> {
fn release(&self, task: &Task) -> Option<Task> {
use std::ptr::NonNull;

CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
enum Immediate {
Removed(Option<Task>),
Core(bool),
}

if self.eq(&cx.worker) {
let mut maybe_core = cx.core.borrow_mut();
let immediate = CURRENT.with(|maybe_cx| {
let cx = match maybe_cx {
Some(cx) => cx,
None => return Immediate::Core(false),
};

if let Some(core) = &mut *maybe_core {
// Directly remove the task
//
// safety: the task is inserted in the list in `bind`.
unsafe {
let ptr = NonNull::from(task.header());
return core.tasks.remove(ptr);
}
if !self.eq(&cx.worker) {
return Immediate::Core(cx.core.borrow().is_some());
}

let mut maybe_core = cx.core.borrow_mut();

if let Some(core) = &mut *maybe_core {
// Directly remove the task
//
// safety: the task is inserted in the list in `bind`.
unsafe {
let ptr = NonNull::from(task.header());
return Immediate::Removed(core.tasks.remove(ptr));
}
}

// Track the task to be released by the worker that owns it
//
// Safety: We get a new handle without incrementing the ref-count.
// A ref-count is held by the "owned" linked list and it is only
// ever removed from that list as part of the release process: this
// method or popping the task from `pending_drop`. Thus, we can rely
// on the ref-count held by the linked-list to keep the memory
// alive.
//
// When the task is removed from the stack, it is forgotten instead
// of dropped.
let task = unsafe { Task::from_raw(task.header().into()) };
Immediate::Core(false)
});

self.remote().pending_drop.push(task);
// Checks if we were called from within a worker, allowing for immediate
// removal of a scheduled task. Else we have to go through the slower
// process below where we remotely mark a task as dropped.
let worker_has_core = match immediate {
Immediate::Removed(task) => return task,
Immediate::Core(worker_has_core) => worker_has_core,
};

if cx.core.borrow().is_some() {
return None;
}
// Track the task to be released by the worker that owns it
//
// Safety: We get a new handle without incrementing the ref-count.
// A ref-count is held by the "owned" linked list and it is only
// ever removed from that list as part of the release process: this
// method or popping the task from `pending_drop`. Thus, we can rely
// on the ref-count held by the linked-list to keep the memory
// alive.
//
// When the task is removed from the stack, it is forgotten instead
// of dropped.
let task = unsafe { Task::from_raw(task.header().into()) };

// The worker core has been handed off to another thread. In the
// event that the scheduler is currently shutting down, the thread
// that owns the task may be waiting on the release to complete
// shutdown.
if self.inject().is_closed() {
self.remote().unpark.unpark();
}
self.remote().pending_drop.push(task);

None
})
if worker_has_core {
return None;
}

// The worker core has been handed off to another thread. In the
// event that the scheduler is currently shutting down, the thread
// that owns the task may be waiting on the release to complete
// shutdown.
if self.inject().is_closed() {
self.remote().unpark.unpark();
}

None
}

fn schedule(&self, task: Notified) {
Expand Down
26 changes: 26 additions & 0 deletions tokio/tests/task_abort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

/// Checks that a suspended task can be aborted without panicking as reported in
/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
#[test]
fn test_abort_without_panic_3157() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.worker_threads(1)
.build()
.unwrap();

rt.block_on(async move {
let handle = tokio::spawn(async move {
println!("task started");
tokio::time::sleep(std::time::Duration::new(100, 0)).await
});

// wait for task to sleep.
tokio::time::sleep(std::time::Duration::new(1, 0)).await;

handle.abort();
let _ = handle.await;
});
}