From 18bf41ee7f1ddddecba6f7dda3f5186215e29854 Mon Sep 17 00:00:00 2001 From: Aymerick Valette Date: Mon, 26 Apr 2021 21:43:49 +0000 Subject: [PATCH 1/2] add a method to sync the local receiver version with the shared one --- tokio/src/sync/watch.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index db65e5a5e29..f5ce87149f2 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -271,6 +271,42 @@ impl Receiver { } } + /// Synchronizes the local version counter with the shared one. + /// + /// This can be useful in situations where the receiver has been cloned from + /// one whose local version counter is outdated. In this case, it may return + /// values already received by other receivers, which is not always the desired + /// behavior. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// use std::time::Duration; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = watch::channel(0); + /// tokio::spawn(async move { + /// tx.send(1).unwrap(); + /// sleep(Duration::from_millis(50)).await; + /// tx.send(2).unwrap(); + /// }); + /// + /// let mut rx2 = rx.clone(); + /// rx2.changed().await.unwrap(); + /// assert_eq!(1, *rx2.borrow()); + /// + /// let mut rx2 = rx.clone(); + /// rx2.sync_version(); + /// rx2.changed().await.unwrap(); + /// assert_eq!(2, *rx2.borrow()); + /// } + /// ``` + pub fn sync_version(&mut self) { + self.version = self.shared.version.load(SeqCst); + } + cfg_process_driver! { pub(crate) fn try_has_changed(&mut self) -> Option> { maybe_changed(&self.shared, &mut self.version) From e3d8ba4737ad558237962f82b33c0d8deadf41f0 Mon Sep 17 00:00:00 2001 From: Aymerick Valette Date: Tue, 27 Apr 2021 08:03:51 +0000 Subject: [PATCH 2/2] fix example import --- tokio/src/sync/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index f5ce87149f2..ef75dc12bab 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -281,7 +281,7 @@ impl Receiver { /// # Examples /// /// ``` - /// use tokio::sync::watch; + /// use tokio::{sync::watch, time::sleep}; /// use std::time::Duration; /// /// #[tokio::main]