diff --git a/tokio-stream/src/wrappers/broadcast.rs b/tokio-stream/src/wrappers/broadcast.rs index 3bddbb741a5..c8346a68ec7 100644 --- a/tokio-stream/src/wrappers/broadcast.rs +++ b/tokio-stream/src/wrappers/broadcast.rs @@ -71,3 +71,9 @@ impl fmt::Debug for BroadcastStream { f.debug_struct("BroadcastStream").finish() } } + +impl From> for BroadcastStream { + fn from(recv: Receiver) -> Self { + Self::new(recv) + } +} diff --git a/tokio-stream/src/wrappers/mpsc_bounded.rs b/tokio-stream/src/wrappers/mpsc_bounded.rs index e4f90000985..b5362680ee9 100644 --- a/tokio-stream/src/wrappers/mpsc_bounded.rs +++ b/tokio-stream/src/wrappers/mpsc_bounded.rs @@ -57,3 +57,9 @@ impl AsMut> for ReceiverStream { &mut self.inner } } + +impl From> for ReceiverStream { + fn from(recv: Receiver) -> Self { + Self::new(recv) + } +} diff --git a/tokio-stream/src/wrappers/mpsc_unbounded.rs b/tokio-stream/src/wrappers/mpsc_unbounded.rs index bc5f40cdc9f..54597b7f6fe 100644 --- a/tokio-stream/src/wrappers/mpsc_unbounded.rs +++ b/tokio-stream/src/wrappers/mpsc_unbounded.rs @@ -51,3 +51,9 @@ impl AsMut> for UnboundedReceiverStream { &mut self.inner } } + +impl From> for UnboundedReceiverStream { + fn from(recv: UnboundedReceiver) -> Self { + Self::new(recv) + } +} diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 1daca101014..bd3a18a5831 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -59,7 +59,7 @@ async fn make_future( (result, rx) } -impl WatchStream { +impl WatchStream { /// Create a new `WatchStream`. pub fn new(rx: Receiver) -> Self { Self { @@ -94,3 +94,9 @@ impl fmt::Debug for WatchStream { f.debug_struct("WatchStream").finish() } } + +impl From> for WatchStream { + fn from(recv: Receiver) -> Self { + Self::new(recv) + } +}