Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish updating tokio-sync #1130

Merged
merged 13 commits into from
Jun 7, 2019
10 changes: 8 additions & 2 deletions tokio-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ Synchronization utilities.
categories = ["asynchronous"]
publish = false

[features]
async-traits = ["async-sink", "futures-core-preview"]

[dependencies]
fnv = "1.0.6"
tokio-futures = { version = "0.2.0", path = "../tokio-futures" }
async-sink = { git = "https://github.com/tokio-rs/async", optional = true }
futures-core-preview = { version = "0.3.0-alpha.16", optional = true }

[dev-dependencies]
async-util = { git = "https://github.com/tokio-rs/async" }
env_logger = { version = "0.5", default-features = false }
pin-utils = "0.1.0-alpha.4"
# tokio = { version = "0.2.0", path = "../tokio" }
tokio-test = { version = "0.2.0", path = "../tokio-test" }
loom = { version = "0.1.1", features = ["futures"] }
loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] }
21 changes: 16 additions & 5 deletions tokio-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,29 @@ macro_rules! debug {
}
}

/*
/// Unwrap a ready value or propagate `Poll::Pending`.
#[macro_export]
macro_rules! ready {
($e:expr) => {{
use std::task::Poll::{Pending, Ready};

match $e {
Ready(v) => v,
Pending => return Pending,
}
}};
}

macro_rules! if_fuzz {
($($t:tt)*) => {{
if false { $($t)* }
}}
}
*/

// pub mod lock;
pub mod lock;
mod loom;
// pub mod mpsc;
pub mod mpsc;
pub mod oneshot;
// pub mod semaphore;
pub mod semaphore;
pub mod task;
pub mod watch;
14 changes: 7 additions & 7 deletions tokio-sync/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
//! [`LockGuard`]: struct.LockGuard.html

use crate::semaphore;
use futures::Async;

use std::cell::UnsafeCell;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::task::Poll::Ready;
use std::task::{Context, Poll};

/// An asynchronous mutual exclusion primitive useful for protecting shared data
///
Expand Down Expand Up @@ -103,22 +105,20 @@ impl<T> Lock<T> {
/// Try to acquire the lock.
///
/// If the lock is already held, the current task is notified when it is released.
pub fn poll_lock(&mut self) -> Async<LockGuard<T>> {
if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
ready!(self.permit.poll_acquire(cx, &self.inner.s)).unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
}) {
return Async::NotReady;
}
});

// We want to move the acquired permit into the guard,
// and leave an unacquired one in self.
let acquired = Self {
inner: self.inner.clone(),
permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()),
};
Async::Ready(LockGuard(acquired))
Ready(LockGuard(acquired))
}
}

Expand Down
8 changes: 2 additions & 6 deletions tokio-sync/src/loom.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
pub(crate) mod futures {
// pub(crate) use crate::task::AtomicTask;
pub(crate) use crate::task::AtomicWaker;
}

pub(crate) use std::task;

pub(crate) mod sync {
pub(crate) use std::sync::atomic;
// pub(crate) use std::sync::Arc;
pub(crate) use std::sync::Arc;

use std::cell::UnsafeCell;

Expand All @@ -33,8 +31,6 @@ pub(crate) mod sync {
}
}

/*
pub(crate) fn yield_now() {
::std::sync::atomic::spin_loop_hint();
}
*/
65 changes: 36 additions & 29 deletions tokio-sync/src/mpsc/bounded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use super::chan;
use futures::{Poll, Sink, StartSend, Stream};

use std::fmt;
use std::task::{Context, Poll};

#[cfg(feature = "async-traits")]
use std::pin::Pin;

/// Send values to the associated `Receiver`.
///
Expand Down Expand Up @@ -127,6 +131,11 @@ impl<T> Receiver<T> {
Receiver { chan }
}

/// TODO: Dox
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.chan.recv(cx)
}

/// Closes the receiving half of a channel, without dropping it.
///
/// This prevents any further messages from being sent on the channel while
Expand All @@ -136,12 +145,12 @@ impl<T> Receiver<T> {
}
}

impl<T> Stream for Receiver<T> {
#[cfg(feature = "async-traits")]
impl<T> futures_core::Stream for Receiver<T> {
type Item = T;
type Error = RecvError;

fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
self.chan.recv().map_err(|_| RecvError(()))
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
Receiver::poll_next(self.get_mut(), cx)
}
}

Expand All @@ -165,13 +174,13 @@ impl<T> Sender<T> {
///
/// This method returns:
///
/// - `Ok(Async::Ready(_))` if capacity is reserved for a single message.
/// - `Ok(Async::NotReady)` if the channel may not have capacity, in which
/// - `Poll::Ready(Ok(_))` if capacity is reserved for a single message.
/// - `Poll::Pending` if the channel may not have capacity, in which
/// case the current task is queued to be notified once
/// capacity is available;
/// - `Err(SendError)` if the receiver has been dropped.
pub fn poll_ready(&mut self) -> Poll<(), SendError> {
self.chan.poll_ready().map_err(|_| SendError(()))
/// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
self.chan.poll_ready(cx).map_err(|_| SendError(()))
}

/// Attempts to send a message on this `Sender`, returning the message
Expand All @@ -182,31 +191,29 @@ impl<T> Sender<T> {
}
}

impl<T> Sink for Sender<T> {
type SinkItem = T;
type SinkError = SendError;
#[cfg(feature = "async-traits")]
impl<T> async_sink::Sink<T> for Sender<T> {
type Error = SendError;

fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
use futures::Async::*;
use futures::AsyncSink;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Sender::poll_ready(self.get_mut(), cx)
}

match self.poll_ready()? {
Ready(_) => {
self.try_send(msg).map_err(|_| SendError(()))?;
Ok(AsyncSink::Ready)
}
NotReady => Ok(AsyncSink::NotReady(msg)),
}
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
self.as_mut()
.try_send(msg)
.map_err(|err| {
assert!(err.is_full(), "call `poll_ready` before sending");
SendError(())
})
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
use futures::Async::Ready;
Ok(Ready(()))
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn close(&mut self) -> Poll<(), Self::SinkError> {
use futures::Async::Ready;
Ok(Ready(()))
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}

Expand Down
46 changes: 23 additions & 23 deletions tokio-sync/src/mpsc/chan.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::list;
use crate::loom::{
futures::AtomicTask,
futures::AtomicWaker,
sync::atomic::AtomicUsize,
sync::{Arc, CausalCell},
};
use futures::Poll;
use std::fmt;
use std::process;
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};

/// Channel sender
pub(crate) struct Tx<T, S: Semaphore> {
Expand Down Expand Up @@ -61,7 +62,8 @@ pub(crate) trait Semaphore {

fn add_permit(&self);

fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>;
fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit)
-> Poll<Result<(), ()>>;

fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>;

Expand All @@ -81,8 +83,8 @@ struct Chan<T, S> {
/// Coordinates access to channel's capacity.
semaphore: S,

/// Receiver task. Notified when a value is pushed into the channel.
rx_task: AtomicTask,
/// Receiver waker. Notified when a value is pushed into the channel.
rx_waker: AtomicWaker,

/// Tracks the number of outstanding sender handles.
///
Expand All @@ -101,7 +103,7 @@ where
fmt.debug_struct("Chan")
.field("tx", &self.tx)
.field("semaphore", &self.semaphore)
.field("rx_task", &self.rx_task)
.field("rx_waker", &self.rx_waker)
.field("tx_count", &self.tx_count)
.field("rx_fields", &"...")
.finish()
Expand Down Expand Up @@ -138,7 +140,7 @@ where
let chan = Arc::new(Chan {
tx,
semaphore,
rx_task: AtomicTask::new(),
rx_waker: AtomicWaker::new(),
tx_count: AtomicUsize::new(1),
rx_fields: CausalCell::new(RxFields {
list: rx,
Expand All @@ -163,8 +165,8 @@ where
}

/// TODO: Docs
pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> {
self.inner.semaphore.poll_acquire(&mut self.permit)
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
self.inner.semaphore.poll_acquire(cx, &mut self.permit)
}

/// Send a message and notify the receiver.
Expand All @@ -177,7 +179,7 @@ where
self.inner.tx.push(value);

// Notify the rx task
self.inner.rx_task.notify();
self.inner.rx_waker.wake();

// Release the permit
self.inner.semaphore.forget(&mut self.permit);
Expand Down Expand Up @@ -217,7 +219,7 @@ where
self.inner.tx.close();

// Notify the receiver
self.inner.rx_task.notify();
self.inner.rx_waker.wake();
}
}

Expand Down Expand Up @@ -246,9 +248,8 @@ where
}

/// Receive the next value
pub(crate) fn recv(&mut self) -> Poll<Option<T>, ()> {
pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read::*;
use futures::Async::*;

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
Expand All @@ -258,7 +259,7 @@ where
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
self.inner.semaphore.add_permit();
return Ok(Ready(Some(value)));
return Ready(Some(value));
}
Some(Closed) => {
// TODO: This check may not be required as it most
Expand All @@ -268,7 +269,7 @@ where
// which ensures that if dropping the tx handle is
// visible, then all messages sent are also visible.
assert!(self.inner.semaphore.is_idle());
return Ok(Ready(None));
return Ready(None);
}
None => {} // fall through
}
Expand All @@ -277,7 +278,7 @@ where

try_recv!();

self.inner.rx_task.register();
self.inner.rx_waker.register_by_ref(cx.waker());

// It is possible that a value was pushed between attempting to read
// and registering the task, so we have to check the channel a
Expand All @@ -291,9 +292,9 @@ where
);

if rx_fields.rx_closed && self.inner.semaphore.is_idle() {
Ok(Ready(None))
Ready(None)
} else {
Ok(NotReady)
Pending
}
})
}
Expand Down Expand Up @@ -372,8 +373,8 @@ impl Semaphore for (crate::semaphore::Semaphore, usize) {
self.0.available_permits() == self.1
}

fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> {
permit.poll_acquire(&self.0).map_err(|_| ())
fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll<Result<(), ()>> {
permit.poll_acquire(cx, &self.0).map_err(|_| ())
}

fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> {
Expand Down Expand Up @@ -415,9 +416,8 @@ impl Semaphore for AtomicUsize {
self.load(Acquire) >> 1 == 0
}

fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> {
use futures::Async::Ready;
self.try_acquire(permit).map(Ready).map_err(|_| ())
fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll<Result<(), ()>> {
Ready(self.try_acquire(permit).map_err(|_| ()))
}

fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> {
Expand Down
Loading