Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Rework priority groups, take 2 #7700

Merged
32 commits merged into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9c4b449
Rework priority groups
tomaka Dec 9, 2020
d673ab4
Broken tests fix
tomaka Dec 9, 2020
126ba30
Fix warning causing CI to fail
tomaka Dec 9, 2020
0e91d3d
[Hack] Try restore backwards-compatibility
tomaka Dec 9, 2020
7d26639
Fix peerset bug
tomaka Dec 9, 2020
c69bf7a
Doc fixes and clean up
tomaka Dec 9, 2020
a7f1a65
Error on state mismatch
tomaka Dec 14, 2020
62c4ad5
Try debug CI
tomaka Dec 14, 2020
d2c3d6f
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Jan 4, 2021
9011f1f
CI debugging
tomaka Jan 4, 2021
4b7cf7c
[CI debug] Can I please see this line
tomaka Jan 4, 2021
f147315
Revert "[CI debug] Can I please see this line"
tomaka Jan 4, 2021
a34d538
Revert "CI debugging"
tomaka Jan 4, 2021
384b50b
Fix error! which isn't actually an error
tomaka Jan 4, 2021
55976c4
Fix Ok() returned when actually Err()
tomaka Jan 4, 2021
b997453
Tweaks and fixes
tomaka Jan 4, 2021
e19878d
Fix build
tomaka Jan 4, 2021
c8591bd
Peerset bugfix
tomaka Jan 4, 2021
d175b92
[Debug] Try outbound GrandPa slots
tomaka Jan 4, 2021
16ace71
Another bugfix
tomaka Jan 4, 2021
ffad0a6
Revert "[Debug] Try outbound GrandPa slots"
tomaka Jan 4, 2021
65cf06e
[Debug] Try outbound GrandPa slots
tomaka Jan 4, 2021
31b9a0b
Apply suggestions from code review
tomaka Jan 5, 2021
ebaeaab
Use consts for hardcoded peersets
tomaka Jan 5, 2021
9517536
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Jan 6, 2021
2fb69a7
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Jan 6, 2021
30f6573
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Jan 6, 2021
8535c90
Revert "Try debug CI"
tomaka Jan 7, 2021
14b3b22
Renames
tomaka Jan 7, 2021
6c709db
Merge remote-tracking branch 'upstream/master' into rework-priority-g…
tomaka Jan 7, 2021
1117b56
Line widths
tomaka Jan 7, 2021
9e702d6
Add doc
tomaka Jan 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError>
}
};
}
config.network.notifications_protocols.push(sc_finality_grandpa::GRANDPA_PROTOCOL_NAME.into());

config.network.extra_sets.push(sc_finality_grandpa::grandpa_peers_set_config());

let (network, network_status_sinks, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
Expand Down Expand Up @@ -244,7 +245,7 @@ pub fn new_light(mut config: Configuration) -> Result<TaskManager, ServiceError>
let (client, backend, keystore_container, mut task_manager, on_demand) =
sc_service::new_light_parts::<Block, RuntimeApi, Executor>(&config)?;

config.network.notifications_protocols.push(sc_finality_grandpa::GRANDPA_PROTOCOL_NAME.into());
config.network.extra_sets.push(sc_finality_grandpa::grandpa_peers_set_config());

let select_chain = sc_consensus::LongestChain::new(backend.clone());

Expand Down
4 changes: 2 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub fn new_full_base(

let shared_voter_state = rpc_setup;

config.network.notifications_protocols.push(grandpa::GRANDPA_PROTOCOL_NAME.into());
config.network.extra_sets.push(grandpa::grandpa_peers_set_config());

let (network, network_status_sinks, system_rpc_tx, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
Expand Down Expand Up @@ -346,7 +346,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<(
let (client, backend, keystore_container, mut task_manager, on_demand) =
sc_service::new_light_parts::<Block, RuntimeApi, Executor>(&config)?;

config.network.notifications_protocols.push(grandpa::GRANDPA_PROTOCOL_NAME.into());
config.network.extra_sets.push(grandpa::grandpa_peers_set_config());

let select_chain = sc_consensus::LongestChain::new(backend.clone());

Expand Down
2 changes: 0 additions & 2 deletions client/authority-discovery/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ pub enum Error {
CallingRuntime(sp_blockchain::Error),
/// Received a dht record with a key that does not match any in-flight awaited keys.
ReceivingUnexpectedRecord,
/// Failed to set the authority discovery peerset priority group in the peerset module.
SettingPeersetPriorityGroup(String),
/// Failed to encode a protobuf payload.
EncodingProto(prost::EncodeError),
/// Failed to decode a protobuf payload.
Expand Down
78 changes: 0 additions & 78 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ pub mod tests;

const LOG_TARGET: &'static str = "sub-authority-discovery";

/// Name of the Substrate peerset priority group for authorities discovered through the authority
/// discovery module.
const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";

/// Maximum number of addresses cached per authority. Additional addresses are discarded.
const MAX_ADDRESSES_PER_AUTHORITY: usize = 10;

Expand Down Expand Up @@ -113,9 +109,6 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
publish_interval: ExpIncInterval,
/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
query_interval: ExpIncInterval,
/// Interval on which to set the peerset priority group to a new random
/// set of addresses.
priority_group_set_interval: ExpIncInterval,

/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,
Expand Down Expand Up @@ -164,13 +157,6 @@ where
Duration::from_secs(2),
config.max_query_interval,
);
let priority_group_set_interval = ExpIncInterval::new(
Duration::from_secs(2),
// Trade-off between node connection churn and connectivity. Using half of
// [`crate::WorkerConfig::max_query_interval`] to update priority group once at the
// beginning and once in the middle of each query interval.
config.max_query_interval / 2,
);

let addr_cache = AddrCache::new();

Expand All @@ -194,7 +180,6 @@ where
dht_event_rx,
publish_interval,
query_interval,
priority_group_set_interval,
pending_lookups: Vec::new(),
in_flight_lookups: HashMap::new(),
addr_cache,
Expand Down Expand Up @@ -224,15 +209,6 @@ where
msg = self.from_service.select_next_some() => {
self.process_message_from_service(msg);
},
// Set peerset priority group to a new random set of addresses.
_ = self.priority_group_set_interval.next().fuse() => {
if let Err(e) = self.set_priority_group().await {
error!(
target: LOG_TARGET,
"Failed to set priority group: {:?}", e,
);
}
},
// Publish own addresses.
_ = self.publish_interval.next().fuse() => {
if let Err(e) = self.publish_ext_addresses().await {
Expand Down Expand Up @@ -580,52 +556,13 @@ where

Ok(intersection)
}

/// Set the peer set 'authority' priority group to a new random set of
/// [`Multiaddr`]s.
async fn set_priority_group(&self) -> Result<()> {
let addresses = self.addr_cache.get_random_subset();

if addresses.is_empty() {
debug!(
target: LOG_TARGET,
"Got no addresses in cache for peerset priority group.",
);
return Ok(());
}

if let Some(metrics) = &self.metrics {
metrics.priority_group_size.set(addresses.len().try_into().unwrap_or(std::u64::MAX));
}

debug!(
target: LOG_TARGET,
"Applying priority group {:?} to peerset.", addresses,
);

self.network
.set_priority_group(
AUTHORITIES_PRIORITY_GROUP_NAME.to_string(),
addresses.into_iter().collect(),
).await
.map_err(Error::SettingPeersetPriorityGroup)?;

Ok(())
}
}

/// NetworkProvider provides [`Worker`] with all necessary hooks into the
/// underlying Substrate networking. Using this trait abstraction instead of [`NetworkService`]
/// directly is necessary to unit test [`Worker`].
#[async_trait]
pub trait NetworkProvider: NetworkStateInfo {
/// Modify a peerset priority group.
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
) -> std::result::Result<(), String>;

/// Start putting a value in the Dht.
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>);

Expand All @@ -639,13 +576,6 @@ where
B: BlockT + 'static,
H: ExHashT,
{
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
) -> std::result::Result<(), String> {
self.set_priority_group(group_id, peers).await
}
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
self.put_value(key, value)
}
Expand All @@ -668,7 +598,6 @@ pub(crate) struct Metrics {
dht_event_received: CounterVec<U64>,
handle_value_found_event_failure: Counter<U64>,
known_authorities_count: Gauge<U64>,
priority_group_size: Gauge<U64>,
}

impl Metrics {
Expand Down Expand Up @@ -728,13 +657,6 @@ impl Metrics {
)?,
registry,
)?,
priority_group_size: register(
Gauge::new(
"authority_discovery_priority_group_size",
"Number of addresses passed to the peer set as a priority group."
)?,
registry,
)?,
})
}
}
Expand Down
41 changes: 0 additions & 41 deletions client/authority-discovery/src/worker/addr_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use libp2p::core::multiaddr::{Multiaddr, Protocol};
use rand::seq::SliceRandom;
use std::collections::HashMap;

use sp_authority_discovery::AuthorityId;
use sc_network::PeerId;

/// The maximum number of authority connections initialized through the authority discovery module.
///
/// In other words the maximum size of the `authority` peerset priority group.
const MAX_NUM_AUTHORITY_CONN: usize = 10;

/// Cache for [`AuthorityId`] -> [`Vec<Multiaddr>`] and [`PeerId`] -> [`AuthorityId`] mappings.
pub(super) struct AddrCache {
authority_id_to_addresses: HashMap<AuthorityId, Vec<Multiaddr>>,
Expand Down Expand Up @@ -75,30 +69,6 @@ impl AddrCache {
self.peer_id_to_authority_id.get(peer_id)
}

/// Returns a single address for a random subset (maximum of [`MAX_NUM_AUTHORITY_CONN`]) of all
/// known authorities.
pub fn get_random_subset(&self) -> Vec<Multiaddr> {
let mut rng = rand::thread_rng();

let mut addresses = self
.authority_id_to_addresses
.iter()
.filter_map(|(_authority_id, addresses)| {
debug_assert!(!addresses.is_empty());
addresses
.choose(&mut rng)
})
.collect::<Vec<&Multiaddr>>();

addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
addresses.dedup();

addresses
.choose_multiple(&mut rng, MAX_NUM_AUTHORITY_CONN)
.map(|a| (**a).clone())
.collect()
}

/// Removes all [`PeerId`]s and [`Multiaddr`]s from the cache that are not related to the given
/// [`AuthorityId`]s.
pub fn retain_ids(&mut self, authority_ids: &Vec<AuthorityId>) {
Expand Down Expand Up @@ -190,11 +160,6 @@ mod tests {
cache.insert(second.0.clone(), vec![second.1.clone()]);
cache.insert(third.0.clone(), vec![third.1.clone()]);

let subset = cache.get_random_subset();
assert!(
subset.contains(&first.1) && subset.contains(&second.1) && subset.contains(&third.1),
"Expect initial subset to contain all authorities.",
);
assert_eq!(
Some(&vec![third.1.clone()]),
cache.get_addresses_by_authority_id(&third.0),
Expand All @@ -208,12 +173,6 @@ mod tests {

cache.retain_ids(&vec![first.0, second.0]);

let subset = cache.get_random_subset();
assert!(
subset.contains(&first.1) || subset.contains(&second.1),
"Expected both first and second authority."
);
assert!(!subset.contains(&third.1), "Did not expect address from third authority");
assert_eq!(
None, cache.get_addresses_by_authority_id(&third.0),
"Expect `get_addresses_by_authority_id` to not return `None` for third authority."
Expand Down
44 changes: 1 addition & 43 deletions client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::worker::schema;

use std::{iter::FromIterator, sync::{Arc, Mutex}, task::Poll};
use std::{sync::{Arc, Mutex}, task::Poll};

use async_trait::async_trait;
use futures::channel::mpsc::{self, channel};
Expand Down Expand Up @@ -112,10 +112,6 @@ sp_api::mock_impl_runtime_apis! {
pub enum TestNetworkEvent {
GetCalled(kad::record::Key),
PutCalled(kad::record::Key, Vec<u8>),
SetPriorityGroupCalled {
group_id: String,
peers: HashSet<Multiaddr>
},
}

pub struct TestNetwork {
Expand All @@ -125,7 +121,6 @@ pub struct TestNetwork {
// vectors below.
pub put_value_call: Arc<Mutex<Vec<(kad::record::Key, Vec<u8>)>>>,
pub get_value_call: Arc<Mutex<Vec<kad::record::Key>>>,
pub set_priority_group_call: Arc<Mutex<Vec<(String, HashSet<Multiaddr>)>>>,
event_sender: mpsc::UnboundedSender<TestNetworkEvent>,
event_receiver: Option<mpsc::UnboundedReceiver<TestNetworkEvent>>,
}
Expand All @@ -147,7 +142,6 @@ impl Default for TestNetwork {
],
put_value_call: Default::default(),
get_value_call: Default::default(),
set_priority_group_call: Default::default(),
event_sender: tx,
event_receiver: Some(rx),
}
Expand All @@ -156,21 +150,6 @@ impl Default for TestNetwork {

#[async_trait]
impl NetworkProvider for TestNetwork {
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<Multiaddr>,
) -> std::result::Result<(), String> {
self.set_priority_group_call
.lock()
.unwrap()
.push((group_id.clone(), peers.clone()));
self.event_sender.clone().unbounded_send(TestNetworkEvent::SetPriorityGroupCalled {
group_id,
peers,
}).unwrap();
Ok(())
}
fn put_value(&self, key: kad::record::Key, value: Vec<u8>) {
self.put_value_call.lock().unwrap().push((key.clone(), value.clone()));
self.event_sender.clone().unbounded_send(TestNetworkEvent::PutCalled(key, value)).unwrap();
Expand Down Expand Up @@ -296,14 +275,6 @@ fn publish_discover_cycle() {
let (_dht_event_tx, dht_event_rx) = channel(1000);

let network: Arc<TestNetwork> = Arc::new(Default::default());
let node_a_multiaddr = {
let peer_id = network.local_peer_id();
let address = network.external_addresses().pop().unwrap();

address.with(multiaddr::Protocol::P2p(
peer_id.into(),
))
};

let key_store = KeyStore::new();

Expand Down Expand Up @@ -365,19 +336,6 @@ fn publish_discover_cycle() {

// Make authority discovery handle the event.
worker.handle_dht_event(dht_event).await;

worker.set_priority_group().await.unwrap();

// Expect authority discovery to set the priority set.
assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);

assert_eq!(
network.set_priority_group_call.lock().unwrap()[0],
(
"authorities".to_string(),
HashSet::from_iter(vec![node_a_multiaddr.clone()].into_iter())
)
);
}.boxed_local().into());

pool.run();
Expand Down
Loading