From 44055180457313517efd5d3cee9f8a558029c939 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sun, 4 Nov 2018 09:47:15 +0100 Subject: [PATCH] Add a PeriodicPingHandler and a PingListenHandler (#574) * Add ProtocolsHandler trait * Reexport symbols * Add a note about shutting down * Add a PeriodicPingHandler and a PingListenHandler * Fix core doctest * Add tolerating not supported * Fix concerns --- core/src/lib.rs | 2 +- protocols/ping/Cargo.toml | 3 + protocols/ping/src/dial_handler.rs | 335 ++++++++++++++++++++ protocols/ping/src/lib.rs | 414 +------------------------ protocols/ping/src/listen_handler.rs | 142 +++++++++ protocols/ping/src/protocol.rs | 443 +++++++++++++++++++++++++++ 6 files changed, 935 insertions(+), 404 deletions(-) create mode 100644 protocols/ping/src/dial_handler.rs create mode 100644 protocols/ping/src/listen_handler.rs create mode 100644 protocols/ping/src/protocol.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 4d727bb5ab0..dbe2426a808 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -132,7 +132,7 @@ //! extern crate tokio; //! //! use futures::{Future, Stream}; -//! use libp2p_ping::{Ping, PingOutput}; +//! use libp2p_ping::protocol::{Ping, PingOutput}; //! use libp2p_core::Transport; //! use tokio::runtime::current_thread::Runtime; //! diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 9689d9c5bbb..9487c3bc054 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Parity Technologies "] license = "MIT" [dependencies] +arrayvec = "0.4" bytes = "0.4" libp2p-core = { path = "../../core" } log = "0.4.1" @@ -15,6 +16,8 @@ parking_lot = "0.6" rand = "0.5" tokio-codec = "0.1" tokio-io = "0.1" +tokio-timer = "0.2.6" +void = "1.0" [dev-dependencies] libp2p-tcp-transport = { path = "../../transports/tcp" } diff --git a/protocols/ping/src/dial_handler.rs b/protocols/ping/src/dial_handler.rs new file mode 100644 index 00000000000..c62b921d5dd --- /dev/null +++ b/protocols/ping/src/dial_handler.rs @@ -0,0 +1,335 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use libp2p_core::{ + nodes::{NodeHandlerEndpoint, ProtocolsHandler, ProtocolsHandlerEvent}, + upgrade::toggleable, + ConnectionUpgrade, +}; +use protocol::{Ping, PingDialer, PingOutput}; +use std::{ + io, mem, + time::{Duration, Instant}, +}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; +use void::Void; + +/// Protocol handler that handles pinging the remote at a regular period. +/// +/// If the remote doesn't respond, produces `Unresponsive` and closes the connection. +pub struct PeriodicPingHandler { + /// Configuration for the ping protocol. + ping_config: toggleable::Toggleable>, + + /// State of the outgoing ping. + out_state: OutState, + + /// Duration after which we consider that a ping failed. + ping_timeout: Duration, + + /// After a ping succeeded, wait this long before the next ping. + delay_to_next_ping: Duration, + + /// If true, we switch to the `Disabled` state if the remote doesn't support the ping protocol. + /// If false, we close the connection. + tolerate_unsupported: bool, +} + +/// State of the outgoing ping substream. +enum OutState { + /// We need to open a new substream. + NeedToOpen { + /// Timeout after which we decide that it's not going to work out. + /// + /// Theoretically the handler should be polled immediately after we set the state to + /// `NeedToOpen` and then we immediately transition away from it. However if the local node + /// is for some reason busy, creating the `Delay` here avoids being overly generous with + /// the ping timeout. + expires: Delay, + }, + + /// Upgrading a substream to use ping. + /// + /// We produced a substream open request, and are waiting for it to be upgraded to a full + /// ping-powered substream. + Upgrading { + /// Timeout after which we decide that it's not going to work out. + /// + /// The user of the `ProtocolsHandler` should ensure that there's a timeout when upgrading, + /// but by storing a timeout here as well we ensure that we keep track of how long the + /// ping has lasted. + expires: Delay, + }, + + /// We sent a ping and we are waiting for the pong. + WaitingForPong { + /// Substream where we should receive the pong. + substream: PingDialer, + /// Timeout after which we decide that we're not going to receive the pong. + expires: Delay, + }, + + /// We received a pong and now we have nothing to do except wait a bit before sending the + /// next ping. + Idle { + /// The substream to use to send pings. + substream: PingDialer, + /// When to send the ping next. + next_ping: Delay, + }, + + /// The ping dialer is disabled. Don't do anything. + Disabled, + + /// The dialer has been closed. + Shutdown, + + /// Something bad happened during the previous polling. + Poisoned, +} + +/// Event produced by the periodic pinger. +#[derive(Debug, Copy, Clone)] +pub enum OutEvent { + /// The node has been determined to be unresponsive. + Unresponsive, + + /// Started pinging the remote. This can be used to print a diagnostic message in the logs. + PingStart, + + /// The node has successfully responded to a ping. + PingSuccess(Duration), +} + +impl PeriodicPingHandler { + /// Builds a new `PeriodicPingHandler`. + pub fn new() -> PeriodicPingHandler { + let ping_timeout = Duration::from_secs(30); + + PeriodicPingHandler { + ping_config: toggleable::toggleable(Default::default()), + out_state: OutState::NeedToOpen { + expires: Delay::new(Instant::now() + ping_timeout), + }, + ping_timeout, + delay_to_next_ping: Duration::from_secs(15), + tolerate_unsupported: false, + } + } +} + +impl Default for PeriodicPingHandler { + #[inline] + fn default() -> Self { + PeriodicPingHandler::new() + } +} + +impl ProtocolsHandler for PeriodicPingHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + type InEvent = Void; + type OutEvent = OutEvent; + type Substream = TSubstream; + type Protocol = toggleable::Toggleable>; + type OutboundOpenInfo = (); + + #[inline] + fn listen_protocol(&self) -> Self::Protocol { + let mut config = self.ping_config; + config.disable(); + config + } + + fn inject_fully_negotiated( + &mut self, + protocol: >::Output, + _endpoint: NodeHandlerEndpoint, + ) { + match protocol { + PingOutput::Pinger(mut substream) => { + debug_assert!(_endpoint.is_dialer()); + match mem::replace(&mut self.out_state, OutState::Poisoned) { + OutState::Upgrading { expires } => { + // We always upgrade with the intent of immediately pinging. + substream.ping(Instant::now()); + self.out_state = OutState::WaitingForPong { substream, expires }; + } + _ => (), + } + } + PingOutput::Ponger(_) => { + debug_assert!(false, "Received an unexpected incoming ping substream"); + } + } + } + + fn inject_event(&mut self, _: &Self::InEvent) {} + + fn inject_inbound_closed(&mut self) {} + + #[inline] + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) { + // In case of error while upgrading, there's not much we can do except shut down. + // TODO: we assume that the error is about ping not being supported, which is not + // necessarily the case + if self.tolerate_unsupported { + self.out_state = OutState::Disabled; + } else { + self.out_state = OutState::Shutdown; + } + } + + fn shutdown(&mut self) { + // Put `Shutdown` in `self.out_state` if we don't have any substream open. + // Otherwise, keep the state as it is but call `shutdown()` on the substream. This + // guarantees that the dialer will return `None` at some point. + match self.out_state { + OutState::WaitingForPong { + ref mut substream, .. + } => substream.shutdown(), + OutState::Idle { + ref mut substream, .. + } => substream.shutdown(), + ref mut s => *s = OutState::Shutdown, + } + } + + fn poll( + &mut self, + ) -> Poll< + Option>, + io::Error, + > { + // Shortcut for polling a `tokio_timer::Delay` + macro_rules! poll_delay { + ($delay:expr => { NotReady => $notready:expr, Ready => $ready:expr, }) => ( + match $delay.poll() { + Ok(Async::NotReady) => $notready, + Ok(Async::Ready(())) => $ready, + Err(err) => { + warn!(target: "sub-libp2p", "Ping timer errored: {:?}", err); + return Err(io::Error::new(io::ErrorKind::Other, err)); + } + } + ) + } + + loop { + match mem::replace(&mut self.out_state, OutState::Poisoned) { + OutState::Shutdown | OutState::Poisoned => { + // This shuts down the whole connection with the remote. + return Ok(Async::Ready(None)); + }, + + OutState::Disabled => { + return Ok(Async::NotReady); + } + + // Need to open an outgoing substream. + OutState::NeedToOpen { expires } => { + // Note that we ignore the expiration here, as it's pretty unlikely to happen. + // The expiration is only here to be transmitted to the `Upgrading`. + self.out_state = OutState::Upgrading { expires }; + return Ok(Async::Ready(Some( + ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade: self.ping_config, + info: (), + }, + ))); + } + + // Waiting for the upgrade to be negotiated. + OutState::Upgrading { mut expires } => poll_delay!(expires => { + NotReady => { + self.out_state = OutState::Upgrading { expires }; + return Ok(Async::NotReady); + }, + Ready => { + self.out_state = OutState::Shutdown; + let ev = OutEvent::Unresponsive; + return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))); + }, + }), + + // Waiting for the pong. + OutState::WaitingForPong { + mut substream, + mut expires, + } => { + // We start by dialing the substream, leaving one last chance for it to + // produce the pong even if the expiration happened. + match substream.poll()? { + Async::Ready(Some(started)) => { + self.out_state = OutState::Idle { + substream, + next_ping: Delay::new(Instant::now() + self.delay_to_next_ping), + }; + let ev = OutEvent::PingSuccess(started.elapsed()); + return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))); + } + Async::NotReady => {} + Async::Ready(None) => { + self.out_state = OutState::Shutdown; + return Ok(Async::Ready(None)); + } + }; + + // Check the expiration. + poll_delay!(expires => { + NotReady => { + self.out_state = OutState::WaitingForPong { substream, expires }; + // Both `substream` and `expires` and not ready, so it's fine to return + // not ready. + return Ok(Async::NotReady); + }, + Ready => { + self.out_state = OutState::Shutdown; + let ev = OutEvent::Unresponsive; + return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(ev)))); + }, + }) + } + + OutState::Idle { + mut substream, + mut next_ping, + } => { + // Poll the future that fires when we need to ping the node again. + poll_delay!(next_ping => { + NotReady => { + self.out_state = OutState::Idle { substream, next_ping }; + return Ok(Async::NotReady); + }, + Ready => { + let expires = Delay::new(Instant::now() + self.ping_timeout); + substream.ping(Instant::now()); + self.out_state = OutState::WaitingForPong { substream, expires }; + return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(OutEvent::PingStart)))); + }, + }) + } + } + } + } +} diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index e8c59a00f1a..dacab9cc926 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2017-2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -56,7 +56,7 @@ //! extern crate tokio; //! //! use futures::{Future, Stream}; -//! use libp2p_ping::{Ping, PingOutput}; +//! use libp2p_ping::protocol::{Ping, PingOutput}; //! use libp2p_core::Transport; //! use tokio::runtime::current_thread::Runtime; //! @@ -82,7 +82,9 @@ //! ``` //! +extern crate arrayvec; extern crate bytes; +#[macro_use] extern crate futures; extern crate libp2p_core; #[macro_use] @@ -92,407 +94,13 @@ extern crate parking_lot; extern crate rand; extern crate tokio_codec; extern crate tokio_io; +extern crate tokio_timer; +extern crate void; -use bytes::{BufMut, Bytes, BytesMut}; -use futures::{prelude::*, future::{FutureResult, IntoFuture}, task}; -use libp2p_core::{ConnectionUpgrade, Endpoint}; -use rand::{distributions::Standard, prelude::*, rngs::EntropyRng}; -use std::collections::VecDeque; -use std::io::Error as IoError; -use std::{iter, marker::PhantomData, mem}; -use tokio_codec::{Decoder, Encoder, Framed}; -use tokio_io::{AsyncRead, AsyncWrite}; - -/// Represents a prototype for an upgrade to handle the ping protocol. -/// -/// According to the design of libp2p, this struct would normally contain the configuration options -/// for the protocol, but in the case of `Ping` no configuration is required. -#[derive(Debug, Copy, Clone)] -pub struct Ping(PhantomData); - -impl Default for Ping { - #[inline] - fn default() -> Self { - Ping(PhantomData) - } -} - -/// Output of a `Ping` upgrade. -pub enum PingOutput { - /// We are on the dialing side. - Pinger(PingDialer), - /// We are on the listening side. - Ponger(PingListener), -} - -impl ConnectionUpgrade for Ping -where - TSocket: AsyncRead + AsyncWrite, -{ - type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; - type UpgradeIdentifier = (); - - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once(("/ipfs/ping/1.0.0".into(), ())) - } - - type Output = PingOutput; - type Future = FutureResult; - - #[inline] - fn upgrade( - self, - socket: TSocket, - _: Self::UpgradeIdentifier, - endpoint: Endpoint, - ) -> Self::Future { - let out = match endpoint { - Endpoint::Dialer => upgrade_as_dialer(socket), - Endpoint::Listener => upgrade_as_listener(socket), - }; - - Ok(out).into_future() - } -} - -/// Upgrades a connection from the dialer side. -fn upgrade_as_dialer(socket: TSocket) -> PingOutput -where TSocket: AsyncRead + AsyncWrite, -{ - let dialer = PingDialer { - inner: Framed::new(socket, Codec), - need_writer_flush: false, - needs_close: false, - sent_pings: VecDeque::with_capacity(4), - rng: EntropyRng::default(), - pings_to_send: VecDeque::with_capacity(4), - }; - - PingOutput::Pinger(dialer) -} - -/// Upgrades a connection from the listener side. -fn upgrade_as_listener(socket: TSocket) -> PingOutput -where TSocket: AsyncRead + AsyncWrite, -{ - let listener = PingListener { - inner: Framed::new(socket, Codec), - state: PingListenerState::Listening, - }; - - PingOutput::Ponger(listener) -} - -/// Sends pings and receives the pongs. -/// -/// Implements `Stream`. The stream indicates when we receive a pong. -pub struct PingDialer { - /// The underlying socket. - inner: Framed, - /// If true, need to flush the sink. - need_writer_flush: bool, - /// If true, need to close the sink. - needs_close: bool, - /// List of pings that have been sent to the remote and that are waiting for an answer. - sent_pings: VecDeque<(Bytes, TUserData)>, - /// Random number generator for the ping payload. - rng: EntropyRng, - /// List of pings to send to the remote. - pings_to_send: VecDeque<(Bytes, TUserData)>, -} - -impl PingDialer { - /// Sends a ping to the remote. - /// - /// The stream will produce an event containing the user data when we receive the pong. - pub fn ping(&mut self, user_data: TUserData) { - let payload: [u8; 32] = self.rng.sample(Standard); - debug!("Preparing for ping with payload {:?}", payload); - self.pings_to_send.push_back((Bytes::from(payload.to_vec()), user_data)); - } -} - -impl Stream for PingDialer -where TSocket: AsyncRead + AsyncWrite, -{ - type Item = TUserData; - type Error = IoError; - - fn poll(&mut self) -> Poll, Self::Error> { - if self.needs_close { - match self.inner.close() { - Ok(Async::Ready(())) => return Ok(Async::Ready(None)), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => return Err(err), - } - } - - while let Some((ping, user_data)) = self.pings_to_send.pop_front() { - match self.inner.start_send(ping.clone()) { - Ok(AsyncSink::Ready) => self.need_writer_flush = true, - Ok(AsyncSink::NotReady(_)) => { - self.pings_to_send.push_front((ping, user_data)); - break; - }, - Err(err) => return Err(err), - } - - self.sent_pings.push_back((ping, user_data)); - } - - if self.need_writer_flush { - match self.inner.poll_complete() { - Ok(Async::Ready(())) => self.need_writer_flush = false, - Ok(Async::NotReady) => (), - Err(err) => return Err(err), - } - } - - loop { - match self.inner.poll() { - Ok(Async::Ready(Some(pong))) => { - if let Some(pos) = self.sent_pings.iter().position(|&(ref p, _)| p == &pong) { - let (_, user_data) = self.sent_pings.remove(pos) - .expect("Grabbed a valid position just above"); - return Ok(Async::Ready(Some(user_data))); - } else { - debug!("Received pong that doesn't match what we sent: {:?}", pong); - } - }, - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) => { - // Notify the current task so that we poll again. - self.needs_close = true; - task::current().notify(); - return Ok(Async::NotReady); - } - Err(err) => return Err(err), - } - } - - Ok(Async::NotReady) - } -} - -/// Listens to incoming pings and answers them. -/// -/// Implements `Future`. The future terminates when the underlying socket closes. -pub struct PingListener { - /// The underlying socket. - inner: Framed, - /// State of the listener. - state: PingListenerState, -} - -#[derive(Debug)] -enum PingListenerState { - /// We are waiting for the next ping on the socket. - Listening, - /// We are trying to send a pong. - Sending(Bytes), - /// We are flusing the underlying sink. - Flushing, - /// We are shutting down everything. - Closing, - /// A panic happened during the processing. - Poisoned, -} - -impl Future for PingListener -where TSocket: AsyncRead + AsyncWrite -{ - type Item = (); - type Error = IoError; - - fn poll(&mut self) -> Poll { - loop { - match mem::replace(&mut self.state, PingListenerState::Poisoned) { - PingListenerState::Listening => { - match self.inner.poll() { - Ok(Async::Ready(Some(payload))) => { - debug!("Received ping (payload={:?}); sending back", payload); - self.state = PingListenerState::Sending(payload.freeze()) - }, - Ok(Async::Ready(None)) => self.state = PingListenerState::Closing, - Ok(Async::NotReady) => { - self.state = PingListenerState::Listening; - return Ok(Async::NotReady); - }, - Err(err) => return Err(err), - } - }, - PingListenerState::Sending(data) => { - match self.inner.start_send(data) { - Ok(AsyncSink::Ready) => self.state = PingListenerState::Flushing, - Ok(AsyncSink::NotReady(data)) => { - self.state = PingListenerState::Sending(data); - return Ok(Async::NotReady); - }, - Err(err) => return Err(err), - } - }, - PingListenerState::Flushing => { - match self.inner.poll_complete() { - Ok(Async::Ready(())) => self.state = PingListenerState::Listening, - Ok(Async::NotReady) => { - self.state = PingListenerState::Flushing; - return Ok(Async::NotReady); - }, - Err(err) => return Err(err), - } - }, - PingListenerState::Closing => { - match self.inner.close() { - Ok(Async::Ready(())) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => { - self.state = PingListenerState::Closing; - return Ok(Async::NotReady); - }, - Err(err) => return Err(err), - } - }, - PingListenerState::Poisoned => panic!("Poisoned or errored PingListener"), - } - } - } -} - -// Implementation of the `Codec` trait of tokio-io. Splits frames into groups of 32 bytes. -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] -struct Codec; - -impl Decoder for Codec { - type Item = BytesMut; - type Error = IoError; - - #[inline] - fn decode(&mut self, buf: &mut BytesMut) -> Result, IoError> { - if buf.len() >= 32 { - Ok(Some(buf.split_to(32))) - } else { - Ok(None) - } - } -} - -impl Encoder for Codec { - type Item = Bytes; - type Error = IoError; - - #[inline] - fn encode(&mut self, mut data: Bytes, buf: &mut BytesMut) -> Result<(), IoError> { - if !data.is_empty() { - let split = 32 * (1 + ((data.len() - 1) / 32)); - buf.reserve(split); - buf.put(data.split_to(split)); - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - extern crate tokio; - extern crate tokio_tcp; - - use self::tokio::runtime::current_thread::Runtime; - use self::tokio_tcp::TcpListener; - use self::tokio_tcp::TcpStream; - use super::{Ping, PingOutput}; - use futures::{Future, Stream}; - use libp2p_core::{ConnectionUpgrade, Endpoint}; - - // TODO: rewrite tests with the MemoryTransport - - #[test] - fn ping_pong() { - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = listener - .incoming() - .into_future() - .map_err(|(e, _)| e.into()) - .and_then(|(c, _)| { - Ping::<()>::default().upgrade( - c.unwrap(), - (), - Endpoint::Listener, - ) - }) - .and_then(|out| match out { - PingOutput::Ponger(service) => service, - _ => unreachable!(), - }); - - let client = TcpStream::connect(&listener_addr) - .map_err(|e| e.into()) - .and_then(|c| { - Ping::<()>::default().upgrade( - c, - (), - Endpoint::Dialer, - ) - }) - .and_then(|out| match out { - PingOutput::Pinger(mut pinger) => { - pinger.ping(()); - pinger.into_future().map(|_| ()).map_err(|_| panic!()) - }, - _ => unreachable!(), - }) - .map(|_| ()); - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(server.select(client).map_err(|_| panic!())).unwrap(); - } - - #[test] - fn multipings() { - // Check that we can send multiple pings in a row and it will still work. - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = listener - .incoming() - .into_future() - .map_err(|(e, _)| e.into()) - .and_then(|(c, _)| { - Ping::::default().upgrade( - c.unwrap(), - (), - Endpoint::Listener, - ) - }) - .and_then(|out| match out { - PingOutput::Ponger(service) => service, - _ => unreachable!(), - }); +pub use self::dial_handler::PeriodicPingHandler; +pub use self::listen_handler::PingListenHandler; - let client = TcpStream::connect(&listener_addr) - .map_err(|e| e.into()) - .and_then(|c| { - Ping::::default().upgrade( - c, - (), - Endpoint::Dialer, - ) - }) - .and_then(|out| match out { - PingOutput::Pinger(mut pinger) => { - for n in 0..20 { - pinger.ping(n); - } +pub mod protocol; - pinger - .take(20) - .collect() - .map(|val| { assert_eq!(val, (0..20).collect::>()); }) - .map_err(|_| panic!()) - }, - _ => unreachable!(), - }); - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(server.select(client)).unwrap_or_else(|_| panic!()); - } -} +mod dial_handler; +mod listen_handler; diff --git a/protocols/ping/src/listen_handler.rs b/protocols/ping/src/listen_handler.rs new file mode 100644 index 00000000000..5684b4ab0f3 --- /dev/null +++ b/protocols/ping/src/listen_handler.rs @@ -0,0 +1,142 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use arrayvec::ArrayVec; +use futures::prelude::*; +use libp2p_core::{ + nodes::{NodeHandlerEndpoint, ProtocolsHandler, ProtocolsHandlerEvent}, + ConnectionUpgrade, +}; +use protocol::{Ping, PingListener, PingOutput}; +use std::io; +use tokio_io::{AsyncRead, AsyncWrite}; +use void::Void; + +/// Handler for handling pings received from a remote. +pub struct PingListenHandler { + /// Configuration for the ping protocol. + ping_config: Ping<()>, + + /// The ping substreams that were opened by the remote. + /// Note that we only accept a certain number of substreams, after which we refuse new ones + /// to avoid being DDoSed. + ping_in_substreams: ArrayVec<[PingListener; 8]>, + + /// If true, we're in the shutdown process and we shouldn't accept new substreams. + shutdown: bool, +} + +impl PingListenHandler { + /// Builds a new `PingListenHandler`. + pub fn new() -> PingListenHandler { + PingListenHandler { + ping_config: Default::default(), + shutdown: false, + ping_in_substreams: ArrayVec::new(), + } + } +} + +impl Default for PingListenHandler { + #[inline] + fn default() -> Self { + PingListenHandler::new() + } +} + +impl ProtocolsHandler for PingListenHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + type InEvent = Void; + type OutEvent = Void; + type Substream = TSubstream; + type Protocol = Ping<()>; + type OutboundOpenInfo = (); + + #[inline] + fn listen_protocol(&self) -> Self::Protocol { + self.ping_config + } + + fn inject_fully_negotiated( + &mut self, + protocol: >::Output, + _endpoint: NodeHandlerEndpoint, + ) { + if self.shutdown { + return; + } + + match protocol { + PingOutput::Pinger(_) => { + debug_assert!(false, "Received an unexpected outgoing ping substream"); + } + PingOutput::Ponger(listener) => { + debug_assert!(_endpoint.is_listener()); + // Try insert the element, but don't care if the list is full. + let _ = self.ping_in_substreams.try_push(listener); + } + } + } + + #[inline] + fn inject_event(&mut self, _: &Self::InEvent) {} + + #[inline] + fn inject_inbound_closed(&mut self) {} + + #[inline] + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {} + + #[inline] + fn shutdown(&mut self) { + for ping in self.ping_in_substreams.iter_mut() { + ping.shutdown(); + } + + self.shutdown = true; + } + + fn poll( + &mut self, + ) -> Poll< + Option>, + io::Error, + > { + // Removes each substream one by one, and pushes them back if they're not ready (which + // should be the case 99% of the time). + for n in (0..self.ping_in_substreams.len()).rev() { + let mut ping = self.ping_in_substreams.swap_remove(n); + match ping.poll() { + Ok(Async::Ready(())) => {} + Ok(Async::NotReady) => self.ping_in_substreams.push(ping), + Err(err) => warn!(target: "sub-libp2p", "Remote ping substream errored: {:?}", err), + } + } + + // Special case if shutting down. + if self.shutdown && self.ping_in_substreams.is_empty() { + return Ok(Async::Ready(None)); + } + + Ok(Async::NotReady) + } +} diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs new file mode 100644 index 00000000000..a748871c7fe --- /dev/null +++ b/protocols/ping/src/protocol.rs @@ -0,0 +1,443 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::{prelude::*, future::FutureResult, future::IntoFuture}; +use libp2p_core::{ConnectionUpgrade, Endpoint}; +use rand::{distributions::Standard, prelude::*, rngs::EntropyRng}; +use std::collections::VecDeque; +use std::io::Error as IoError; +use std::{iter, marker::PhantomData, mem}; +use tokio_codec::{Decoder, Encoder, Framed}; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Represents a prototype for an upgrade to handle the ping protocol. +/// +/// According to the design of libp2p, this struct would normally contain the configuration options +/// for the protocol, but in the case of `Ping` no configuration is required. +#[derive(Debug, Copy, Clone)] +pub struct Ping(PhantomData); + +impl Default for Ping { + #[inline] + fn default() -> Self { + Ping(PhantomData) + } +} + +/// Output of a `Ping` upgrade. +pub enum PingOutput { + /// We are on the dialing side. + Pinger(PingDialer), + /// We are on the listening side. + Ponger(PingListener), +} + +impl ConnectionUpgrade for Ping +where + TSocket: AsyncRead + AsyncWrite, +{ + type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once(("/ipfs/ping/1.0.0".into(), ())) + } + + type Output = PingOutput; + type Future = FutureResult; + + #[inline] + fn upgrade( + self, + socket: TSocket, + _: Self::UpgradeIdentifier, + endpoint: Endpoint, + ) -> Self::Future { + let out = match endpoint { + Endpoint::Dialer => upgrade_as_dialer(socket), + Endpoint::Listener => upgrade_as_listener(socket), + }; + + Ok(out).into_future() + } +} + +/// Upgrades a connection from the dialer side. +fn upgrade_as_dialer(socket: TSocket) -> PingOutput +where TSocket: AsyncRead + AsyncWrite, +{ + let dialer = PingDialer { + inner: Framed::new(socket, Codec), + need_writer_flush: false, + needs_close: false, + sent_pings: VecDeque::with_capacity(4), + rng: EntropyRng::default(), + pings_to_send: VecDeque::with_capacity(4), + }; + + PingOutput::Pinger(dialer) +} + +/// Upgrades a connection from the listener side. +fn upgrade_as_listener(socket: TSocket) -> PingOutput +where TSocket: AsyncRead + AsyncWrite, +{ + let listener = PingListener { + inner: Framed::new(socket, Codec), + state: PingListenerState::Listening, + }; + + PingOutput::Ponger(listener) +} + +/// Sends pings and receives the pongs. +/// +/// Implements `Stream`. The stream indicates when we receive a pong. +pub struct PingDialer { + /// The underlying socket. + inner: Framed, + /// If true, need to flush the sink. + need_writer_flush: bool, + /// If true, need to close the sink. + needs_close: bool, + /// List of pings that have been sent to the remote and that are waiting for an answer. + sent_pings: VecDeque<(Bytes, TUserData)>, + /// Random number generator for the ping payload. + rng: EntropyRng, + /// List of pings to send to the remote. + pings_to_send: VecDeque<(Bytes, TUserData)>, +} + +impl PingDialer { + /// Sends a ping to the remote. + /// + /// The stream will produce an event containing the user data when we receive the pong. + pub fn ping(&mut self, user_data: TUserData) { + let payload: [u8; 32] = self.rng.sample(Standard); + debug!("Preparing for ping with payload {:?}", payload); + self.pings_to_send.push_back((Bytes::from(payload.to_vec()), user_data)); + } +} + +impl PingDialer +where TSocket: AsyncRead + AsyncWrite, +{ + /// Call this when the ping dialer needs to shut down. After this, the `Stream` is guaranteed + /// to finish soon-ish. + #[inline] + pub fn shutdown(&mut self) { + self.needs_close = true; + } +} + +impl Stream for PingDialer +where TSocket: AsyncRead + AsyncWrite, +{ + type Item = TUserData; + type Error = IoError; + + fn poll(&mut self) -> Poll, Self::Error> { + if self.needs_close { + try_ready!(self.inner.close()); + return Ok(Async::Ready(None)); + } + + while let Some((ping, user_data)) = self.pings_to_send.pop_front() { + match self.inner.start_send(ping.clone()) { + Ok(AsyncSink::Ready) => self.need_writer_flush = true, + Ok(AsyncSink::NotReady(_)) => { + self.pings_to_send.push_front((ping, user_data)); + break; + }, + Err(err) => return Err(err), + } + + self.sent_pings.push_back((ping, user_data)); + } + + if self.need_writer_flush { + match self.inner.poll_complete() { + Ok(Async::Ready(())) => self.need_writer_flush = false, + Ok(Async::NotReady) => (), + Err(err) => return Err(err), + } + } + + loop { + match self.inner.poll() { + Ok(Async::Ready(Some(pong))) => { + if let Some(pos) = self.sent_pings.iter().position(|&(ref p, _)| p == &pong) { + let (_, user_data) = self.sent_pings.remove(pos) + .expect("Grabbed a valid position just above"); + return Ok(Async::Ready(Some(user_data))); + } else { + debug!("Received pong that doesn't match what we sent: {:?}", pong); + } + }, + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) => { + // Notify the current task so that we poll again. + self.needs_close = true; + try_ready!(self.inner.close()); + return Ok(Async::Ready(None)); + } + Err(err) => return Err(err), + } + } + + Ok(Async::NotReady) + } +} + +/// Listens to incoming pings and answers them. +/// +/// Implements `Future`. The future terminates when the underlying socket closes. +pub struct PingListener { + /// The underlying socket. + inner: Framed, + /// State of the listener. + state: PingListenerState, +} + +#[derive(Debug)] +enum PingListenerState { + /// We are waiting for the next ping on the socket. + Listening, + /// We are trying to send a pong. + Sending(Bytes), + /// We are flusing the underlying sink. + Flushing, + /// We are shutting down everything. + Closing, + /// A panic happened during the processing. + Poisoned, +} + +impl PingListener +where TSocket: AsyncRead + AsyncWrite +{ + /// Call this when the ping listener needs to shut down. After this, the `Future` is guaranteed + /// to finish soon-ish. + #[inline] + pub fn shutdown(&mut self) { + self.state = PingListenerState::Closing; + } +} + +impl Future for PingListener +where TSocket: AsyncRead + AsyncWrite +{ + type Item = (); + type Error = IoError; + + fn poll(&mut self) -> Poll { + loop { + match mem::replace(&mut self.state, PingListenerState::Poisoned) { + PingListenerState::Listening => { + match self.inner.poll() { + Ok(Async::Ready(Some(payload))) => { + debug!("Received ping (payload={:?}) ; sending back", payload); + self.state = PingListenerState::Sending(payload.freeze()) + }, + Ok(Async::Ready(None)) => self.state = PingListenerState::Closing, + Ok(Async::NotReady) => { + self.state = PingListenerState::Listening; + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + PingListenerState::Sending(data) => { + match self.inner.start_send(data) { + Ok(AsyncSink::Ready) => self.state = PingListenerState::Flushing, + Ok(AsyncSink::NotReady(data)) => { + self.state = PingListenerState::Sending(data); + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + PingListenerState::Flushing => { + match self.inner.poll_complete() { + Ok(Async::Ready(())) => self.state = PingListenerState::Listening, + Ok(Async::NotReady) => { + self.state = PingListenerState::Flushing; + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + PingListenerState::Closing => { + match self.inner.close() { + Ok(Async::Ready(())) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => { + self.state = PingListenerState::Closing; + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + PingListenerState::Poisoned => panic!("Poisoned or errored PingListener"), + } + } + } +} + +// Implementation of the `Codec` trait of tokio-io. Splits frames into groups of 32 bytes. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +struct Codec; + +impl Decoder for Codec { + type Item = BytesMut; + type Error = IoError; + + #[inline] + fn decode(&mut self, buf: &mut BytesMut) -> Result, IoError> { + if buf.len() >= 32 { + Ok(Some(buf.split_to(32))) + } else { + Ok(None) + } + } +} + +impl Encoder for Codec { + type Item = Bytes; + type Error = IoError; + + #[inline] + fn encode(&mut self, mut data: Bytes, buf: &mut BytesMut) -> Result<(), IoError> { + if !data.is_empty() { + let split = 32 * (1 + ((data.len() - 1) / 32)); + buf.reserve(split); + buf.put(data.split_to(split)); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + extern crate tokio; + extern crate tokio_tcp; + + use self::tokio_tcp::TcpListener; + use self::tokio_tcp::TcpStream; + use super::{Ping, PingOutput}; + use futures::{Future, Stream}; + use libp2p_core::{ConnectionUpgrade, Endpoint}; + + // TODO: rewrite tests with the MemoryTransport + + #[test] + fn ping_pong() { + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = listener + .incoming() + .into_future() + .map_err(|(e, _)| e.into()) + .and_then(|(c, _)| { + Ping::<()>::default().upgrade( + c.unwrap(), + (), + Endpoint::Listener, + ) + }) + .and_then(|out| match out { + PingOutput::Ponger(service) => service, + _ => unreachable!(), + }); + + let client = TcpStream::connect(&listener_addr) + .map_err(|e| e.into()) + .and_then(|c| { + Ping::<()>::default().upgrade( + c, + (), + Endpoint::Dialer, + ) + }) + .and_then(|out| match out { + PingOutput::Pinger(mut pinger) => { + pinger.ping(()); + pinger.into_future().map(|_| ()).map_err(|_| panic!()) + }, + _ => unreachable!(), + }) + .map(|_| ()); + + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(server.select(client).map_err(|_| panic!())).unwrap(); + } + + #[test] + fn multipings() { + // Check that we can send multiple pings in a row and it will still work. + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = listener + .incoming() + .into_future() + .map_err(|(e, _)| e.into()) + .and_then(|(c, _)| { + Ping::::default().upgrade( + c.unwrap(), + (), + Endpoint::Listener, + ) + }) + .and_then(|out| match out { + PingOutput::Ponger(service) => service, + _ => unreachable!(), + }); + + let client = TcpStream::connect(&listener_addr) + .map_err(|e| e.into()) + .and_then(|c| { + Ping::::default().upgrade( + c, + (), + Endpoint::Dialer, + ) + }) + .and_then(|out| match out { + PingOutput::Pinger(mut pinger) => { + for n in 0..20 { + pinger.ping(n); + } + + pinger + .take(20) + .collect() + .map(|val| { assert_eq!(val, (0..20).collect::>()); }) + .map_err(|_| panic!()) + }, + _ => unreachable!(), + }); + + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(server.select(client)).unwrap_or_else(|_| panic!()); + } +}