From d9a6b39f37a9be99fb36923626f80fd569e4e8e3 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 19 Sep 2019 13:11:08 +0200 Subject: [PATCH 1/2] add crate::stream::Stream bound Signed-off-by: Yoshua Wuyts --- src/result/from_stream.rs | 84 +++++++++++++++++++-------------------- src/stream/from_stream.rs | 6 +-- src/stream/into_stream.rs | 2 +- src/stream/stream/mod.rs | 18 ++++----- src/vec/from_stream.rs | 25 ++++++------ src/vec/into_stream.rs | 20 +++++----- 6 files changed, 78 insertions(+), 77 deletions(-) diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 71cf61dcd..990416ec6 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -1,47 +1,47 @@ -use crate::stream::{FromStream, IntoStream, Stream}; +// use crate::stream::{FromStream, IntoStream, Stream}; -use std::pin::Pin; +// use std::pin::Pin; -impl FromStream> for Result -where - V: FromStream, -{ - /// Takes each element in the stream: if it is an `Err`, no further - /// elements are taken, and the `Err` is returned. Should no `Err` - /// occur, a container with the values of each `Result` is returned. - #[inline] - fn from_stream<'a, S: IntoStream>>( - stream: S, - ) -> Pin + Send + 'a>> - where - ::IntoStream: Send + 'a, - { - let stream = stream.into_stream(); +// impl FromStream> for Result +// where +// V: FromStream, +// { +// /// Takes each element in the stream: if it is an `Err`, no further +// /// elements are taken, and the `Err` is returned. Should no `Err` +// /// occur, a container with the values of each `Result` is returned. +// #[inline] +// fn from_stream<'a, S: IntoStream>>( +// stream: S, +// ) -> Pin + Send + 'a>> +// where +// ::IntoStream: Send + 'a, +// { +// let stream = stream.into_stream(); - Pin::from(Box::new(async move { - pin_utils::pin_mut!(stream); +// Pin::from(Box::new(async move { +// pin_utils::pin_mut!(stream); - // Using `scan` here because it is able to stop the stream early - // if a failure occurs - let mut found_error = None; - let out: V = stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - found_error = Some(err); - // Stop processing the stream on error - None - } - } - }) - .collect() - .await; +// // Using `scan` here because it is able to stop the stream early +// // if a failure occurs +// let mut found_error = None; +// let out: V = stream +// .scan((), |_, elem| { +// match elem { +// Ok(elem) => Some(elem), +// Err(err) => { +// found_error = Some(err); +// // Stop processing the stream on error +// None +// } +// } +// }) +// .collect() +// .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), - } - })) - } -} +// match found_error { +// Some(err) => Err(err), +// None => Ok(out), +// } +// })) +// } +// } diff --git a/src/stream/from_stream.rs b/src/stream/from_stream.rs index 91d3e24bd..ebb63425d 100644 --- a/src/stream/from_stream.rs +++ b/src/stream/from_stream.rs @@ -11,7 +11,7 @@ use std::pin::Pin; /// /// [`IntoStream`]: trait.IntoStream.html #[cfg_attr(feature = "docs", doc(cfg(unstable)))] -pub trait FromStream { +pub trait FromStream { /// Creates a value from a stream. /// /// # Examples @@ -23,7 +23,7 @@ pub trait FromStream { /// /// // let _five_fives = async_std::stream::repeat(5).take(5); /// ``` - fn from_stream<'a, S: IntoStream + Send + 'a>( + fn from_stream<'a, S: IntoStream + Send + Unpin + 'a>( stream: S, - ) -> Pin + Send + 'a>>; + ) -> Pin + Send + Unpin + 'a>>; } diff --git a/src/stream/into_stream.rs b/src/stream/into_stream.rs index b2913170a..92f219a8a 100644 --- a/src/stream/into_stream.rs +++ b/src/stream/into_stream.rs @@ -1,4 +1,4 @@ -use futures_core::stream::Stream; +use crate::stream::Stream; /// Conversion into a `Stream`. /// diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 62f1b4b50..ff29f85ae 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -750,18 +750,18 @@ pub trait Stream { #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] fn collect<'a, B>(self) -> dyn_ret!('a, B) where - Self: futures_core::stream::Stream + Sized + Send + 'a, - ::Item: Send, - B: FromStream<::Item>, + Self: Stream + Sized + Send + Unpin + 'a, + ::Item: Send + Unpin, + B: FromStream<::Item>, { FromStream::from_stream(self) } } -impl Stream for T { - type Item = ::Item; +// impl Stream for T { +// type Item = ::Item; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - futures_core::stream::Stream::poll_next(self, cx) - } -} +// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// futures_core::stream::Stream::poll_next(self, cx) +// } +// } diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index f603d0dc7..ad3661c1b 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -1,25 +1,26 @@ -use crate::stream::{FromStream, IntoStream, Stream}; +use crate::stream::{FromStream, IntoStream}; use std::pin::Pin; -impl FromStream for Vec { +impl FromStream for Vec { #[inline] fn from_stream<'a, S: IntoStream>( stream: S, - ) -> Pin + Send + 'a>> + ) -> Pin + Send + Unpin + 'a>> where ::IntoStream: Send + 'a, { - let stream = stream.into_stream(); + let _stream = stream.into_stream(); - Pin::from(Box::new(async move { - pin_utils::pin_mut!(stream); + // Box::pin(async move { + // pin_utils::pin_mut!(stream); - let mut out = vec![]; - while let Some(item) = stream.next().await { - out.push(item); - } - out - })) + // let mut out = vec![]; + // while let Some(item) = stream.next().await { + // out.push(item); + // } + // out + // }) + panic!(); } } diff --git a/src/vec/into_stream.rs b/src/vec/into_stream.rs index 42472a05c..192b5116d 100644 --- a/src/vec/into_stream.rs +++ b/src/vec/into_stream.rs @@ -7,7 +7,7 @@ pub struct IntoStream { iter: std::vec::IntoIter, } -impl crate::stream::IntoStream for Vec { +impl crate::stream::IntoStream for Vec { type Item = T; type IntoStream = IntoStream; @@ -25,30 +25,30 @@ impl crate::stream::IntoStream for Vec { /// } /// ``` #[inline] - fn into_stream(mut self) -> IntoStream { + fn into_stream(self) -> IntoStream { let iter = self.into_iter(); IntoStream { iter } } } -impl futures_core::stream::Stream for IntoStream { +impl crate::stream::Stream for IntoStream { type Item = T; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(self.iter.next()) + Poll::Ready(Pin::new(&mut *self).iter.next()) } } /// Slice stream. #[derive(Debug)] -pub struct Stream<'a, T: 'a> { +pub struct Stream<'a, T> { iter: std::slice::Iter<'a, T>, } -impl<'a, T: Sync> futures_core::stream::Stream for Stream<'a, T> { +impl<'a, T: Sync> crate::stream::Stream for Stream<'a, T> { type Item = &'a T; - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.iter.next()) } } @@ -65,14 +65,14 @@ impl<'a, T: Sync> crate::stream::IntoStream for &'a Vec { /// Mutable slice stream. #[derive(Debug)] -pub struct StreamMut<'a, T: 'a> { +pub struct StreamMut<'a, T> { iter: std::slice::IterMut<'a, T>, } -impl<'a, T: Sync> futures_core::stream::Stream for StreamMut<'a, T> { +impl<'a, T: Sync> crate::stream::Stream for StreamMut<'a, T> { type Item = &'a mut T; - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(self.iter.next()) } } From f5451c2e097a0091afcb70c4d4f4b0ac4986b5ef Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 19 Sep 2019 13:15:16 +0200 Subject: [PATCH 2/2] uncomment generic Stream for Stream impl, which restores tests but breaks compilation Signed-off-by: Yoshua Wuyts --- src/stream/stream/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index ff29f85ae..1411f2013 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -758,10 +758,10 @@ pub trait Stream { } } -// impl Stream for T { -// type Item = ::Item; +impl Stream for T { + type Item = ::Item; -// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { -// futures_core::stream::Stream::poll_next(self, cx) -// } -// } + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + futures_core::stream::Stream::poll_next(self, cx) + } +}