diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index db65e5a5e29..ef75dc12bab 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -271,6 +271,42 @@ impl Receiver { } } + /// Synchronizes the local version counter with the shared one. + /// + /// This can be useful in situations where the receiver has been cloned from + /// one whose local version counter is outdated. In this case, it may return + /// values already received by other receivers, which is not always the desired + /// behavior. + /// + /// # Examples + /// + /// ``` + /// use tokio::{sync::watch, time::sleep}; + /// use std::time::Duration; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = watch::channel(0); + /// tokio::spawn(async move { + /// tx.send(1).unwrap(); + /// sleep(Duration::from_millis(50)).await; + /// tx.send(2).unwrap(); + /// }); + /// + /// let mut rx2 = rx.clone(); + /// rx2.changed().await.unwrap(); + /// assert_eq!(1, *rx2.borrow()); + /// + /// let mut rx2 = rx.clone(); + /// rx2.sync_version(); + /// rx2.changed().await.unwrap(); + /// assert_eq!(2, *rx2.borrow()); + /// } + /// ``` + pub fn sync_version(&mut self) { + self.version = self.shared.version.load(SeqCst); + } + cfg_process_driver! { pub(crate) fn try_has_changed(&mut self) -> Option> { maybe_changed(&self.shared, &mut self.version)