diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index 5c959172acb..e169a2becb8 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -30,7 +30,7 @@ signal = ["tokio/signal"] [dependencies] futures-core = { version = "0.3.0" } pin-project-lite = "0.2.0" -tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] } +tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] } tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true } [dev-dependencies] diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 64688aa3e62..1daca101014 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -72,10 +72,10 @@ impl Stream for WatchStream { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let (result, rx) = ready!(self.inner.poll(cx)); + let (result, mut rx) = ready!(self.inner.poll(cx)); match result { Ok(_) => { - let received = (*rx.borrow()).clone(); + let received = (*rx.borrow_and_update()).clone(); self.inner.set(make_future(rx)); Poll::Ready(Some(received)) } diff --git a/tokio-stream/tests/watch.rs b/tokio-stream/tests/watch.rs new file mode 100644 index 00000000000..23aebf06d70 --- /dev/null +++ b/tokio-stream/tests/watch.rs @@ -0,0 +1,30 @@ +use tokio::sync::watch; +use tokio_stream::wrappers::WatchStream; +use tokio_stream::StreamExt; + +#[tokio::test] +async fn message_not_twice() { + let (tx, rx) = watch::channel("hello"); + + let mut counter = 0; + let mut stream = WatchStream::new(rx).map(move |payload| { + println!("{}", payload); + if payload == "goodbye" { + counter += 1; + } + if counter >= 2 { + panic!("too many goodbyes"); + } + }); + + let task = tokio::spawn(async move { + while stream.next().await.is_some() { + } + }); + + // Send goodbye just once + tx.send("goodbye").unwrap(); + + drop(tx); + task.await.unwrap(); +}