Skip to content

Commit

Permalink
watch: fix spurious wakeup (#3244)
Browse files Browse the repository at this point in the history
Co-authored-by: @tijsvd
  • Loading branch information
Darksonn authored Dec 10, 2020
1 parent 60f1631 commit d2676db
Showing 1 changed file with 56 additions and 15 deletions.
71 changes: 56 additions & 15 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@
use crate::sync::Notify;

use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst};
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::ops;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::{Arc, RwLock, RwLockReadGuard};

/// Receives values from the associated [`Sender`](struct@Sender).
///
Expand Down Expand Up @@ -241,19 +241,19 @@ impl<T> Receiver<T> {
/// }
/// ```
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped.
let notified = self.shared.notify_rx.notified();

if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
return ret;
loop {
// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped.
let notified = self.shared.notify_rx.notified();

if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
return ret;
}

notified.await;
// loop around again in case the wake-up was spurious
}

notified.await;

maybe_changed(&self.shared, &mut self.version)
.expect("[bug] failed to observe change after notificaton.")
}
}

Expand Down Expand Up @@ -390,3 +390,44 @@ impl<T> ops::Deref for Ref<'_, T> {
self.inner.deref()
}
}

#[cfg(all(test, loom))]
mod tests {
use futures::future::FutureExt;
use loom::thread;

// test for https://github.com/tokio-rs/tokio/issues/3168
#[test]
fn watch_spurious_wakeup() {
loom::model(|| {
let (send, mut recv) = crate::sync::watch::channel(0i32);

send.send(1).unwrap();

let send_thread = thread::spawn(move || {
send.send(2).unwrap();
send
});

recv.changed().now_or_never();

let send = send_thread.join().unwrap();
let recv_thread = thread::spawn(move || {
recv.changed().now_or_never();
recv.changed().now_or_never();
recv
});

send.send(3).unwrap();

let mut recv = recv_thread.join().unwrap();
let send_thread = thread::spawn(move || {
send.send(2).unwrap();
});

recv.changed().now_or_never();

send_thread.join().unwrap();
});
}
}

0 comments on commit d2676db

Please sign in to comment.