Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio-stream: WatchStream: yield the current value #3576

Merged
merged 3 commits into from
Mar 5, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 53 additions & 5 deletions tokio-stream/src/wrappers/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,74 @@ use tokio::sync::watch::error::RecvError;

/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
///
/// This stream will always start by yielding the value present the Receiver when the WatchStream
/// is constructed. As such, you are advised to construct the WatchStream before using the Sender.
/// If you don't, you may receive the current value twice.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{StreamExt, wrappers::WatchStream};
/// use tokio::sync::watch;
///
/// let (tx, rx) = watch::channel("hello");
///
/// let mut rx = WatchStream::new(rx);
/// tx.send("goodbye").unwrap();
///
/// assert_eq!(rx.next().await, Some("hello"));
/// assert_eq!(rx.next().await, Some("goodbye"));
/// # }
/// ```
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{StreamExt, wrappers::WatchStream};
/// use tokio::sync::watch;
///
/// let (tx, rx) = watch::channel("hello");
///
/// // NOT RECOMMENDED!
/// tx.send("goodbye").unwrap();
/// let mut rx = WatchStream::new(rx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to swap these lines.

Copy link
Contributor Author

@krallin krallin Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure — done!

///
/// tokio::task::spawn(async move {
/// tokio::time::sleep(std::time::Duration::from_millis(10)).await;
/// tx.send("hello again").unwrap();
/// });
///
/// // goodbye will be received twice
/// assert_eq!(rx.next().await, Some("goodbye"));
/// assert_eq!(rx.next().await, Some("goodbye"));
/// assert_eq!(rx.next().await, Some("hello again"));
/// # }
/// ```
///
/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
/// [`Stream`]: trait@crate::Stream
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub struct WatchStream<T> {
inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
}

async fn make_future<T: Clone + Send + Sync>(
mut rx: Receiver<T>,
) -> (Result<(), RecvError>, Receiver<T>) {
) -> (Result<T, RecvError>, Receiver<T>) {
let result = rx.changed().await;
let result = result.map(|()| (*rx.borrow()).clone());
(result, rx)
}

impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
/// Create a new `WatchStream`.
pub fn new(rx: Receiver<T>) -> Self {
let initial = (*rx.borrow()).clone();

Self {
inner: ReusableBoxFuture::new(make_future(rx)),
inner: ReusableBoxFuture::new(async move { (Ok(initial), rx) }),
}
krallin marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand All @@ -39,8 +88,7 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (result, rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
let received = (*rx.borrow()).clone();
Ok(received) => {
self.inner.set(make_future(rx));
Poll::Ready(Some(received))
}
Expand Down