Skip to content

Commit

Permalink
Finish updating tokio-sync (#1130)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed Jun 7, 2019
1 parent c5c379c commit 1f08354
Show file tree
Hide file tree
Showing 23 changed files with 709 additions and 585 deletions.
10 changes: 8 additions & 2 deletions tokio-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ Synchronization utilities.
categories = ["asynchronous"]
publish = false

[features]
async-traits = ["async-sink", "futures-core-preview"]

[dependencies]
fnv = "1.0.6"
tokio-futures = { version = "0.2.0", path = "../tokio-futures" }
async-sink = { git = "https://github.com/tokio-rs/async", optional = true }
futures-core-preview = { version = "0.3.0-alpha.16", optional = true }

[dev-dependencies]
async-util = { git = "https://github.com/tokio-rs/async" }
env_logger = { version = "0.5", default-features = false }
pin-utils = "0.1.0-alpha.4"
# tokio = { version = "0.2.0", path = "../tokio" }
tokio-test = { version = "0.2.0", path = "../tokio-test" }
loom = { version = "0.1.1", features = ["futures"] }
loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] }
21 changes: 16 additions & 5 deletions tokio-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,29 @@ macro_rules! debug {
}
}

/*
/// Unwrap a ready value or propagate `Poll::Pending`.
#[macro_export]
macro_rules! ready {
($e:expr) => {{
use std::task::Poll::{Pending, Ready};

match $e {
Ready(v) => v,
Pending => return Pending,
}
}};
}

macro_rules! if_fuzz {
($($t:tt)*) => {{
if false { $($t)* }
}}
}
*/

// pub mod lock;
pub mod lock;
mod loom;
// pub mod mpsc;
pub mod mpsc;
pub mod oneshot;
// pub mod semaphore;
pub mod semaphore;
pub mod task;
pub mod watch;
14 changes: 7 additions & 7 deletions tokio-sync/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
//! [`LockGuard`]: struct.LockGuard.html

use crate::semaphore;
use futures::Async;

use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::task::Poll::Ready;
use std::task::{Context, Poll};

/// An asynchronous mutual exclusion primitive useful for protecting shared data
///
Expand Down Expand Up @@ -103,22 +105,20 @@ impl<T> Lock<T> {
/// Try to acquire the lock.
///
/// If the lock is already held, the current task is notified when it is released.
pub fn poll_lock(&mut self) -> Async<LockGuard<T>> {
if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
ready!(self.permit.poll_acquire(cx, &self.inner.s)).unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
}) {
return Async::NotReady;
}
});

// We want to move the acquired permit into the guard,
// and leave an unacquired one in self.
let acquired = Self {
inner: self.inner.clone(),
permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()),
};
Async::Ready(LockGuard(acquired))
Ready(LockGuard(acquired))
}
}

Expand Down
8 changes: 2 additions & 6 deletions tokio-sync/src/loom.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
pub(crate) mod futures {
// pub(crate) use crate::task::AtomicTask;
pub(crate) use crate::task::AtomicWaker;
}

pub(crate) use std::task;

pub(crate) mod sync {
pub(crate) use std::sync::atomic;
// pub(crate) use std::sync::Arc;
pub(crate) use std::sync::Arc;

use std::cell::UnsafeCell;

Expand All @@ -33,8 +31,6 @@ pub(crate) mod sync {
}
}

/*
pub(crate) fn yield_now() {
::std::sync::atomic::spin_loop_hint();
}
*/
65 changes: 36 additions & 29 deletions tokio-sync/src/mpsc/bounded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use super::chan;
use futures::{Poll, Sink, StartSend, Stream};

use std::fmt;
use std::task::{Context, Poll};

#[cfg(feature = "async-traits")]
use std::pin::Pin;

/// Send values to the associated `Receiver`.
///
Expand Down Expand Up @@ -127,6 +131,11 @@ impl<T> Receiver<T> {
Receiver { chan }
}

/// TODO: Dox
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
Expand All @@ -136,12 +145,12 @@ impl<T> Receiver<T> {
}
}

impl<T> Stream for Receiver<T> {
#[cfg(feature = "async-traits")]
impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
type Error = RecvError;

fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
self.chan.recv().map_err(|_| RecvError(()))
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
Receiver::poll_next(self.get_mut(), cx)
}
}

Expand All @@ -165,13 +174,13 @@ impl<T> Sender<T> {
///
/// This method returns:
///
/// - `Ok(Async::Ready(_))` if capacity is reserved for a single message.
/// - `Ok(Async::NotReady)` if the channel may not have capacity, in which
/// - `Poll::Ready(Ok(_))` if capacity is reserved for a single message.
/// - `Poll::Pending` if the channel may not have capacity, in which
/// case the current task is queued to be notified once
/// capacity is available;
/// - `Err(SendError)` if the receiver has been dropped.
pub fn poll_ready(&mut self) -> Poll<(), SendError> {
self.chan.poll_ready().map_err(|_| SendError(()))
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
self.chan.poll_ready(cx).map_err(|_| SendError(()))
}

/// Attempts to send a message on this `Sender`, returning the message
Expand All @@ -182,31 +191,29 @@ impl<T> Sender<T> {
}
}

impl<T> Sink for Sender<T> {
type SinkItem = T;
type SinkError = SendError;
#[cfg(feature = "async-traits")]
impl<T> async_sink::Sink<T> for Sender<T> {
type Error = SendError;

fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
use futures::Async::*;
use futures::AsyncSink;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sender::poll_ready(self.get_mut(), cx)
}

match self.poll_ready()? {
Ready(_) => {
self.try_send(msg).map_err(|_| SendError(()))?;
Ok(AsyncSink::Ready)
}
NotReady => Ok(AsyncSink::NotReady(msg)),
}
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
self.as_mut()
.try_send(msg)
.map_err(|err| {
assert!(err.is_full(), "call `poll_ready` before sending");
SendError(())
})
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
use futures::Async::Ready;
Ok(Ready(()))
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn close(&mut self) -> Poll<(), Self::SinkError> {
use futures::Async::Ready;
Ok(Ready(()))
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}

Expand Down
46 changes: 23 additions & 23 deletions tokio-sync/src/mpsc/chan.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::list;
use crate::loom::{
futures::AtomicTask,
futures::AtomicWaker,
sync::atomic::AtomicUsize,
sync::{Arc, CausalCell},
};
use futures::Poll;
use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};

/// Channel sender
pub(crate) struct Tx<T, S: Semaphore> {
Expand Down Expand Up @@ -61,7 +62,8 @@ pub(crate) trait Semaphore {

fn add_permit(&self);

fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>;
fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit)
-> Poll<Result<(), ()>>;

fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;

Expand All @@ -81,8 +83,8 @@ struct Chan<T, S> {
/// Coordinates access to channel's capacity.
semaphore: S,

/// Receiver task. Notified when a value is pushed into the channel.
rx_task: AtomicTask,
/// Receiver waker. Notified when a value is pushed into the channel.
rx_waker: AtomicWaker,

/// Tracks the number of outstanding sender handles.
///
Expand All @@ -101,7 +103,7 @@ where
fmt.debug_struct("Chan")
.field("tx", &self.tx)
.field("semaphore", &self.semaphore)
.field("rx_task", &self.rx_task)
.field("rx_waker", &self.rx_waker)
.field("tx_count", &self.tx_count)
.field("rx_fields", &"...")
.finish()
Expand Down Expand Up @@ -138,7 +140,7 @@ where
let chan = Arc::new(Chan {
tx,
semaphore,
rx_task: AtomicTask::new(),
rx_waker: AtomicWaker::new(),
tx_count: AtomicUsize::new(1),
rx_fields: CausalCell::new(RxFields {
list: rx,
Expand All @@ -163,8 +165,8 @@ where
}

/// TODO: Docs
pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> {
self.inner.semaphore.poll_acquire(&mut self.permit)
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.inner.semaphore.poll_acquire(cx, &mut self.permit)
}

/// Send a message and notify the receiver.
Expand All @@ -177,7 +179,7 @@ where
self.inner.tx.push(value);

// Notify the rx task
self.inner.rx_task.notify();
self.inner.rx_waker.wake();

// Release the permit
self.inner.semaphore.forget(&mut self.permit);
Expand Down Expand Up @@ -217,7 +219,7 @@ where
self.inner.tx.close();

// Notify the receiver
self.inner.rx_task.notify();
self.inner.rx_waker.wake();
}
}

Expand Down Expand Up @@ -246,9 +248,8 @@ where
}

/// Receive the next value
pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> {
pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read::*;
use futures::Async::*;

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
Expand All @@ -258,7 +259,7 @@ where
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
return Ok(Ready(Some(value)));
return Ready(Some(value));
}
Some(Closed) => {
// TODO: This check may not be required as it most
Expand All @@ -268,7 +269,7 @@ where
// which ensures that if dropping the tx handle is
// visible, then all messages sent are also visible.
assert!(self.inner.semaphore.is_idle());
return Ok(Ready(None));
return Ready(None);
}
None => {} // fall through
}
Expand All @@ -277,7 +278,7 @@ where

try_recv!();

self.inner.rx_task.register();
self.inner.rx_waker.register_by_ref(cx.waker());

// It is possible that a value was pushed between attempting to read
// and registering the task, so we have to check the channel a
Expand All @@ -291,9 +292,9 @@ where
);

if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
Ok(Ready(None))
Ready(None)
} else {
Ok(NotReady)
Pending
}
})
}
Expand Down Expand Up @@ -372,8 +373,8 @@ impl Semaphore for (crate::semaphore::Semaphore, usize) {
self.0.available_permits() == self.1
}

fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> {
permit.poll_acquire(&self.0).map_err(|_| ())
fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> {
permit.poll_acquire(cx, &self.0).map_err(|_| ())
}

fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
Expand Down Expand Up @@ -415,9 +416,8 @@ impl Semaphore for AtomicUsize {
self.load(Acquire) >> 1 == 0
}

fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> {
use futures::Async::Ready;
self.try_acquire(permit).map(Ready).map_err(|_| ())
fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> {
Ready(self.try_acquire(permit).map_err(|_| ()))
}

fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
Expand Down
Loading

0 comments on commit 1f08354

Please sign in to comment.