From 2cdf705f6098a09f289e1e38c134300a22c0fab5 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 May 2019 15:29:40 +0200 Subject: [PATCH 1/2] Some preliminary changes --- misc/multistream-select/src/dialer_select.rs | 45 ++++++++--- misc/multistream-select/src/lib.rs | 55 +------------ .../multistream-select/src/listener_select.rs | 2 +- misc/multistream-select/src/negotiated.rs | 77 +++++++++++++++++++ 4 files changed, 117 insertions(+), 62 deletions(-) create mode 100644 misc/multistream-select/src/negotiated.rs diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index bbd40c1a200..ffba658256c 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -21,7 +21,7 @@ //! Contains the `dialer_select_proto` code, which allows selecting a protocol thanks to //! `multistream-select` for the dialer. -use futures::{future::Either, prelude::*, stream::StreamFuture}; +use futures::{prelude::*, stream::StreamFuture}; use crate::protocol::{ Dialer, DialerFuture, @@ -33,10 +33,6 @@ use std::mem; use tokio_io::{AsyncRead, AsyncWrite}; use crate::{Negotiated, ProtocolChoiceError}; -/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer -/// either sequentially of by considering all protocols in parallel. -pub type DialerSelectFuture = Either, DialerSelectPar>; - /// Helps selecting a protocol amongst the ones supported. /// /// This function expects a socket and a list of protocols. It uses the `multistream-select` @@ -57,9 +53,40 @@ where let iter = protocols.into_iter(); // We choose between the "serial" and "parallel" strategies based on the number of protocols. if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::A(dialer_select_proto_serial(inner, iter)) + DialerSelectFuture::Serial(dialer_select_proto_serial(inner, iter)) } else { - Either::B(dialer_select_proto_parallel(inner, iter)) + DialerSelectFuture::Parallel(dialer_select_proto_parallel(inner, iter)) + } +} + +/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer +/// either sequentially of by considering all protocols in parallel. +pub enum DialerSelectFuture +where + R: AsyncRead + AsyncWrite, + I: Iterator, + I::Item: AsRef<[u8]> +{ + /// We try protocols one by one. + Serial(DialerSelectSeq), + /// We try protocols in parallel. + Parallel(DialerSelectPar), +} + +impl Future for DialerSelectFuture +where + R: AsyncRead + AsyncWrite, + I: Iterator, + I::Item: AsRef<[u8]> + Clone +{ + type Item = (I::Item, Negotiated); + type Error = ProtocolChoiceError; + + fn poll(&mut self) -> Poll { + match self { + DialerSelectFuture::Serial(fut) => fut.poll(), + DialerSelectFuture::Parallel(fut) => fut.poll(), + } } } @@ -207,7 +234,7 @@ where ListenerToDialerMessage::ProtocolAck { ref name } if name.as_ref() == proto_name.as_ref() => { - return Ok(Async::Ready((proto_name, Negotiated(r.into_inner())))) + return Ok(Async::Ready((proto_name, Negotiated::finished(r.into_inner())))) } ListenerToDialerMessage::NotAvailable => { let proto_name = protocols.next() @@ -423,7 +450,7 @@ where Some(ListenerToDialerMessage::ProtocolAck { ref name }) if name.as_ref() == proto_name.as_ref() => { - return Ok(Async::Ready((proto_name, Negotiated(dialer.into_inner())))) + return Ok(Async::Ready((proto_name, Negotiated::finished(dialer.into_inner())))) } _ => return Err(ProtocolChoiceError::UnexpectedMessage) } diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 746f167608c..b249c7a3da7 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -70,60 +70,11 @@ mod dialer_select; mod error; mod length_delimited; mod listener_select; -mod tests; - +mod negotiated; mod protocol; - -use futures::prelude::*; -use std::io; +mod tests; pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture}; pub use self::error::ProtocolChoiceError; pub use self::listener_select::{listener_select_proto, ListenerSelectFuture}; - -/// A stream after it has been negotiated. -pub struct Negotiated(pub(crate) TInner); - -impl io::Read for Negotiated -where - TInner: io::Read -{ - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.read(buf) - } -} - -impl tokio_io::AsyncRead for Negotiated -where - TInner: tokio_io::AsyncRead -{ - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } - - fn read_buf(&mut self, buf: &mut B) -> Poll { - self.0.read_buf(buf) - } -} - -impl io::Write for Negotiated -where - TInner: io::Write -{ - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.0.flush() - } -} - -impl tokio_io::AsyncWrite for Negotiated -where - TInner: tokio_io::AsyncWrite -{ - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.0.shutdown() - } -} +pub use self::negotiated::Negotiated; diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index 59492dc0722..15e41e5677c 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -171,7 +171,7 @@ where } }; if let Some(p) = outcome { - return Ok(Async::Ready((p, Negotiated(listener.into_inner()), protocols))) + return Ok(Async::Ready((p, Negotiated::finished(listener.into_inner()), protocols))) } else { let stream = listener.into_future(); self.inner = ListenerSelectState::Incoming { stream, protocols } diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs new file mode 100644 index 00000000000..70fd313c305 --- /dev/null +++ b/misc/multistream-select/src/negotiated.rs @@ -0,0 +1,77 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use std::io; + +/// A stream after it has been negotiated. +pub struct Negotiated(TInner); + +impl Negotiated { + /// Builds a `Negotiated` containing a stream that's already been successfully negotiated to + /// a specific protocol. + pub(crate) fn finished(inner: TInner) -> Self { + Negotiated(inner) + } +} + +impl io::Read for Negotiated +where + TInner: io::Read +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.read(buf) + } +} + +impl tokio_io::AsyncRead for Negotiated +where + TInner: tokio_io::AsyncRead +{ + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.0.prepare_uninitialized_buffer(buf) + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.0.read_buf(buf) + } +} + +impl io::Write for Negotiated +where + TInner: io::Write +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl tokio_io::AsyncWrite for Negotiated +where + TInner: tokio_io::AsyncWrite +{ + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.0.shutdown() + } +} From 6648acff92fb42fb028a011e2d2d4d4462b460c3 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 May 2019 17:25:39 +0200 Subject: [PATCH 2/2] Add dialer_select_proto_one --- misc/multistream-select/src/dialer_select.rs | 121 ++++++++++++++++- .../src/length_delimited.rs | 50 ++++--- misc/multistream-select/src/negotiated.rs | 127 ++++++++++++++++-- .../multistream-select/src/protocol/dialer.rs | 39 +++++- protocols/floodsub/src/protocol.rs | 2 +- 5 files changed, 299 insertions(+), 40 deletions(-) diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index ffba658256c..5084bee761a 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -51,8 +51,13 @@ where I::Item: AsRef<[u8]> { let iter = protocols.into_iter(); - // We choose between the "serial" and "parallel" strategies based on the number of protocols. - if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { + + // We choose which negotiation strategy to use based on the number of protocols. + if iter.size_hint().0 == 1 && iter.size_hint().1.map(|n| n == 1).unwrap_or(false) { + let protocol = iter.into_iter().next() + .expect("iterator hint returns a minimum length of 1 but its actual length is 0"); + DialerSelectFuture::One(dialer_select_proto_one(inner, protocol)) + } else if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { DialerSelectFuture::Serial(dialer_select_proto_serial(inner, iter)) } else { DialerSelectFuture::Parallel(dialer_select_proto_parallel(inner, iter)) @@ -67,6 +72,8 @@ where I: Iterator, I::Item: AsRef<[u8]> { + /// We try a single protocol. + One(DialerSelectOne), /// We try protocols one by one. Serial(DialerSelectSeq), /// We try protocols in parallel. @@ -84,12 +91,122 @@ where fn poll(&mut self) -> Poll { match self { + DialerSelectFuture::One(fut) => fut.poll(), DialerSelectFuture::Serial(fut) => fut.poll(), DialerSelectFuture::Parallel(fut) => fut.poll(), } } } +/// Tries to negotiate a specific protocol on the substream. Assumes that the negotiation will +/// succeed and immediately returns after starting the negotiation. If it turns out that the +/// negotiation did not succeed, the stream will return an error. +pub fn dialer_select_proto_one(inner: R, protocol: P) -> DialerSelectOne +where + R: AsyncRead + AsyncWrite, + P: AsRef<[u8]> +{ + DialerSelectOne { + inner: DialerSelectOneState::AwaitDialer { dialer_fut: Dialer::dial(inner), proto_name: protocol } + } +} + +/// Future returned by `dialer_select_proto_one`. +pub struct DialerSelectOne +where + R: AsyncRead + AsyncWrite, + P: AsRef<[u8]> +{ + inner: DialerSelectOneState +} + +enum DialerSelectOneState +where + R: AsyncRead + AsyncWrite, + P: AsRef<[u8]> +{ + AwaitDialer { + dialer_fut: DialerFuture, + proto_name: P + }, + SendProtocol { + dialer: Dialer, + proto_name: P, + }, + FlushProtocol { + dialer: Dialer, + proto_name: P, + }, + Undefined +} + +impl Future for DialerSelectOne +where + R: AsyncRead + AsyncWrite, + P: AsRef<[u8]> + Clone +{ + type Item = (P, Negotiated); + type Error = ProtocolChoiceError; + + fn poll(&mut self) -> Poll { + loop { + match mem::replace(&mut self.inner, DialerSelectOneState::Undefined) { + DialerSelectOneState::AwaitDialer { mut dialer_fut, proto_name } => { + let dialer = match dialer_fut.poll()? { + Async::Ready(d) => d, + Async::NotReady => { + self.inner = DialerSelectOneState::AwaitDialer { dialer_fut, proto_name }; + return Ok(Async::NotReady) + } + }; + self.inner = DialerSelectOneState::SendProtocol { + dialer, + proto_name + } + } + DialerSelectOneState::SendProtocol { mut dialer, proto_name } => { + trace!("sending {:?}", proto_name.as_ref()); + let req = DialerToListenerMessage::ProtocolRequest { + name: proto_name.clone() + }; + match dialer.start_send(req)? { + AsyncSink::Ready => { + self.inner = DialerSelectOneState::FlushProtocol { + dialer, + proto_name + } + } + AsyncSink::NotReady(_) => { + self.inner = DialerSelectOneState::SendProtocol { + dialer, + proto_name + }; + return Ok(Async::NotReady) + } + } + } + DialerSelectOneState::FlushProtocol { mut dialer, proto_name } => { + match dialer.poll_complete()? { + Async::Ready(()) => { + let stream = Negotiated::negotiating(dialer, proto_name.as_ref().to_vec()); + return Ok(Async::Ready((proto_name, stream))); + } + Async::NotReady => { + self.inner = DialerSelectOneState::FlushProtocol { + dialer, + proto_name, + }; + return Ok(Async::NotReady) + } + } + } + DialerSelectOneState::Undefined => + panic!("DialerSelectOneState::poll called after completion") + } + } + } +} + /// Helps selecting a protocol amongst the ones supported. /// /// Same as `dialer_select_proto`. Tries protocols one by one. The iterator doesn't need to produce diff --git a/misc/multistream-select/src/length_delimited.rs b/misc/multistream-select/src/length_delimited.rs index 72256b83980..970ca810b1d 100644 --- a/misc/multistream-select/src/length_delimited.rs +++ b/misc/multistream-select/src/length_delimited.rs @@ -52,22 +52,20 @@ enum State { ReadingData { frame_len: u16 }, } -impl LengthDelimited -where - R: AsyncWrite, - C: Encoder -{ - pub fn new(inner: R, codec: C) -> LengthDelimited { - LengthDelimited { - inner: FramedWrite::new(inner, codec), - internal_buffer: { - let mut v = SmallVec::new(); - v.push(0); - v - }, - internal_buffer_pos: 0, - state: State::ReadingLength - } +impl LengthDelimited { + /// Grants access to the underlying socket. + /// + /// Be extra careful when you use this method in order to not trigger logic errors. + pub fn get_ref(&self) -> &R { + self.inner.get_ref() + } + + /// Grants access to the underlying socket. + /// + /// This method is only ever intended to be used for writing. Be extra careful when you use it + /// in order to not trigger logic errors. + pub fn get_mut(&mut self) -> &mut R { + self.inner.get_mut() } /// Destroys the `LengthDelimited` and returns the underlying socket. @@ -81,7 +79,6 @@ where /// you call `poll()` manually**. Using this struct as it is intended to be used (i.e. through /// the modifiers provided by the `futures` crate) will always leave the object in a state in /// which `into_inner()` will not panic. - #[inline] pub fn into_inner(self) -> R { assert_eq!(self.state, State::ReadingLength); assert_eq!(self.internal_buffer_pos, 0); @@ -89,6 +86,25 @@ where } } +impl LengthDelimited +where + R: AsyncWrite, + C: Encoder +{ + pub fn new(inner: R, codec: C) -> LengthDelimited { + LengthDelimited { + inner: FramedWrite::new(inner, codec), + internal_buffer: { + let mut v = SmallVec::new(); + v.push(0); + v + }, + internal_buffer_pos: 0, + state: State::ReadingLength + } + } +} + impl Stream for LengthDelimited where R: AsyncRead diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index 70fd313c305..d7e6acc7f1a 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -18,26 +18,84 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::protocol::{Dialer, ListenerToDialerMessage}; use futures::prelude::*; -use std::io; +use std::{io, mem}; /// A stream after it has been negotiated. -pub struct Negotiated(TInner); +pub struct Negotiated(NegotiatedInner); + +enum NegotiatedInner { + /// We have received confirmation that the protocol is negotiated. + Finished(TInner), + + /// We are waiting for the remote to send back a confirmation that the negotiation has been + /// successful. + Negotiating { + /// The stream of data. + inner: Dialer>, + /// Expected protocol name. + expected_name: Vec, + }, + + /// Temporary state used for transitioning. Should never be observed. + Poisoned, +} impl Negotiated { /// Builds a `Negotiated` containing a stream that's already been successfully negotiated to /// a specific protocol. pub(crate) fn finished(inner: TInner) -> Self { - Negotiated(inner) + Negotiated(NegotiatedInner::Finished(inner)) + } + + /// Builds a `Negotiated` expecting a successful protocol negotiation answer. + pub(crate) fn negotiating

(inner: Dialer, expected_name: Vec) -> Self + where P: AsRef<[u8]>, + TInner: tokio_io::AsyncRead + tokio_io::AsyncWrite, + { + Negotiated(NegotiatedInner::Negotiating { + inner: inner.map_param(), + expected_name, + }) } } impl io::Read for Negotiated where - TInner: io::Read + TInner: tokio_io::AsyncRead { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.read(buf) + loop { + match self.0 { + NegotiatedInner::Finished(ref mut s) => return s.read(buf), + NegotiatedInner::Negotiating { ref mut inner, ref expected_name } => { + let msg = match inner.poll() { + Ok(Async::Ready(Some(x))) => x, + Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()), + Err(_) | Ok(Async::Ready(None)) => return Err(io::ErrorKind::InvalidData.into()), + }; + + if let ListenerToDialerMessage::ProtocolAck { ref name } = msg { + if name.as_ref() != &expected_name[..] { + return Err(io::ErrorKind::InvalidData.into()); + } + } else { + return Err(io::ErrorKind::InvalidData.into()); + } + }, + NegotiatedInner::Poisoned => panic!("Poisonned negotiated stream"), + }; + + // If we reach here, we should transition from `Negotiating` to `Finished`. + self.0 = match mem::replace(&mut self.0, NegotiatedInner::Poisoned) { + NegotiatedInner::Negotiating { inner, .. } => { + NegotiatedInner::Finished(inner.into_inner()) + }, + NegotiatedInner::Finished(_) => unreachable!(), + NegotiatedInner::Poisoned => panic!("Poisonned negotiated stream"), + }; + } } } @@ -46,11 +104,12 @@ where TInner: tokio_io::AsyncRead { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } - - fn read_buf(&mut self, buf: &mut B) -> Poll { - self.0.read_buf(buf) + match self.0 { + NegotiatedInner::Finished(ref s) => s.prepare_uninitialized_buffer(buf), + NegotiatedInner::Negotiating { ref inner, .. } => + inner.get_ref().prepare_uninitialized_buffer(buf), + NegotiatedInner::Poisoned => panic!("Poisonned negotiated stream"), + } } } @@ -59,19 +118,59 @@ where TInner: io::Write { fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.write(buf) + match self.0 { + NegotiatedInner::Finished(ref mut s) => s.write(buf), + NegotiatedInner::Negotiating { ref mut inner, .. } => inner.get_mut().write(buf), + NegotiatedInner::Poisoned => panic!("Poisonned negotiated stream"), + } } fn flush(&mut self) -> io::Result<()> { - self.0.flush() + match self.0 { + NegotiatedInner::Finished(ref mut s) => s.flush(), + NegotiatedInner::Negotiating { ref mut inner, .. } => inner.get_mut().flush(), + NegotiatedInner::Poisoned => panic!("Poisonned negotiated stream"), + } } } impl tokio_io::AsyncWrite for Negotiated where - TInner: tokio_io::AsyncWrite + TInner: tokio_io::AsyncRead + tokio_io::AsyncWrite { fn shutdown(&mut self) -> Poll<(), io::Error> { - self.0.shutdown() + loop { + match self.0 { + NegotiatedInner::Finished(ref mut s) => return s.shutdown(), + NegotiatedInner::Negotiating { ref mut inner, ref expected_name } => { + // We need to wait for the remote to send either an approval or a refusal, + // otherwise for write-only streams we would never know whether anything has + // succeeded. + let msg = match inner.poll() { + Ok(Async::Ready(Some(x))) => x, + Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()), + Err(_) | Ok(Async::Ready(None)) => return Err(io::ErrorKind::InvalidData.into()), + }; + + if let ListenerToDialerMessage::ProtocolAck { ref name } = msg { + if name.as_ref() != &expected_name[..] { + return Err(io::ErrorKind::InvalidData.into()); + } + } else { + return Err(io::ErrorKind::InvalidData.into()); + } + } + NegotiatedInner::Poisoned => panic!("Poisonned negotiated stream"), + }; + + // If we reach here, we should transition from `Negotiating` to `Finished`. + self.0 = match mem::replace(&mut self.0, NegotiatedInner::Poisoned) { + NegotiatedInner::Negotiating { inner, .. } => { + NegotiatedInner::Finished(inner.into_inner()) + }, + NegotiatedInner::Finished(_) => unreachable!(), + NegotiatedInner::Poisoned => panic!("Poisonned negotiated stream"), + }; + } } } diff --git a/misc/multistream-select/src/protocol/dialer.rs b/misc/multistream-select/src/protocol/dialer.rs index 71c9d21077f..5a7c525e386 100644 --- a/misc/multistream-select/src/protocol/dialer.rs +++ b/misc/multistream-select/src/protocol/dialer.rs @@ -27,7 +27,7 @@ use crate::protocol::ListenerToDialerMessage; use crate::protocol::MultistreamSelectError; use crate::protocol::MULTISTREAM_PROTOCOL_WITH_LF; use futures::{prelude::*, sink, Async, StartSend, try_ready}; -use std::io; +use std::{io, marker::PhantomData}; use tokio_codec::Encoder; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::{decode, codec::Uvi}; @@ -39,6 +39,30 @@ pub struct Dialer { handshake_finished: bool } +impl Dialer { + /// Grants access to the socket. You should only ever use this if you're not going to send + /// anything anymore using the `Sink` API. + /// + /// Be extra careful when you use this method in order to not trigger logic errors. + pub fn get_ref(&self) -> &R { + self.inner.get_ref() + } + + /// Grants access to the socket. You should only ever use this if you're not going to send + /// anything anymore using the `Sink` API. + /// + /// This method is only ever intended to be used for writing. Be extra careful when you use it + /// in order to not trigger logic errors. + pub fn get_mut(&mut self) -> &mut R { + self.inner.get_mut() + } + + /// Grants back the socket. Typically used after a `ProtocolAck` has been received. + pub fn into_inner(self) -> R { + self.inner.into_inner() + } +} + impl Dialer where R: AsyncRead + AsyncWrite, @@ -52,10 +76,13 @@ where } } - /// Grants back the socket. Typically used after a `ProtocolAck` has been received. - #[inline] - pub fn into_inner(self) -> R { - self.inner.into_inner() + /// Changes the `N` type parameter. + pub fn map_param>(self) -> Dialer { + let handshake_finished = self.handshake_finished; + Dialer { + inner: LengthDelimited::new(self.into_inner(), MessageEncoder(PhantomData)), + handshake_finished, + } } } @@ -89,7 +116,7 @@ where impl Stream for Dialer where - R: AsyncRead + AsyncWrite + R: AsyncRead { type Item = ListenerToDialerMessage; type Error = MultistreamSelectError; diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 532a0f88f80..40f47f7ce65 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -164,7 +164,7 @@ impl UpgradeInfo for FloodsubRpc { impl OutboundUpgrade for FloodsubRpc where - TSocket: AsyncWrite, + TSocket: AsyncRead + AsyncWrite, { type Output = (); type Error = io::Error;