From e865d401550c400a2411893cacd52b0a4e708f35 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sun, 15 May 2022 21:43:13 +1000 Subject: [PATCH] Fix `StreamMuxer::Error` to io::Error We are already enforcing that the associated type must convert to `io::Error`. We might as well just make all functions return an `io::Error` directly. --- core/src/either.rs | 53 ++++++++--------- core/src/muxing.rs | 96 +++++++++++++------------------ core/src/muxing/singleton.rs | 7 +-- core/src/transport/upgrade.rs | 3 +- core/tests/util.rs | 3 +- muxers/mplex/src/lib.rs | 2 - muxers/yamux/src/lib.rs | 67 +++++++++------------ swarm/src/connection/substream.rs | 6 +- 8 files changed, 100 insertions(+), 137 deletions(-) diff --git a/core/src/either.rs b/core/src/either.rs index df7caf600bd..8bc29bce1eb 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -28,7 +28,7 @@ use futures::{ prelude::*, }; use pin_project::pin_project; -use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll}; +use std::{fmt, io, io::Error as IoError, pin::Pin, task::Context, task::Poll}; #[derive(Debug, Copy, Clone)] pub enum EitherError { @@ -203,15 +203,14 @@ where { type Substream = EitherOutput; type OutboundSubstream = EitherOutbound; - type Error = IoError; fn poll_event( &self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll>> { match self { EitherOutput::First(inner) => inner.poll_event(cx).map(|result| { - result.map_err(|e| e.into()).map(|event| match event { + result.map(|event| match event { StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), StreamMuxerEvent::InboundSubstream(substream) => { StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream)) @@ -219,7 +218,7 @@ where }) }), EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| { - result.map_err(|e| e.into()).map(|event| match event { + result.map(|event| match event { StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), StreamMuxerEvent::InboundSubstream(substream) => { StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream)) @@ -240,16 +239,14 @@ where &self, cx: &mut Context<'_>, substream: &mut Self::OutboundSubstream, - ) -> Poll> { + ) -> Poll> { match (self, substream) { (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::First)) - .map_err(|e| e.into()), + .map(|p| p.map(EitherOutput::First)), (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::Second)) - .map_err(|e| e.into()), + .map(|p| p.map(EitherOutput::Second)), _ => panic!("Wrong API usage"), } } @@ -272,13 +269,13 @@ where cx: &mut Context<'_>, sub: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.read_substream(cx, sub, buf).map_err(|e| e.into()) + inner.read_substream(cx, sub, buf) } (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.read_substream(cx, sub, buf).map_err(|e| e.into()) + inner.read_substream(cx, sub, buf) } _ => panic!("Wrong API usage"), } @@ -289,13 +286,13 @@ where cx: &mut Context<'_>, sub: &mut Self::Substream, buf: &[u8], - ) -> Poll> { + ) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.write_substream(cx, sub, buf).map_err(|e| e.into()) + inner.write_substream(cx, sub, buf) } (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.write_substream(cx, sub, buf).map_err(|e| e.into()) + inner.write_substream(cx, sub, buf) } _ => panic!("Wrong API usage"), } @@ -305,13 +302,13 @@ where &self, cx: &mut Context<'_>, sub: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.flush_substream(cx, sub).map_err(|e| e.into()) + inner.flush_substream(cx, sub) } (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.flush_substream(cx, sub).map_err(|e| e.into()) + inner.flush_substream(cx, sub) } _ => panic!("Wrong API usage"), } @@ -321,13 +318,13 @@ where &self, cx: &mut Context<'_>, sub: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.shutdown_substream(cx, sub).map_err(|e| e.into()) + inner.shutdown_substream(cx, sub) } (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.shutdown_substream(cx, sub).map_err(|e| e.into()) + inner.shutdown_substream(cx, sub) } _ => panic!("Wrong API usage"), } @@ -346,17 +343,17 @@ where } } - fn close(&self, cx: &mut Context<'_>) -> Poll> { + fn close(&self, cx: &mut Context<'_>) -> Poll> { match self { - EitherOutput::First(inner) => inner.close(cx).map_err(|e| e.into()), - EitherOutput::Second(inner) => inner.close(cx).map_err(|e| e.into()), + EitherOutput::First(inner) => inner.close(cx), + EitherOutput::Second(inner) => inner.close(cx), } } - fn flush_all(&self, cx: &mut Context<'_>) -> Poll> { + fn flush_all(&self, cx: &mut Context<'_>) -> Poll> { match self { - EitherOutput::First(inner) => inner.flush_all(cx).map_err(|e| e.into()), - EitherOutput::Second(inner) => inner.flush_all(cx).map_err(|e| e.into()), + EitherOutput::First(inner) => inner.flush_all(cx), + EitherOutput::Second(inner) => inner.flush_all(cx), } } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 12beb51d9dd..79e4cb73c0b 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -85,9 +85,6 @@ pub trait StreamMuxer { /// Future that will be resolved when the outgoing substream is open. type OutboundSubstream; - /// Error type of the muxer - type Error: Into; - /// Polls for a connection-wide event. /// /// This function behaves the same as a `Stream`. @@ -103,7 +100,7 @@ pub trait StreamMuxer { fn poll_event( &self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; + ) -> Poll>>; /// Opens a new outgoing substream, and produces the equivalent to a future that will be /// resolved when it becomes available. @@ -125,7 +122,7 @@ pub trait StreamMuxer { &self, cx: &mut Context<'_>, s: &mut Self::OutboundSubstream, - ) -> Poll>; + ) -> Poll>; /// Destroys an outbound substream future. Use this after the outbound substream has finished, /// or if you want to interrupt it. @@ -147,7 +144,7 @@ pub trait StreamMuxer { cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8], - ) -> Poll>; + ) -> Poll>; /// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`. /// @@ -165,7 +162,7 @@ pub trait StreamMuxer { cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8], - ) -> Poll>; + ) -> Poll>; /// Flushes a substream. The behaviour is the same as `futures::AsyncWrite::poll_flush`. /// @@ -181,7 +178,7 @@ pub trait StreamMuxer { &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll>; + ) -> Poll>; /// Attempts to shut down the writing side of a substream. The behaviour is similar to /// `AsyncWrite::poll_close`. @@ -198,7 +195,7 @@ pub trait StreamMuxer { &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll>; + ) -> Poll>; /// Destroys a substream. fn destroy_substream(&self, s: Self::Substream); @@ -226,14 +223,14 @@ pub trait StreamMuxer { /// > that the remote is properly informed of the shutdown. However, apart from /// > properly informing the remote, there is no difference between this and /// > immediately dropping the muxer. - fn close(&self, cx: &mut Context<'_>) -> Poll>; + fn close(&self, cx: &mut Context<'_>) -> Poll>; /// Flush this `StreamMuxer`. /// /// This drains any write buffers of substreams and delivers any pending shutdown notifications /// due to `shutdown_substream` or `close`. One may thus shutdown groups of substreams /// followed by a final `flush_all` instead of having to do `flush_substream` for each. - fn flush_all(&self, cx: &mut Context<'_>) -> Poll>; + fn flush_all(&self, cx: &mut Context<'_>) -> Poll>; } /// Event about a connection, reported by an implementation of [`StreamMuxer`]. @@ -265,7 +262,7 @@ impl StreamMuxerEvent { /// object that implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`. pub fn event_from_ref_and_wrap

( muxer: P, -) -> impl Future>, ::Error>> +) -> impl Future>>> where P: Deref + Clone, P::Target: StreamMuxer, @@ -304,7 +301,7 @@ where P: Deref + Clone, P::Target: StreamMuxer, { - type Output = Result, ::Error>; + type Output = io::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Future::poll(Pin::new(&mut self.inner), cx) { @@ -353,7 +350,7 @@ where P: Deref, P::Target: StreamMuxer, { - type Output = Result<::Substream, ::Error>; + type Output = io::Result<::Substream>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // We use a `this` because the compiler isn't smart enough to allow mutably borrowing @@ -442,7 +439,7 @@ where let this = &mut *self; let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.read_substream(cx, s, buf).map_err(|e| e.into()) + this.muxer.read_substream(cx, s, buf) } } @@ -461,7 +458,7 @@ where let this = &mut *self; let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.write_substream(cx, s, buf).map_err(|e| e.into()) + this.muxer.write_substream(cx, s, buf) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -474,12 +471,12 @@ where match this.shutdown_state { ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) { Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => return Poll::Pending, }, ShutdownState::Flush => match this.muxer.flush_substream(cx, s) { Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => return Poll::Pending, }, ShutdownState::Done => { @@ -495,7 +492,7 @@ where let this = &mut *self; let s = this.substream.as_mut().expect("substream was empty"); - this.muxer.flush_substream(cx, s).map_err(|e| e.into()) + this.muxer.flush_substream(cx, s) } } @@ -512,11 +509,7 @@ where /// Abstract `StreamMuxer`. pub struct StreamMuxerBox { - inner: Box< - dyn StreamMuxer - + Send - + Sync, - >, + inner: Box + Send + Sync>, } impl StreamMuxerBox { @@ -544,13 +537,12 @@ impl StreamMuxerBox { impl StreamMuxer for StreamMuxerBox { type Substream = usize; // TODO: use a newtype type OutboundSubstream = usize; // TODO: use a newtype - type Error = io::Error; #[inline] fn poll_event( &self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll>> { self.inner.poll_event(cx) } @@ -564,7 +556,7 @@ impl StreamMuxer for StreamMuxerBox { &self, cx: &mut Context<'_>, s: &mut Self::OutboundSubstream, - ) -> Poll> { + ) -> Poll> { self.inner.poll_outbound(cx, s) } @@ -579,7 +571,7 @@ impl StreamMuxer for StreamMuxerBox { cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { self.inner.read_substream(cx, s, buf) } @@ -589,7 +581,7 @@ impl StreamMuxer for StreamMuxerBox { cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8], - ) -> Poll> { + ) -> Poll> { self.inner.write_substream(cx, s, buf) } @@ -598,7 +590,7 @@ impl StreamMuxer for StreamMuxerBox { &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { self.inner.flush_substream(cx, s) } @@ -607,7 +599,7 @@ impl StreamMuxer for StreamMuxerBox { &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { self.inner.shutdown_substream(cx, s) } @@ -617,12 +609,12 @@ impl StreamMuxer for StreamMuxerBox { } #[inline] - fn close(&self, cx: &mut Context<'_>) -> Poll> { + fn close(&self, cx: &mut Context<'_>) -> Poll> { self.inner.close(cx) } #[inline] - fn flush_all(&self, cx: &mut Context<'_>) -> Poll> { + fn flush_all(&self, cx: &mut Context<'_>) -> Poll> { self.inner.flush_all(cx) } } @@ -644,20 +636,19 @@ where { type Substream = usize; // TODO: use a newtype type OutboundSubstream = usize; // TODO: use a newtype - type Error = io::Error; #[inline] fn poll_event( &self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll>> { let substream = match self.inner.poll_event(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => { return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) } Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), }; let id = self.next_substream.fetch_add(1, Ordering::Relaxed); @@ -678,7 +669,7 @@ where &self, cx: &mut Context<'_>, substream: &mut Self::OutboundSubstream, - ) -> Poll> { + ) -> Poll> { let mut list = self.outbound.lock(); let substream = match self .inner @@ -686,7 +677,7 @@ where { Poll::Pending => return Poll::Pending, Poll::Ready(Ok(s)) => s, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), }; let id = self.next_substream.fetch_add(1, Ordering::Relaxed); self.substreams.lock().insert(id, substream); @@ -706,11 +697,9 @@ where cx: &mut Context<'_>, s: &mut Self::Substream, buf: &mut [u8], - ) -> Poll> { + ) -> Poll> { let mut list = self.substreams.lock(); - self.inner - .read_substream(cx, list.get_mut(s).unwrap(), buf) - .map_err(|e| e.into()) + self.inner.read_substream(cx, list.get_mut(s).unwrap(), buf) } #[inline] @@ -719,11 +708,10 @@ where cx: &mut Context<'_>, s: &mut Self::Substream, buf: &[u8], - ) -> Poll> { + ) -> Poll> { let mut list = self.substreams.lock(); self.inner .write_substream(cx, list.get_mut(s).unwrap(), buf) - .map_err(|e| e.into()) } #[inline] @@ -731,11 +719,9 @@ where &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { let mut list = self.substreams.lock(); - self.inner - .flush_substream(cx, list.get_mut(s).unwrap()) - .map_err(|e| e.into()) + self.inner.flush_substream(cx, list.get_mut(s).unwrap()) } #[inline] @@ -743,11 +729,9 @@ where &self, cx: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { + ) -> Poll> { let mut list = self.substreams.lock(); - self.inner - .shutdown_substream(cx, list.get_mut(s).unwrap()) - .map_err(|e| e.into()) + self.inner.shutdown_substream(cx, list.get_mut(s).unwrap()) } #[inline] @@ -758,12 +742,12 @@ where } #[inline] - fn close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.close(cx).map_err(|e| e.into()) + fn close(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.close(cx) } #[inline] - fn flush_all(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.flush_all(cx).map_err(|e| e.into()) + fn flush_all(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.flush_all(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 749e9cd673e..288f14bc81e 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -72,12 +72,11 @@ where { type Substream = Substream; type OutboundSubstream = OutboundSubstream; - type Error = io::Error; fn poll_event( &self, _: &mut Context<'_>, - ) -> Poll, io::Error>> { + ) -> Poll>> { match self.endpoint { Endpoint::Dialer => return Poll::Pending, Endpoint::Listener => {} @@ -149,12 +148,12 @@ where fn destroy_substream(&self, _: Self::Substream) {} - fn close(&self, cx: &mut Context<'_>) -> Poll> { + fn close(&self, cx: &mut Context<'_>) -> Poll> { // The `StreamMuxer` trait requires that `close()` implies `flush_all()`. self.flush_all(cx) } - fn flush_all(&self, cx: &mut Context<'_>) -> Poll> { + fn flush_all(&self, cx: &mut Context<'_>) -> Poll> { AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index c72859eb57d..69bf22fc805 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -22,9 +22,10 @@ pub use crate::upgrade::Version; +use crate::muxing::StreamMuxerBox; use crate::{ connection::ConnectedPoint, - muxing::{StreamMuxer, StreamMuxerBox}, + muxing::StreamMuxer, transport::{ and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerEvent, Transport, TransportError, diff --git a/core/tests/util.rs b/core/tests/util.rs index 64e366ecd99..e940d7ede88 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -24,9 +24,8 @@ pub enum CloseMuxerState { impl Future for CloseMuxer where M: StreamMuxer, - M::Error: From, { - type Output = Result; + type Output = std::io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 0f8598c5eef..ccc0afa467d 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -88,8 +88,6 @@ where { type Substream = Substream; type OutboundSubstream = OutboundSubstream; - type Error = io::Error; - fn poll_event( &self, cx: &mut Context<'_>, diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 2ce0b065345..21594bca96d 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -35,7 +35,6 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use thiserror::Error; /// A Yamux connection. pub struct Yamux(Mutex>); @@ -95,26 +94,23 @@ where } } -pub type YamuxResult = Result; - /// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events. impl StreamMuxer for Yamux where - S: Stream> + Unpin, + S: Stream> + Unpin, { type Substream = yamux::Stream; type OutboundSubstream = OpenSubstreamToken; - type Error = YamuxError; fn poll_event( &self, c: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let mut inner = self.0.lock(); match ready!(inner.incoming.poll_next_unpin(c)) { Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), - Some(Err(e)) => Poll::Ready(Err(e)), - None => Poll::Ready(Err(yamux::ConnectionError::Closed.into())), + Some(Err(e)) => Poll::Ready(Err(to_io_error(e))), + None => Poll::Ready(Err(to_io_error(yamux::ConnectionError::Closed))), } } @@ -126,11 +122,11 @@ where &self, c: &mut Context<'_>, _: &mut OpenSubstreamToken, - ) -> Poll> { + ) -> Poll> { let mut inner = self.0.lock(); Pin::new(&mut inner.control) .poll_open_stream(c) - .map_err(YamuxError) + .map_err(to_io_error) } fn destroy_outbound(&self, _: Self::OutboundSubstream) { @@ -142,10 +138,8 @@ where c: &mut Context<'_>, s: &mut Self::Substream, b: &mut [u8], - ) -> Poll> { - Pin::new(s) - .poll_read(c, b) - .map_err(|e| YamuxError(e.into())) + ) -> Poll> { + Pin::new(s).poll_read(c, b) } fn write_substream( @@ -153,46 +147,44 @@ where c: &mut Context<'_>, s: &mut Self::Substream, b: &[u8], - ) -> Poll> { - Pin::new(s) - .poll_write(c, b) - .map_err(|e| YamuxError(e.into())) + ) -> Poll> { + Pin::new(s).poll_write(c, b) } fn flush_substream( &self, c: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_flush(c).map_err(|e| YamuxError(e.into())) + ) -> Poll> { + Pin::new(s).poll_flush(c) } fn shutdown_substream( &self, c: &mut Context<'_>, s: &mut Self::Substream, - ) -> Poll> { - Pin::new(s).poll_close(c).map_err(|e| YamuxError(e.into())) + ) -> Poll> { + Pin::new(s).poll_close(c) } fn destroy_substream(&self, _: Self::Substream) {} - fn close(&self, c: &mut Context<'_>) -> Poll> { + fn close(&self, c: &mut Context<'_>) -> Poll> { let mut inner = self.0.lock(); if let std::task::Poll::Ready(x) = Pin::new(&mut inner.control).poll_close(c) { - return Poll::Ready(x.map_err(YamuxError)); + return Poll::Ready(x.map_err(to_io_error)); } while let std::task::Poll::Ready(x) = inner.incoming.poll_next_unpin(c) { match x { Some(Ok(_)) => {} // drop inbound stream - Some(Err(e)) => return Poll::Ready(Err(e)), + Some(Err(e)) => return Poll::Ready(Err(to_io_error(e))), None => return Poll::Ready(Ok(())), } } Poll::Pending } - fn flush_all(&self, _: &mut Context<'_>) -> Poll> { + fn flush_all(&self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } @@ -385,23 +377,16 @@ where } } -/// The Yamux [`StreamMuxer`] error type. -#[derive(Debug, Error)] -#[error("yamux error: {0}")] -pub struct YamuxError(#[from] yamux::ConnectionError); - -impl From for io::Error { - fn from(err: YamuxError) -> Self { - match err.0 { - yamux::ConnectionError::Io(e) => e, - e => io::Error::new(io::ErrorKind::Other, e), - } +fn to_io_error(e: yamux::ConnectionError) -> io::Error { + match e { + yamux::ConnectionError::Io(e) => e, + e => io::Error::new(io::ErrorKind::Other, e), } } /// The [`futures::stream::Stream`] of incoming substreams. pub struct Incoming { - stream: BoxStream<'static, Result>, + stream: BoxStream<'static, Result>, _marker: std::marker::PhantomData, } @@ -413,7 +398,7 @@ impl fmt::Debug for Incoming { /// The [`futures::stream::Stream`] of incoming substreams (`!Send`). pub struct LocalIncoming { - stream: LocalBoxStream<'static, Result>, + stream: LocalBoxStream<'static, Result>, _marker: std::marker::PhantomData, } @@ -424,7 +409,7 @@ impl fmt::Debug for LocalIncoming { } impl Stream for Incoming { - type Item = Result; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, @@ -441,7 +426,7 @@ impl Stream for Incoming { impl Unpin for Incoming {} impl Stream for LocalIncoming { - type Item = Result; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs index 426f64d9f60..1bcc9add1b6 100644 --- a/swarm/src/connection/substream.rs +++ b/swarm/src/connection/substream.rs @@ -147,7 +147,7 @@ where Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => {} } @@ -169,7 +169,7 @@ where } Poll::Ready(Err(err)) => { self.inner.destroy_outbound(outbound); - return Poll::Ready(Err(err.into())); + return Poll::Ready(Err(err)); } } } @@ -214,7 +214,7 @@ where match self.muxer.close(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), } } }