From 69c48de535b65d394a90171e34b6963f53cc6076 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 3 Jul 2019 10:17:20 -0700 Subject: [PATCH] Rename `Sink::SinkError` to `Sink::Error` --- futures-channel/src/mpsc/sink_impl.rs | 30 +++++----- futures-sink/src/lib.rs | 60 +++++++++---------- futures-util/src/compat/compat01as03.rs | 12 ++-- futures-util/src/compat/compat03as01.rs | 2 +- futures-util/src/future/either.rs | 12 ++-- futures-util/src/io/into_sink.rs | 10 ++-- futures-util/src/lib.rs | 8 +-- futures-util/src/sink/buffer.rs | 12 ++-- futures-util/src/sink/close.rs | 2 +- futures-util/src/sink/drain.rs | 10 ++-- futures-util/src/sink/err_into.rs | 12 ++-- futures-util/src/sink/fanout.rs | 12 ++-- futures-util/src/sink/flush.rs | 2 +- futures-util/src/sink/map_err.rs | 12 ++-- futures-util/src/sink/mod.rs | 14 ++--- futures-util/src/sink/send.rs | 2 +- futures-util/src/sink/send_all.rs | 4 +- futures-util/src/sink/with.rs | 16 ++--- futures-util/src/sink/with_flat_map.rs | 18 +++--- futures-util/src/stream/buffer_unordered.rs | 2 +- futures-util/src/stream/buffered.rs | 2 +- futures-util/src/stream/chunks.rs | 2 +- futures-util/src/stream/enumerate.rs | 2 +- futures-util/src/stream/filter.rs | 2 +- futures-util/src/stream/filter_map.rs | 2 +- futures-util/src/stream/flatten.rs | 2 +- futures-util/src/stream/forward.rs | 12 ++-- futures-util/src/stream/fuse.rs | 2 +- futures-util/src/stream/inspect.rs | 2 +- futures-util/src/stream/map.rs | 2 +- futures-util/src/stream/mod.rs | 2 +- futures-util/src/stream/peek.rs | 2 +- futures-util/src/stream/skip.rs | 2 +- futures-util/src/stream/skip_while.rs | 2 +- futures-util/src/stream/split.rs | 10 ++-- futures-util/src/stream/take.rs | 2 +- futures-util/src/stream/take_while.rs | 16 ++--- futures-util/src/stream/then.rs | 16 ++--- futures-util/src/try_future/flatten_sink.rs | 12 ++-- .../src/try_future/flatten_stream_sink.rs | 12 ++-- futures-util/src/try_future/mod.rs | 6 +- .../src/try_future/try_flatten_stream.rs | 12 ++-- futures-util/src/try_stream/and_then.rs | 16 ++--- futures-util/src/try_stream/err_into.rs | 5 +- futures-util/src/try_stream/inspect_err.rs | 13 ++-- futures-util/src/try_stream/inspect_ok.rs | 13 ++-- futures-util/src/try_stream/into_stream.rs | 4 +- futures-util/src/try_stream/map_err.rs | 7 +-- futures-util/src/try_stream/map_ok.rs | 7 +-- futures-util/src/try_stream/or_else.rs | 16 ++--- .../src/try_stream/try_buffer_unordered.rs | 8 +-- futures-util/src/try_stream/try_filter.rs | 8 +-- futures-util/src/try_stream/try_filter_map.rs | 8 +-- futures-util/src/try_stream/try_skip_while.rs | 15 +++-- futures/tests/future_try_flatten_stream.rs | 10 ++-- futures/tests/split.rs | 10 ++-- 56 files changed, 257 insertions(+), 259 deletions(-) diff --git a/futures-channel/src/mpsc/sink_impl.rs b/futures-channel/src/mpsc/sink_impl.rs index eab12e16f9..e7f5457338 100644 --- a/futures-channel/src/mpsc/sink_impl.rs +++ b/futures-channel/src/mpsc/sink_impl.rs @@ -4,26 +4,26 @@ use futures_sink::Sink; use std::pin::Pin; impl Sink for Sender { - type SinkError = SendError; + type Error = SendError; fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { (*self).poll_ready(cx) } fn start_send( mut self: Pin<&mut Self>, msg: T, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { (*self).start_send(msg) } fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { match (*self).poll_ready(cx) { Poll::Ready(Err(ref e)) if e.is_disconnected() => { // If the receiver disconnected, we consider the sink to be flushed. @@ -36,56 +36,56 @@ impl Sink for Sender { fn poll_close( mut self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.disconnect(); Poll::Ready(Ok(())) } } impl Sink for UnboundedSender { - type SinkError = SendError; + type Error = SendError; fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { UnboundedSender::poll_ready(&*self, cx) } fn start_send( mut self: Pin<&mut Self>, msg: T, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { UnboundedSender::start_send(&mut *self, msg) } fn poll_flush( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn poll_close( mut self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.disconnect(); Poll::Ready(Ok(())) } } impl Sink for &UnboundedSender { - type SinkError = SendError; + type Error = SendError; fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { UnboundedSender::poll_ready(*self, cx) } - fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { self.unbounded_send(msg) .map_err(TrySendError::into_send_error) } @@ -93,14 +93,14 @@ impl Sink for &UnboundedSender { fn poll_flush( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn poll_close( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.close_channel(); Poll::Ready(Ok(())) } diff --git a/futures-sink/src/lib.rs b/futures-sink/src/lib.rs index 7ac7f1dc22..9d77134845 100644 --- a/futures-sink/src/lib.rs +++ b/futures-sink/src/lib.rs @@ -49,7 +49,7 @@ use core::pin::Pin; #[must_use = "sinks do nothing unless polled"] pub trait Sink { /// The type of value produced by the sink when an error occurs. - type SinkError; + type Error; /// Attempts to prepare the `Sink` to receive a value. /// @@ -63,7 +63,7 @@ pub trait Sink { /// /// In most cases, if the sink encounters an error, the sink will /// permanently be unable to receive items. - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; /// Begin the process of sending a value to the sink. /// Each call to this function must be preceded by a successful call to @@ -85,7 +85,7 @@ pub trait Sink { /// In most cases, if the sink encounters an error, the sink will /// permanently be unable to receive items. fn start_send(self: Pin<&mut Self>, item: Item) - -> Result<(), Self::SinkError>; + -> Result<(), Self::Error>; /// Flush any remaining output from this sink. /// @@ -99,7 +99,7 @@ pub trait Sink { /// /// In most cases, if the sink encounters an error, the sink will /// permanently be unable to receive items. - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; /// Flush any remaining output and close this sink, if necessary. /// @@ -112,25 +112,25 @@ pub trait Sink { /// /// If this function encounters an error, the sink should be considered to /// have failed permanently, and no more `Sink` methods should be called. - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } impl + Unpin, Item> Sink for &mut S { - type SinkError = S::SinkError; + type Error = S::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut **self).poll_ready(cx) } - fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { Pin::new(&mut **self).start_send(item) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut **self).poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut **self).poll_close(cx) } } @@ -140,21 +140,21 @@ where P: DerefMut + Unpin, P::Target: Sink, { - type SinkError = >::SinkError; + type Error = >::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().as_mut().poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { self.get_mut().as_mut().start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().as_mut().poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_mut().as_mut().poll_close(cx) } } @@ -165,65 +165,65 @@ mod if_alloc { use futures_core::never::Never; impl Sink for alloc::vec::Vec { - type SinkError = Never; + type Error = Never; - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { // TODO: impl Unpin for Vec {} unsafe { self.get_unchecked_mut() }.push(item); Ok(()) } - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } impl Sink for alloc::collections::VecDeque { - type SinkError = Never; + type Error = Never; - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { // TODO: impl Unpin for Vec {} unsafe { self.get_unchecked_mut() }.push_back(item); Ok(()) } - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } impl + Unpin, Item> Sink for alloc::boxed::Box { - type SinkError = S::SinkError; + type Error = S::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut **self).poll_ready(cx) } - fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { Pin::new(&mut **self).start_send(item) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut **self).poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut **self).poll_close(cx) } } diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index a7d0489c3c..55dd4e7d5f 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -105,7 +105,7 @@ pub trait Sink01CompatExt: Sink01 { /// Converts a futures 0.1 /// [`Sink`](futures_01::sink::Sink) /// into a futures 0.3 - /// [`Sink`](futures_sink::Sink). + /// [`Sink`](futures_sink::Sink). /// /// ``` /// #![feature(async_await)] @@ -222,12 +222,12 @@ impl Sink03 for Compat01As03Sink where S: Sink01, { - type SinkError = S::SinkError; + type Error = S::SinkError; fn start_send( mut self: Pin<&mut Self>, item: SinkItem, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { debug_assert!(self.buffer.is_none()); self.buffer = Some(item); Ok(()) @@ -236,7 +236,7 @@ where fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> task03::Poll> { + ) -> task03::Poll> { match self.buffer.take() { Some(item) => match self.in_notify(cx, |f| f.start_send(item))? { AsyncSink01::Ready => task03::Poll::Ready(Ok(())), @@ -252,7 +252,7 @@ where fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> task03::Poll> { + ) -> task03::Poll> { let item = self.buffer.take(); match self.in_notify(cx, |f| match item { Some(i) => match f.start_send(i)? { @@ -274,7 +274,7 @@ where fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> task03::Poll> { + ) -> task03::Poll> { let item = self.buffer.take(); let close_started = self.close_started; diff --git a/futures-util/src/compat/compat03as01.rs b/futures-util/src/compat/compat03as01.rs index cdc3cf9fd6..2a85df2c3b 100644 --- a/futures-util/src/compat/compat03as01.rs +++ b/futures-util/src/compat/compat03as01.rs @@ -121,7 +121,7 @@ where T: Sink03 + Unpin, { type SinkItem = Item; - type SinkError = T::SinkError; + type SinkError = T::Error; fn start_send( &mut self, diff --git a/futures-util/src/future/either.rs b/futures-util/src/future/either.rs index 0b87ed9883..44bca76a64 100644 --- a/futures-util/src/future/either.rs +++ b/futures-util/src/future/either.rs @@ -111,11 +111,11 @@ where impl Sink for Either where A: Sink, - B: Sink, + B: Sink, { - type SinkError = A::SinkError; + type Error = A::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { unsafe { match self.get_unchecked_mut() { Either::Left(x) => Pin::new_unchecked(x).poll_ready(cx), @@ -124,7 +124,7 @@ where } } - fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { unsafe { match self.get_unchecked_mut() { Either::Left(x) => Pin::new_unchecked(x).start_send(item), @@ -133,7 +133,7 @@ where } } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { unsafe { match self.get_unchecked_mut() { Either::Left(x) => Pin::new_unchecked(x).poll_flush(cx), @@ -142,7 +142,7 @@ where } } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { unsafe { match self.get_unchecked_mut() { Either::Left(x) => Pin::new_unchecked(x).poll_close(cx), diff --git a/futures-util/src/io/into_sink.rs b/futures-util/src/io/into_sink.rs index 3573519406..fdb6b85b2a 100644 --- a/futures-util/src/io/into_sink.rs +++ b/futures-util/src/io/into_sink.rs @@ -64,12 +64,12 @@ impl> IntoSink { } impl> Sink for IntoSink { - type SinkError = io::Error; + type Error = io::Error; fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> + ) -> Poll> { ready!(self.as_mut().poll_flush_buffer(cx))?; Poll::Ready(Ok(())) @@ -78,7 +78,7 @@ impl> Sink for IntoSink { fn start_send( mut self: Pin<&mut Self>, item: Item, - ) -> Result<(), Self::SinkError> + ) -> Result<(), Self::Error> { debug_assert!(self.as_mut().buffer().is_none()); *self.as_mut().buffer() = Some(Block { offset: 0, bytes: item }); @@ -88,7 +88,7 @@ impl> Sink for IntoSink { fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> + ) -> Poll> { ready!(self.as_mut().poll_flush_buffer(cx))?; ready!(self.as_mut().writer().poll_flush(cx))?; @@ -98,7 +98,7 @@ impl> Sink for IntoSink { fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> + ) -> Poll> { ready!(self.as_mut().poll_flush_buffer(cx))?; ready!(self.as_mut().writer().poll_close(cx))?; diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index e234003a98..cd460eff8f 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -63,28 +63,28 @@ macro_rules! delegate_sink { fn poll_ready( self: Pin<&mut Self>, cx: &mut $crate::core_reexport::task::Context<'_>, - ) -> $crate::core_reexport::task::Poll> { + ) -> $crate::core_reexport::task::Poll> { self.$field().poll_ready(cx) } fn start_send( self: Pin<&mut Self>, item: $item, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { self.$field().start_send(item) } fn poll_flush( self: Pin<&mut Self>, cx: &mut $crate::core_reexport::task::Context<'_>, - ) -> $crate::core_reexport::task::Poll> { + ) -> $crate::core_reexport::task::Poll> { self.$field().poll_flush(cx) } fn poll_close( self: Pin<&mut Self>, cx: &mut $crate::core_reexport::task::Context<'_>, - ) -> $crate::core_reexport::task::Poll> { + ) -> $crate::core_reexport::task::Poll> { self.$field().poll_close(cx) } } diff --git a/futures-util/src/sink/buffer.rs b/futures-util/src/sink/buffer.rs index ec88f2d209..c2e7ca6d6e 100644 --- a/futures-util/src/sink/buffer.rs +++ b/futures-util/src/sink/buffer.rs @@ -57,7 +57,7 @@ impl, Item> Buffer { fn try_empty_buffer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { ready!(self.as_mut().sink().poll_ready(cx))?; while let Some(item) = self.as_mut().buf().pop_front() { self.as_mut().sink().start_send(item)?; @@ -79,12 +79,12 @@ impl Stream for Buffer where S: Sink + Stream { } impl, Item> Sink for Buffer { - type SinkError = Si::SinkError; + type Error = Si::Error; fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { if self.capacity == 0 { return self.as_mut().sink().poll_ready(cx); } @@ -101,7 +101,7 @@ impl, Item> Sink for Buffer { fn start_send( mut self: Pin<&mut Self>, item: Item, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { if self.capacity == 0 { self.as_mut().sink().start_send(item) } else { @@ -113,7 +113,7 @@ impl, Item> Sink for Buffer { fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { ready!(self.as_mut().try_empty_buffer(cx))?; debug_assert!(self.as_mut().buf().is_empty()); self.as_mut().sink().poll_flush(cx) @@ -122,7 +122,7 @@ impl, Item> Sink for Buffer { fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { ready!(self.as_mut().try_empty_buffer(cx))?; debug_assert!(self.as_mut().buf().is_empty()); self.as_mut().sink().poll_close(cx) diff --git a/futures-util/src/sink/close.rs b/futures-util/src/sink/close.rs index d800320dab..6ce500e3b3 100644 --- a/futures-util/src/sink/close.rs +++ b/futures-util/src/sink/close.rs @@ -25,7 +25,7 @@ impl<'a, Si: Sink + Unpin + ?Sized, Item> Close<'a, Si, Item> { } impl + Unpin + ?Sized, Item> Future for Close<'_, Si, Item> { - type Output = Result<(), Si::SinkError>; + type Output = Result<(), Si::Error>; fn poll( mut self: Pin<&mut Self>, diff --git a/futures-util/src/sink/drain.rs b/futures-util/src/sink/drain.rs index 6318bb18ee..672252bf0d 100644 --- a/futures-util/src/sink/drain.rs +++ b/futures-util/src/sink/drain.rs @@ -31,33 +31,33 @@ pub fn drain() -> Drain { } impl Sink for Drain { - type SinkError = Never; + type Error = Never; fn poll_ready( self: Pin<&mut Self>, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn start_send( self: Pin<&mut Self>, _item: T, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { Ok(()) } fn poll_flush( self: Pin<&mut Self>, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } fn poll_close( self: Pin<&mut Self>, _cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/futures-util/src/sink/err_into.rs b/futures-util/src/sink/err_into.rs index ec86315fc3..2118c409f0 100644 --- a/futures-util/src/sink/err_into.rs +++ b/futures-util/src/sink/err_into.rs @@ -9,14 +9,14 @@ use pin_utils::unsafe_pinned; #[derive(Debug)] #[must_use = "sinks do nothing unless polled"] pub struct SinkErrInto, Item, E> { - sink: SinkMapErr E>, + sink: SinkMapErr E>, } impl SinkErrInto where Si: Sink, - Si::SinkError: Into, + Si::Error: Into, { - unsafe_pinned!(sink: SinkMapErr E>); + unsafe_pinned!(sink: SinkMapErr E>); pub(super) fn new(sink: Si) -> Self { SinkErrInto { @@ -50,16 +50,16 @@ impl SinkErrInto impl Sink for SinkErrInto where Si: Sink, - Si::SinkError: Into, + Si::Error: Into, { - type SinkError = E; + type Error = E; delegate_sink!(sink, Item); } impl Stream for SinkErrInto where S: Sink + Stream, - S::SinkError: Into + S::Error: Into { type Item = S::Item; diff --git a/futures-util/src/sink/fanout.rs b/futures-util/src/sink/fanout.rs index 4457d69342..8ecf4a68c7 100644 --- a/futures-util/src/sink/fanout.rs +++ b/futures-util/src/sink/fanout.rs @@ -61,14 +61,14 @@ impl Debug for Fanout { impl Sink for Fanout where Si1: Sink, Item: Clone, - Si2: Sink + Si2: Sink { - type SinkError = Si1::SinkError; + type Error = Si1::Error; fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let sink1_ready = self.as_mut().sink1().poll_ready(cx)?.is_ready(); let sink2_ready = self.as_mut().sink2().poll_ready(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; @@ -78,7 +78,7 @@ impl Sink for Fanout fn start_send( mut self: Pin<&mut Self>, item: Item, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { self.as_mut().sink1().start_send(item.clone())?; self.as_mut().sink2().start_send(item)?; Ok(()) @@ -87,7 +87,7 @@ impl Sink for Fanout fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let sink1_ready = self.as_mut().sink1().poll_flush(cx)?.is_ready(); let sink2_ready = self.as_mut().sink2().poll_flush(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; @@ -97,7 +97,7 @@ impl Sink for Fanout fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let sink1_ready = self.as_mut().sink1().poll_close(cx)?.is_ready(); let sink2_ready = self.as_mut().sink2().poll_close(cx)?.is_ready(); let ready = sink1_ready && sink2_ready; diff --git a/futures-util/src/sink/flush.rs b/futures-util/src/sink/flush.rs index bcb8d31889..2d0927455f 100644 --- a/futures-util/src/sink/flush.rs +++ b/futures-util/src/sink/flush.rs @@ -31,7 +31,7 @@ impl<'a, Si: Sink + Unpin + ?Sized, Item> Flush<'a, Si, Item> { } impl + Unpin + ?Sized, Item> Future for Flush<'_, Si, Item> { - type Output = Result<(), Si::SinkError>; + type Output = Result<(), Si::Error>; fn poll( mut self: Pin<&mut Self>, diff --git a/futures-util/src/sink/map_err.rs b/futures-util/src/sink/map_err.rs index 82ee2e23b6..1263b294ed 100644 --- a/futures-util/src/sink/map_err.rs +++ b/futures-util/src/sink/map_err.rs @@ -52,35 +52,35 @@ impl SinkMapErr { impl Sink for SinkMapErr where Si: Sink, - F: FnOnce(Si::SinkError) -> E, + F: FnOnce(Si::Error) -> E, { - type SinkError = E; + type Error = E; fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.as_mut().sink().poll_ready(cx).map_err(|e| self.as_mut().take_f()(e)) } fn start_send( mut self: Pin<&mut Self>, item: Item, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { self.as_mut().sink().start_send(item).map_err(|e| self.as_mut().take_f()(e)) } fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.as_mut().sink().poll_flush(cx).map_err(|e| self.as_mut().take_f()(e)) } fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.as_mut().sink().poll_close(cx).map_err(|e| self.as_mut().take_f()(e)) } } diff --git a/futures-util/src/sink/mod.rs b/futures-util/src/sink/mod.rs index 9ff4455931..fc4c8ec883 100644 --- a/futures-util/src/sink/mod.rs +++ b/futures-util/src/sink/mod.rs @@ -65,7 +65,7 @@ pub trait SinkExt: Sink { fn with(self, f: F) -> With where F: FnMut(U) -> Fut, Fut: Future>, - E: From, + E: From, Self: Sized { With::new(self, f) @@ -107,7 +107,7 @@ pub trait SinkExt: Sink { /// ``` fn with_flat_map(self, f: F) -> WithFlatMap where F: FnMut(U) -> St, - St: Stream>, + St: Stream>, Self: Sized { WithFlatMap::new(self, f) @@ -129,7 +129,7 @@ pub trait SinkExt: Sink { /// Transforms the error returned by the sink. fn sink_map_err(self, f: F) -> SinkMapErr - where F: FnOnce(Self::SinkError) -> E, + where F: FnOnce(Self::Error) -> E, Self: Sized, { SinkMapErr::new(self, f) @@ -140,7 +140,7 @@ pub trait SinkExt: Sink { /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`. fn sink_err_into(self) -> err_into::SinkErrInto where Self: Sized, - Self::SinkError: Into, + Self::Error: Into, { SinkErrInto::new(self) } @@ -179,7 +179,7 @@ pub trait SinkExt: Sink { fn fanout(self, other: Si) -> Fanout where Self: Sized, Item: Clone, - Si: Sink + Si: Sink { Fanout::new(self, other) } @@ -233,7 +233,7 @@ pub trait SinkExt: Sink { /// This can be used in combination with the `right_sink` method to write `if` /// statements that evaluate to different streams in different branches. fn left_sink(self) -> Either - where Si2: Sink, + where Si2: Sink, Self: Sized { Either::Left(self) @@ -245,7 +245,7 @@ pub trait SinkExt: Sink { /// This can be used in combination with the `left_sink` method to write `if` /// statements that evaluate to different streams in different branches. fn right_sink(self) -> Either - where Si1: Sink, + where Si1: Sink, Self: Sized { Either::Right(self) diff --git a/futures-util/src/sink/send.rs b/futures-util/src/sink/send.rs index e7590f0fcb..2e608d1f5f 100644 --- a/futures-util/src/sink/send.rs +++ b/futures-util/src/sink/send.rs @@ -24,7 +24,7 @@ impl<'a, Si: Sink + Unpin + ?Sized, Item> Send<'a, Si, Item> { } impl + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> { - type Output = Result<(), Si::SinkError>; + type Output = Result<(), Si::Error>; fn poll( mut self: Pin<&mut Self>, diff --git a/futures-util/src/sink/send_all.rs b/futures-util/src/sink/send_all.rs index 67a630fa33..65e5bf5236 100644 --- a/futures-util/src/sink/send_all.rs +++ b/futures-util/src/sink/send_all.rs @@ -45,7 +45,7 @@ where &mut self, cx: &mut Context<'_>, item: St::Item, - ) -> Poll> { + ) -> Poll> { debug_assert!(self.buffered.is_none()); match Pin::new(&mut self.sink).poll_ready(cx)? { Poll::Ready(()) => { @@ -64,7 +64,7 @@ where Si: Sink + Unpin + ?Sized, St: Stream + Unpin + ?Sized, { - type Output = Result<(), Si::SinkError>; + type Output = Result<(), Si::Error>; fn poll( mut self: Pin<&mut Self>, diff --git a/futures-util/src/sink/with.rs b/futures-util/src/sink/with.rs index 6991a1bf19..e5a9ee5e6f 100644 --- a/futures-util/src/sink/with.rs +++ b/futures-util/src/sink/with.rs @@ -49,7 +49,7 @@ where Si: Sink, pub(super) fn new(sink: Si, f: F) -> Self where Fut: Future>, - E: From, + E: From, { With { state: State::Empty, @@ -105,7 +105,7 @@ impl With where Si: Sink, F: FnMut(U) -> Fut, Fut: Future>, - E: From, + E: From, { /// Get a shared reference to the inner sink. pub fn get_ref(&self) -> &Si { @@ -154,21 +154,21 @@ impl Sink for With where Si: Sink, F: FnMut(U) -> Fut, Fut: Future>, - E: From, + E: From, { - type SinkError = E; + type Error = E; fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.poll(cx) } fn start_send( mut self: Pin<&mut Self>, item: U, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { let item = (self.as_mut().f())(item); self.as_mut().state().set(State::Process(item)); Ok(()) @@ -177,7 +177,7 @@ impl Sink for With fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { ready!(self.as_mut().poll(cx))?; ready!(self.as_mut().sink().poll_flush(cx))?; Poll::Ready(Ok(())) @@ -186,7 +186,7 @@ impl Sink for With fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { ready!(self.as_mut().poll(cx))?; ready!(self.as_mut().sink().poll_close(cx))?; Poll::Ready(Ok(())) diff --git a/futures-util/src/sink/with_flat_map.rs b/futures-util/src/sink/with_flat_map.rs index 4d0f0fa7eb..6a48509613 100644 --- a/futures-util/src/sink/with_flat_map.rs +++ b/futures-util/src/sink/with_flat_map.rs @@ -44,7 +44,7 @@ impl WithFlatMap where Si: Sink, F: FnMut(U) -> St, - St: Stream>, + St: Stream>, { unsafe_pinned!(sink: Si); unsafe_unpinned!(f: F); @@ -86,7 +86,7 @@ where fn try_empty_stream( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let WithFlatMap { sink, stream, buffer, .. } = unsafe { self.get_unchecked_mut() }; let mut sink = unsafe { Pin::new_unchecked(sink) }; @@ -117,7 +117,7 @@ impl Stream for WithFlatMap where S: Stream + Sink, F: FnMut(U) -> St, - St: Stream>, + St: Stream>, { type Item = S::Item; fn poll_next( @@ -132,21 +132,21 @@ impl Sink for WithFlatMap where Si: Sink, F: FnMut(U) -> St, - St: Stream>, + St: Stream>, { - type SinkError = Si::SinkError; + type Error = Si::Error; fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.try_empty_stream(cx) } fn start_send( mut self: Pin<&mut Self>, item: U, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { assert!(self.stream.is_none()); let stream = (self.as_mut().f())(item); self.stream().set(Some(stream)); @@ -156,7 +156,7 @@ where fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { ready!(self.as_mut().try_empty_stream(cx)?); self.as_mut().sink().poll_flush(cx) } @@ -164,7 +164,7 @@ where fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { ready!(self.as_mut().try_empty_stream(cx)?); self.as_mut().sink().poll_close(cx) } diff --git a/futures-util/src/stream/buffer_unordered.rs b/futures-util/src/stream/buffer_unordered.rs index 522c693ee3..48eb04c08c 100644 --- a/futures-util/src/stream/buffer_unordered.rs +++ b/futures-util/src/stream/buffer_unordered.rs @@ -144,7 +144,7 @@ where S: Stream + Sink, S::Item: Future, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/buffered.rs b/futures-util/src/stream/buffered.rs index 5b9dedcf55..6aea92d037 100644 --- a/futures-util/src/stream/buffered.rs +++ b/futures-util/src/stream/buffered.rs @@ -129,7 +129,7 @@ where S: Stream + Sink, S::Item: Future, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/chunks.rs b/futures-util/src/stream/chunks.rs index c4f0b466fb..16ad3aee0e 100644 --- a/futures-util/src/stream/chunks.rs +++ b/futures-util/src/stream/chunks.rs @@ -111,7 +111,7 @@ impl Sink for Chunks where S: Stream + Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/enumerate.rs b/futures-util/src/stream/enumerate.rs index b5a4413bc1..7c1dcafaa6 100644 --- a/futures-util/src/stream/enumerate.rs +++ b/futures-util/src/stream/enumerate.rs @@ -87,7 +87,7 @@ impl Sink for Enumerate where S: Stream + Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/filter.rs b/futures-util/src/stream/filter.rs index b447864864..229520c3f2 100644 --- a/futures-util/src/stream/filter.rs +++ b/futures-util/src/stream/filter.rs @@ -139,7 +139,7 @@ impl Sink for Filter F: FnMut(&S::Item) -> Fut, Fut: Future, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/filter_map.rs b/futures-util/src/stream/filter_map.rs index 799d4db8c2..1e7ce09925 100644 --- a/futures-util/src/stream/filter_map.rs +++ b/futures-util/src/stream/filter_map.rs @@ -125,7 +125,7 @@ impl Sink for FilterMap F: FnMut(S::Item) -> Fut, Fut: Future, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/flatten.rs b/futures-util/src/stream/flatten.rs index 428a81fe4e..865ab61e6d 100644 --- a/futures-util/src/stream/flatten.rs +++ b/futures-util/src/stream/flatten.rs @@ -101,7 +101,7 @@ impl Sink for Flatten where S: Stream + Sink, S::Item: Stream, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/forward.rs b/futures-util/src/stream/forward.rs index a79b896a86..085f1fa2a6 100644 --- a/futures-util/src/stream/forward.rs +++ b/futures-util/src/stream/forward.rs @@ -19,10 +19,10 @@ pub struct Forward> { impl + Unpin> Unpin for Forward {} -impl Forward +impl Forward where - Si: Sink, - St: TryStream + Stream, + Si: Sink, + St: TryStream + Stream, { unsafe_pinned!(sink: Option); unsafe_pinned!(stream: Fuse); @@ -40,7 +40,7 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, item: St::Ok, - ) -> Poll> { + ) -> Poll> { debug_assert!(self.buffered_item.is_none()); { let mut sink = self.as_mut().sink().as_pin_mut().unwrap(); @@ -61,10 +61,10 @@ impl + Unpin> FusedFuture for Forward { impl Future for Forward where - Si: Sink, + Si: Sink, St: Stream>, { - type Output = Result<(), Si::SinkError>; + type Output = Result<(), E>; fn poll( mut self: Pin<&mut Self>, diff --git a/futures-util/src/stream/fuse.rs b/futures-util/src/stream/fuse.rs index 45a1d8ccc5..2ce9b309ca 100644 --- a/futures-util/src/stream/fuse.rs +++ b/futures-util/src/stream/fuse.rs @@ -91,7 +91,7 @@ impl Stream for Fuse { // Forwarding impl of Sink from the underlying stream impl, Item> Sink for Fuse { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/inspect.rs b/futures-util/src/stream/inspect.rs index 474f42bafc..a1164c4986 100644 --- a/futures-util/src/stream/inspect.rs +++ b/futures-util/src/stream/inspect.rs @@ -104,7 +104,7 @@ impl Sink for Inspect where S: Stream + Sink, F: FnMut(&S::Item), { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/map.rs b/futures-util/src/stream/map.rs index be7695987e..5d5f56c741 100644 --- a/futures-util/src/stream/map.rs +++ b/futures-util/src/stream/map.rs @@ -97,7 +97,7 @@ impl Sink for Map where S: Stream + Sink, F: FnMut(S::Item) -> T, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index e8f01e7b00..442a9071ca 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -1073,7 +1073,7 @@ pub trait StreamExt: Stream { fn forward(self, sink: S) -> Forward where S: Sink<::Ok>, - Self: TryStream + Sized, + Self: TryStream + Sized, { Forward::new(self, sink) } diff --git a/futures-util/src/stream/peek.rs b/futures-util/src/stream/peek.rs index 7bc4765985..7cdda49ca5 100644 --- a/futures-util/src/stream/peek.rs +++ b/futures-util/src/stream/peek.rs @@ -109,7 +109,7 @@ impl Stream for Peekable { impl Sink for Peekable where S: Sink + Stream { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/skip.rs b/futures-util/src/stream/skip.rs index 2bd72f1843..dc5237dec4 100644 --- a/futures-util/src/stream/skip.rs +++ b/futures-util/src/stream/skip.rs @@ -87,7 +87,7 @@ impl Sink for Skip where S: Stream + Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/skip_while.rs b/futures-util/src/stream/skip_while.rs index fc7668babf..89cd6ca982 100644 --- a/futures-util/src/stream/skip_while.rs +++ b/futures-util/src/stream/skip_while.rs @@ -138,7 +138,7 @@ impl Sink for SkipWhile F: FnMut(&S::Item) -> Fut, Fut: Future, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/split.rs b/futures-util/src/stream/split.rs index 0d08395d8b..c6151fbf79 100644 --- a/futures-util/src/stream/split.rs +++ b/futures-util/src/stream/split.rs @@ -66,9 +66,9 @@ impl + Unpin, Item> SplitSink { } impl, Item> Sink for SplitSink { - type SinkError = S::SinkError; + type Error = S::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if self.slot.is_none() { return Poll::Ready(Ok(())); @@ -77,12 +77,12 @@ impl, Item> Sink for SplitSink { } } - fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::SinkError> { + fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> { self.slot = Some(item); Ok(()) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); if this.slot.is_some() { @@ -92,7 +92,7 @@ impl, Item> Sink for SplitSink { inner.as_pin_mut().poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); if this.slot.is_some() { diff --git a/futures-util/src/stream/take.rs b/futures-util/src/stream/take.rs index 9d641900a4..6613bfa84c 100644 --- a/futures-util/src/stream/take.rs +++ b/futures-util/src/stream/take.rs @@ -84,7 +84,7 @@ impl Stream for Take impl Sink for Take where S: Stream + Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/take_while.rs b/futures-util/src/stream/take_while.rs index 940927d6b6..6e26a336dd 100644 --- a/futures-util/src/stream/take_while.rs +++ b/futures-util/src/stream/take_while.rs @@ -34,17 +34,19 @@ where } } -impl TakeWhile - where St: Stream, - F: FnMut(&St::Item) -> Fut, - Fut: Future, -{ +impl TakeWhile { unsafe_pinned!(stream: St); unsafe_unpinned!(f: F); unsafe_pinned!(pending_fut: Option); unsafe_unpinned!(pending_item: Option); unsafe_unpinned!(done_taking: bool); +} +impl TakeWhile + where St: Stream, + F: FnMut(&St::Item) -> Fut, + Fut: Future, +{ pub(super) fn new(stream: St, f: F) -> TakeWhile { TakeWhile { stream, @@ -129,10 +131,8 @@ impl Stream for TakeWhile // Forwarding impl of Sink from the underlying stream impl Sink for TakeWhile where S: Stream + Sink, - F: FnMut(&S::Item) -> Fut, - Fut: Future, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/stream/then.rs b/futures-util/src/stream/then.rs index afb80c7649..f4ffe2d417 100644 --- a/futures-util/src/stream/then.rs +++ b/futures-util/src/stream/then.rs @@ -29,14 +29,16 @@ where } } -impl Then - where St: Stream, - F: FnMut(St::Item) -> Fut, -{ +impl Then { unsafe_pinned!(stream: St); unsafe_pinned!(future: Option); unsafe_unpinned!(f: F); +} +impl Then + where St: Stream, + F: FnMut(St::Item) -> Fut, +{ pub(super) fn new(stream: St, f: F) -> Then { Then { stream, @@ -112,11 +114,9 @@ impl Stream for Then // Forwarding impl of Sink from the underlying stream impl Sink for Then - where S: Stream + Sink, - F: FnMut(S::Item) -> Fut, - Fut: Future, + where S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_future/flatten_sink.rs b/futures-util/src/try_future/flatten_sink.rs index 9a189bda2c..4c3df85e6d 100644 --- a/futures-util/src/try_future/flatten_sink.rs +++ b/futures-util/src/try_future/flatten_sink.rs @@ -54,23 +54,23 @@ where impl Sink for FlattenSink where Fut: TryFuture, - Si: Sink, + Si: Sink, { - type SinkError = Fut::Error; + type Error = Fut::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner().poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { self.inner().start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner().poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner().poll_close(cx) } } diff --git a/futures-util/src/try_future/flatten_stream_sink.rs b/futures-util/src/try_future/flatten_stream_sink.rs index cb46ec6171..f2813d7403 100644 --- a/futures-util/src/try_future/flatten_stream_sink.rs +++ b/futures-util/src/try_future/flatten_stream_sink.rs @@ -130,14 +130,14 @@ where impl Sink for FlattenStreamSink where Fut: TryFuture, - Fut::Ok: Sink, + Fut::Ok: Sink, { - type SinkError = Fut::Error; + type Error = Fut::Error; fn poll_ready( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { ready!(self.as_mut().state().poll_future(cx)?); match self.as_mut().state().get_pin_mut() { State::StreamOrSink(s) => s.poll_ready(cx), @@ -146,7 +146,7 @@ where } } - fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { match self.state().get_pin_mut() { State::StreamOrSink(s) => s.start_send(item), State::Future(_) => panic!("poll_ready not called first"), @@ -154,7 +154,7 @@ where } } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.state().get_pin_mut() { State::StreamOrSink(s) => s.poll_flush(cx), // if sink not yet resolved, nothing written ==> everything flushed @@ -166,7 +166,7 @@ where fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let res = match self.as_mut().state().get_pin_mut() { State::StreamOrSink(s) => s.poll_close(cx), State::Future(_) | State::Done => Poll::Ready(Ok(())), diff --git a/futures-util/src/try_future/mod.rs b/futures-util/src/try_future/mod.rs index dcd8cdd5a1..c20da7c057 100644 --- a/futures-util/src/try_future/mod.rs +++ b/futures-util/src/try_future/mod.rs @@ -95,20 +95,20 @@ pub trait TryFutureExt: TryFuture { /// # type E = SendError; /// /// fn make_sink_async() -> impl Future, + /// impl Sink, /// E, /// >> { // ... } /// # let (tx, _rx) = mpsc::unbounded::(); /// # futures::future::ready(Ok(tx)) /// # } - /// fn take_sink(sink: impl Sink) { /* ... */ } + /// fn take_sink(sink: impl Sink) { /* ... */ } /// /// let fut = make_sink_async(); /// take_sink(fut.flatten_sink()) /// ``` fn flatten_sink(self) -> FlattenSink where - Self::Ok: Sink, + Self::Ok: Sink, Self: Sized, { FlattenSink::new(self) diff --git a/futures-util/src/try_future/try_flatten_stream.rs b/futures-util/src/try_future/try_flatten_stream.rs index 32a040269b..1b3c79336e 100644 --- a/futures-util/src/try_future/try_flatten_stream.rs +++ b/futures-util/src/try_future/try_flatten_stream.rs @@ -67,23 +67,23 @@ where impl Sink for TryFlattenStream where Fut: TryFuture, - Fut::Ok: TryStream + Sink, + Fut::Ok: TryStream + Sink, { - type SinkError = Fut::Error; + type Error = Fut::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner().poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { self.inner().start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner().poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.inner().poll_close(cx) } } diff --git a/futures-util/src/try_stream/and_then.rs b/futures-util/src/try_stream/and_then.rs index 96f00d44ca..6053075949 100644 --- a/futures-util/src/try_stream/and_then.rs +++ b/futures-util/src/try_stream/and_then.rs @@ -29,15 +29,17 @@ where } } +impl AndThen { + unsafe_pinned!(stream: St); + unsafe_pinned!(future: Option); + unsafe_unpinned!(f: F); +} + impl AndThen where St: TryStream, F: FnMut(St::Ok) -> Fut, Fut: TryFuture, { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Self { Self { stream, future: None, f } } @@ -103,11 +105,9 @@ impl Stream for AndThen // Forwarding impl of Sink from the underlying stream impl Sink for AndThen - where S: TryStream + Sink, - F: FnMut(S::Ok) -> Fut, - Fut: TryFuture, + where S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/err_into.rs b/futures-util/src/try_stream/err_into.rs index 05f6878b37..b5959e0c02 100644 --- a/futures-util/src/try_stream/err_into.rs +++ b/futures-util/src/try_stream/err_into.rs @@ -80,10 +80,9 @@ where // Forwarding impl of Sink from the underlying stream impl Sink for ErrInto where - S: TryStream + Sink, - S::Error: Into, + S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/inspect_err.rs b/futures-util/src/try_stream/inspect_err.rs index 62ce4d295f..fe173ca160 100644 --- a/futures-util/src/try_stream/inspect_err.rs +++ b/futures-util/src/try_stream/inspect_err.rs @@ -26,14 +26,16 @@ where } } +impl InspectErr { + unsafe_pinned!(stream: St); + unsafe_unpinned!(f: F); +} + impl InspectErr where St: TryStream, F: FnMut(&St::Error), { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Self { Self { stream, f } } @@ -98,10 +100,9 @@ where // Forwarding impl of Sink from the underlying stream impl Sink for InspectErr where - S: TryStream + Sink, - F: FnMut(&S::Error), + S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/inspect_ok.rs b/futures-util/src/try_stream/inspect_ok.rs index 35e647ec9d..816dabfe51 100644 --- a/futures-util/src/try_stream/inspect_ok.rs +++ b/futures-util/src/try_stream/inspect_ok.rs @@ -26,14 +26,16 @@ where } } +impl InspectOk { + unsafe_pinned!(stream: St); + unsafe_unpinned!(f: F); +} + impl InspectOk where St: TryStream, F: FnMut(&St::Ok), { - unsafe_pinned!(stream: St); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Self { Self { stream, f } } @@ -98,10 +100,9 @@ where // Forwarding impl of Sink from the underlying stream impl Sink for InspectOk where - S: TryStream + Sink, - F: FnMut(&S::Ok), + S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/into_stream.rs b/futures-util/src/try_stream/into_stream.rs index a6faadfa46..0cd524e161 100644 --- a/futures-util/src/try_stream/into_stream.rs +++ b/futures-util/src/try_stream/into_stream.rs @@ -71,8 +71,8 @@ impl Stream for IntoStream { } // Forwarding impl of Sink from the underlying stream -impl, Item> Sink for IntoStream { - type SinkError = S::SinkError; +impl, Item> Sink for IntoStream { + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/map_err.rs b/futures-util/src/try_stream/map_err.rs index 9b35aaca2f..0d9c1d72e8 100644 --- a/futures-util/src/try_stream/map_err.rs +++ b/futures-util/src/try_stream/map_err.rs @@ -92,12 +92,11 @@ where } // Forwarding impl of Sink from the underlying stream -impl Sink for MapErr +impl Sink for MapErr where - S: TryStream + Sink, - F: FnMut(S::Error) -> E, + S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/map_ok.rs b/futures-util/src/try_stream/map_ok.rs index f5f38011c0..217b7e0e4d 100644 --- a/futures-util/src/try_stream/map_ok.rs +++ b/futures-util/src/try_stream/map_ok.rs @@ -92,12 +92,11 @@ where } // Forwarding impl of Sink from the underlying stream -impl Sink for MapOk +impl Sink for MapOk where - S: TryStream + Sink, - F: FnMut(S::Ok) -> T, + S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/or_else.rs b/futures-util/src/try_stream/or_else.rs index 0ea9bc9c98..d2b0279b3f 100644 --- a/futures-util/src/try_stream/or_else.rs +++ b/futures-util/src/try_stream/or_else.rs @@ -29,15 +29,17 @@ where } } +impl OrElse { + unsafe_pinned!(stream: St); + unsafe_pinned!(future: Option); + unsafe_unpinned!(f: F); +} + impl OrElse where St: TryStream, F: FnMut(St::Error) -> Fut, Fut: TryFuture, { - unsafe_pinned!(stream: St); - unsafe_pinned!(future: Option); - unsafe_unpinned!(f: F); - pub(super) fn new(stream: St, f: F) -> Self { Self { stream, future: None, f } } @@ -104,11 +106,9 @@ impl Stream for OrElse // Forwarding impl of Sink from the underlying stream impl Sink for OrElse - where S: TryStream + Sink, - F: FnMut(S::Error) -> Fut, - Fut: TryFuture, + where S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/try_buffer_unordered.rs b/futures-util/src/try_stream/try_buffer_unordered.rs index 0f443c71bb..2755645e1b 100644 --- a/futures-util/src/try_stream/try_buffer_unordered.rs +++ b/futures-util/src/try_stream/try_buffer_unordered.rs @@ -107,11 +107,11 @@ impl Stream for TryBufferUnordered } // Forwarding impl of Sink from the underlying stream -impl Sink for TryBufferUnordered - where S: TryStream + Sink, - S::Ok: TryFuture, +impl Sink for TryBufferUnordered + where S: TryStream + Sink, + S::Ok: TryFuture, { - type SinkError = S::SinkError; + type Error = E; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/try_filter.rs b/futures-util/src/try_stream/try_filter.rs index 12b34117b4..28320656fc 100644 --- a/futures-util/src/try_stream/try_filter.rs +++ b/futures-util/src/try_stream/try_filter.rs @@ -131,12 +131,10 @@ impl Stream for TryFilter } // Forwarding impl of Sink from the underlying stream -impl Sink for TryFilter - where S: TryStream + Sink, - Fut: Future, - F: FnMut(&S::Ok) -> Fut, +impl Sink for TryFilter + where S: TryStream + Sink, { - type SinkError = S::SinkError; + type Error = E; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/try_filter_map.rs b/futures-util/src/try_stream/try_filter_map.rs index c392b2ed9e..44ffb9c0b4 100644 --- a/futures-util/src/try_stream/try_filter_map.rs +++ b/futures-util/src/try_stream/try_filter_map.rs @@ -115,12 +115,10 @@ impl Stream for TryFilterMap } // Forwarding impl of Sink from the underlying stream -impl Sink for TryFilterMap - where S: TryStream + Sink, - Fut: TryFuture, Error = S::Error>, - F: FnMut(S::Ok) -> Fut, +impl Sink for TryFilterMap + where S: Sink, { - type SinkError = S::SinkError; + type Error = S::Error; delegate_sink!(stream, Item); } diff --git a/futures-util/src/try_stream/try_skip_while.rs b/futures-util/src/try_stream/try_skip_while.rs index 7a7204c575..26f0c24545 100644 --- a/futures-util/src/try_stream/try_skip_while.rs +++ b/futures-util/src/try_stream/try_skip_while.rs @@ -35,12 +35,17 @@ where } } +impl TrySkipWhile + where St: TryStream, +{ + unsafe_pinned!(stream: St); +} + impl TrySkipWhile where St: TryStream, F: FnMut(&St::Ok) -> Fut, Fut: TryFuture, { - unsafe_pinned!(stream: St); unsafe_unpinned!(f: F); unsafe_pinned!(pending_fut: Option); unsafe_unpinned!(pending_item: Option); @@ -128,12 +133,10 @@ impl Stream for TrySkipWhile } // Forwarding impl of Sink from the underlying stream -impl Sink for TrySkipWhile - where S: TryStream + Sink, - F: FnMut(&S::Ok) -> Fut, - Fut: TryFuture, +impl Sink for TrySkipWhile + where S: TryStream + Sink, { - type SinkError = S::SinkError; + type Error = E; delegate_sink!(stream, Item); } diff --git a/futures/tests/future_try_flatten_stream.rs b/futures/tests/future_try_flatten_stream.rs index 6897c90cbf..082c5efa9a 100644 --- a/futures/tests/future_try_flatten_stream.rs +++ b/futures/tests/future_try_flatten_stream.rs @@ -50,17 +50,17 @@ impl Stream for StreamSink { } impl Sink for StreamSink { - type SinkError = E; - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + type Error = E; + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { panic!() } - fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> { panic!() } - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { panic!() } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { panic!() } } diff --git a/futures/tests/split.rs b/futures/tests/split.rs index 5a6b38024f..db024bff13 100644 --- a/futures/tests/split.rs +++ b/futures/tests/split.rs @@ -27,33 +27,33 @@ impl Stream for Join { } impl, Item> Sink for Join { - type SinkError = U::SinkError; + type Error = U::Error; fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.sink().poll_ready(cx) } fn start_send( self: Pin<&mut Self>, item: Item, - ) -> Result<(), Self::SinkError> { + ) -> Result<(), Self::Error> { self.sink().start_send(item) } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.sink().poll_flush(cx) } fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.sink().poll_close(cx) } }