From bd73f6035850522bbee2105242a405ba2a7df29d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 Aug 2018 15:23:30 +0200 Subject: [PATCH] Add some Send implementations to Futures (#412) --- core/src/transport/denied.rs | 8 +-- core/src/upgrade/apply.rs | 39 ++++++-------- core/src/upgrade/denied.rs | 4 +- multistream-select/src/dialer_select.rs | 56 +++++++++------------ multistream-select/src/listener_select.rs | 27 ++++------ multistream-select/src/protocol/dialer.rs | 10 ++-- multistream-select/src/protocol/listener.rs | 11 ++-- uds/src/lib.rs | 4 +- 8 files changed, 64 insertions(+), 95 deletions(-) diff --git a/core/src/transport/denied.rs b/core/src/transport/denied.rs index 46cc706cddc..2f77c79ae22 100644 --- a/core/src/transport/denied.rs +++ b/core/src/transport/denied.rs @@ -32,10 +32,10 @@ pub struct DeniedTransport; impl Transport for DeniedTransport { // TODO: could use `!` for associated types once stable type Output = Cursor>; - type MultiaddrFuture = Box>; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type MultiaddrFuture = Box + Send + Sync>; + type Listener = Box + Send + Sync>; + type ListenerUpgrade = Box + Send + Sync>; + type Dial = Box + Send + Sync>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index 9ac188f0baf..e6ff6732f8c 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use bytes::Bytes; -use futures::prelude::*; +use futures::{prelude::*, future}; use multistream_select; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -29,19 +29,18 @@ use upgrade::{ConnectionUpgrade, Endpoint}; /// /// Returns a `Future` that returns the outcome of the connection upgrade. #[inline] -pub fn apply<'a, C, U, Maf>( +pub fn apply( connection: C, upgrade: U, endpoint: Endpoint, remote_addr: Maf, -) -> Box + 'a> +) -> impl Future where - U: ConnectionUpgrade + 'a, + U: ConnectionUpgrade, U::NamesIter: Clone, // TODO: not elegant - C: AsyncRead + AsyncWrite + 'a, - Maf: 'a, + C: AsyncRead + AsyncWrite, { - let future = negotiate(connection, &upgrade, endpoint) + negotiate(connection, &upgrade, endpoint) .and_then(move |(upgrade_id, connection)| { upgrade.upgrade(connection, upgrade_id, endpoint, remote_addr) }) @@ -49,29 +48,25 @@ where .then(|val| { match val { Ok(_) => debug!("Successfully applied negotiated protocol"), - Err(_) => debug!("Failed to apply negotiated protocol"), + Err(ref err) => debug!("Failed to apply negotiated protocol: {:?}", err), } val - }); - - Box::new(future) + }) } /// Negotiates a protocol on a stream. /// /// Returns a `Future` that returns the negotiated protocol and the stream. #[inline] -pub fn negotiate<'a, C, I, U, Maf>( +pub fn negotiate( connection: C, upgrade: &U, endpoint: Endpoint, -) -> Box + 'a> +) -> impl Future where - U: ConnectionUpgrade + 'a, + U: ConnectionUpgrade, U::NamesIter: Clone, // TODO: not elegant - C: AsyncRead + AsyncWrite + 'a, - Maf: 'a, - I: 'a, + C: AsyncRead + AsyncWrite, { let iter = upgrade .protocol_names() @@ -79,11 +74,11 @@ where debug!("Starting protocol negotiation"); let negotiation = match endpoint { - Endpoint::Listener => multistream_select::listener_select_proto(connection, iter), - Endpoint::Dialer => multistream_select::dialer_select_proto(connection, iter), + Endpoint::Listener => future::Either::A(multistream_select::listener_select_proto(connection, iter)), + Endpoint::Dialer => future::Either::B(multistream_select::dialer_select_proto(connection, iter)), }; - let future = negotiation + negotiation .map_err(|err| IoError::new(IoErrorKind::Other, err)) .then(move |negotiated| { match negotiated { @@ -91,7 +86,5 @@ where Err(ref err) => debug!("Error while negotiated protocol upgrade: {:?}", err), }; negotiated - }); - - Box::new(future) + }) } diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index 51a84f0ef9b..3cb9b836ccf 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -36,8 +36,8 @@ where type NamesIter = iter::Empty<(Bytes, ())>; type UpgradeIdentifier = (); // TODO: could use `!` type Output = (); // TODO: could use `!` - type MultiaddrFuture = Box>; // TODO: could use `!` - type Future = Box>; // TODO: could use `!` + type MultiaddrFuture = Box + Send + Sync>; // TODO: could use `!` + type Future = Box + Send + Sync>; // TODO: could use `!` #[inline] fn protocol_names(&self) -> Self::NamesIter { diff --git a/multistream-select/src/dialer_select.rs b/multistream-select/src/dialer_select.rs index 460de22abea..986c1d3803a 100644 --- a/multistream-select/src/dialer_select.rs +++ b/multistream-select/src/dialer_select.rs @@ -22,7 +22,7 @@ //! `multistream-select` for the dialer. use bytes::Bytes; -use futures::future::{loop_fn, result, Loop}; +use futures::future::{loop_fn, result, Loop, Either}; use futures::{Future, Sink, Stream}; use ProtocolChoiceError; @@ -42,23 +42,23 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// remote, and the protocol name that we passed (so that you don't have to clone the name). On /// success, the function returns the identifier (of type `P`), plus the socket which now uses that /// chosen protocol. -// TODO: remove the Box once -> impl Trait lands #[inline] -pub fn dialer_select_proto<'a, R, I, M, P>( +pub fn dialer_select_proto( inner: R, protocols: I, -) -> Box + 'a> +) -> impl Future where - R: AsyncRead + AsyncWrite + 'a, - I: Iterator + 'a, - M: FnMut(&Bytes, &Bytes) -> bool + 'a, - P: 'a, + R: AsyncRead + AsyncWrite, + I: Iterator, + M: FnMut(&Bytes, &Bytes) -> bool, { // We choose between the "serial" and "parallel" strategies based on the number of protocols. if protocols.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - dialer_select_proto_serial(inner, protocols.map(|(n, _, id)| (n, id))) + let fut = dialer_select_proto_serial(inner, protocols.map(|(n, _, id)| (n, id))); + Either::A(fut) } else { - dialer_select_proto_parallel(inner, protocols) + let fut = dialer_select_proto_parallel(inner, protocols); + Either::B(fut) } } @@ -66,17 +66,15 @@ where /// /// Same as `dialer_select_proto`. Tries protocols one by one. The iterator doesn't need to produce /// match functions, because it's not needed. -// TODO: remove the Box once -> impl Trait lands -pub fn dialer_select_proto_serial<'a, R, I, P>( +pub fn dialer_select_proto_serial( inner: R, mut protocols: I, -) -> Box + 'a> +) -> impl Future where - R: AsyncRead + AsyncWrite + 'a, - I: Iterator + 'a, - P: 'a, + R: AsyncRead + AsyncWrite, + I: Iterator, { - let future = Dialer::new(inner).from_err().and_then(move |dialer| { + Dialer::new(inner).from_err().and_then(move |dialer| { // Similar to a `loop` keyword. loop_fn(dialer, move |dialer| { result(protocols.next().ok_or(ProtocolChoiceError::NoProtocolFound)) @@ -116,28 +114,23 @@ where } }) }) - }); - - // The "Rust doesn't have impl Trait yet" tax. - Box::new(future) + }) } /// Helps selecting a protocol amongst the ones supported. /// /// Same as `dialer_select_proto`. Queries the list of supported protocols from the remote, then /// chooses the most appropriate one. -// TODO: remove the Box once -> impl Trait lands -pub fn dialer_select_proto_parallel<'a, R, I, M, P>( +pub fn dialer_select_proto_parallel( inner: R, protocols: I, -) -> Box + 'a> +) -> impl Future where - R: AsyncRead + AsyncWrite + 'a, - I: Iterator + 'a, - M: FnMut(&Bytes, &Bytes) -> bool + 'a, - P: 'a, + R: AsyncRead + AsyncWrite, + I: Iterator, + M: FnMut(&Bytes, &Bytes) -> bool, { - let future = Dialer::new(inner) + Dialer::new(inner) .from_err() .and_then(move |dialer| { trace!("requesting protocols list"); @@ -193,8 +186,5 @@ where } _ => Err(ProtocolChoiceError::UnexpectedMessage), } - }); - - // The "Rust doesn't have impl Trait yet" tax. - Box::new(future) + }) } diff --git a/multistream-select/src/listener_select.rs b/multistream-select/src/listener_select.rs index ed36905a82e..e97674b4b5e 100644 --- a/multistream-select/src/listener_select.rs +++ b/multistream-select/src/listener_select.rs @@ -22,7 +22,7 @@ //! `multistream-select` for the listener. use bytes::Bytes; -use futures::future::{err, loop_fn, Loop}; +use futures::future::{err, loop_fn, Loop, Either}; use futures::{Future, Sink, Stream}; use ProtocolChoiceError; @@ -45,18 +45,16 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// /// On success, returns the socket and the identifier of the chosen protocol (of type `P`). The /// socket now uses this protocol. -// TODO: remove the Box once -> impl Trait lands -pub fn listener_select_proto<'a, R, I, M, P>( +pub fn listener_select_proto( inner: R, protocols: I, -) -> Box + 'a> +) -> impl Future where - R: AsyncRead + AsyncWrite + 'a, - I: Iterator + Clone + 'a, - M: FnMut(&Bytes, &Bytes) -> bool + 'a, - P: 'a, + R: AsyncRead + AsyncWrite, + I: Iterator + Clone, + M: FnMut(&Bytes, &Bytes) -> bool, { - let future = Listener::new(inner).from_err().and_then(move |listener| { + Listener::new(inner).from_err().and_then(move |listener| { loop_fn(listener, move |listener| { let protocols = protocols.clone(); @@ -73,7 +71,7 @@ where .send(msg) .from_err() .map(move |listener| (None, listener)); - Box::new(fut) as Box> + Either::A(Either::A(fut)) } Some(DialerToListenerMessage::ProtocolRequest { name }) => { let mut outcome = None; @@ -91,11 +89,11 @@ where .send(send_back) .from_err() .map(move |listener| (outcome, listener)); - Box::new(fut) as Box> + Either::A(Either::B(fut)) } None => { debug!("no protocol request received"); - Box::new(err(ProtocolChoiceError::NoProtocolFound)) as Box<_> + Either::B(err(ProtocolChoiceError::NoProtocolFound)) } }) .map(|(outcome, listener): (_, Listener)| match outcome { @@ -103,8 +101,5 @@ where None => Loop::Continue(listener), }) }) - }); - - // The "Rust doesn't have impl Trait yet" tax. - Box::new(future) + }) } diff --git a/multistream-select/src/protocol/dialer.rs b/multistream-select/src/protocol/dialer.rs index 06d7d4021ee..550247b4bf6 100644 --- a/multistream-select/src/protocol/dialer.rs +++ b/multistream-select/src/protocol/dialer.rs @@ -46,23 +46,19 @@ where { /// Takes ownership of a socket and starts the handshake. If the handshake succeeds, the /// future returns a `Dialer`. - pub fn new<'a>(inner: R) -> Box, Error = MultistreamSelectError> + 'a> - where - R: 'a, - { + pub fn new(inner: R) -> impl Future, Error = MultistreamSelectError> { let write = LengthDelimitedBuilder::new() .length_field_length(1) .new_write(inner); let inner = LengthDelimitedFramedRead::new(write); - let future = inner + inner .send(BytesMut::from(MULTISTREAM_PROTOCOL_WITH_LF)) .from_err() .map(|inner| Dialer { inner, handshake_finished: false, - }); - Box::new(future) + }) } /// Grants back the socket. Typically used after a `ProtocolAck` has been received. diff --git a/multistream-select/src/protocol/listener.rs b/multistream-select/src/protocol/listener.rs index d7b7fbf2cc6..3f99b02648d 100644 --- a/multistream-select/src/protocol/listener.rs +++ b/multistream-select/src/protocol/listener.rs @@ -44,16 +44,13 @@ where { /// Takes ownership of a socket and starts the handshake. If the handshake succeeds, the /// future returns a `Listener`. - pub fn new<'a>(inner: R) -> Box, Error = MultistreamSelectError> + 'a> - where - R: 'a, - { + pub fn new(inner: R) -> impl Future, Error = MultistreamSelectError> { let write = LengthDelimitedBuilder::new() .length_field_length(1) .new_write(inner); let inner = LengthDelimitedFramedRead::::new(write); - let future = inner + inner .into_future() .map_err(|(e, _)| e.into()) .and_then(|(msg, rest)| { @@ -69,9 +66,7 @@ where .send(BytesMut::from(MULTISTREAM_PROTOCOL_WITH_LF)) .from_err() }) - .map(|inner| Listener { inner }); - - Box::new(future) + .map(|inner| Listener { inner }) } /// Grants back the socket. Typically used after a `ProtocolRequest` has been received and a diff --git a/uds/src/lib.rs b/uds/src/lib.rs index 1c475b2422d..4a754f28809 100644 --- a/uds/src/lib.rs +++ b/uds/src/lib.rs @@ -86,10 +86,10 @@ impl UdsConfig { impl Transport for UdsConfig { type Output = UnixStream; - type Listener = Box>; + type Listener = Box + Send + Sync>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; type MultiaddrFuture = FutureResult; - type Dial = Box>; + type Dial = Box + Send + Sync>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { if let Ok(path) = multiaddr_to_path(&addr) {