Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman S. Borschel committed Sep 5, 2019
1 parent 1d31ae6 commit 14faf43
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 108 deletions.
49 changes: 24 additions & 25 deletions core/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ pub trait Transport {
where
Self: Sized;

/// Turns the transport into an abstract boxed (i.e. heap-allocated) transport.
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
where Self: Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
{
boxed::boxed(self)
}

/// Applies a function on the connections created by the transport.
fn map<F, O>(self, f: F) -> map::Map<Self, F>
where
Expand All @@ -135,12 +145,25 @@ pub trait Transport {
fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
where
Self: Sized,
E: Error + 'static,
F: FnOnce(Self::Error) -> E + Clone
{
map_err::MapErr::new(self, f)
}

/// Adds a fallback transport that is used when encountering errors
/// while establishing inbound or outbound connections.
///
/// The returned transport will act like `self`, except that if `listen_on` or `dial`
/// return an error then `other` will be tried.
fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
where
Self: Sized,
U: Transport,
<U as Transport>::Error: 'static
{
OrTransport::new(self, other)
}

/// Applies a function producing an asynchronous result to every connection
/// created by this transport.
///
Expand All @@ -158,30 +181,6 @@ pub trait Transport {
and_then::AndThen::new(self, f)
}

/// Turns the transport into an abstract boxed (i.e. heap-allocated) transport.
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
where Self: Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
{
boxed::boxed(self)
}

/// Adds a fallback transport that is used when encountering errors
/// while establishing inbound or outbound connections.
///
/// The returned transport will act like `self`, except that if `listen_on` or `dial`
/// return an error then `other` will be tried.
fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
where
Self: Sized,
U: Transport,
<U as Transport>::Error: 'static
{
OrTransport::new(self, other)
}

/// Adds a timeout to the connection setup (including upgrades) for all
/// inbound and outbound connections established through the transport.
fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
Expand Down
136 changes: 55 additions & 81 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Configuration of transport protocol upgrades.
use crate::{
ConnectedPoint,
ConnectionInfo,
Expand Down Expand Up @@ -72,7 +74,7 @@ pub struct Builder<T> {
impl<T> Builder<T>
where
T: Transport,
<T as Transport>::Error: 'static,
T::Error: 'static,
{
/// Creates a `Builder` over the given (base) `Transport`.
pub fn new(transport: T) -> Builder<T> {
Expand All @@ -92,18 +94,16 @@ where
///
/// * I/O upgrade: `C -> (I, D)`.
/// * Transport output: `C -> (I, D)`
pub fn authenticate<C, D, U, I>(self, upgrade: U) -> Builder<
pub fn authenticate<C, D, U, I, E>(self, upgrade: U) -> Builder<
AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
> where
T: Transport<Output = C>,
I: ConnectionInfo,
C: AsyncRead + AsyncWrite,
D: AsyncRead + AsyncWrite,
U: InboundUpgrade<C, Output = (I, D)> + OutboundUpgrade<C,
Output = <U as InboundUpgrade<C>>::Output,
Error = <U as InboundUpgrade<C>>::Error
> + Clone,
<U as InboundUpgrade<C>>::Error: Error + 'static,
U: InboundUpgrade<C, Output = (I, D), Error = E>,
U: OutboundUpgrade<C, Output = (I, D), Error = E> + Clone,
E: Error + 'static,
{
Builder::new(self.inner.and_then(move |conn, endpoint| {
Authenticate {
Expand All @@ -112,7 +112,7 @@ where
}))
}

/// Applies an arbitrary [`Upgrade`] on an authenticated, non-multiplexed
/// Applies an arbitrary upgrade on an authenticated, non-multiplexed
/// transport.
///
/// The upgrade receives the I/O resource (i.e. connection) `C` and
Expand All @@ -123,17 +123,15 @@ where
///
/// * I/O upgrade: `C -> D`.
/// * Transport output: `(I, C) -> (I, D)`.
pub fn apply<C, D, U, I>(self, upgrade: U) -> Builder<Upgrade<T, U>>
pub fn apply<C, D, U, I, E>(self, upgrade: U) -> Builder<Upgrade<T, U>>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite,
D: AsyncRead + AsyncWrite,
I: ConnectionInfo,
U: InboundUpgrade<C, Output = D> + OutboundUpgrade<C,
Output = <U as InboundUpgrade<C>>::Output,
Error = <U as InboundUpgrade<C>>::Error
> + Clone,
<U as InboundUpgrade<C>>::Error: Error + 'static,
U: InboundUpgrade<C, Output = D, Error = E>,
U: OutboundUpgrade<C, Output = D, Error = E> + Clone,
E: Error + 'static,
{
Builder::new(Upgrade::new(self.inner, upgrade))
}
Expand All @@ -149,18 +147,16 @@ where
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex<C, M, U, I>(self, upgrade: U)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I, M> + Clone>
pub fn multiplex<C, M, U, I, E>(self, upgrade: U)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite,
I: ConnectionInfo,
M: StreamMuxer,
T: Transport<Output = (I, C)>,
U: InboundUpgrade<C, Output = M> + OutboundUpgrade<C,
Output = <U as InboundUpgrade<C>>::Output,
Error = <U as InboundUpgrade<C>>::Error,
> + Clone,
<U as InboundUpgrade<C>>::Error: Error + 'static,
I: ConnectionInfo,
U: InboundUpgrade<C, Output = M, Error = E>,
U: OutboundUpgrade<C, Output = M, Error = E> + Clone,
E: Error + 'static,
{
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint);
Expand All @@ -173,8 +169,6 @@ where
/// in the context of negotiating a secure channel.
///
/// Configured through [`Builder::authenticate`].
/// This is the first mandatory step in the construction of a
/// transport, followed by [`Multiplex`].
pub struct Authenticate<C, U>
where
C: AsyncRead + AsyncWrite,
Expand Down Expand Up @@ -203,34 +197,23 @@ where
/// top of an authenticated transport.
///
/// Configured through [`Builder::multiplex`].
/// This is the second mandatory step in the construction of a
/// transport, preceded by [`Authenticate`].
pub struct Multiplex<C, U, I, M>
pub struct Multiplex<C, U, I>
where
C: AsyncRead + AsyncWrite,
I: ConnectionInfo,
M: StreamMuxer,
U: InboundUpgrade<C, Output = M> + OutboundUpgrade<C,
Output = <U as InboundUpgrade<C>>::Output,
Error = <U as InboundUpgrade<C>>::Error
>
U: InboundUpgrade<C> + OutboundUpgrade<C>,
{
info: Option<I>,
upgrade: EitherUpgrade<C, U>,
}

impl<C, U, I, M> Future for Multiplex<C, U, I, M>
impl<C, U, I, M, E> Future for Multiplex<C, U, I>
where
C: AsyncRead + AsyncWrite,
I: ConnectionInfo,
M: StreamMuxer,
U: InboundUpgrade<C, Output = M> + OutboundUpgrade<C,
Output = <U as InboundUpgrade<C>>::Output,
Error = <U as InboundUpgrade<C>>::Error
>,
U: InboundUpgrade<C, Output = M, Error = E>,
U: OutboundUpgrade<C, Output = M, Error = E>
{
type Item = (I, M);
type Error = UpgradeError<<U as InboundUpgrade<C>>::Error>;
type Error = UpgradeError<E>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let m = try_ready!(self.upgrade.poll());
Expand All @@ -254,13 +237,11 @@ impl<T, U> Upgrade<T, U> {
}
}

impl<T, C, D, I, U, E> Transport for Upgrade<T, U>
impl<T, C, D, U, I, E> Transport for Upgrade<T, U>
where
T: Transport<Output = (I, C)>,
T::Error: 'static,
C: AsyncRead + AsyncWrite,
D: AsyncRead + AsyncWrite,
I: ConnectionInfo,
U: InboundUpgrade<C, Output = D, Error = E>,
U: OutboundUpgrade<C, Output = D, Error = E> + Clone,
E: Error + 'static
Expand All @@ -272,18 +253,21 @@ where
type Dial = DialUpgradeFuture<T::Dial, U, I, C>;

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let outbound = self.inner.dial(addr.clone())
let future = self.inner.dial(addr.clone())
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(DialUpgradeFuture {
future: outbound,
future,
upgrade: future::Either::A(Some(self.upgrade))
})
}

fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let inbound = self.inner.listen_on(addr)
let stream = self.inner.listen_on(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(ListenerStream { stream: inbound, upgrade: self.upgrade })
Ok(ListenerStream {
stream,
upgrade: self.upgrade
})
}
}

Expand Down Expand Up @@ -323,32 +307,28 @@ where
}

/// The [`Transport::Dial`] future of an [`Upgrade`]d transport.
pub struct DialUpgradeFuture<T, U, I, C>
pub struct DialUpgradeFuture<F, U, I, C>
where
I: ConnectionInfo,
U: OutboundUpgrade<C>,
C: AsyncRead + AsyncWrite,
T: Future<Item = (I, C)>,
U: OutboundUpgrade<C>
{
future: T,
future: F,
upgrade: future::Either<Option<U>, (Option<I>, OutboundUpgradeApply<C, U>)>
}

impl<T, U, I, C, D> Future for DialUpgradeFuture<T, U, I, C>
impl<F, U, I, C, D> Future for DialUpgradeFuture<F, U, I, C>
where
I: ConnectionInfo,
F: Future<Item = (I, C)>,
C: AsyncRead + AsyncWrite,
D: AsyncRead + AsyncWrite,
T: Future<Item = (I, C)>,
U: OutboundUpgrade<C, Output = D>,
U::Error: Error
{
type Item = (I, D);
type Error = TransportUpgradeError<T::Error, U::Error>;
type Error = TransportUpgradeError<F::Error, U::Error>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.upgrade {
self.upgrade = match self.upgrade {
future::Either::A(ref mut up) => {
let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some).");
Expand All @@ -359,36 +339,33 @@ where
let i = i.take().expect("DialUpgradeFuture polled after completion.");
return Ok(Async::Ready((i, d)))
}
};
self.upgrade = next
}
}
}
}

/// The [`Transport::Listener`] stream of an [`Upgrade`]d transport.
pub struct ListenerStream<T, U> {
stream: T,
pub struct ListenerStream<S, U> {
stream: S,
upgrade: U
}

impl<T, U, F, I, C, D> Stream for ListenerStream<T, U>
impl<S, U, F, I, C, D> Stream for ListenerStream<S, U>
where
T: Stream<Item = ListenerEvent<F>>,
I: ConnectionInfo,
S: Stream<Item = ListenerEvent<F>>,
F: Future<Item = (I, C)>,
C: AsyncRead + AsyncWrite,
D: AsyncRead + AsyncWrite,
U: InboundUpgrade<C, Output = D> + Clone
{
type Item = ListenerEvent<ListenerUpgradeFuture<F, U, I, C>>;
type Error = TransportUpgradeError<T::Error, U::Error>;
type Error = TransportUpgradeError<S::Error, U::Error>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.stream.poll().map_err(TransportUpgradeError::Transport)) {
Some(event) => {
let event = event.map(move |x| {
let event = event.map(move |future| {
ListenerUpgradeFuture {
future: x,
future,
upgrade: future::Either::A(Some(self.upgrade.clone()))
}
});
Expand All @@ -400,30 +377,28 @@ where
}

/// The [`Transport::ListenerUpgrade`] future of an [`Upgrade`]d transport.
pub struct ListenerUpgradeFuture<T, U, I, C>
pub struct ListenerUpgradeFuture<F, U, I, C>
where
T: Future<Item = (I, C)>,
C: AsyncRead + AsyncWrite,
U: InboundUpgrade<C>
{
future: T,
future: F,
upgrade: future::Either<Option<U>, (Option<I>, InboundUpgradeApply<C, U>)>
}

impl<T, U, I, C, D> Future for ListenerUpgradeFuture<T, U, I, C>
impl<F, U, I, C, D> Future for ListenerUpgradeFuture<F, U, I, C>
where
T: Future<Item = (I, C)>,
F: Future<Item = (I, C)>,
C: AsyncRead + AsyncWrite,
D: AsyncRead + AsyncWrite,
U: InboundUpgrade<C, Output = D>,
U::Error: Error
{
type Item = (I, D);
type Error = TransportUpgradeError<T::Error, U::Error>;
type Error = TransportUpgradeError<F::Error, U::Error>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.upgrade {
self.upgrade = match self.upgrade {
future::Either::A(ref mut up) => {
let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::A(Some).");
Expand All @@ -434,8 +409,7 @@ where
let i = i.take().expect("ListenerUpgradeFuture polled after completion.");
return Ok(Async::Ready((i, d)))
}
};
self.upgrade = next
}
}
}
}
Expand Down
Loading

0 comments on commit 14faf43

Please sign in to comment.