Skip to content

Commit

Permalink
rt: remotely abort tasks on JoinHandle::abort (#3934)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn authored Jul 6, 2021
1 parent a5ee2f0 commit e385108
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 9 deletions.
8 changes: 8 additions & 0 deletions tokio/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# 1.5.1 (July 6, 2021)

### Fixed

- runtime: remotely abort tasks on `JoinHandle::abort` ([#3934])

[#3934]: https://github.com/tokio-rs/tokio/pull/3934

# 1.5.0 (April 12, 2021)

### Added
Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
version = "1.5.0"
version = "1.5.1"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/runtime/task/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ where
self.complete(Err(err), true)
}

/// Remotely abort the task
///
/// This is similar to `shutdown` except that it asks the runtime to perform
/// the shutdown. This is necessary to avoid the shutdown happening in the
/// wrong thread for non-Send tasks.
pub(super) fn remote_abort(self) {
if self.header().state.transition_to_notified_and_cancel() {
self.core().scheduler.schedule(Notified(self.to_task()));
}
}

// ====== internal ======

fn complete(self, output: super::Result<T::Output>, is_join_interested: bool) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl<T> JoinHandle<T> {
/// ```
pub fn abort(&self) {
if let Some(raw) = self.raw {
raw.shutdown();
raw.remote_abort();
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions tokio/src/runtime/task/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub(super) struct Vtable {
/// The join handle has been dropped
pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),

/// The task is remotely aborted
pub(super) remote_abort: unsafe fn(NonNull<Header>),

/// Scheduler is being shutdown
pub(super) shutdown: unsafe fn(NonNull<Header>),
}
Expand All @@ -33,6 +36,7 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
dealloc: dealloc::<T, S>,
try_read_output: try_read_output::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
remote_abort: remote_abort::<T, S>,
shutdown: shutdown::<T, S>,
}
}
Expand Down Expand Up @@ -89,6 +93,11 @@ impl RawTask {
let vtable = self.header().vtable;
unsafe { (vtable.shutdown)(self.ptr) }
}

pub(super) fn remote_abort(self) {
let vtable = self.header().vtable;
unsafe { (vtable.remote_abort)(self.ptr) }
}
}

impl Clone for RawTask {
Expand Down Expand Up @@ -125,6 +134,11 @@ unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) {
harness.drop_join_handle_slow()
}

unsafe fn remote_abort<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.remote_abort()
}

unsafe fn shutdown<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.shutdown()
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/runtime/task/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ impl State {
prev.will_need_queueing()
}

/// Set the cancelled bit and transition the state to `NOTIFIED`.
///
/// Returns `true` if the task needs to be submitted to the pool for
/// execution
pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
let prev = Snapshot(self.val.fetch_or(NOTIFIED | CANCELLED, AcqRel));
prev.will_need_queueing()
}

/// Set the `CANCELLED` bit and attempt to transition to `Running`.
///
/// Returns `true` if the transition to `Running` succeeded.
Expand Down
3 changes: 0 additions & 3 deletions tokio/tests/macros_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,6 @@ async fn join_with_select() {
async fn use_future_in_if_condition() {
use tokio::time::{self, Duration};

let sleep = time::sleep(Duration::from_millis(50));
tokio::pin!(sleep);

tokio::select! {
_ = time::sleep(Duration::from_millis(50)), if false => {
panic!("if condition ignored")
Expand Down
55 changes: 51 additions & 4 deletions tokio/tests/task_abort.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use std::thread::sleep;
use std::time::Duration;

/// Checks that a suspended task can be aborted without panicking as reported in
/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
#[test]
Expand Down Expand Up @@ -62,18 +65,16 @@ fn test_abort_without_panic_3662() {
// This runs in a separate thread so it doesn't have immediate
// thread-local access to the executor. It does however transition
// the underlying task to be completed, which will cause it to be
// dropped (in this thread no less).
// dropped (but not in this thread).
assert!(!drop_flag2.load(Ordering::SeqCst));
j.abort();
// TODO: is this guaranteed at this point?
// assert!(drop_flag2.load(Ordering::SeqCst));
j
})
.join()
.unwrap();

assert!(drop_flag.load(Ordering::SeqCst));
let result = task.await;
assert!(drop_flag.load(Ordering::SeqCst));
assert!(result.unwrap_err().is_cancelled());

// Note: We do the following to trigger a deferred task cleanup.
Expand All @@ -91,3 +92,49 @@ fn test_abort_without_panic_3662() {
i.await.unwrap();
});
}

/// Checks that a suspended LocalSet task can be aborted from a remote thread
/// without panicking and without running the tasks destructor on the wrong thread.
/// <https://github.com/tokio-rs/tokio/issues/3929>
#[test]
fn remote_abort_local_set_3929() {
struct DropCheck {
created_on: std::thread::ThreadId,
not_send: std::marker::PhantomData<*const ()>,
}

impl DropCheck {
fn new() -> Self {
Self {
created_on: std::thread::current().id(),
not_send: std::marker::PhantomData,
}
}
}
impl Drop for DropCheck {
fn drop(&mut self) {
if std::thread::current().id() != self.created_on {
panic!("non-Send value dropped in another thread!");
}
}
}

let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();

let check = DropCheck::new();
let jh = local.spawn_local(async move {
futures::future::pending::<()>().await;
drop(check);
});

let jh2 = std::thread::spawn(move || {
sleep(Duration::from_millis(50));
jh.abort();
});

rt.block_on(local);
jh2.join().unwrap();
}

0 comments on commit e385108

Please sign in to comment.