diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index b377ca7f77b..eb61fd03c66 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -53,10 +53,10 @@ use crate::sync::Notify; +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst}; +use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; use std::ops; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::sync::{Arc, RwLock, RwLockReadGuard}; /// Receives values from the associated [`Sender`](struct@Sender). /// @@ -241,19 +241,19 @@ impl Receiver { /// } /// ``` pub async fn changed(&mut self) -> Result<(), error::RecvError> { - // In order to avoid a race condition, we first request a notification, - // **then** check the current value's version. If a new version exists, - // the notification request is dropped. - let notified = self.shared.notify_rx.notified(); - - if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { - return ret; + loop { + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. + let notified = self.shared.notify_rx.notified(); + + if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { + return ret; + } + + notified.await; + // loop around again in case the wake-up was spurious } - - notified.await; - - maybe_changed(&self.shared, &mut self.version) - .expect("[bug] failed to observe change after notificaton.") } } @@ -390,3 +390,44 @@ impl ops::Deref for Ref<'_, T> { self.inner.deref() } } + +#[cfg(all(test, loom))] +mod tests { + use futures::future::FutureExt; + use loom::thread; + + // test for https://github.com/tokio-rs/tokio/issues/3168 + #[test] + fn watch_spurious_wakeup() { + loom::model(|| { + let (send, mut recv) = crate::sync::watch::channel(0i32); + + send.send(1).unwrap(); + + let send_thread = thread::spawn(move || { + send.send(2).unwrap(); + send + }); + + recv.changed().now_or_never(); + + let send = send_thread.join().unwrap(); + let recv_thread = thread::spawn(move || { + recv.changed().now_or_never(); + recv.changed().now_or_never(); + recv + }); + + send.send(3).unwrap(); + + let mut recv = recv_thread.join().unwrap(); + let send_thread = thread::spawn(move || { + send.send(2).unwrap(); + }); + + recv.changed().now_or_never(); + + send_thread.join().unwrap(); + }); + } +}