Skip to content

Commit

Permalink
Add a IdentifyTransport (libp2p#569)
Browse files Browse the repository at this point in the history
* Add a IdentifyTransport

* Retreiver -> Retriever

* Move the muxer in the IdRetrieverState
  • Loading branch information
tomaka authored Nov 2, 2018
1 parent 437a8c0 commit 4225d26
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 0 deletions.
229 changes: 229 additions & 0 deletions protocols/identify/src/id_transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Contains the `IdentifyTransport` type.

use futures::prelude::*;
use libp2p_core::{Endpoint, Multiaddr, PeerId, PublicKey, Transport, muxing, upgrade::apply};
use protocol::{IdentifyOutput, IdentifyProtocolConfig};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use std::sync::Arc;

/// Wraps around an implementation of `Transport` that yields a muxer. Will use the muxer to
/// open a substream with the remote and retreive its peer id. Then yields a
/// `(PeerId, impl StreamMuxer)`.
///
/// This transport can be used if you don't use any encryption layer, or if you want to make
/// encryption optional, in which case you have no other way to know the `PeerId` of the remote
/// than to ask for it.
///
/// > **Note**: If you use this transport, keep in mind that the `PeerId` returned by the remote
/// > can be anything and shouldn't necessarily be trusted.
#[derive(Debug, Clone)]
pub struct IdentifyTransport<TTrans> {
/// The underlying transport we wrap around.
transport: TTrans,
}

impl<TTrans> IdentifyTransport<TTrans> {
/// Creates an `IdentifyTransport` that wraps around the given transport.
#[inline]
pub fn new(transport: TTrans) -> Self {
IdentifyTransport {
transport,
}
}
}

// TODO: don't use boxes
impl<TTrans, TMuxer> Transport for IdentifyTransport<TTrans>
where
TTrans: Transport<Output = TMuxer>,
TMuxer: muxing::StreamMuxer + Send + Sync + 'static, // TODO: remove unnecessary bounds
TMuxer::Substream: Send + Sync + 'static, // TODO: remove unnecessary bounds
TMuxer::OutboundSubstream: Send + 'static, // TODO: remove unnecessary bounds
TTrans::Dial: Send + Sync + 'static,
TTrans::Listener: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
{
type Output = (PeerId, TMuxer);
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = Self::Output, Error = IoError> + Send>;
type Dial = Box<Future<Item = Self::Output, Error = IoError> + Send>;

#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (listener, new_addr) = match self.transport.listen_on(addr) {
Ok((l, a)) => (l, a),
Err((inner, addr)) => {
let id = IdentifyTransport {
transport: inner,
};
return Err((id, addr));
}
};

let listener = listener
.map(move |(upgrade, remote_addr)| {
let upgr = upgrade
.and_then(move |muxer| {
IdRetriever::new(muxer, IdentifyProtocolConfig, Endpoint::Listener)
});
(Box::new(upgr) as Box<Future<Item = _, Error = _> + Send>, remote_addr)
});

Ok((Box::new(listener) as Box<_>, new_addr))
}

#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
// We dial a first time the node.
let dial = match self.transport.dial(addr.clone()) {
Ok(d) => d,
Err((transport, addr)) => {
let id = IdentifyTransport {
transport,
};
return Err((id, addr));
}
};

let dial = dial.and_then(move |muxer| {
IdRetriever::new(muxer, IdentifyProtocolConfig, Endpoint::Dialer)
});

Ok(Box::new(dial) as Box<_>)
}

#[inline]
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
self.transport.nat_traversal(a, b)
}
}

/// Implementation of `Future` that asks the remote of its `PeerId`.
// TODO: remove unneeded bounds
struct IdRetriever<TMuxer>
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
/// Internal state.
state: IdRetrieverState<TMuxer>,
/// Whether we're dialing or listening.
endpoint: Endpoint,
}

enum IdRetrieverState<TMuxer>
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
/// We are in the process of opening a substream with the remote.
OpeningSubstream(Arc<TMuxer>, muxing::OutboundSubstreamRefWrapFuture<Arc<TMuxer>>, IdentifyProtocolConfig),
/// We opened the substream and are currently negotiating the identify protocol.
NegotiatingIdentify(Arc<TMuxer>, apply::UpgradeApplyFuture<muxing::SubstreamRef<Arc<TMuxer>>, IdentifyProtocolConfig>),
/// We retreived the remote's public key and are ready to yield it when polled again.
Finishing(Arc<TMuxer>, PublicKey),
/// Something bad happend, or the `Future` is finished, and shouldn't be polled again.
Poisoned,
}

impl<TMuxer> IdRetriever<TMuxer>
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
/// Creates a new `IdRetriever` ready to be polled.
fn new(muxer: TMuxer, config: IdentifyProtocolConfig, endpoint: Endpoint) -> Self {
let muxer = Arc::new(muxer);
let opening = muxing::outbound_from_ref_and_wrap(muxer.clone());

IdRetriever {
state: IdRetrieverState::OpeningSubstream(muxer, opening, config),
endpoint,
}
}
}

impl<TMuxer> Future for IdRetriever<TMuxer>
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
type Item = (PeerId, TMuxer);
type Error = IoError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// This loop is here so that we can continue polling until we're ready.
loop {
// In order to satisfy the borrow checker, we extract the state and temporarily put
// `Poisoned` instead.
match mem::replace(&mut self.state, IdRetrieverState::Poisoned) {
IdRetrieverState::OpeningSubstream(muxer, mut opening, config) => {
match opening.poll() {
Ok(Async::Ready(Some(substream))) => {
let upgrade = apply::apply(substream, config, self.endpoint);
self.state = IdRetrieverState::NegotiatingIdentify(muxer, upgrade);
},
Ok(Async::Ready(None)) => {
return Err(IoError::new(IoErrorKind::Other, "remote refused our identify attempt"));
},
Ok(Async::NotReady) => {
self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config);
return Ok(Async::NotReady);
},
Err(err) => return Err(err),
}
},
IdRetrieverState::NegotiatingIdentify(muxer, mut nego) => {
match nego.poll() {
Ok(Async::Ready(IdentifyOutput::RemoteInfo { info, .. })) => {
self.state = IdRetrieverState::Finishing(muxer, info.public_key);
},
Ok(Async::Ready(IdentifyOutput::Sender { .. })) => {
unreachable!("IdentifyOutput::Sender can never be the output from \
the dialing side");
},
Ok(Async::NotReady) => {
self.state = IdRetrieverState::NegotiatingIdentify(muxer, nego);
return Ok(Async::NotReady);
},
Err(err) => return Err(err),
}
},
IdRetrieverState::Finishing(muxer, public_key) => {
// Here is a tricky part: we need to get back the muxer in order to return
// it, but it is in an `Arc`.
let unwrapped = Arc::try_unwrap(muxer).unwrap_or_else(|_| {
panic!("we clone the Arc only to put it into substreams ; once in the \
Finishing state, no substream or upgrade exists anymore ; \
therefore there exists only one instance of the Arc ; qed")
});

// We leave `Poisoned` as the state when returning.
return Ok(Async::Ready((public_key.into(), unwrapped)));
},
IdRetrieverState::Poisoned => {
panic!("Future state panicked inside poll() or is finished")
},
}
}
}
}

// TODO: write basic working test
2 changes: 2 additions & 0 deletions protocols/identify/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ extern crate tokio_timer;
extern crate unsigned_varint;
extern crate void;

pub use self::id_transport::IdentifyTransport;
pub use self::periodic_id_handler::{PeriodicIdentification, PeriodicIdentificationEvent};
pub use self::protocol::{IdentifyInfo, IdentifyOutput};
pub use self::protocol::{IdentifyProtocolConfig, IdentifySender};

mod id_transport;
mod periodic_id_handler;
mod protocol;
mod structs_proto;

0 comments on commit 4225d26

Please sign in to comment.