Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: adds Notify for basic task notification #2210

Merged
merged 16 commits into from
Feb 26, 2020
Merged
Prev Previous commit
Next Next commit
rename recv() -> notified()
carllerche committed Feb 26, 2020
commit f1796f85bed827c9866e02ad3b257e45ce883697
106 changes: 54 additions & 52 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
@@ -14,23 +14,22 @@ use std::task::{Context, Poll, Waker};
/// another task to perform an operation.
///
/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits.
/// [`Notify::recv`] waits for a permit to become available and
/// [`Notify::notify_one`] sets a permit **if there currently are no available
/// permits**.
/// [`notfied().await`] waits for a permit to become available and [`notify()`]
carllerche marked this conversation as resolved.
Show resolved Hide resolved
/// sets a permit **if there currently are no available permits**.
///
/// The synchronization details of `Notify` are similar to
/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
/// value contains a single permit. [`Notify::recv`] waits for the permit to be
/// made available, consumes the permit, and resumes. [`Notify::notify_one`]
/// value contains a single permit. [`Notify::notfied`] waits for the permit to
/// be made available, consumes the permit, and resumes. [`Notify::notify_one`]
/// sets the permit, waking a pending task if there is one.
///
/// If `notify_one` is called **before** `recv()`, then the next call to
/// `recv()` will complete immediately, consuming the permit. Any subsequent
/// calls to `recv()` will wait for a new permit.
/// If `notify()` is called **before** `notfied().await`, then the next call to
/// `notified().await` will complete immediately, consuming the permit. Any
/// subsequent calls to `notified().await` will wait for a new permit.
///
/// If `notify_one` is called **multiple** times before `recv()`, only a
/// **single** permit is stored. The next call to `recv()` will complete
/// immediately, but the one after will wait for a new permit.
/// If `notify()` is called **multiple** times before `notified().await`, only a
/// **single** permit is stored. The next call to `notified().await` will
/// complete immediately, but the one after will wait for a new permit.
///
/// # Examples
///
@@ -46,19 +45,20 @@ use std::task::{Context, Poll, Waker};
/// let notify2 = notify.clone();
///
/// tokio::spawn(async move {
/// notify2.recv().await;
/// notify2.notified().await;
/// println!("received notification");
/// });
///
/// println!("sending notification");
/// notify.notify_one();
/// notify.notify();
/// }
/// ```
///
/// Unbound mpsc channel.
///
/// ```
/// use tokio::sync::Notify;
///
/// use std::collections::VecDeque;
/// use std::sync::Mutex;
///
@@ -73,7 +73,7 @@ use std::task::{Context, Poll, Waker};
/// .push_back(value);
///
/// // Notify the consumer a value is available
/// self.notify.notify_one();
/// self.notify.notify();
/// }
///
/// pub async fn recv(&self) -> T {
@@ -84,16 +84,16 @@ use std::task::{Context, Poll, Waker};
/// }
///
/// // Wait for values to be available
/// self.notify.recv().await;
/// self.notify.notified().await;
/// }
/// }
/// }
/// ```
///
/// [park]: std::thread::park
/// [unpark]: std::thread::Thread::unpark
/// [`Notify::recv`]: Notify::recv()
/// [`Notify::notify_one`]: Notify::notify_one()
/// [`notified().await`]: Notify::notified()
/// [`notify`]: Notify::notify()
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
@@ -110,20 +110,21 @@ struct Waiter {
notified: bool,
}

/// Future returned from `notified()`
#[derive(Debug)]
struct RecvFuture<'a> {
struct Notified<'a> {
/// The `Notify` being received on.
notify: &'a Notify,

/// The current state of the receiving process.
state: RecvState,
state: State,

/// Entry in the waiter `LinkedList`.
waiter: linked_list::Entry<Waiter>,
}

#[derive(Debug)]
enum RecvState {
enum State {
Init,
Waiting,
Done,
@@ -158,9 +159,11 @@ impl Notify {
/// Wait for a notification.
///
/// Each `Notify` value holds a single permit. If a permit is available from
/// an earlier call to `notify_one`, then `recv` will complete immediately,
/// consuming that permit. Otherwise, `recv()` waits for a permit to be made
/// available by the next call to `notify_one`
/// an earlier call to [`notify()`], then `notified().await` will complete
/// immediately, consuming that permit. Otherwise, `notified().await` waits
/// for a permit to be made available by the next call to `notify()`
carllerche marked this conversation as resolved.
Show resolved Hide resolved
///
/// [`notify()`]: Notify::notify
///
/// # Examples
///
@@ -174,17 +177,18 @@ impl Notify {
/// let notify2 = notify.clone();
///
/// tokio::spawn(async move {
/// notify2.recv().await;
/// println!("received a notification");
/// notify2.notified().await;
/// println!("received notification");
/// });
///
/// notify.notify_one();
/// println!("sending notification");
/// notify.notify();
/// }
/// ```
pub async fn recv(&self) {
RecvFuture {
pub async fn notified(&self) {
Notified {
notify: self,
state: RecvState::Init,
state: State::Init,
waiter: linked_list::Entry::new(Waiter {
waker: None,
notified: false,
@@ -197,12 +201,13 @@ impl Notify {
///
/// If a task is currently waiting, that task is notified. Otherwise, a
/// permit is stored in this `Notify` value and the **next** call to
/// `recv()` will complete immediately consuming the permit made available
/// by this call to `notify_one()`.
/// `notified().await` will complete immediately consuming the permit made
carllerche marked this conversation as resolved.
Show resolved Hide resolved
/// available by this call to `notify()`.
///
/// At most one permit may be stored by `Notify`. Many sequential calls to
/// `notify_one` will result in a single permit being stored. The next call
/// to `recv()` will complete immediately, but the one after that will wait.
/// `notify` will result in a single permit being stored. The next call to
/// `notified().await` will complete immediately, but the one after that
/// will wait.
///
/// # Examples
///
@@ -216,22 +221,23 @@ impl Notify {
/// let notify2 = notify.clone();
///
/// tokio::spawn(async move {
/// notify2.recv().await;
/// println!("received a notification");
/// notify2.notified().await;
/// println!("received notification");
/// });
///
/// notify.notify_one();
/// println!("sending notification");
/// notify.notify();
/// }
/// ```
pub fn notify_one(&self) {
pub fn notify(&self) {
// Load the current state
let mut curr = self.state.load(SeqCst);
carllerche marked this conversation as resolved.
Show resolved Hide resolved

// If the state is `EMPTY`, transition to `NOTIFIED` and return.
while let EMPTY | NOTIFIED = curr {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
// The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
// happens-before synchronization must happen between this atomic
// operation and a task calling `recv()`.
// operation and a task calling `notified().await`.
let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst);

match res {
@@ -306,16 +312,12 @@ fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -
}
}

// ===== impl RecvFuture =====
// ===== impl Notified =====

impl RecvFuture<'_> {
impl Notified<'_> {
fn project(
carllerche marked this conversation as resolved.
Show resolved Hide resolved
self: Pin<&mut Self>,
) -> (
&Notify,
&mut RecvState,
Pin<&mut linked_list::Entry<Waiter>>,
) {
) -> (&Notify, &mut State, Pin<&mut linked_list::Entry<Waiter>>) {
unsafe {
// Safety: both `notify` and `state` are `Unpin`.

@@ -332,11 +334,11 @@ impl RecvFuture<'_> {
}
}

impl Future for RecvFuture<'_> {
impl Future for Notified<'_> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
use RecvState::*;
use State::*;

let (notify, state, mut waiter) = self.project();

@@ -452,16 +454,16 @@ impl Future for RecvFuture<'_> {
}
}

impl Drop for RecvFuture<'_> {
impl Drop for Notified<'_> {
fn drop(&mut self) {
use RecvState::*;
use State::*;

// Safety: The type only transitions to a "Waiting" state when pinned.
let (notify, state, mut waiter) = unsafe { Pin::new_unchecked(self).project() };

// This is where we ensure safety. The `RecvFuture` is being dropped,
// which means we must ensure that the waiter entry is no longer stored
// in the linked list.
// This is where we ensure safety. The `Notified` value is being
// dropped, which means we must ensure that the waiter entry is no
// longer stored in the linked list.
if let Waiting = *state {
let mut notify_state = WAITING;
let mut waiters = notify.waiters.lock().unwrap();
hawkw marked this conversation as resolved.
Show resolved Hide resolved
98 changes: 49 additions & 49 deletions tokio/tests/sync_notify.rs
Original file line number Diff line number Diff line change
@@ -9,94 +9,94 @@ trait AssertSend: Send + Sync {}
impl AssertSend for Notify {}

#[test]
fn notify_recv_one() {
fn notify_notified_one() {
let notify = Notify::new();
let mut recv = spawn(async { notify.recv().await });
let mut notified = spawn(async { notify.notified().await });

notify.notify_one();
assert_ready!(recv.poll());
notify.notify();
assert_ready!(notified.poll());
}

#[test]
fn recv_one_notify() {
fn notified_one_notify() {
let notify = Notify::new();
let mut recv = spawn(async { notify.recv().await });
let mut notified = spawn(async { notify.notified().await });

assert_pending!(recv.poll());
assert_pending!(notified.poll());

notify.notify_one();
assert!(recv.is_woken());
assert_ready!(recv.poll());
notify.notify();
assert!(notified.is_woken());
assert_ready!(notified.poll());
}

#[test]
fn recv_multi_notify() {
fn notified_multi_notify() {
let notify = Notify::new();
let mut recv1 = spawn(async { notify.recv().await });
let mut recv2 = spawn(async { notify.recv().await });
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

assert_pending!(recv1.poll());
assert_pending!(recv2.poll());
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

notify.notify_one();
assert!(recv1.is_woken());
assert!(!recv2.is_woken());
notify.notify();
assert!(notified1.is_woken());
assert!(!notified2.is_woken());

assert_ready!(recv1.poll());
assert_pending!(recv2.poll());
assert_ready!(notified1.poll());
assert_pending!(notified2.poll());
}

#[test]
fn notify_recv_multi() {
fn notify_notified_multi() {
let notify = Notify::new();

notify.notify_one();
notify.notify();

let mut recv1 = spawn(async { notify.recv().await });
let mut recv2 = spawn(async { notify.recv().await });
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

assert_ready!(recv1.poll());
assert_pending!(recv2.poll());
assert_ready!(notified1.poll());
assert_pending!(notified2.poll());

notify.notify_one();
notify.notify();

assert!(recv2.is_woken());
assert_ready!(recv2.poll());
assert!(notified2.is_woken());
assert_ready!(notified2.poll());
}

#[test]
fn recv_drop_recv_notify() {
fn notified_drop_notified_notify() {
let notify = Notify::new();
let mut recv1 = spawn(async { notify.recv().await });
let mut recv2 = spawn(async { notify.recv().await });
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

assert_pending!(recv1.poll());
assert_pending!(notified1.poll());

drop(recv1);
drop(notified1);

assert_pending!(recv2.poll());
assert_pending!(notified2.poll());

notify.notify_one();
assert!(recv2.is_woken());
assert_ready!(recv2.poll());
notify.notify();
assert!(notified2.is_woken());
assert_ready!(notified2.poll());
}

#[test]
fn recv_multi_notify_drop_one() {
fn notified_multi_notify_drop_one() {
let notify = Notify::new();
let mut recv1 = spawn(async { notify.recv().await });
let mut recv2 = spawn(async { notify.recv().await });
let mut notified1 = spawn(async { notify.notified().await });
let mut notified2 = spawn(async { notify.notified().await });

assert_pending!(recv1.poll());
assert_pending!(recv2.poll());
assert_pending!(notified1.poll());
assert_pending!(notified2.poll());

notify.notify_one();
notify.notify();

assert!(recv1.is_woken());
assert!(!recv2.is_woken());
assert!(notified1.is_woken());
assert!(!notified2.is_woken());

drop(recv1);
drop(notified1);

assert!(recv2.is_woken());
assert_ready!(recv2.poll());
assert!(notified2.is_woken());
assert_ready!(notified2.poll());
}