diff --git a/protocols/identify/src/id_transport.rs b/protocols/identify/src/id_transport.rs new file mode 100644 index 00000000000..b83318ca4ae --- /dev/null +++ b/protocols/identify/src/id_transport.rs @@ -0,0 +1,229 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Contains the `IdentifyTransport` type. + +use futures::prelude::*; +use libp2p_core::{Endpoint, Multiaddr, PeerId, PublicKey, Transport, muxing, upgrade::apply}; +use protocol::{IdentifyOutput, IdentifyProtocolConfig}; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::mem; +use std::sync::Arc; + +/// Wraps around an implementation of `Transport` that yields a muxer. Will use the muxer to +/// open a substream with the remote and retreive its peer id. Then yields a +/// `(PeerId, impl StreamMuxer)`. +/// +/// This transport can be used if you don't use any encryption layer, or if you want to make +/// encryption optional, in which case you have no other way to know the `PeerId` of the remote +/// than to ask for it. +/// +/// > **Note**: If you use this transport, keep in mind that the `PeerId` returned by the remote +/// > can be anything and shouldn't necessarily be trusted. +#[derive(Debug, Clone)] +pub struct IdentifyTransport { + /// The underlying transport we wrap around. + transport: TTrans, +} + +impl IdentifyTransport { + /// Creates an `IdentifyTransport` that wraps around the given transport. + #[inline] + pub fn new(transport: TTrans) -> Self { + IdentifyTransport { + transport, + } + } +} + +// TODO: don't use boxes +impl Transport for IdentifyTransport +where + TTrans: Transport, + TMuxer: muxing::StreamMuxer + Send + Sync + 'static, // TODO: remove unnecessary bounds + TMuxer::Substream: Send + Sync + 'static, // TODO: remove unnecessary bounds + TMuxer::OutboundSubstream: Send + 'static, // TODO: remove unnecessary bounds + TTrans::Dial: Send + Sync + 'static, + TTrans::Listener: Send + 'static, + TTrans::ListenerUpgrade: Send + 'static, +{ + type Output = (PeerId, TMuxer); + type Listener = Box + Send>; + type ListenerUpgrade = Box + Send>; + type Dial = Box + Send>; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + let (listener, new_addr) = match self.transport.listen_on(addr) { + Ok((l, a)) => (l, a), + Err((inner, addr)) => { + let id = IdentifyTransport { + transport: inner, + }; + return Err((id, addr)); + } + }; + + let listener = listener + .map(move |(upgrade, remote_addr)| { + let upgr = upgrade + .and_then(move |muxer| { + IdRetriever::new(muxer, IdentifyProtocolConfig, Endpoint::Listener) + }); + (Box::new(upgr) as Box + Send>, remote_addr) + }); + + Ok((Box::new(listener) as Box<_>, new_addr)) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result { + // We dial a first time the node. + let dial = match self.transport.dial(addr.clone()) { + Ok(d) => d, + Err((transport, addr)) => { + let id = IdentifyTransport { + transport, + }; + return Err((id, addr)); + } + }; + + let dial = dial.and_then(move |muxer| { + IdRetriever::new(muxer, IdentifyProtocolConfig, Endpoint::Dialer) + }); + + Ok(Box::new(dial) as Box<_>) + } + + #[inline] + fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option { + self.transport.nat_traversal(a, b) + } +} + +/// Implementation of `Future` that asks the remote of its `PeerId`. +// TODO: remove unneeded bounds +struct IdRetriever +where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, + TMuxer::Substream: Send, +{ + /// Internal state. + state: IdRetrieverState, + /// Whether we're dialing or listening. + endpoint: Endpoint, +} + +enum IdRetrieverState +where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, + TMuxer::Substream: Send, +{ + /// We are in the process of opening a substream with the remote. + OpeningSubstream(Arc, muxing::OutboundSubstreamRefWrapFuture>, IdentifyProtocolConfig), + /// We opened the substream and are currently negotiating the identify protocol. + NegotiatingIdentify(Arc, apply::UpgradeApplyFuture>, IdentifyProtocolConfig>), + /// We retreived the remote's public key and are ready to yield it when polled again. + Finishing(Arc, PublicKey), + /// Something bad happend, or the `Future` is finished, and shouldn't be polled again. + Poisoned, +} + +impl IdRetriever +where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, + TMuxer::Substream: Send, +{ + /// Creates a new `IdRetriever` ready to be polled. + fn new(muxer: TMuxer, config: IdentifyProtocolConfig, endpoint: Endpoint) -> Self { + let muxer = Arc::new(muxer); + let opening = muxing::outbound_from_ref_and_wrap(muxer.clone()); + + IdRetriever { + state: IdRetrieverState::OpeningSubstream(muxer, opening, config), + endpoint, + } + } +} + +impl Future for IdRetriever +where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, + TMuxer::Substream: Send, +{ + type Item = (PeerId, TMuxer); + type Error = IoError; + + fn poll(&mut self) -> Poll { + // This loop is here so that we can continue polling until we're ready. + loop { + // In order to satisfy the borrow checker, we extract the state and temporarily put + // `Poisoned` instead. + match mem::replace(&mut self.state, IdRetrieverState::Poisoned) { + IdRetrieverState::OpeningSubstream(muxer, mut opening, config) => { + match opening.poll() { + Ok(Async::Ready(Some(substream))) => { + let upgrade = apply::apply(substream, config, self.endpoint); + self.state = IdRetrieverState::NegotiatingIdentify(muxer, upgrade); + }, + Ok(Async::Ready(None)) => { + return Err(IoError::new(IoErrorKind::Other, "remote refused our identify attempt")); + }, + Ok(Async::NotReady) => { + self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config); + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + IdRetrieverState::NegotiatingIdentify(muxer, mut nego) => { + match nego.poll() { + Ok(Async::Ready(IdentifyOutput::RemoteInfo { info, .. })) => { + self.state = IdRetrieverState::Finishing(muxer, info.public_key); + }, + Ok(Async::Ready(IdentifyOutput::Sender { .. })) => { + unreachable!("IdentifyOutput::Sender can never be the output from \ + the dialing side"); + }, + Ok(Async::NotReady) => { + self.state = IdRetrieverState::NegotiatingIdentify(muxer, nego); + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + IdRetrieverState::Finishing(muxer, public_key) => { + // Here is a tricky part: we need to get back the muxer in order to return + // it, but it is in an `Arc`. + let unwrapped = Arc::try_unwrap(muxer).unwrap_or_else(|_| { + panic!("we clone the Arc only to put it into substreams ; once in the \ + Finishing state, no substream or upgrade exists anymore ; \ + therefore there exists only one instance of the Arc ; qed") + }); + + // We leave `Poisoned` as the state when returning. + return Ok(Async::Ready((public_key.into(), unwrapped))); + }, + IdRetrieverState::Poisoned => { + panic!("Future state panicked inside poll() or is finished") + }, + } + } + } +} + +// TODO: write basic working test diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index 26f7a1ccd58..3684177f71f 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -81,10 +81,12 @@ extern crate tokio_timer; extern crate unsigned_varint; extern crate void; +pub use self::id_transport::IdentifyTransport; pub use self::periodic_id_handler::{PeriodicIdentification, PeriodicIdentificationEvent}; pub use self::protocol::{IdentifyInfo, IdentifyOutput}; pub use self::protocol::{IdentifyProtocolConfig, IdentifySender}; +mod id_transport; mod periodic_id_handler; mod protocol; mod structs_proto;