diff --git a/Cargo.toml b/Cargo.toml index f768f8183..80817fb8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,8 @@ features = ["docs"] rustdoc-args = ["--cfg", "feature=\"docs\""] [features] -docs = [] -unstable = [] +docs = ["futures-channel-preview"] +unstable = ["futures-channel-preview"] [dependencies] async-macros = "1.0.0" @@ -30,6 +30,7 @@ async-task = "1.0.0" cfg-if = "0.1.9" crossbeam-channel = "0.3.9" crossbeam-deque = "0.7.1" +futures-channel-preview = { version = "0.3.0-alpha.18", optional = true } futures-core-preview = "0.3.0-alpha.18" futures-io-preview = "0.3.0-alpha.18" futures-timer = "0.4.0" diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 65992eb82..e41f4683e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -71,6 +71,9 @@ use cfg_if::cfg_if; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { + mod unzip; + use unzip::UnzipSendFuture; + use crate::stream::FromStream; } } @@ -929,6 +932,47 @@ pub trait Stream { { FromStream::from_stream(self) } + + /// Converts a stream of pairs into a pair of containers. + /// + /// ## Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque<(usize, usize)> = vec![(1, 4), (2, 5), (3, 6)].into_iter().collect(); + /// let (a, b): (Vec<_>, Vec<_>) = s.unzip().await; + /// + /// assert_eq!(a, vec![1, 2, 3]); + /// assert_eq!(b, vec![4, 5, 6]); + /// # + /// # }) } + /// ``` + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[cfg(any(feature = "unstable", feature = "docs"))] + fn unzip<'a, A, B, FromA, FromB>(self) -> dyn_ret!('a, (FromA, FromB)) + where + Self: 'a + Stream + Send + Sized, + FromA: 'a + FromStream + Send, + FromB: 'a + FromStream + Send, + A: 'a + Send, + B: 'a + Send, + { + let (tx_a, rx_a) = futures_channel::mpsc::channel(0); + let (tx_b, rx_b) = futures_channel::mpsc::channel(0); + + let a_fut = FromA::from_stream(rx_a); + let b_fut = FromB::from_stream(rx_b); + let send_fut = UnzipSendFuture::new(self, tx_a, tx_b); + + Box::pin(async move { + let (a, b, _): (_, _, ()) = async_macros::join!(a_fut, b_fut, send_fut).await; + (a, b) + }) + } } impl Stream for T { diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs new file mode 100644 index 000000000..3c8eaf0fa --- /dev/null +++ b/src/stream/stream/unzip.rs @@ -0,0 +1,66 @@ +use std::pin::Pin; + +use futures_channel::mpsc::Sender; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pub(crate) struct UnzipSendFuture { + stream: S, + a: (Option, Sender), + b: (Option, Sender), +} + +impl UnzipSendFuture { + pub(crate) fn new(stream: S, tx_a: Sender, tx_b: Sender) -> Self { + UnzipSendFuture { + stream, + a: (None, tx_a), + b: (None, tx_b), + } + } + + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(a: (Option, Sender)); + pin_utils::unsafe_unpinned!(b: (Option, Sender)); +} + +impl crate::future::Future for UnzipSendFuture +where + S: Stream, +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + // hand-written send-join + let (item, tx) = self.as_mut().a(); + if item.is_some() { + let poll_result = tx.poll_ready(cx); + if poll_result.is_ready() { + let item = item.take().unwrap(); + let _ = tx.start_send(item); + } + } + let (item, tx) = self.as_mut().b(); + if item.is_some() { + let poll_result = tx.poll_ready(cx); + if poll_result.is_ready() { + let item = item.take().unwrap(); + let _ = tx.start_send(item); + } + } + if self.as_mut().a().0.is_some() || self.as_mut().b().0.is_some() { + return Poll::Pending; + } + + let item = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + if let Some((a, b)) = item { + self.as_mut().a().0 = Some(a); + self.as_mut().b().0 = Some(b); + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(()) + } + } +}