Skip to content

Commit

Permalink
fix use-after-free of Task with futures2
Browse files Browse the repository at this point in the history
Code stolen from tokio-rs#254
  • Loading branch information
kpp committed Apr 11, 2018
1 parent 372400e commit ad74995
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 17 deletions.
3 changes: 0 additions & 3 deletions tokio-threadpool/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ use num_cpus;
use tokio_executor::Enter;
use tokio_executor::park::Park;

#[cfg(feature = "unstable-futures")]
use futures2;

/// Builds a thread pool with custom configuration values.
///
/// Methods can be chanined in order to set the configuration values. The thread
Expand Down
111 changes: 102 additions & 9 deletions tokio-threadpool/src/futures2_wake.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use inner::Pool;
use pool::Pool;
use notifier::Notifier;

use std::marker::PhantomData;
Expand All @@ -13,6 +13,11 @@ pub(crate) struct Futures2Wake {
id: usize,
}

// Futures2Wake doesn't need to drop_id on drop,
// because the Futures2Wake is **only** ever held in:
// - ::task::Task, which handles drop itself
// - futures::Waker, that a user could have cloned. When that drops,
// it will call drop_raw, so we don't need to double drop.
impl Futures2Wake {
pub(crate) fn new(id: usize, inner: &Arc<Pool>) -> Futures2Wake {
let notifier = Arc::new(Notifier {
Expand All @@ -22,12 +27,6 @@ impl Futures2Wake {
}
}

impl Drop for Futures2Wake {
fn drop(&mut self) {
self.notifier.drop_id(self.id)
}
}

struct ArcWrapped(PhantomData<Futures2Wake>);

unsafe impl futures2::task::UnsafeWake for ArcWrapped {
Expand All @@ -52,9 +51,103 @@ unsafe impl futures2::task::UnsafeWake for ArcWrapped {
}
}

pub(crate) fn into_unsafe_wake(rc: Arc<Futures2Wake>) -> *mut futures2::task::UnsafeWake {
unsafe {
mem::transmute::<Arc<Futures2Wake>, *mut ArcWrapped>(rc)
}
}

pub(crate) fn into_waker(rc: Arc<Futures2Wake>) -> futures2::task::Waker {
unsafe {
let ptr = mem::transmute::<Arc<Futures2Wake>, *mut ArcWrapped>(rc);
futures2::task::Waker::new(ptr)
futures2::task::Waker::new(into_unsafe_wake(rc))
}
}


#[cfg(test)]
mod tests {
// We want most tests as integration tests, but these ones are special:
//
// This is testing that Task drop never happens more than it should,
// causing use-after-free bugs. ;_;

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Relaxed, Release};
use ::{Sender, Shutdown, ThreadPool};

use futures2;
use futures2::prelude::*;

static TASK_DROPS: AtomicUsize = ::std::sync::atomic::ATOMIC_USIZE_INIT;

pub(super) fn on_task_drop() {
TASK_DROPS.fetch_add(1, Release);
}

fn reset_task_drops() {
TASK_DROPS.store(0, Release);
}

fn spawn_pool<F>(pool: &mut Sender, f: F)
where F: Future<Item = (), Error = ()> + Send + 'static
{
futures2::executor::Executor::spawn(
pool,
Box::new(f.map_err(|_| panic!()))
).unwrap()
}

fn await_shutdown(shutdown: Shutdown) {
::futures::Future::wait(shutdown).unwrap()
}

#[test]
fn task_drop_counts() {
extern crate env_logger;
let _ = env_logger::init();

struct Always;

impl Future for Always {
type Item = ();
type Error = ();

fn poll(&mut self, _: &mut futures2::task::Context) -> Poll<(), ()> {
Ok(Async::Ready(()))
}
}

reset_task_drops();

let pool = ThreadPool::new();
let mut tx = pool.sender().clone();
spawn_pool(&mut tx, Always);
await_shutdown(pool.shutdown());

// We've never cloned the waker/notifier, so should only be 1 drop
assert_eq!(TASK_DROPS.load(Relaxed), 1);


struct Park;

impl Future for Park {
type Item = ();
type Error = ();

fn poll(&mut self, cx: &mut futures2::task::Context) -> Poll<(), ()> {
cx.waker().clone().wake();
Ok(Async::Ready(()))
}
}

reset_task_drops();

let pool = ThreadPool::new();
let mut tx = pool.sender().clone();
spawn_pool(&mut tx, Park);
await_shutdown(pool.shutdown());

// We've cloned the task once, so should be 2 drops
assert_eq!(TASK_DROPS.load(Relaxed), 2);
}
}
82 changes: 82 additions & 0 deletions tokio-threadpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,85 @@ pub use sender::Sender;
pub use shutdown::Shutdown;
pub use thread_pool::ThreadPool;
pub use worker::Worker;

#[cfg(not(feature = "unstable-futures"))]
#[cfg(test)]
mod tests {
// We want most tests as integration tests, but these ones are special:
//
// This is testing that Task drop never happens more than it should,
// causing use-after-free bugs. ;_;

use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Relaxed, Release};
use super::{Sender, Shutdown, ThreadPool};

#[cfg(not(feature = "unstable-futures"))]
use futures::{Poll, Async, Future};

static TASK_DROPS: AtomicUsize = ::std::sync::atomic::ATOMIC_USIZE_INIT;

fn reset_task_drops() {
TASK_DROPS.store(0, Release);
}

fn spawn_pool<F>(pool: &mut Sender, f: F)
where F: Future<Item = (), Error = ()> + Send + 'static
{
pool.spawn(f).unwrap()
}

fn await_shutdown(shutdown: Shutdown) {
shutdown.wait().unwrap()
}

#[test]
fn task_drop_counts() {
extern crate env_logger;
let _ = env_logger::init();

struct Always;

impl Future for Always {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::Ready(()))
}
}

reset_task_drops();

let pool = ThreadPool::new();
let mut tx = pool.sender().clone();
spawn_pool(&mut tx, Always);
await_shutdown(pool.shutdown());

// We've never cloned the waker/notifier, so should only be 1 drop
assert_eq!(TASK_DROPS.load(Relaxed), 1);


struct Park;

impl Future for Park {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
::futures::task::current().notify();
Ok(Async::Ready(()))
}
}

reset_task_drops();

let pool = ThreadPool::new();
let mut tx = pool.sender().clone();
spawn_pool(&mut tx, Park);
await_shutdown(pool.shutdown());

// We've cloned the task once, so should be 2 drops
assert_eq!(TASK_DROPS.load(Relaxed), 2);
}
}
3 changes: 2 additions & 1 deletion tokio-threadpool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use task::Task;
use worker::{self, Worker, WorkerId};

use futures::task::AtomicTask;
#[cfg(feature = "unstable-futures")]
use futures2;

use std::cell::UnsafeCell;
use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed};
Expand Down Expand Up @@ -172,7 +174,6 @@ impl Pool {
trace!("worker_terminated; num_workers={}", prev - 1);

if 1 == prev {
trace!("notifying shutdown task");
self.shutdown_task.notify();
}
}
Expand Down
6 changes: 4 additions & 2 deletions tokio-threadpool/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,11 @@ impl<'a> futures2::executor::Executor for &'a Sender {
// execution.

// Create a new task for the future
let task = Task::new2(f, |id| into_waker(Arc::new(Futures2Wake::new(id, &self.inner))));
let task = Task::new2(f, |id| {
into_waker(Arc::new(Futures2Wake::new(id, &self.inner)))
});

self.inner.submit(task, &self.inner);
self.inner.submit(Arc::new(task), &self.inner);

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions tokio-threadpool/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ impl Task {
let mut inner = Box::new(Task {
next: AtomicPtr::new(ptr::null_mut()),
state: AtomicUsize::new(State::new().into()),
future: None,
future: UnsafeCell::new(None),
});

let waker = make_waker((&*inner) as *const _ as usize);
let tls = futures2::task::LocalMap::new();
inner.future = Some(TaskFuture::Futures2 { waker, tls, fut });
inner.future = UnsafeCell::new(Some(TaskFuture::Futures2 { waker, tls, fut }));

Task { ptr: Box::into_raw(inner) }
}
Expand Down

0 comments on commit ad74995

Please sign in to comment.