Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

[WIP do not merge] Extract sc-network-sync crate #10514

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,9 @@ pub(crate) mod beefy_protocol_name {
pub fn beefy_peers_set_config(
protocol_name: std::borrow::Cow<'static, str>,
) -> sc_network::config::NonDefaultSetConfig {
let mut cfg = sc_network::config::NonDefaultSetConfig::new(protocol_name, 1024 * 1024);

cfg.allow_non_reserved(25, 25);
cfg.add_fallback_names(beefy_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect());
cfg
sc_network::config::NonDefaultSetConfig::new(protocol_name, 1024 * 1024)
.allow_non_reserved(25, 25)
.add_fallback_names(beefy_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect())
}

/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
Expand Down
15 changes: 3 additions & 12 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,18 +722,9 @@ pub fn grandpa_peers_set_config(
protocol_name: std::borrow::Cow<'static, str>,
) -> sc_network::config::NonDefaultSetConfig {
use communication::grandpa_protocol_name;
sc_network::config::NonDefaultSetConfig {
notifications_protocol: protocol_name,
fallback_names: grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(),
// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
max_notification_size: 1024 * 1024,
set_config: sc_network::config::SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny,
},
}
// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
sc_network::config::NonDefaultSetConfig::new(protocol_name, 1024 * 1024)
.add_fallback_names(grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect())
}

/// Run a GRANDPA voter as a task. Provide configuration and a link to a
Expand Down
85 changes: 77 additions & 8 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use std::{
borrow::Cow,
collections::HashMap,
convert::TryFrom,
error::Error,
fs,
future::Future,
io::{self, Write},
Expand Down Expand Up @@ -551,11 +550,12 @@ impl Default for SetConfig {
}
}

#[derive(Clone, Debug)]
struct Trap;

/// Extension to [`SetConfig`] for sets that aren't the default set.
///
/// > **Note**: As new fields might be added in the future, please consider using the `new` method
/// > and modifiers instead of creating this struct manually.
#[derive(Clone, Debug)]
#[must_use = "Why would you build a config if you do not use it afterwards?"]
pub struct NonDefaultSetConfig {
/// Name of the notifications protocols of this set. A substream on this set will be
/// considered established once this protocol is open.
Expand All @@ -574,6 +574,9 @@ pub struct NonDefaultSetConfig {
pub max_notification_size: u64,
/// Base configuration.
pub set_config: SetConfig,

/// You must use [`NonDefaultSetConfig::new`] to create this structure.
_use_new_to_create: Trap,
}

impl NonDefaultSetConfig {
Expand All @@ -589,26 +592,58 @@ impl NonDefaultSetConfig {
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
_use_new_to_create: Trap,
}
}

/// Modifies the configuration to allow non-reserved nodes.
pub fn allow_non_reserved(&mut self, in_peers: u32, out_peers: u32) {
pub fn allow_non_reserved(mut self, in_peers: u32, out_peers: u32) -> Self {
self.set_config.in_peers = in_peers;
self.set_config.out_peers = out_peers;
self.set_config.non_reserved_mode = NonReservedPeerMode::Accept;
self
}

/// Add a node to the list of reserved nodes.
pub fn add_reserved(&mut self, peer: MultiaddrWithPeerId) {
pub fn add_reserved(mut self, peer: MultiaddrWithPeerId) -> Self {
self.set_config.reserved_nodes.push(peer);
self
}

/// Add a list of protocol names used for backward compatibility.
///
/// See the explanations in [`NonDefaultSetConfig::fallback_names`].
pub fn add_fallback_names(&mut self, fallback_names: Vec<Cow<'static, str>>) {
pub fn add_fallback_names(mut self, fallback_names: Vec<Cow<'static, str>>) -> Self {
self.fallback_names.extend(fallback_names);
self
}

/// Replace the peer set configuration
pub fn with_config(mut self, config: SetConfig) -> Self {
self.set_config = config;
self
}
}

/// A trait that enables handling different types with a single relevant multiaddress in a generic
/// way
pub trait AsMultiAddr {
/// Extract the multiaddress from the type.
///
/// NOTE: Do not implement this trait for types where it is ambiguous which address should be
/// extracted here.
fn as_multiaddr(&self) -> &Multiaddr;
}

impl AsMultiAddr for Multiaddr {
fn as_multiaddr(&self) -> &Multiaddr {
self
}
}

impl AsMultiAddr for MultiaddrWithPeerId {
fn as_multiaddr(&self) -> &Multiaddr {
&self.multiaddr
}
}

Expand All @@ -632,6 +667,40 @@ pub enum TransportConfig {
MemoryOnly,
}

impl TransportConfig {
/// Checks every address and gives an error if any of them do not match the specified transport
pub fn check_addresses<'a, T>(
&self,
addresses: impl IntoIterator<Item = &'a T>,
) -> Result<(), crate::error::Error>
where
T: AsMultiAddr + 'a,
{
let memory_only = matches!(self, TransportConfig::MemoryOnly);
let invalid_addresses: Vec<_> = addresses
.into_iter()
.map(|x| x.as_multiaddr())
.filter(|x| {
x.iter().any(|y| {
// MemoryOnly can only have `/memory/...` protocols, while
// Normal must not have any
memory_only != matches!(y, libp2p::core::multiaddr::Protocol::Memory(_))
})
})
.cloned()
.collect();

if invalid_addresses.is_empty() {
Ok(())
} else {
Err(crate::error::Error::AddressesForAnotherTransport {
transport: self.clone(),
addresses: invalid_addresses,
})
}
}
}

/// The policy for connections to non-reserved peers.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
Expand Down Expand Up @@ -742,7 +811,7 @@ where
P: AsRef<Path>,
F: for<'r> FnOnce(&'r mut [u8]) -> Result<K, E>,
G: FnOnce() -> K,
E: Error + Send + Sync + 'static,
E: std::error::Error + Send + Sync + 'static,
W: Fn(&K) -> Vec<u8>,
{
std::fs::read(&file)
Expand Down
74 changes: 11 additions & 63 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,34 +142,17 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
/// for the network processing to advance. From it, you can extract a `NetworkService` using
/// `worker.service()`. The `NetworkService` can be shared through the codebase.
pub fn new(mut params: Params<B, H>) -> Result<Self, Error> {
// Ensure the listen addresses are consistent with the transport.
ensure_addresses_consistent_with_transport(
params.network_config.listen_addresses.iter(),
&params.network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
params.network_config.boot_nodes.iter().map(|x| &x.multiaddr),
&params.network_config.transport,
)?;
ensure_addresses_consistent_with_transport(
params
.network_config
.default_peers_set
.reserved_nodes
.iter()
.map(|x| &x.multiaddr),
&params.network_config.transport,
)?;
let transport_config = &params.network_config.transport;

// Ensure all configured addresses are consistent with the transport.
transport_config.check_addresses(&params.network_config.listen_addresses)?;
transport_config.check_addresses(&params.network_config.boot_nodes)?;
transport_config
.check_addresses(&params.network_config.default_peers_set.reserved_nodes)?;
for extra_set in &params.network_config.extra_sets {
ensure_addresses_consistent_with_transport(
extra_set.set_config.reserved_nodes.iter().map(|x| &x.multiaddr),
&params.network_config.transport,
)?;
transport_config.check_addresses(&extra_set.set_config.reserved_nodes)?;
}
ensure_addresses_consistent_with_transport(
params.network_config.public_addresses.iter(),
&params.network_config.transport,
)?;
transport_config.check_addresses(&params.network_config.public_addresses)?;

let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker");

Expand Down Expand Up @@ -212,8 +195,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
&params.network_config,
iter::once(Vec::new())
.chain(
(0..params.network_config.extra_sets.len() - 1)
.map(|_| default_notif_handshake_message.clone()),
iter::repeat(default_notif_handshake_message)
.take(params.network_config.extra_sets.len() - 1),
)
.collect(),
params.block_announce_validator,
Expand Down Expand Up @@ -2163,38 +2146,3 @@ impl<'a, B: BlockT> Link<B> for NetworkLink<'a, B> {
.request_justification(hash, number)
}
}

fn ensure_addresses_consistent_with_transport<'a>(
addresses: impl Iterator<Item = &'a Multiaddr>,
transport: &TransportConfig,
) -> Result<(), Error> {
if matches!(transport, TransportConfig::MemoryOnly) {
let addresses: Vec<_> = addresses
.filter(|x| {
x.iter().any(|y| !matches!(y, libp2p::core::multiaddr::Protocol::Memory(_)))
})
.cloned()
.collect();

if !addresses.is_empty() {
return Err(Error::AddressesForAnotherTransport {
transport: transport.clone(),
addresses,
})
}
} else {
let addresses: Vec<_> = addresses
.filter(|x| x.iter().any(|y| matches!(y, libp2p::core::multiaddr::Protocol::Memory(_))))
.cloned()
.collect();

if !addresses.is_empty() {
return Err(Error::AddressesForAnotherTransport {
transport: transport.clone(),
addresses,
})
}
}

Ok(())
}
53 changes: 16 additions & 37 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,30 +154,23 @@ fn build_nodes_one_proto() -> (
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];

let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: Default::default(),
}],
extra_sets: vec![config::NonDefaultSetConfig::new(PROTOCOL_NAME, 1024 * 1024)
.with_config(Default::default())],
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});

let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
extra_sets: vec![config::NonDefaultSetConfig::new(PROTOCOL_NAME, 1024 * 1024).with_config(
config::SetConfig {
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: node1.local_peer_id().clone(),
}],
..Default::default()
},
}],
)],
listen_addresses: vec![],
transport: config::TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
Expand Down Expand Up @@ -345,12 +338,8 @@ fn lots_of_incoming_peers_works() {

let (main_node, _) = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![listen_addr.clone()],
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig { in_peers: u32::MAX, ..Default::default() },
}],
extra_sets: vec![config::NonDefaultSetConfig::new(PROTOCOL_NAME, 1024 * 1024)
.with_config(config::SetConfig { in_peers: u32::MAX, ..Default::default() })],
transport: config::TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
Expand All @@ -364,18 +353,14 @@ fn lots_of_incoming_peers_works() {
for _ in 0..32 {
let (_dialing_node, event_stream) = build_test_full_node(config::NetworkConfiguration {
listen_addresses: vec![],
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
extra_sets: vec![config::NonDefaultSetConfig::new(PROTOCOL_NAME, 1024 * 1024)
.with_config(config::SetConfig {
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr.clone(),
peer_id: main_node_peer_id,
}],
..Default::default()
},
}],
})],
transport: config::TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
Expand Down Expand Up @@ -477,30 +462,24 @@ fn fallback_name_working() {
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];

let (node1, mut events_stream1) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: NEW_PROTOCOL_NAME.clone(),
fallback_names: vec![PROTOCOL_NAME],
max_notification_size: 1024 * 1024,
set_config: Default::default(),
}],
extra_sets: vec![config::NonDefaultSetConfig::new(NEW_PROTOCOL_NAME.clone(), 1024 * 1024)
.add_fallback_names(vec![PROTOCOL_NAME])
.with_config(Default::default())],
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});

let (_, mut events_stream2) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
extra_sets: vec![config::NonDefaultSetConfig::new(PROTOCOL_NAME, 1024 * 1024).with_config(
config::SetConfig {
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: node1.local_peer_id().clone(),
}],
..Default::default()
},
}],
)],
listen_addresses: vec![],
transport: config::TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
Expand Down
Loading