diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index 2c008e19d1b0d..21e44f9782286 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -126,6 +126,7 @@ impl NetworkParams { }, listen_addresses, public_addresses: Vec::new(), + notifications_protocols: Vec::new(), node_key, node_name: node_name.to_string(), client_version: client_id.to_string(), diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 9cd57a9d1d47b..01acbe68755be 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -31,16 +31,23 @@ pub use crate::protocol::ProtocolConfig; use crate::service::ExHashT; -use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue}; -use sp_runtime::traits::{Block as BlockT}; -use libp2p::identity::{Keypair, ed25519}; -use libp2p::wasm_ext; -use libp2p::{PeerId, Multiaddr, multiaddr}; use core::{fmt, iter}; -use std::{convert::TryFrom, future::Future, pin::Pin, str::FromStr}; -use std::{error::Error, fs, io::{self, Write}, net::Ipv4Addr, path::{Path, PathBuf}, sync::Arc}; -use zeroize::Zeroize; +use libp2p::identity::{ed25519, Keypair}; +use libp2p::wasm_ext; +use libp2p::{multiaddr, Multiaddr, PeerId}; use prometheus_endpoint::Registry; +use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue}; +use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; +use std::{borrow::Cow, convert::TryFrom, future::Future, pin::Pin, str::FromStr}; +use std::{ + error::Error, + fs, + io::{self, Write}, + net::Ipv4Addr, + path::{Path, PathBuf}, + sync::Arc, +}; +use zeroize::Zeroize; /// Network initialization parameters. pub struct Params { @@ -317,6 +324,9 @@ pub struct NetworkConfiguration { pub boot_nodes: Vec, /// The node key configuration, which determines the node's network identity keypair. pub node_key: NodeKeyConfig, + /// List of notifications protocols that the node supports. Must also include a + /// `ConsensusEngineId` for backwards-compatibility. + pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, [u8]>)>, /// Maximum allowed number of incoming connections. pub in_peers: u32, /// Number of outgoing connections we're trying to maintain. @@ -349,6 +359,7 @@ impl NetworkConfiguration { public_addresses: Vec::new(), boot_nodes: Vec::new(), node_key, + notifications_protocols: Vec::new(), in_peers: 25, out_peers: 75, reserved_nodes: Vec::new(), diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 7e597f1be69f9..46b759d4580a6 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -442,6 +442,38 @@ impl ProtocolsHandler for NotifsHandler { ) -> Poll< ProtocolsHandlerEvent > { + while let Poll::Ready(ev) = self.legacy.poll(cx) { + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: protocol.map_upgrade(EitherUpgrade::B), + info: None, + }), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Open { endpoint } + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Closed { endpoint, reason } + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::CustomMessage { message } + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Clogged { messages }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::Clogged { messages } + )), + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) => + return Poll::Ready(ProtocolsHandlerEvent::Custom( + NotifsHandlerOut::ProtocolError { is_severe, error } + )), + ProtocolsHandlerEvent::Close(err) => + return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))), + } + } + for (handler_num, handler) in self.in_handlers.iter_mut().enumerate() { while let Poll::Ready(ev) = handler.poll(cx) { match ev { @@ -495,38 +527,6 @@ impl ProtocolsHandler for NotifsHandler { } } - while let Poll::Ready(ev) = self.legacy.poll(cx) { - match ev { - ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => - return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: protocol.map_upgrade(EitherUpgrade::B), - info: None, - }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) => - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Open { endpoint } - )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Closed { endpoint, reason } - )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::CustomMessage { message } - )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::Clogged { messages }) => - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Clogged { messages } - )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::ProtocolError { is_severe, error }) => - return Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::ProtocolError { is_severe, error } - )), - ProtocolsHandlerEvent::Close(err) => - return Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))), - } - } - Poll::Pending } } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index fb33901dd09c3..e9df08f3e489e 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -52,7 +52,7 @@ use sp_runtime::{ traits::{Block as BlockT, NumberFor}, ConsensusEngineId, }; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use std::{ borrow::Cow, collections::{HashMap, HashSet}, @@ -60,15 +60,20 @@ use std::{ marker::PhantomData, pin::Pin, str, - sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, task::Poll, }; +#[cfg(test)] +mod tests; + /// Minimum Requirements for a Hash within Networking pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} -impl ExHashT for T where - T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static +impl ExHashT for T where T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} /// Transaction pool interface @@ -284,7 +289,7 @@ impl NetworkWorker { )?; // Build the swarm. - let (mut swarm, bandwidth): (Swarm::, _) = { + let (mut swarm, bandwidth): (Swarm, _) = { let user_agent = format!( "{} ({})", params.network_config.client_version, @@ -296,9 +301,14 @@ impl NetworkWorker { }; let light_client_handler = { let config = protocol::light_client_handler::Config::new(¶ms.protocol_id); - protocol::LightClientHandler::new(config, params.chain, checker, peerset_handle.clone()) + protocol::LightClientHandler::new( + config, + params.chain, + checker, + peerset_handle.clone(), + ) }; - let behaviour = futures::executor::block_on(Behaviour::new( + let mut behaviour = futures::executor::block_on(Behaviour::new( protocol, params.role, user_agent, @@ -316,6 +326,9 @@ impl NetworkWorker { block_requests, light_client_handler )); + for (engine_id, protocol_name) in ¶ms.network_config.notifications_protocols { + behaviour.register_notifications_protocol(*engine_id, protocol_name.clone()); + } let (transport, bandwidth) = { let (config_mem, config_wasm, flowctrl) = match params.network_config.transport { TransportConfig::MemoryOnly => (true, None, false), @@ -531,6 +544,11 @@ impl NetworkWorker { } impl NetworkService { + /// Returns the local `PeerId`. + pub fn local_peer_id(&self) -> &PeerId { + &self.local_peer_id + } + /// Writes a message on an open notifications channel. Has no effect if the notifications /// channel with this protocol name is closed. /// diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs new file mode 100644 index 0000000000000..0e097072e6c05 --- /dev/null +++ b/client/network/src/service/tests.rs @@ -0,0 +1,270 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::{config, Event, NetworkService, NetworkWorker}; + +use futures::prelude::*; +use sp_runtime::traits::{Block as BlockT, Header as _}; +use std::{sync::Arc, time::Duration}; +use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; + +type TestNetworkService = NetworkService< + substrate_test_runtime_client::runtime::Block, + substrate_test_runtime_client::runtime::Hash, +>; + +/// Builds a full node to be used for testing. Returns the node service and its associated events +/// stream. +/// +/// > **Note**: We return the events stream in order to not possibly lose events between the +/// > construction of the service and the moment the events stream is grabbed. +fn build_test_full_node(config: config::NetworkConfiguration) + -> (Arc, impl Stream) +{ + let client = Arc::new( + TestClientBuilder::with_default_backend() + .build_with_longest_chain() + .0, + ); + + #[derive(Clone)] + struct PassThroughVerifier(bool); + impl sp_consensus::import_queue::Verifier for PassThroughVerifier { + fn verify( + &mut self, + origin: sp_consensus::BlockOrigin, + header: B::Header, + justification: Option, + body: Option>, + ) -> Result< + ( + sp_consensus::BlockImportParams, + Option)>>, + ), + String, + > { + let maybe_keys = header + .digest() + .log(|l| { + l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura")) + .or_else(|| { + l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"babe")) + }) + }) + .map(|blob| { + vec![( + sp_blockchain::well_known_cache_keys::AUTHORITIES, + blob.to_vec(), + )] + }); + + let mut import = sp_consensus::BlockImportParams::new(origin, header); + import.body = body; + import.finalized = self.0; + import.justification = justification; + import.fork_choice = Some(sp_consensus::ForkChoiceStrategy::LongestChain); + Ok((import, maybe_keys)) + } + } + + let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new( + PassThroughVerifier(false), + Box::new(client.clone()), + None, + None, + )); + + let worker = NetworkWorker::new(config::Params { + role: config::Role::Full, + executor: None, + network_config: config, + chain: client.clone(), + finality_proof_provider: None, + finality_proof_request_builder: None, + on_demand: None, + transaction_pool: Arc::new(crate::service::EmptyTransactionPool), + protocol_id: config::ProtocolId::from(&b"/test-protocol-name"[..]), + import_queue, + block_announce_validator: Box::new( + sp_consensus::block_validation::DefaultBlockAnnounceValidator::new(client.clone()), + ), + metrics_registry: None, + }) + .unwrap(); + + let service = worker.service().clone(); + let event_stream = service.event_stream(); + + async_std::task::spawn(async move { + futures::pin_mut!(worker); + let _ = worker.await; + }); + + (service, event_stream) +} + +const ENGINE_ID: sp_runtime::ConsensusEngineId = *b"foo\0"; + +/// Builds two nodes and their associated events stream. +/// The nodes are connected together and have the `ENGINE_ID` protocol registered. +fn build_nodes_one_proto() + -> (Arc, impl Stream, Arc, impl Stream) +{ + let listen_addr = config::build_multiaddr![Memory(rand::random::())]; + + let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration { + notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], + listen_addresses: vec![listen_addr.clone()], + transport: config::TransportConfig::MemoryOnly, + .. config::NetworkConfiguration::new_local() + }); + + let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration { + notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], + reserved_nodes: vec![config::MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: node1.local_peer_id().clone(), + }], + transport: config::TransportConfig::MemoryOnly, + .. config::NetworkConfiguration::new_local() + }); + + (node1, events_stream1, node2, events_stream2) +} + +#[test] +fn notifications_state_consistent() { + // Runs two nodes and ensures that events are propagated out of the API in a consistent + // correct order, which means no notification received on a closed substream. + + let (node1, mut events_stream1, node2, mut events_stream2) = build_nodes_one_proto(); + + // Write some initial notifications that shouldn't get through. + for _ in 0..(rand::random::() % 5) { + node1.write_notification(node2.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec()); + } + for _ in 0..(rand::random::() % 5) { + node2.write_notification(node1.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec()); + } + + async_std::task::block_on(async move { + // True if we have an active substream from node1 to node2. + let mut node1_to_node2_open = false; + // True if we have an active substream from node2 to node1. + let mut node2_to_node1_open = false; + // We stop the test after a certain number of iterations. + let mut iterations = 0; + // Safe guard because we don't want the test to pass if no substream has been open. + let mut something_happened = false; + + loop { + iterations += 1; + if iterations >= 1_000 { + assert!(something_happened); + break; + } + + // Start by sending a notification from node1 to node2 and vice-versa. Part of the + // test consists in ensuring that notifications get ignored if the stream isn't open. + if rand::random::() % 5 >= 3 { + node1.write_notification(node2.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec()); + } + if rand::random::() % 5 >= 3 { + node2.write_notification(node1.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec()); + } + + // Also randomly disconnect the two nodes from time to time. + if rand::random::() % 20 == 0 { + node1.disconnect_peer(node2.local_peer_id().clone()); + } + if rand::random::() % 20 == 0 { + node2.disconnect_peer(node1.local_peer_id().clone()); + } + + // Grab next event from either `events_stream1` or `events_stream2`. + let next_event = { + let next1 = events_stream1.next(); + let next2 = events_stream2.next(); + // We also await on a small timer, otherwise it is possible for the test to wait + // forever while nothing at all happens on the network. + let continue_test = futures_timer::Delay::new(Duration::from_millis(20)); + match future::select(future::select(next1, next2), continue_test).await { + future::Either::Left((future::Either::Left((Some(ev), _)), _)) => + future::Either::Left(ev), + future::Either::Left((future::Either::Right((Some(ev), _)), _)) => + future::Either::Right(ev), + future::Either::Right(_) => continue, + _ => break, + } + }; + + match next_event { + future::Either::Left(Event::NotificationStreamOpened { remote, engine_id, .. }) => { + something_happened = true; + assert!(!node1_to_node2_open); + node1_to_node2_open = true; + assert_eq!(remote, *node2.local_peer_id()); + assert_eq!(engine_id, ENGINE_ID); + } + future::Either::Right(Event::NotificationStreamOpened { remote, engine_id, .. }) => { + something_happened = true; + assert!(!node2_to_node1_open); + node2_to_node1_open = true; + assert_eq!(remote, *node1.local_peer_id()); + assert_eq!(engine_id, ENGINE_ID); + } + future::Either::Left(Event::NotificationStreamClosed { remote, engine_id, .. }) => { + assert!(node1_to_node2_open); + node1_to_node2_open = false; + assert_eq!(remote, *node2.local_peer_id()); + assert_eq!(engine_id, ENGINE_ID); + } + future::Either::Right(Event::NotificationStreamClosed { remote, engine_id, .. }) => { + assert!(node2_to_node1_open); + node2_to_node1_open = false; + assert_eq!(remote, *node1.local_peer_id()); + assert_eq!(engine_id, ENGINE_ID); + } + future::Either::Left(Event::NotificationsReceived { remote, .. }) => { + assert!(node1_to_node2_open); + assert_eq!(remote, *node2.local_peer_id()); + if rand::random::() % 5 >= 4 { + node1.write_notification( + node2.local_peer_id().clone(), + ENGINE_ID, + b"hello world".to_vec() + ); + } + } + future::Either::Right(Event::NotificationsReceived { remote, .. }) => { + assert!(node2_to_node1_open); + assert_eq!(remote, *node1.local_peer_id()); + if rand::random::() % 5 >= 4 { + node2.write_notification( + node1.local_peer_id().clone(), + ENGINE_ID, + b"hello world".to_vec() + ); + } + } + + // Add new events here. + future::Either::Left(Event::Dht(_)) => {} + future::Either::Right(Event::Dht(_)) => {} + }; + } + }); +} diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 56070f9888081..fec4f1317b2dd 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -41,7 +41,7 @@ use sp_consensus::block_import::{BlockImport, ImportResult}; use sp_consensus::Error as ConsensusError; use sp_consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport}; use futures::prelude::*; -use sc_network::{NetworkWorker, NetworkStateInfo, NetworkService, config::ProtocolId}; +use sc_network::{NetworkWorker, NetworkService, config::ProtocolId}; use sc_network::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder}; use libp2p::PeerId; use parking_lot::Mutex; @@ -189,7 +189,7 @@ pub struct Peer { impl Peer { /// Get this peer ID. pub fn id(&self) -> PeerId { - self.network.service().local_peer_id() + self.network.service().local_peer_id().clone() } /// Returns true if we're major syncing. @@ -628,7 +628,7 @@ pub trait TestNetFactory: Sized { self.mut_peers(|peers| { for peer in peers.iter_mut() { - peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone()); + peer.network.add_known_address(network.service().local_peer_id().clone(), listen_addr.clone()); } let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse()); @@ -704,7 +704,7 @@ pub trait TestNetFactory: Sized { self.mut_peers(|peers| { for peer in peers.iter_mut() { - peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone()); + peer.network.add_known_address(network.service().local_peer_id().clone(), listen_addr.clone()); } let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse()); diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index bb5ef09562484..2811076ba38b7 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -37,7 +37,7 @@ use sc_service::{ Role, Error, }; -use sc_network::{multiaddr, Multiaddr, NetworkStateInfo}; +use sc_network::{multiaddr, Multiaddr}; use sc_network::config::{NetworkConfiguration, TransportConfig}; use sp_runtime::{generic::BlockId, traits::Block as BlockT}; use sp_transaction_pool::TransactionPool; @@ -265,7 +265,7 @@ impl TestNet where let service = SyncService::from(service); executor.spawn(service.clone().map_err(|_| ())); - let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); + let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.authority_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; } @@ -281,7 +281,7 @@ impl TestNet where let service = SyncService::from(service); executor.spawn(service.clone().map_err(|_| ())); - let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); + let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.full_nodes.push((self.nodes, service, user_data, addr)); self.nodes += 1; } @@ -296,7 +296,7 @@ impl TestNet where let service = SyncService::from(light(node_config).expect("Error creating test node service")); executor.spawn(service.clone().map_err(|_| ())); - let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); + let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().clone().into())); self.light_nodes.push((self.nodes, service, addr)); self.nodes += 1; }