Skip to content

Commit

Permalink
Improve Node of FuturesUnordered
Browse files Browse the repository at this point in the history
  • Loading branch information
MajorBreakfast committed Jun 22, 2018
1 parent f0c8787 commit d7cc312
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 54 deletions.
7 changes: 3 additions & 4 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::{Arc, Weak};
use std::usize;

use futures_core::{Stream, Future, Poll};
use futures_core::task::{self, AtomicWaker, LocalWaker};
use futures_core::task::{self, AtomicWaker};

mod abort;
mod inner;
Expand All @@ -21,7 +21,7 @@ mod node;

use self::inner::{Inner, Dequeue};
use self::iter_mut::IterMut;
use self::node::{Node, NodeToHandle};
use self::node::Node;

/// A set of `Future`s which may complete in any order.
///
Expand Down Expand Up @@ -325,8 +325,7 @@ impl<T> Stream for FuturesUnordered<T>
// the internal allocation, appropriately accessing fields and
// deallocating the node if need be.
let res = {
let notify = NodeToHandle(bomb.node.as_ref().unwrap());
let local_waker = LocalWaker::from(notify);
let local_waker = bomb.node.as_ref().unwrap().local_waker();
let mut cx = cx.with_waker(&local_waker);

// Safety: We won't move the future ever again
Expand Down
103 changes: 53 additions & 50 deletions futures-util/src/stream/futures_unordered/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub(super) struct Node<T> {
}

impl<T> Node<T> {
pub(super) fn notify(me: &Arc<Node<T>>) {
let inner = match me.queue.upgrade() {
pub(super) fn wake(self: &Arc<Node<T>>) {
let inner = match self.queue.upgrade() {
Some(inner) => inner,
None => return,
};
Expand All @@ -52,12 +52,39 @@ impl<T> Node<T> {
// implementation guarantees that if we set the `queued` flag true that
// there's a reference count held by the main `FuturesUnordered` queue
// still.
let prev = me.queued.swap(true, SeqCst);
let prev = self.queued.swap(true, SeqCst);
if !prev {
inner.enqueue(&**me);
inner.enqueue(&**self);
inner.parent.wake();
}
}

// Saftey: The returned `NonNull<Unsafe>` needs to be put into a `Waker`
// or `LocalWaker`
unsafe fn clone_as_unsafe_wake_without_lifetime(self: &Arc<Node<T>>)
-> NonNull<UnsafeWake>
{
let clone = self.clone();

// Safety: This is save because an `Arc` is a struct which contains
// a single field that is a pointer.
let ptr = mem::transmute::<Arc<Node<T>>, NonNull<ArcNode<T>>>(clone);

let ptr = ptr as NonNull<UnsafeWake>;

// Hide lifetime of `T`
// Safety: This is safe because `UnsafeWake` is guaranteed not to
// touch `T`
mem::transmute::<NonNull<UnsafeWake>, NonNull<UnsafeWake>>(ptr)
}

pub(super) fn local_waker(self: &Arc<Node<T>>) -> LocalWaker {
unsafe { LocalWaker::new(self.clone_as_unsafe_wake_without_lifetime()) }
}

pub(super) fn waker(self: &Arc<Node<T>>) -> Waker {
unsafe { Waker::new(self.clone_as_unsafe_wake_without_lifetime()) }
}
}

impl<T> Drop for Node<T> {
Expand All @@ -78,68 +105,44 @@ impl<T> Drop for Node<T> {
}
}

#[allow(missing_debug_implementations)]
pub(super) struct NodeToHandle<'a, T: 'a>(pub(super) &'a Arc<Node<T>>);

impl<'a, T> Clone for NodeToHandle<'a, T> {
fn clone(&self) -> Self {
NodeToHandle(self.0)
}
}

#[doc(hidden)]
impl<'a, T> From<NodeToHandle<'a, T>> for LocalWaker {
fn from(handle: NodeToHandle<'a, T>) -> LocalWaker {
unsafe {
let ptr: Arc<Node<T>> = handle.0.clone();
let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr);
let ptr = ptr as *mut UnsafeWake;
// Hide lifetime
let ptr = mem::transmute::<*mut UnsafeWake, *mut UnsafeWake>(ptr);
LocalWaker::new(NonNull::new(ptr).unwrap())
}
}
}

#[doc(hidden)]
impl<'a, T> From<NodeToHandle<'a, T>> for Waker {
fn from(handle: NodeToHandle<'a, T>) -> Waker {
unsafe {
let ptr: Arc<Node<T>> = handle.0.clone();
let ptr = mem::transmute::<Arc<Node<T>>, *mut ArcNode<T>>(ptr);
let ptr = ptr as *mut UnsafeWake;
// Hide lifetime
let ptr = mem::transmute::<*mut UnsafeWake, *mut UnsafeWake>(ptr);
Waker::new(NonNull::new(ptr).unwrap())
}
}
}

// `ArcNode<T>` represents conceptually the struct an `Arc<Node<T>>` points to.
// `*const ArcNode<T>` is equal to `Arc<Node<T>>`
// It may only be used through references because its layout obviously doesn't
// match the real inner struct of an `Arc` which (currently) has the form
// `{ strong, weak, data }`.
struct ArcNode<T>(PhantomData<T>);

// We should never touch `T` on any thread other than the one owning
// We should never touch the future `T` on any thread other than the one owning
// `FuturesUnordered`, so this should be a safe operation.
unsafe impl<T> Send for ArcNode<T> {}
unsafe impl<T> Sync for ArcNode<T> {}

// We need to implement `UnsafeWake` trait directly and can't implement `Wake`
// for `Node<T>` because `T`, the future, isn't required to have a static
// lifetime. `UnsafeWake` lets us forget about `T` and its lifetime. This is
// safe because neither `drop_raw` nor `wake` touch `T`. This is the case even
// though `drop_raw` runs the destructor for `Node<T>` because its destructor is
// guaranteed to not touch `T`. `T` must already have been dropped by the time
// it runs. See `Drop` impl for `Node<T>` for more details.
unsafe impl<T> UnsafeWake for ArcNode<T> {
#[inline]
unsafe fn clone_raw(&self) -> Waker {
let me: *const ArcNode<T> = self;
let me: *const *const ArcNode<T> = &me;
let me = &*(me as *const Arc<Node<T>>);
NodeToHandle(me).into()
let node = &*(&me as *const *const ArcNode<T> as *const Arc<Node<T>>);
Node::waker(node)
}

#[inline]
unsafe fn drop_raw(&self) {
let mut me: *const ArcNode<T> = self;
let me = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
ptr::drop_in_place(me);
let node_ptr = &mut me as *mut *const ArcNode<T> as *mut Arc<Node<T>>;
ptr::drop_in_place(node_ptr);
}

#[inline]
unsafe fn wake(&self) {
let me: *const ArcNode<T> = self;
let me: *const *const ArcNode<T> = &me;
let me = me as *const Arc<Node<T>>;
Node::notify(&*me)
let node = &*(&me as *const *const ArcNode<T> as *const Arc<Node<T>>);
Node::wake(node)
}
}

0 comments on commit d7cc312

Please sign in to comment.