Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: sync watch local version with shared version #3731

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,42 @@ impl<T> Receiver<T> {
}
}

/// Synchronizes the local version counter with the shared one.
///
/// This can be useful in situations where the receiver has been cloned from
/// one whose local version counter is outdated. In this case, it may return
/// values already received by other receivers, which is not always the desired
/// behavior.
///
/// # Examples
///
/// ```
/// use tokio::{sync::watch, time::sleep};
/// use std::time::Duration;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = watch::channel(0);
/// tokio::spawn(async move {
/// tx.send(1).unwrap();
/// sleep(Duration::from_millis(50)).await;
/// tx.send(2).unwrap();
/// });
///
/// let mut rx2 = rx.clone();
/// rx2.changed().await.unwrap();
/// assert_eq!(1, *rx2.borrow());
///
/// let mut rx2 = rx.clone();
/// rx2.sync_version();
/// rx2.changed().await.unwrap();
/// assert_eq!(2, *rx2.borrow());
/// }
/// ```
pub fn sync_version(&mut self) {
self.version = self.shared.version.load(SeqCst);
}
Comment on lines +306 to +308
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a method for doing something like this seems reasonable. I don't really like the name though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit that I have a hard time finding an ideal name as well.

Here are some proposals:
reconcile_version | reconcile_local_version | reconcile_version_counter
sync_version_counter | sync_local_version | synchronize_version

Maybe @pluyckx who reported the issue could help us on this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is hard to find a good function name. If I look at the names you propose, I see you add some information about the object's implementation details.

Maybe a name like sync or update is better? In fact we are synchronizing the object's state. However, it is still not clear when/why you should call this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this function is very specific and advanced, because it skips arbitrary amount of updates, and it is very easy to have a time-of-check-to-time-of-use bug (imagine that your task has been interrupted for an hour after the Receiver was creared, but before sync_version was called, highly likely applocation did not want to skip all this hour of updates, so it will behave incorrectly). That's why I'd give somewhat longer name, like skip_all_pending_notifications.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name skip_all_pending_notifications is not perfect. As a user the word notifications confuses me. skip_all_pending_changes seems more appropriate. You wait for a change, not for a notification if you look at the API of a Watch. Not sure if the word all is necessary in the name.

The only question I have is, do we skip the changes or do we accept the last one? In the latter case then name accept_last_change is maybe a better option.

For me skip is a synonym for ignore in this case. So when calling borrow after calling skip... I expect the old value and not the latest sent to the Watch.


cfg_process_driver! {
pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
maybe_changed(&self.shared, &mut self.version)
Expand Down