diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index 9becb130165..94ab4e3852f 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -60,6 +60,14 @@ a kernel bug. ([#3803]) [#3775]: https://github.com/tokio-rs/tokio/pull/3775 [#3780]: https://github.com/tokio-rs/tokio/pull/3780 +# 1.5.1 (July 6, 2021) + +### Fixed + +- runtime: remotely abort tasks on `JoinHandle::abort` ([#3934]) + +[#3934]: https://github.com/tokio-rs/tokio/pull/3934 + # 1.5.0 (April 12, 2021) ### Added diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 7d596e36e1a..c9c99c76949 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -164,6 +164,17 @@ where self.complete(Err(err), true) } + /// Remotely abort the task + /// + /// This is similar to `shutdown` except that it asks the runtime to perform + /// the shutdown. This is necessary to avoid the shutdown happening in the + /// wrong thread for non-Send tasks. + pub(super) fn remote_abort(self) { + if self.header().state.transition_to_notified_and_cancel() { + self.core().scheduler.schedule(Notified(self.to_task())); + } + } + // ====== internal ====== fn complete(self, output: super::Result, is_join_interested: bool) { diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index dedfb387949..2fe40a72195 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -192,7 +192,7 @@ impl JoinHandle { /// ``` pub fn abort(&self) { if let Some(raw) = self.raw { - raw.shutdown(); + raw.remote_abort(); } } } diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index cae56d037da..39336cee904 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -22,6 +22,9 @@ pub(super) struct Vtable { /// The join handle has been dropped pub(super) drop_join_handle_slow: unsafe fn(NonNull
), + /// The task is remotely aborted + pub(super) remote_abort: unsafe fn(NonNull
), + /// Scheduler is being shutdown pub(super) shutdown: unsafe fn(NonNull
), } @@ -33,6 +36,7 @@ pub(super) fn vtable() -> &'static Vtable { dealloc: dealloc::, try_read_output: try_read_output::, drop_join_handle_slow: drop_join_handle_slow::, + remote_abort: remote_abort::, shutdown: shutdown::, } } @@ -89,6 +93,11 @@ impl RawTask { let vtable = self.header().vtable; unsafe { (vtable.shutdown)(self.ptr) } } + + pub(super) fn remote_abort(self) { + let vtable = self.header().vtable; + unsafe { (vtable.remote_abort)(self.ptr) } + } } impl Clone for RawTask { @@ -125,6 +134,11 @@ unsafe fn drop_join_handle_slow(ptr: NonNull
) { harness.drop_join_handle_slow() } +unsafe fn remote_abort(ptr: NonNull
) { + let harness = Harness::::from_raw(ptr); + harness.remote_abort() +} + unsafe fn shutdown(ptr: NonNull
) { let harness = Harness::::from_raw(ptr); harness.shutdown() diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 21e90430db2..da0c567d1a8 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -177,6 +177,15 @@ impl State { prev.will_need_queueing() } + /// Set the cancelled bit and transition the state to `NOTIFIED`. + /// + /// Returns `true` if the task needs to be submitted to the pool for + /// execution + pub(super) fn transition_to_notified_and_cancel(&self) -> bool { + let prev = Snapshot(self.val.fetch_or(NOTIFIED | CANCELLED, AcqRel)); + prev.will_need_queueing() + } + /// Set the `CANCELLED` bit and attempt to transition to `Running`. /// /// Returns `true` if the transition to `Running` succeeded. diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 1d72ac3a25c..e96dcf0c5b4 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -1,6 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::thread::sleep; +use std::time::Duration; + /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: . #[test] @@ -62,18 +65,16 @@ fn test_abort_without_panic_3662() { // This runs in a separate thread so it doesn't have immediate // thread-local access to the executor. It does however transition // the underlying task to be completed, which will cause it to be - // dropped (in this thread no less). + // dropped (but not in this thread). assert!(!drop_flag2.load(Ordering::SeqCst)); j.abort(); - // TODO: is this guaranteed at this point? - // assert!(drop_flag2.load(Ordering::SeqCst)); j }) .join() .unwrap(); - assert!(drop_flag.load(Ordering::SeqCst)); let result = task.await; + assert!(drop_flag.load(Ordering::SeqCst)); assert!(result.unwrap_err().is_cancelled()); // Note: We do the following to trigger a deferred task cleanup. @@ -91,3 +92,49 @@ fn test_abort_without_panic_3662() { i.await.unwrap(); }); } + +/// Checks that a suspended LocalSet task can be aborted from a remote thread +/// without panicking and without running the tasks destructor on the wrong thread. +/// +#[test] +fn remote_abort_local_set_3929() { + struct DropCheck { + created_on: std::thread::ThreadId, + not_send: std::marker::PhantomData<*const ()>, + } + + impl DropCheck { + fn new() -> Self { + Self { + created_on: std::thread::current().id(), + not_send: std::marker::PhantomData, + } + } + } + impl Drop for DropCheck { + fn drop(&mut self) { + if std::thread::current().id() != self.created_on { + panic!("non-Send value dropped in another thread!"); + } + } + } + + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + + let check = DropCheck::new(); + let jh = local.spawn_local(async move { + futures::future::pending::<()>().await; + drop(check); + }); + + let jh2 = std::thread::spawn(move || { + sleep(Duration::from_millis(50)); + jh.abort(); + }); + + rt.block_on(local); + jh2.join().unwrap(); +}