Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix use-after-free of Task with futures2 #254

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 144 additions & 10 deletions tokio-threadpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,9 @@ impl<'a> futures2::executor::Executor for &'a Sender {
// execution.

// Create a new task for the future
let task = Task::new2(f, |id| into_waker(Arc::new(Futures2Wake::new(id, &self.inner))));
let task = Task::new2(f, |id| {
into_unsafe_wake(Arc::new(Futures2Wake::new(id, &self.inner)))
});

self.inner.submit(task, &self.inner);

Expand Down Expand Up @@ -1090,7 +1092,6 @@ impl Inner {
trace!("worker_terminated; num_workers={}", prev - 1);

if 1 == prev {
trace!("notifying shutdown task");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er, woops, did you want to keep this? I was just cleaning up a bunch of logs I had used to debug this, and thought this was one I had added.

self.shutdown_task.notify();
}
}
Expand Down Expand Up @@ -2222,6 +2223,11 @@ impl fmt::Debug for Callback {

// ===== impl Futures2Wake =====

// Futures2Wake doesn't need to drop_id on drop,
// because the Futures2Wake is **only** ever held in:
// - ::task::Task, which handles drop itself
// - futures::Waker, that a user could have cloned. When that drops,
// it will call drop_raw, so we don't need to double drop.
#[cfg(feature = "unstable-futures")]
impl Futures2Wake {
fn new(id: usize, inner: &Arc<Inner>) -> Futures2Wake {
Expand All @@ -2232,12 +2238,6 @@ impl Futures2Wake {
}
}

#[cfg(feature = "unstable-futures")]
impl Drop for Futures2Wake {
fn drop(&mut self) {
self.notifier.drop_id(self.id)
}
}

#[cfg(feature = "unstable-futures")]
struct ArcWrapped(PhantomData<Futures2Wake>);
Expand Down Expand Up @@ -2265,10 +2265,144 @@ unsafe impl futures2::task::UnsafeWake for ArcWrapped {
}
}

#[cfg(feature = "unstable-futures")]
fn into_unsafe_wake(rc: Arc<Futures2Wake>) -> *mut futures2::task::UnsafeWake {
unsafe {
mem::transmute::<Arc<Futures2Wake>, *mut ArcWrapped>(rc)
}
}

#[cfg(feature = "unstable-futures")]
fn into_waker(rc: Arc<Futures2Wake>) -> futures2::task::Waker {
unsafe {
let ptr = mem::transmute::<Arc<Futures2Wake>, *mut ArcWrapped>(rc);
futures2::task::Waker::new(ptr)
futures2::task::Waker::new(into_unsafe_wake(rc))
}
}

#[cfg(test)]
mod tests {
// We want most tests as integration tests, but these ones are special:
//
// This is testing that Task drop never happens more than it should,
// causing use-after-free bugs. ;_;

use super::{AtomicUsize, Relaxed, Release, Sender, Shutdown, ThreadPool};

#[cfg(not(feature = "unstable-futures"))]
use futures::{Poll, Async, Future};

#[cfg(feature = "unstable-futures")]
use futures2;
#[cfg(feature = "unstable-futures")]
use futures2::prelude::*;

static TASK_DROPS: AtomicUsize = ::std::sync::atomic::ATOMIC_USIZE_INIT;

pub(super) fn on_task_drop() {
TASK_DROPS.fetch_add(1, Release);
}

fn reset_task_drops() {
TASK_DROPS.store(0, Release);
}

#[cfg(not(feature = "unstable-futures"))]
fn spawn_pool<F>(pool: &mut Sender, f: F)
where F: Future<Item = (), Error = ()> + Send + 'static
{
pool.spawn(f).unwrap()
}

#[cfg(feature = "unstable-futures")]
fn spawn_pool<F>(pool: &mut Sender, f: F)
where F: Future<Item = (), Error = ()> + Send + 'static
{
futures2::executor::Executor::spawn(
pool,
Box::new(f.map_err(|_| panic!()))
).unwrap()
}

#[cfg(feature = "unstable-futures")]
fn await_shutdown(shutdown: Shutdown) {
::futures::Future::wait(shutdown).unwrap()
}
#[cfg(not(feature = "unstable-futures"))]
fn await_shutdown(shutdown: Shutdown) {
shutdown.wait().unwrap()
}

#[test]
fn task_drop_counts() {
extern crate env_logger;
let _ = env_logger::init();

struct Always;

#[cfg(not(feature = "unstable-futures"))]
impl Future for Always {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::Ready(()))
}
}

#[cfg(feature = "unstable-futures")]
impl Future for Always {
type Item = ();
type Error = ();

fn poll(&mut self, _: &mut futures2::task::Context) -> Poll<(), ()> {
Ok(Async::Ready(()))
}
}

reset_task_drops();

let pool = ThreadPool::new();
let mut tx = pool.sender().clone();
spawn_pool(&mut tx, Always);
await_shutdown(pool.shutdown());

// We've never cloned the waker/notifier, so should only be 1 drop
assert_eq!(TASK_DROPS.load(Relaxed), 1);


struct Park;

#[cfg(not(feature = "unstable-futures"))]
impl Future for Park {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
::futures::task::current().notify();
Ok(Async::Ready(()))
}
}

#[cfg(feature = "unstable-futures")]
impl Future for Park {
type Item = ();
type Error = ();

fn poll(&mut self, cx: &mut futures2::task::Context) -> Poll<(), ()> {
cx.waker().clone().wake();
Ok(Async::Ready(()))
}
}

reset_task_drops();

let pool = ThreadPool::new();
let mut tx = pool.sender().clone();
spawn_pool(&mut tx, Park);
await_shutdown(pool.shutdown());

// We've cloned the task once, so should be 2 drops
assert_eq!(TASK_DROPS.load(Relaxed), 2);
}
}

27 changes: 24 additions & 3 deletions tokio-threadpool/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ enum TaskFuture {
#[cfg(feature = "unstable-futures")]
Futures2 {
tls: futures2::task::LocalMap,
waker: futures2::task::Waker,
// We hold a *mut UnsafeWake in here because we don't want
// to run Waker::drop when this future is dropped. That would
// trigger Task::drop.
//
// The Task will drop correctly afterwards.
waker: *mut futures2::task::UnsafeWake,
fut: BoxFuture2,
}
}
Expand Down Expand Up @@ -102,7 +107,7 @@ impl Task {
/// Create a new task handle for a futures 0.2 future
#[cfg(feature = "unstable-futures")]
pub fn new2<F>(fut: BoxFuture2, make_waker: F) -> Task
where F: FnOnce(usize) -> futures2::task::Waker
where F: FnOnce(usize) -> *mut futures2::task::UnsafeWake
{
let mut inner = Box::new(Inner {
next: AtomicPtr::new(ptr::null_mut()),
Expand Down Expand Up @@ -297,6 +302,12 @@ impl Clone for Task {

impl Drop for Task {
fn drop(&mut self) {
#[cfg(test)]
{
// Hook for tests that we never drop too many times.
super::tests::on_task_drop();
}

// Because `fetch_sub` is already atomic, we do not need to synchronize
// with other threads unless we are going to delete the object. This
// same logic applies to the below `fetch_sub` to the `weak` count.
Expand Down Expand Up @@ -502,7 +513,16 @@ impl TaskFuture {

#[cfg(feature = "unstable-futures")]
TaskFuture::Futures2 { ref mut fut, ref waker, ref mut tls } => {
let mut cx = futures2::task::Context::new(tls, waker, exec);
// we don't want to construct a concrete Waker yet,
// since that needs to clone. We just need to give a reference.
// If the future clones, all is fine.
let w = unsafe {
mem::transmute::<
&*mut futures2::task::UnsafeWake,
&futures2::task::Waker,
>(waker)
};
let mut cx = futures2::task::Context::new(tls, w, exec);
match fut.poll(&mut cx).unwrap() {
futures2::Async::Pending => Ok(Async::NotReady),
futures2::Async::Ready(x) => Ok(Async::Ready(x)),
Expand All @@ -511,3 +531,4 @@ impl TaskFuture {
}
}
}