Skip to content

Commit

Permalink
Add some Send implementations to Futures (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka authored and gnunicorn committed Aug 14, 2018
1 parent e5afab1 commit bd73f60
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 95 deletions.
8 changes: 4 additions & 4 deletions core/src/transport/denied.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ pub struct DeniedTransport;
impl Transport for DeniedTransport {
// TODO: could use `!` for associated types once stable
type Output = Cursor<Vec<u8>>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = io::Error>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error>>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error> + Send + Sync>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = io::Error> + Send + Sync>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error> + Send + Sync>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = io::Error> + Send + Sync>;

#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Expand Down
39 changes: 16 additions & 23 deletions core/src/upgrade/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use bytes::Bytes;
use futures::prelude::*;
use futures::{prelude::*, future};
use multistream_select;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use tokio_io::{AsyncRead, AsyncWrite};
Expand All @@ -29,69 +29,62 @@ use upgrade::{ConnectionUpgrade, Endpoint};
///
/// Returns a `Future` that returns the outcome of the connection upgrade.
#[inline]
pub fn apply<'a, C, U, Maf>(
pub fn apply<C, U, Maf>(
connection: C,
upgrade: U,
endpoint: Endpoint,
remote_addr: Maf,
) -> Box<Future<Item = (U::Output, U::MultiaddrFuture), Error = IoError> + 'a>
) -> impl Future<Item = (U::Output, U::MultiaddrFuture), Error = IoError>
where
U: ConnectionUpgrade<C, Maf> + 'a,
U: ConnectionUpgrade<C, Maf>,
U::NamesIter: Clone, // TODO: not elegant
C: AsyncRead + AsyncWrite + 'a,
Maf: 'a,
C: AsyncRead + AsyncWrite,
{
let future = negotiate(connection, &upgrade, endpoint)
negotiate(connection, &upgrade, endpoint)
.and_then(move |(upgrade_id, connection)| {
upgrade.upgrade(connection, upgrade_id, endpoint, remote_addr)
})
.into_future()
.then(|val| {
match val {
Ok(_) => debug!("Successfully applied negotiated protocol"),
Err(_) => debug!("Failed to apply negotiated protocol"),
Err(ref err) => debug!("Failed to apply negotiated protocol: {:?}", err),
}
val
});

Box::new(future)
})
}

/// Negotiates a protocol on a stream.
///
/// Returns a `Future` that returns the negotiated protocol and the stream.
#[inline]
pub fn negotiate<'a, C, I, U, Maf>(
pub fn negotiate<C, I, U, Maf>(
connection: C,
upgrade: &U,
endpoint: Endpoint,
) -> Box<Future<Item = (U::UpgradeIdentifier, C), Error = IoError> + 'a>
) -> impl Future<Item = (U::UpgradeIdentifier, C), Error = IoError>
where
U: ConnectionUpgrade<I, Maf> + 'a,
U: ConnectionUpgrade<I, Maf>,
U::NamesIter: Clone, // TODO: not elegant
C: AsyncRead + AsyncWrite + 'a,
Maf: 'a,
I: 'a,
C: AsyncRead + AsyncWrite,
{
let iter = upgrade
.protocol_names()
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
debug!("Starting protocol negotiation");

let negotiation = match endpoint {
Endpoint::Listener => multistream_select::listener_select_proto(connection, iter),
Endpoint::Dialer => multistream_select::dialer_select_proto(connection, iter),
Endpoint::Listener => future::Either::A(multistream_select::listener_select_proto(connection, iter)),
Endpoint::Dialer => future::Either::B(multistream_select::dialer_select_proto(connection, iter)),
};

let future = negotiation
negotiation
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.then(move |negotiated| {
match negotiated {
Ok(_) => debug!("Successfully negotiated protocol upgrade"),
Err(ref err) => debug!("Error while negotiated protocol upgrade: {:?}", err),
};
negotiated
});

Box::new(future)
})
}
4 changes: 2 additions & 2 deletions core/src/upgrade/denied.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ where
type NamesIter = iter::Empty<(Bytes, ())>;
type UpgradeIdentifier = (); // TODO: could use `!`
type Output = (); // TODO: could use `!`
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error>>; // TODO: could use `!`
type Future = Box<Future<Item = ((), Self::MultiaddrFuture), Error = io::Error>>; // TODO: could use `!`
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = io::Error> + Send + Sync>; // TODO: could use `!`
type Future = Box<Future<Item = ((), Self::MultiaddrFuture), Error = io::Error> + Send + Sync>; // TODO: could use `!`

#[inline]
fn protocol_names(&self) -> Self::NamesIter {
Expand Down
56 changes: 23 additions & 33 deletions multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! `multistream-select` for the dialer.
use bytes::Bytes;
use futures::future::{loop_fn, result, Loop};
use futures::future::{loop_fn, result, Loop, Either};
use futures::{Future, Sink, Stream};
use ProtocolChoiceError;

Expand All @@ -42,41 +42,39 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// remote, and the protocol name that we passed (so that you don't have to clone the name). On
/// success, the function returns the identifier (of type `P`), plus the socket which now uses that
/// chosen protocol.
// TODO: remove the Box once -> impl Trait lands
#[inline]
pub fn dialer_select_proto<'a, R, I, M, P>(
pub fn dialer_select_proto<R, I, M, P>(
inner: R,
protocols: I,
) -> Box<Future<Item = (P, R), Error = ProtocolChoiceError> + 'a>
) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
where
R: AsyncRead + AsyncWrite + 'a,
I: Iterator<Item = (Bytes, M, P)> + 'a,
M: FnMut(&Bytes, &Bytes) -> bool + 'a,
P: 'a,
R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, M, P)>,
M: FnMut(&Bytes, &Bytes) -> bool,
{
// We choose between the "serial" and "parallel" strategies based on the number of protocols.
if protocols.size_hint().1.map(|n| n <= 3).unwrap_or(false) {
dialer_select_proto_serial(inner, protocols.map(|(n, _, id)| (n, id)))
let fut = dialer_select_proto_serial(inner, protocols.map(|(n, _, id)| (n, id)));
Either::A(fut)
} else {
dialer_select_proto_parallel(inner, protocols)
let fut = dialer_select_proto_parallel(inner, protocols);
Either::B(fut)
}
}

/// Helps selecting a protocol amongst the ones supported.
///
/// Same as `dialer_select_proto`. Tries protocols one by one. The iterator doesn't need to produce
/// match functions, because it's not needed.
// TODO: remove the Box once -> impl Trait lands
pub fn dialer_select_proto_serial<'a, R, I, P>(
pub fn dialer_select_proto_serial<R, I, P>(
inner: R,
mut protocols: I,
) -> Box<Future<Item = (P, R), Error = ProtocolChoiceError> + 'a>
) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
where
R: AsyncRead + AsyncWrite + 'a,
I: Iterator<Item = (Bytes, P)> + 'a,
P: 'a,
R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, P)>,
{
let future = Dialer::new(inner).from_err().and_then(move |dialer| {
Dialer::new(inner).from_err().and_then(move |dialer| {
// Similar to a `loop` keyword.
loop_fn(dialer, move |dialer| {
result(protocols.next().ok_or(ProtocolChoiceError::NoProtocolFound))
Expand Down Expand Up @@ -116,28 +114,23 @@ where
}
})
})
});

// The "Rust doesn't have impl Trait yet" tax.
Box::new(future)
})
}

/// Helps selecting a protocol amongst the ones supported.
///
/// Same as `dialer_select_proto`. Queries the list of supported protocols from the remote, then
/// chooses the most appropriate one.
// TODO: remove the Box once -> impl Trait lands
pub fn dialer_select_proto_parallel<'a, R, I, M, P>(
pub fn dialer_select_proto_parallel<R, I, M, P>(
inner: R,
protocols: I,
) -> Box<Future<Item = (P, R), Error = ProtocolChoiceError> + 'a>
) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
where
R: AsyncRead + AsyncWrite + 'a,
I: Iterator<Item = (Bytes, M, P)> + 'a,
M: FnMut(&Bytes, &Bytes) -> bool + 'a,
P: 'a,
R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, M, P)>,
M: FnMut(&Bytes, &Bytes) -> bool,
{
let future = Dialer::new(inner)
Dialer::new(inner)
.from_err()
.and_then(move |dialer| {
trace!("requesting protocols list");
Expand Down Expand Up @@ -193,8 +186,5 @@ where
}
_ => Err(ProtocolChoiceError::UnexpectedMessage),
}
});

// The "Rust doesn't have impl Trait yet" tax.
Box::new(future)
})
}
27 changes: 11 additions & 16 deletions multistream-select/src/listener_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! `multistream-select` for the listener.
use bytes::Bytes;
use futures::future::{err, loop_fn, Loop};
use futures::future::{err, loop_fn, Loop, Either};
use futures::{Future, Sink, Stream};
use ProtocolChoiceError;

Expand All @@ -45,18 +45,16 @@ use tokio_io::{AsyncRead, AsyncWrite};
///
/// On success, returns the socket and the identifier of the chosen protocol (of type `P`). The
/// socket now uses this protocol.
// TODO: remove the Box once -> impl Trait lands
pub fn listener_select_proto<'a, R, I, M, P>(
pub fn listener_select_proto<R, I, M, P>(
inner: R,
protocols: I,
) -> Box<Future<Item = (P, R), Error = ProtocolChoiceError> + 'a>
) -> impl Future<Item = (P, R), Error = ProtocolChoiceError>
where
R: AsyncRead + AsyncWrite + 'a,
I: Iterator<Item = (Bytes, M, P)> + Clone + 'a,
M: FnMut(&Bytes, &Bytes) -> bool + 'a,
P: 'a,
R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, M, P)> + Clone,
M: FnMut(&Bytes, &Bytes) -> bool,
{
let future = Listener::new(inner).from_err().and_then(move |listener| {
Listener::new(inner).from_err().and_then(move |listener| {
loop_fn(listener, move |listener| {
let protocols = protocols.clone();

Expand All @@ -73,7 +71,7 @@ where
.send(msg)
.from_err()
.map(move |listener| (None, listener));
Box::new(fut) as Box<Future<Item = _, Error = ProtocolChoiceError>>
Either::A(Either::A(fut))
}
Some(DialerToListenerMessage::ProtocolRequest { name }) => {
let mut outcome = None;
Expand All @@ -91,20 +89,17 @@ where
.send(send_back)
.from_err()
.map(move |listener| (outcome, listener));
Box::new(fut) as Box<Future<Item = _, Error = ProtocolChoiceError>>
Either::A(Either::B(fut))
}
None => {
debug!("no protocol request received");
Box::new(err(ProtocolChoiceError::NoProtocolFound)) as Box<_>
Either::B(err(ProtocolChoiceError::NoProtocolFound))
}
})
.map(|(outcome, listener): (_, Listener<R>)| match outcome {
Some(outcome) => Loop::Break((outcome, listener.into_inner())),
None => Loop::Continue(listener),
})
})
});

// The "Rust doesn't have impl Trait yet" tax.
Box::new(future)
})
}
10 changes: 3 additions & 7 deletions multistream-select/src/protocol/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,19 @@ where
{
/// Takes ownership of a socket and starts the handshake. If the handshake succeeds, the
/// future returns a `Dialer`.
pub fn new<'a>(inner: R) -> Box<Future<Item = Dialer<R>, Error = MultistreamSelectError> + 'a>
where
R: 'a,
{
pub fn new(inner: R) -> impl Future<Item = Dialer<R>, Error = MultistreamSelectError> {
let write = LengthDelimitedBuilder::new()
.length_field_length(1)
.new_write(inner);
let inner = LengthDelimitedFramedRead::new(write);

let future = inner
inner
.send(BytesMut::from(MULTISTREAM_PROTOCOL_WITH_LF))
.from_err()
.map(|inner| Dialer {
inner,
handshake_finished: false,
});
Box::new(future)
})
}

/// Grants back the socket. Typically used after a `ProtocolAck` has been received.
Expand Down
11 changes: 3 additions & 8 deletions multistream-select/src/protocol/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,13 @@ where
{
/// Takes ownership of a socket and starts the handshake. If the handshake succeeds, the
/// future returns a `Listener`.
pub fn new<'a>(inner: R) -> Box<Future<Item = Listener<R>, Error = MultistreamSelectError> + 'a>
where
R: 'a,
{
pub fn new(inner: R) -> impl Future<Item = Listener<R>, Error = MultistreamSelectError> {
let write = LengthDelimitedBuilder::new()
.length_field_length(1)
.new_write(inner);
let inner = LengthDelimitedFramedRead::<Bytes, _>::new(write);

let future = inner
inner
.into_future()
.map_err(|(e, _)| e.into())
.and_then(|(msg, rest)| {
Expand All @@ -69,9 +66,7 @@ where
.send(BytesMut::from(MULTISTREAM_PROTOCOL_WITH_LF))
.from_err()
})
.map(|inner| Listener { inner });

Box::new(future)
.map(|inner| Listener { inner })
}

/// Grants back the socket. Typically used after a `ProtocolRequest` has been received and a
Expand Down
4 changes: 2 additions & 2 deletions uds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ impl UdsConfig {

impl Transport for UdsConfig {
type Output = UnixStream;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send + Sync>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
type Dial = Box<Future<Item = (UnixStream, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (UnixStream, Self::MultiaddrFuture), Error = IoError> + Send + Sync>;

fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
if let Ok(path) = multiaddr_to_path(&addr) {
Expand Down

0 comments on commit bd73f60

Please sign in to comment.