From bd69c0112c3f224e5ef3c9a29cc740d5b5d5a080 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 18 Oct 2018 10:46:36 +0200 Subject: [PATCH 1/4] Add ProtocolsHandler trait --- core/Cargo.toml | 2 +- core/src/lib.rs | 3 +- core/src/nodes/handled_node.rs | 11 + core/src/nodes/mod.rs | 1 + core/src/nodes/protocols_handler.rs | 605 ++++++++++++++++++++++++++++ 5 files changed, 619 insertions(+), 3 deletions(-) create mode 100644 core/src/nodes/protocols_handler.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index 048c464e3c7..f979883458f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -20,6 +20,7 @@ rw-stream-sink = { path = "../misc/rw-stream-sink" } smallvec = "0.6" tokio-executor = "0.1.4" tokio-io = "0.1" +tokio-timer = "0.2" void = "1" [dev-dependencies] @@ -30,6 +31,5 @@ rand = "0.5" tokio = "0.1" tokio-codec = "0.1" tokio-current-thread = "0.1" -tokio-timer = "0.2" assert_matches = "1.3" tokio-mock-task = "0.1" diff --git a/core/src/lib.rs b/core/src/lib.rs index 798405bba50..0e813b6a220 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -182,6 +182,7 @@ extern crate rw_stream_sink; extern crate smallvec; extern crate tokio_executor; extern crate tokio_io; +extern crate tokio_timer; extern crate void; #[cfg(test)] @@ -193,8 +194,6 @@ extern crate tokio_codec; #[cfg(test)] extern crate tokio_current_thread; #[cfg(test)] -extern crate tokio_timer; -#[cfg(test)] #[macro_use] extern crate assert_matches; #[cfg(test)] diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index cc638ab6f26..9767ec25edc 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -41,6 +41,12 @@ pub trait NodeHandler { /// Sends a new substream to the handler. /// /// The handler is responsible for upgrading the substream to whatever protocol it wants. + /// + /// # Panic + /// + /// Implementations are allowed to panic in the case of dialing if the `user_data` in + /// `endpoint` doesn't correspond to what was returned earlier when polling, or is used + /// multiple times. fn inject_substream(&mut self, substream: Self::Substream, endpoint: NodeHandlerEndpoint); /// Indicates to the handler that the inbound part of the muxer has been closed, and that @@ -49,6 +55,11 @@ pub trait NodeHandler { /// Indicates to the handler that an outbound substream failed to open because the outbound /// part of the muxer has been closed. + /// + /// # Panic + /// + /// Implementations are allowed to panic if `user_data` doesn't correspond to what was returned + /// earlier when polling, or is used multiple times. fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo); /// Injects an event coming from the outside into the handler. diff --git a/core/src/nodes/mod.rs b/core/src/nodes/mod.rs index 1d78ea7b3a3..97ae0711d6b 100644 --- a/core/src/nodes/mod.rs +++ b/core/src/nodes/mod.rs @@ -23,6 +23,7 @@ pub mod handled_node; pub mod handled_node_tasks; pub mod listeners; pub mod node; +pub mod protocols_handler; pub mod raw_swarm; pub use self::node::Substream; diff --git a/core/src/nodes/protocols_handler.rs b/core/src/nodes/protocols_handler.rs new file mode 100644 index 00000000000..351531599f4 --- /dev/null +++ b/core/src/nodes/protocols_handler.rs @@ -0,0 +1,605 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}; +use std::{io, marker::PhantomData, time::Duration}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Timeout; +use upgrade::{self, apply::UpgradeApplyFuture, DeniedConnectionUpgrade}; +use void::Void; +use {ConnectionUpgrade, Endpoint}; + +/// Handler for a set of protocols for a specific connection with a remote. +/// +/// This trait should be implemented on struct that hold the state for a specific protocol +/// behaviour with a specific remote. +/// +/// # Handling a protocol +/// +/// Protocols with the remote can be opened in two different ways: +/// +/// - Dialing, which is a voluntary process. In order to do so, make `poll()` return a +/// `OutboundSubstreamRequest` variant containing the connection upgrade to use. +/// - Listening, which is used to determine which protocols are supported when the remote wants +/// to open a substream. The `listen_protocol()` method should return the upgrades supported when +/// listening. +/// +/// The upgrade when dialing and the upgrade when listening have to be of the same type, but you +/// are free to return for example an `OrUpgrade` enum, or an enum of yours, containing the upgrade +/// you want depending on the situation. +/// +/// # Shutting down +/// +/// Implementors of this trait should keep in mind that the connection can be closed at any time. +/// When a connection is closed (either by us or by the remote) `shutdown()` is called and the +/// handler continues to be processed until it produces `None`. Then only the handler is destroyed. +/// +/// This makes it possible for the handler to finish delivering events even after knowing that it +/// is shutting down. +/// +/// Implementors of this trait should keep in mind that when `shutdown()` is called, the connection +/// might already be closed or unresponsive. They should therefore not rely on being able to +/// deliver messages. +/// +/// # Relationship with `NodeHandler`. +/// +/// This trait is very similar to the `NodeHandler` trait. The fundamental differences are: +/// +/// - The `NodeHandler` trait gives you more control and is therefore more difficult to implement. +/// - The `NodeHandler` trait is designed to have exclusive ownership of the connection with a +/// node, while the `ProtocolsHandler` trait is designed to handle only a specific set of +/// protocols. Two or more implementations of `ProtocolsHandler` can be combined into one that +/// supports all the protocols together, which is not possible with `NodeHandler`. +/// +// TODO: add a "blocks connection closing" system, so that we can gracefully close a connection +// when it's no longer needed, and so that for example the periodic pinging system does not +// keep the connection alive forever +pub trait ProtocolsHandler { + /// Custom event that can be received from the outside. + type InEvent; + /// Custom event that can be produced by the handler and that will be returned to the outside. + type OutEvent; + /// The type of the substream that contains the raw data. + type Substream: AsyncRead + AsyncWrite; + /// The upgrade for the protocol or protocols handled by this handler. + type Protocol: ConnectionUpgrade; + /// Information about a substream. Can be sent to the handler through a `NodeHandlerEndpoint`, + /// and will be passed back in `inject_substream` or `inject_outbound_closed`. + type OutboundOpenInfo; + + /// Produces a `ConnectionUpgrade` for the protocol or protocols to accept when listening. + /// + /// > **Note**: You should always accept all the protocols you support, even if in a specific + /// > context you wouldn't accept one in particular (eg. only allow one substream at + /// > a time for a given protocol). The reason is that remotes are allowed to put the + /// > list of supported protocols in a cache in order to avoid spurious queries. + fn listen_protocol(&self) -> Self::Protocol; + + /// Injects a fully-negotiated substream in the handler. + /// + /// This method is called when a substream has been successfully opened and negotiated. + fn inject_fully_negotiated( + &mut self, + protocol: >::Output, + endpoint: NodeHandlerEndpoint, + ); + + /// Injects an event coming from the outside in the handler. + fn inject_event(&mut self, event: &Self::InEvent); + + /// Indicates to the handler that upgrading a substream to the given protocol has failed. + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error); + + /// Indicates the handler that the inbound part of the muxer has been closed, and that + /// therefore no more inbound substream will be produced. + fn inject_inbound_closed(&mut self); + + /// Indicates the node that it should shut down. After that, it is expected that `poll()` + /// returns `Ready(None)` as soon as possible. + /// + /// This method allows an implementation to perform a graceful shutdown of the substreams, and + /// send back various events. + fn shutdown(&mut self); + + /// Should behave like `Stream::poll()`. Should close if no more event can be produced and the + /// node should be closed. + // TODO: should not return a `NodeHandlerEvent` but a different enum + fn poll( + &mut self, + ) -> Poll< + Option>, + io::Error, + >; + + /// Adds a closure that turns the input event into something else. + #[inline] + fn map_in_event(self, map: TMap) -> MapInEvent + where + Self: Sized, + TMap: Fn(&TNewIn) -> Option<&Self::InEvent>, + { + MapInEvent { + inner: self, + map, + marker: PhantomData, + } + } + + /// Adds a closure that turns the output event into something else. + #[inline] + fn map_out_event(self, map: TMap) -> MapOutEvent + where + Self: Sized, + TMap: FnMut(Self::OutEvent) -> TNewOut, + { + MapOutEvent { inner: self, map } + } + + /// Builds an implementation of `NodeHandler` that handles this protocol exclusively. + #[inline] + fn into_node_handler(self) -> NodeHandlerWrapper + where + Self: Sized, + { + NodeHandlerWrapper { + handler: self, + negotiating_in: Vec::new(), + negotiating_out: Vec::new(), + in_timeout: Duration::from_secs(10), + out_timeout: Duration::from_secs(10), + queued_dial_upgrades: Vec::new(), + unique_dial_upgrade_id: 0, + } + } +} + +/// Event produced by a handler. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ProtocolsHandlerEvent { + /// Require a new outbound substream to be opened with the remote. + OutboundSubstreamRequest { + /// The upgrade to apply on the substream. + upgrade: TConnectionUpgrade, + /// User-defind information, passed back when the substream is open. + info: TOutboundOpenInfo, + }, + + /// Other event. + Custom(TCustom), +} + +/// Event produced by a handler. +impl + ProtocolsHandlerEvent +{ + /// If this is `OutboundSubstreamRequest`, maps the content to something else. + #[inline] + pub fn map_outbound_open_info( + self, + map: F, + ) -> ProtocolsHandlerEvent + where + F: FnOnce(TOutboundOpenInfo) -> I, + { + match self { + ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade, + info: map(info), + } + } + ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), + } + } + + /// If this is `Custom`, maps the content to something else. + #[inline] + pub fn map_custom( + self, + map: F, + ) -> ProtocolsHandlerEvent + where + F: FnOnce(TCustom) -> I, + { + match self { + ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } + } + ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(map(val)), + } + } +} + +/// Implementation of `ProtocolsHandler` that doesn't handle anything. +pub struct DummyProtocolsHandler { + shutting_down: bool, + marker: PhantomData, +} + +impl Default for DummyProtocolsHandler { + #[inline] + fn default() -> Self { + DummyProtocolsHandler { + shutting_down: false, + marker: PhantomData, + } + } +} + +impl ProtocolsHandler for DummyProtocolsHandler +where + TSubstream: AsyncRead + AsyncWrite, +{ + type InEvent = Void; + type OutEvent = Void; + type Substream = TSubstream; + type Protocol = DeniedConnectionUpgrade; + type OutboundOpenInfo = Void; + + #[inline] + fn listen_protocol(&self) -> Self::Protocol { + DeniedConnectionUpgrade + } + + #[inline] + fn inject_fully_negotiated( + &mut self, + _: >::Output, + _: NodeHandlerEndpoint, + ) { + } + + #[inline] + fn inject_event(&mut self, _: &Self::InEvent) {} + + #[inline] + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {} + + #[inline] + fn inject_inbound_closed(&mut self) {} + + #[inline] + fn shutdown(&mut self) { + self.shutting_down = true; + } + + #[inline] + fn poll( + &mut self, + ) -> Poll< + Option>, + io::Error, + > { + if self.shutting_down { + Ok(Async::Ready(None)) + } else { + Ok(Async::NotReady) + } + } +} + +/// Wrapper around a protocol handler that turns the input event into something else. +pub struct MapInEvent { + inner: TProtoHandler, + map: TMap, + marker: PhantomData, +} + +impl ProtocolsHandler for MapInEvent +where + TProtoHandler: ProtocolsHandler, + TMap: Fn(&TNewIn) -> Option<&TProtoHandler::InEvent>, +{ + type InEvent = TNewIn; + type OutEvent = TProtoHandler::OutEvent; + type Substream = TProtoHandler::Substream; + type Protocol = TProtoHandler::Protocol; + type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; + + #[inline] + fn listen_protocol(&self) -> Self::Protocol { + self.inner.listen_protocol() + } + + #[inline] + fn inject_fully_negotiated( + &mut self, + protocol: >::Output, + endpoint: NodeHandlerEndpoint, + ) { + self.inner.inject_fully_negotiated(protocol, endpoint) + } + + #[inline] + fn inject_event(&mut self, event: &TNewIn) { + if let Some(event) = (self.map)(event) { + self.inner.inject_event(event); + } + } + + #[inline] + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error) { + self.inner.inject_dial_upgrade_error(info, error) + } + + #[inline] + fn inject_inbound_closed(&mut self) { + self.inner.inject_inbound_closed() + } + + #[inline] + fn shutdown(&mut self) { + self.inner.shutdown() + } + + #[inline] + fn poll( + &mut self, + ) -> Poll< + Option>, + io::Error, + > { + self.inner.poll() + } +} + +/// Wrapper around a protocol handler that turns the output event into something else. +pub struct MapOutEvent { + inner: TProtoHandler, + map: TMap, +} + +impl ProtocolsHandler for MapOutEvent +where + TProtoHandler: ProtocolsHandler, + TMap: FnMut(TProtoHandler::OutEvent) -> TNewOut, +{ + type InEvent = TProtoHandler::InEvent; + type OutEvent = TNewOut; + type Substream = TProtoHandler::Substream; + type Protocol = TProtoHandler::Protocol; + type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; + + #[inline] + fn listen_protocol(&self) -> Self::Protocol { + self.inner.listen_protocol() + } + + #[inline] + fn inject_fully_negotiated( + &mut self, + protocol: >::Output, + endpoint: NodeHandlerEndpoint, + ) { + self.inner.inject_fully_negotiated(protocol, endpoint) + } + + #[inline] + fn inject_event(&mut self, event: &Self::InEvent) { + self.inner.inject_event(event) + } + + #[inline] + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error) { + self.inner.inject_dial_upgrade_error(info, error) + } + + #[inline] + fn inject_inbound_closed(&mut self) { + self.inner.inject_inbound_closed() + } + + #[inline] + fn shutdown(&mut self) { + self.inner.shutdown() + } + + #[inline] + fn poll( + &mut self, + ) -> Poll< + Option>, + io::Error, + > { + Ok(self.inner.poll()?.map(|ev| { + ev.map(|ev| match ev { + ProtocolsHandlerEvent::Custom(ev) => ProtocolsHandlerEvent::Custom((self.map)(ev)), + ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } + } + }) + })) + } +} + +/// Wraps around an implementation of `ProtocolsHandler`, and implements `NodeHandler`. +// TODO: add a caching system for protocols that are supported or not +pub struct NodeHandlerWrapper +where + TProtoHandler: ProtocolsHandler, +{ + /// The underlying handler. + handler: TProtoHandler, + /// Futures that upgrade incoming substreams. + negotiating_in: + Vec>>, + /// Futures that upgrade outgoing substreams. The first element of the tuple is the userdata + /// to pass back once successfully opened. + negotiating_out: Vec<( + TProtoHandler::OutboundOpenInfo, + Timeout>, + )>, + /// Timeout for incoming substreams negotiation. + in_timeout: Duration, + /// Timeout for outgoing substreams negotiation. + out_timeout: Duration, + /// For each outbound substream request, how to upgrade it. The first element of the tuple + /// is the unique identifier (see `unique_dial_upgrade_id`). + queued_dial_upgrades: Vec<(u64, TProtoHandler::Protocol)>, + /// Unique identifier assigned to each queued dial upgrade. + unique_dial_upgrade_id: u64, +} + +impl NodeHandler for NodeHandlerWrapper +where + TProtoHandler: ProtocolsHandler, + >::NamesIter: Clone, +{ + type InEvent = TProtoHandler::InEvent; + type OutEvent = TProtoHandler::OutEvent; + type Substream = TProtoHandler::Substream; + // The first element of the tuple is the unique upgrade identifier + // (see `unique_dial_upgrade_id`). + type OutboundOpenInfo = (u64, TProtoHandler::OutboundOpenInfo); + + fn inject_substream( + &mut self, + substream: Self::Substream, + endpoint: NodeHandlerEndpoint, + ) { + match endpoint { + NodeHandlerEndpoint::Listener => { + let protocol = self.handler.listen_protocol(); + let upgrade = upgrade::apply(substream, protocol, Endpoint::Listener); + let with_timeout = Timeout::new(upgrade, self.in_timeout); + self.negotiating_in.push(with_timeout); + } + NodeHandlerEndpoint::Dialer((upgrade_id, user_data)) => { + let pos = match self + .queued_dial_upgrades + .iter() + .position(|(id, _)| id == &upgrade_id) + { + Some(p) => p, + None => { + debug_assert!(false, "Received an upgrade with an invalid upgrade ID"); + return; + } + }; + + let (_, proto_upgrade) = self.queued_dial_upgrades.remove(pos); + let upgrade = upgrade::apply(substream, proto_upgrade, Endpoint::Dialer); + let with_timeout = Timeout::new(upgrade, self.out_timeout); + self.negotiating_out.push((user_data, with_timeout)); + } + } + } + + #[inline] + fn inject_inbound_closed(&mut self) { + self.handler.inject_inbound_closed(); + } + + fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo) { + let pos = match self + .queued_dial_upgrades + .iter() + .position(|(id, _)| id == &user_data.0) + { + Some(p) => p, + None => { + debug_assert!( + false, + "Received an outbound closed error with an invalid upgrade ID" + ); + return; + } + }; + + self.queued_dial_upgrades.remove(pos); + self.handler + .inject_dial_upgrade_error(user_data.1, io::ErrorKind::ConnectionReset.into()); + } + + #[inline] + fn inject_event(&mut self, event: Self::InEvent) { + self.handler.inject_event(&event); + } + + #[inline] + fn shutdown(&mut self) { + self.handler.shutdown(); + } + + fn poll( + &mut self, + ) -> Poll>, io::Error> { + // Continue negotiation of newly-opened substreams on the listening side. + // We remove each element from `negotiating_in` one by one and add them back if not ready. + for n in (0..self.negotiating_in.len()).rev() { + let mut in_progress = self.negotiating_in.swap_remove(n); + match in_progress.poll() { + Ok(Async::Ready(upgrade)) => { + self.handler + .inject_fully_negotiated(upgrade, NodeHandlerEndpoint::Listener); + } + Ok(Async::NotReady) => { + self.negotiating_in.push(in_progress); + } + // TODO: return a diagnostic event? + Err(_err) => {} + } + } + + // Continue negotiation of newly-opened substreams. + // We remove each element from `negotiating_out` one by one and add them back if not ready. + for n in (0..self.negotiating_out.len()).rev() { + let (upgr_info, mut in_progress) = self.negotiating_out.swap_remove(n); + match in_progress.poll() { + Ok(Async::Ready(upgrade)) => { + let endpoint = NodeHandlerEndpoint::Dialer(upgr_info); + self.handler.inject_fully_negotiated(upgrade, endpoint); + } + Ok(Async::NotReady) => { + self.negotiating_out.push((upgr_info, in_progress)); + } + Err(err) => { + let msg = format!("Error while upgrading: {:?}", err); + let err = io::Error::new(io::ErrorKind::Other, msg); + self.handler.inject_dial_upgrade_error(upgr_info, err); + } + } + } + + // Poll the handler at the end so that we see the consequences of the method calls on + // `self.handler`. + match self.handler.poll()? { + Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { + return Ok(Async::Ready(Some(NodeHandlerEvent::Custom(event)))); + } + Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade, + info, + })) => { + let id = self.unique_dial_upgrade_id; + self.unique_dial_upgrade_id += 1; + self.queued_dial_upgrades.push((id, upgrade)); + return Ok(Async::Ready(Some( + NodeHandlerEvent::OutboundSubstreamRequest((id, info)), + ))); + } + Async::Ready(None) => return Ok(Async::Ready(None)), + Async::NotReady => (), + }; + + Ok(Async::NotReady) + } +} From d283c01e1088af1666bb1b500838ab25aafc642e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 18 Oct 2018 15:03:05 +0200 Subject: [PATCH 2/4] Reexport symbols --- core/src/nodes/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/nodes/mod.rs b/core/src/nodes/mod.rs index 97ae0711d6b..2636de3567a 100644 --- a/core/src/nodes/mod.rs +++ b/core/src/nodes/mod.rs @@ -28,4 +28,5 @@ pub mod raw_swarm; pub use self::node::Substream; pub use self::handled_node::{NodeHandlerEvent, NodeHandlerEndpoint}; +pub use self::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent}; pub use self::raw_swarm::{ConnectedPoint, Peer, RawSwarm, RawSwarmEvent}; From 20dae459a42652c8884138f7534d59e4aa59b2d6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 18 Oct 2018 15:06:25 +0200 Subject: [PATCH 3/4] Add a note about shutting down --- core/src/nodes/protocols_handler.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/nodes/protocols_handler.rs b/core/src/nodes/protocols_handler.rs index 351531599f4..ff791ffc01f 100644 --- a/core/src/nodes/protocols_handler.rs +++ b/core/src/nodes/protocols_handler.rs @@ -121,7 +121,10 @@ pub trait ProtocolsHandler { /// Should behave like `Stream::poll()`. Should close if no more event can be produced and the /// node should be closed. - // TODO: should not return a `NodeHandlerEvent` but a different enum + /// + /// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns + /// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to + /// > `shutdown()` and will eventually be closed and destroyed. fn poll( &mut self, ) -> Poll< From fe7bc77769a984b786be429b13d550335922612e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 19 Oct 2018 11:58:10 +0200 Subject: [PATCH 4/4] Add a PeriodicIdentification protocol handler --- protocols/identify/src/lib.rs | 2 + protocols/identify/src/periodic_id_handler.rs | 180 ++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 protocols/identify/src/periodic_id_handler.rs diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index 503bb55cade..26f7a1ccd58 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -81,8 +81,10 @@ extern crate tokio_timer; extern crate unsigned_varint; extern crate void; +pub use self::periodic_id_handler::{PeriodicIdentification, PeriodicIdentificationEvent}; pub use self::protocol::{IdentifyInfo, IdentifyOutput}; pub use self::protocol::{IdentifyProtocolConfig, IdentifySender}; +mod periodic_id_handler; mod protocol; mod structs_proto; diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs new file mode 100644 index 00000000000..cd9e6e602a5 --- /dev/null +++ b/protocols/identify/src/periodic_id_handler.rs @@ -0,0 +1,180 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use libp2p_core::nodes::handled_node::NodeHandlerEndpoint; +use libp2p_core::nodes::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent}; +use libp2p_core::upgrade::{self, toggleable::Toggleable}; +use libp2p_core::{ConnectionUpgrade, Multiaddr}; +use std::io; +use std::marker::PhantomData; +use std::time::{Duration, Instant}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; +use void::Void; +use {IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig}; + +/// Delay between the moment we connect and the first time we identify. +const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500); +/// After an identification succeeded, wait this long before the next time. +const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60); +/// After we failed to identify the remote, try again after the given delay. +const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60); + +/// Protocol handler that identifies the remote at a regular period. +pub struct PeriodicIdentification { + /// Configuration for the protocol. + config: Toggleable, + + /// If `Some`, we successfully generated an `PeriodicIdentificationEvent` and we will produce + /// it the next time `poll()` is invoked. + pending_result: Option, + + /// Future that fires when we need to identify the node again. If `None`, means that we should + /// shut down. + next_id: Option, + + /// Marker for strong typing. + marker: PhantomData, +} + +/// Event produced by the periodic identifier. +#[derive(Debug)] +pub enum PeriodicIdentificationEvent { + /// We obtained identification information from the remote + Identified { + /// Information of the remote. + info: IdentifyInfo, + /// Address the remote observes us as. + observed_addr: Multiaddr, + }, + + /// Failed to identify the remote. + IdentificationError(io::Error), +} + +impl PeriodicIdentification { + /// Builds a new `PeriodicIdentification`. + #[inline] + pub fn new() -> Self { + PeriodicIdentification { + config: upgrade::toggleable(IdentifyProtocolConfig), + pending_result: None, + next_id: Some(Delay::new(Instant::now() + DELAY_TO_FIRST_ID)), + marker: PhantomData, + } + } +} + +impl ProtocolsHandler for PeriodicIdentification +where + TSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static, // TODO: remove useless bounds +{ + type InEvent = Void; + type OutEvent = PeriodicIdentificationEvent; + type Substream = TSubstream; + type Protocol = Toggleable; + type OutboundOpenInfo = (); + + #[inline] + fn listen_protocol(&self) -> Self::Protocol { + let mut upgrade = self.config.clone(); + upgrade.disable(); + upgrade + } + + fn inject_fully_negotiated( + &mut self, + protocol: >::Output, + _endpoint: NodeHandlerEndpoint, + ) { + match protocol { + IdentifyOutput::RemoteInfo { + info, + observed_addr, + } => { + self.pending_result = Some(PeriodicIdentificationEvent::Identified { + info, + observed_addr, + }); + } + IdentifyOutput::Sender { .. } => unreachable!( + "Sender can only be produced if we listen for the identify \ + protocol ; however we disable it in listen_protocol" + ), + } + } + + #[inline] + fn inject_event(&mut self, _: &Self::InEvent) {} + + #[inline] + fn inject_inbound_closed(&mut self) {} + + #[inline] + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, err: io::Error) { + self.pending_result = Some(PeriodicIdentificationEvent::IdentificationError(err)); + if let Some(ref mut next_id) = self.next_id { + next_id.reset(Instant::now() + TRY_AGAIN_ON_ERR); + } + } + + #[inline] + fn shutdown(&mut self) { + self.next_id = None; + } + + fn poll( + &mut self, + ) -> Poll< + Option< + ProtocolsHandlerEvent< + Self::Protocol, + Self::OutboundOpenInfo, + PeriodicIdentificationEvent, + >, + >, + io::Error, + > { + if let Some(pending_result) = self.pending_result.take() { + return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( + pending_result, + )))); + } + + let next_id = match self.next_id { + Some(ref mut nid) => nid, + None => return Ok(Async::Ready(None)), + }; + + // Poll the future that fires when we need to identify the node again. + match next_id.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(())) => { + next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); + let mut upgrade = self.config.clone(); + upgrade.enable(); + let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; + Ok(Async::Ready(Some(ev))) + } + Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), + } + } +}