From ad74995a031da1eb14260511b25bf566e23a531c Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Wed, 11 Apr 2018 17:19:35 +0300 Subject: [PATCH] fix use-after-free of Task with futures2 Code stolen from https://github.com/tokio-rs/tokio/pull/254 --- tokio-threadpool/src/builder.rs | 3 - tokio-threadpool/src/futures2_wake.rs | 111 +++++++++++++++++++++++--- tokio-threadpool/src/lib.rs | 82 +++++++++++++++++++ tokio-threadpool/src/pool/mod.rs | 3 +- tokio-threadpool/src/sender.rs | 6 +- tokio-threadpool/src/task/mod.rs | 4 +- 6 files changed, 192 insertions(+), 17 deletions(-) diff --git a/tokio-threadpool/src/builder.rs b/tokio-threadpool/src/builder.rs index 9583206c255..d4e8bc035d4 100644 --- a/tokio-threadpool/src/builder.rs +++ b/tokio-threadpool/src/builder.rs @@ -15,9 +15,6 @@ use num_cpus; use tokio_executor::Enter; use tokio_executor::park::Park; -#[cfg(feature = "unstable-futures")] -use futures2; - /// Builds a thread pool with custom configuration values. /// /// Methods can be chanined in order to set the configuration values. The thread diff --git a/tokio-threadpool/src/futures2_wake.rs b/tokio-threadpool/src/futures2_wake.rs index ed9d4552c55..639bc928bdb 100644 --- a/tokio-threadpool/src/futures2_wake.rs +++ b/tokio-threadpool/src/futures2_wake.rs @@ -1,4 +1,4 @@ -use inner::Pool; +use pool::Pool; use notifier::Notifier; use std::marker::PhantomData; @@ -13,6 +13,11 @@ pub(crate) struct Futures2Wake { id: usize, } +// Futures2Wake doesn't need to drop_id on drop, +// because the Futures2Wake is **only** ever held in: +// - ::task::Task, which handles drop itself +// - futures::Waker, that a user could have cloned. When that drops, +// it will call drop_raw, so we don't need to double drop. impl Futures2Wake { pub(crate) fn new(id: usize, inner: &Arc) -> Futures2Wake { let notifier = Arc::new(Notifier { @@ -22,12 +27,6 @@ impl Futures2Wake { } } -impl Drop for Futures2Wake { - fn drop(&mut self) { - self.notifier.drop_id(self.id) - } -} - struct ArcWrapped(PhantomData); unsafe impl futures2::task::UnsafeWake for ArcWrapped { @@ -52,9 +51,103 @@ unsafe impl futures2::task::UnsafeWake for ArcWrapped { } } +pub(crate) fn into_unsafe_wake(rc: Arc) -> *mut futures2::task::UnsafeWake { + unsafe { + mem::transmute::, *mut ArcWrapped>(rc) + } +} + pub(crate) fn into_waker(rc: Arc) -> futures2::task::Waker { unsafe { - let ptr = mem::transmute::, *mut ArcWrapped>(rc); - futures2::task::Waker::new(ptr) + futures2::task::Waker::new(into_unsafe_wake(rc)) + } +} + + +#[cfg(test)] +mod tests { + // We want most tests as integration tests, but these ones are special: + // + // This is testing that Task drop never happens more than it should, + // causing use-after-free bugs. ;_; + + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::{Relaxed, Release}; + use ::{Sender, Shutdown, ThreadPool}; + + use futures2; + use futures2::prelude::*; + + static TASK_DROPS: AtomicUsize = ::std::sync::atomic::ATOMIC_USIZE_INIT; + + pub(super) fn on_task_drop() { + TASK_DROPS.fetch_add(1, Release); + } + + fn reset_task_drops() { + TASK_DROPS.store(0, Release); + } + + fn spawn_pool(pool: &mut Sender, f: F) + where F: Future + Send + 'static + { + futures2::executor::Executor::spawn( + pool, + Box::new(f.map_err(|_| panic!())) + ).unwrap() + } + + fn await_shutdown(shutdown: Shutdown) { + ::futures::Future::wait(shutdown).unwrap() + } + + #[test] + fn task_drop_counts() { + extern crate env_logger; + let _ = env_logger::init(); + + struct Always; + + impl Future for Always { + type Item = (); + type Error = (); + + fn poll(&mut self, _: &mut futures2::task::Context) -> Poll<(), ()> { + Ok(Async::Ready(())) + } + } + + reset_task_drops(); + + let pool = ThreadPool::new(); + let mut tx = pool.sender().clone(); + spawn_pool(&mut tx, Always); + await_shutdown(pool.shutdown()); + + // We've never cloned the waker/notifier, so should only be 1 drop + assert_eq!(TASK_DROPS.load(Relaxed), 1); + + + struct Park; + + impl Future for Park { + type Item = (); + type Error = (); + + fn poll(&mut self, cx: &mut futures2::task::Context) -> Poll<(), ()> { + cx.waker().clone().wake(); + Ok(Async::Ready(())) + } + } + + reset_task_drops(); + + let pool = ThreadPool::new(); + let mut tx = pool.sender().clone(); + spawn_pool(&mut tx, Park); + await_shutdown(pool.shutdown()); + + // We've cloned the task once, so should be 2 drops + assert_eq!(TASK_DROPS.load(Relaxed), 2); } } diff --git a/tokio-threadpool/src/lib.rs b/tokio-threadpool/src/lib.rs index 5c33b09db24..5964f1af1e5 100644 --- a/tokio-threadpool/src/lib.rs +++ b/tokio-threadpool/src/lib.rs @@ -36,3 +36,85 @@ pub use sender::Sender; pub use shutdown::Shutdown; pub use thread_pool::ThreadPool; pub use worker::Worker; + +#[cfg(not(feature = "unstable-futures"))] +#[cfg(test)] +mod tests { + // We want most tests as integration tests, but these ones are special: + // + // This is testing that Task drop never happens more than it should, + // causing use-after-free bugs. ;_; + + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::{Relaxed, Release}; + use super::{Sender, Shutdown, ThreadPool}; + + #[cfg(not(feature = "unstable-futures"))] + use futures::{Poll, Async, Future}; + + static TASK_DROPS: AtomicUsize = ::std::sync::atomic::ATOMIC_USIZE_INIT; + + fn reset_task_drops() { + TASK_DROPS.store(0, Release); + } + + fn spawn_pool(pool: &mut Sender, f: F) + where F: Future + Send + 'static + { + pool.spawn(f).unwrap() + } + + fn await_shutdown(shutdown: Shutdown) { + shutdown.wait().unwrap() + } + + #[test] + fn task_drop_counts() { + extern crate env_logger; + let _ = env_logger::init(); + + struct Always; + + impl Future for Always { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(Async::Ready(())) + } + } + + reset_task_drops(); + + let pool = ThreadPool::new(); + let mut tx = pool.sender().clone(); + spawn_pool(&mut tx, Always); + await_shutdown(pool.shutdown()); + + // We've never cloned the waker/notifier, so should only be 1 drop + assert_eq!(TASK_DROPS.load(Relaxed), 1); + + + struct Park; + + impl Future for Park { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + ::futures::task::current().notify(); + Ok(Async::Ready(())) + } + } + + reset_task_drops(); + + let pool = ThreadPool::new(); + let mut tx = pool.sender().clone(); + spawn_pool(&mut tx, Park); + await_shutdown(pool.shutdown()); + + // We've cloned the task once, so should be 2 drops + assert_eq!(TASK_DROPS.load(Relaxed), 2); + } +} diff --git a/tokio-threadpool/src/pool/mod.rs b/tokio-threadpool/src/pool/mod.rs index 91305c2bc46..44559e4e73a 100644 --- a/tokio-threadpool/src/pool/mod.rs +++ b/tokio-threadpool/src/pool/mod.rs @@ -14,6 +14,8 @@ use task::Task; use worker::{self, Worker, WorkerId}; use futures::task::AtomicTask; +#[cfg(feature = "unstable-futures")] +use futures2; use std::cell::UnsafeCell; use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed}; @@ -172,7 +174,6 @@ impl Pool { trace!("worker_terminated; num_workers={}", prev - 1); if 1 == prev { - trace!("notifying shutdown task"); self.shutdown_task.notify(); } } diff --git a/tokio-threadpool/src/sender.rs b/tokio-threadpool/src/sender.rs index d540e55894f..4efe43d4649 100644 --- a/tokio-threadpool/src/sender.rs +++ b/tokio-threadpool/src/sender.rs @@ -227,9 +227,11 @@ impl<'a> futures2::executor::Executor for &'a Sender { // execution. // Create a new task for the future - let task = Task::new2(f, |id| into_waker(Arc::new(Futures2Wake::new(id, &self.inner)))); + let task = Task::new2(f, |id| { + into_waker(Arc::new(Futures2Wake::new(id, &self.inner))) + }); - self.inner.submit(task, &self.inner); + self.inner.submit(Arc::new(task), &self.inner); Ok(()) } diff --git a/tokio-threadpool/src/task/mod.rs b/tokio-threadpool/src/task/mod.rs index 3361789a160..1a1438732c1 100644 --- a/tokio-threadpool/src/task/mod.rs +++ b/tokio-threadpool/src/task/mod.rs @@ -82,12 +82,12 @@ impl Task { let mut inner = Box::new(Task { next: AtomicPtr::new(ptr::null_mut()), state: AtomicUsize::new(State::new().into()), - future: None, + future: UnsafeCell::new(None), }); let waker = make_waker((&*inner) as *const _ as usize); let tls = futures2::task::LocalMap::new(); - inner.future = Some(TaskFuture::Futures2 { waker, tls, fut }); + inner.future = UnsafeCell::new(Some(TaskFuture::Futures2 { waker, tls, fut })); Task { ptr: Box::into_raw(inner) } }