Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
client/network: Make NetworkService::set_priority_group async (#7352)
Browse files Browse the repository at this point in the history
As done with `NetworkService::{add_to,remove_from}_priority_group`, make
`NetworkService::set_priority_group` async as well. This future-proofs
the API should we ever decide to use a bounded channel between
`NetworkService` and `NetworkWorker`.
  • Loading branch information
mxinden authored Oct 22, 2020
1 parent 4279f33 commit 575cbf8
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/authority-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ targets = ["x86_64-unknown-linux-gnu"]
prost-build = "0.6.1"

[dependencies]
async-trait = "0.1"
bytes = "0.5.0"
codec = { package = "parity-scale-codec", default-features = false, version = "1.3.4" }
derive_more = "0.99.2"
Expand Down
15 changes: 9 additions & 6 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
use futures_timer::Delay;

use addr_cache::AddrCache;
use async_trait::async_trait;
use codec::Decode;
use either::Either;
use libp2p::{core::multiaddr, multihash::Multihash};
Expand Down Expand Up @@ -267,7 +268,7 @@ where
},
// Set peerset priority group to a new random set of addresses.
_ = self.priority_group_set_interval.next().fuse() => {
if let Err(e) = self.set_priority_group() {
if let Err(e) = self.set_priority_group().await {
error!(
target: LOG_TARGET,
"Failed to set priority group: {:?}", e,
Expand Down Expand Up @@ -629,7 +630,7 @@ where

/// Set the peer set 'authority' priority group to a new random set of
/// [`Multiaddr`]s.
fn set_priority_group(&self) -> Result<()> {
async fn set_priority_group(&self) -> Result<()> {
let addresses = self.addr_cache.get_random_subset();

if addresses.is_empty() {
Expand All @@ -653,7 +654,7 @@ where
.set_priority_group(
AUTHORITIES_PRIORITY_GROUP_NAME.to_string(),
addresses.into_iter().collect(),
)
).await
.map_err(Error::SettingPeersetPriorityGroup)?;

Ok(())
Expand All @@ -663,9 +664,10 @@ where
/// NetworkProvider provides [`Worker`] with all necessary hooks into the
/// underlying Substrate networking. Using this trait abstraction instead of [`NetworkService`]
/// directly is necessary to unit test [`Worker`].
#[async_trait]
pub trait NetworkProvider: NetworkStateInfo {
/// Modify a peerset priority group.
fn set_priority_group(
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
Expand All @@ -678,17 +680,18 @@ pub trait NetworkProvider: NetworkStateInfo {
fn get_value(&self, key: &libp2p::kad::record::Key);
}

#[async_trait::async_trait]
impl<B, H> NetworkProvider for sc_network::NetworkService<B, H>
where
B: BlockT + 'static,
H: ExHashT,
{
fn set_priority_group(
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<libp2p::Multiaddr>,
) -> std::result::Result<(), String> {
self.set_priority_group(group_id, peers)
self.set_priority_group(group_id, peers).await
}
fn put_value(&self, key: libp2p::kad::record::Key, value: Vec<u8>) {
self.put_value(key, value)
Expand Down
8 changes: 5 additions & 3 deletions client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::worker::schema;

use std::{iter::FromIterator, sync::{Arc, Mutex}, task::Poll};

use async_trait::async_trait;
use futures::channel::mpsc::{self, channel};
use futures::executor::{block_on, LocalPool};
use futures::future::FutureExt;
Expand Down Expand Up @@ -213,8 +214,9 @@ impl Default for TestNetwork {
}
}

#[async_trait]
impl NetworkProvider for TestNetwork {
fn set_priority_group(
async fn set_priority_group(
&self,
group_id: String,
peers: HashSet<Multiaddr>,
Expand Down Expand Up @@ -424,7 +426,7 @@ fn publish_discover_cycle() {
// Make authority discovery handle the event.
worker.handle_dht_event(dht_event).await;

worker.set_priority_group().unwrap();
worker.set_priority_group().await.unwrap();

// Expect authority discovery to set the priority set.
assert_eq!(network.set_priority_group_call.lock().unwrap().len(), 1);
Expand Down Expand Up @@ -623,7 +625,7 @@ fn never_add_own_address_to_priority_group() {
sentry_worker.start_new_lookups();

sentry_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
sentry_worker.set_priority_group().unwrap();
block_on(sentry_worker.set_priority_group()).unwrap();

assert_eq!(
sentry_network.set_priority_group_call.lock().unwrap().len(), 1,
Expand Down
5 changes: 4 additions & 1 deletion client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
///
/// Returns an `Err` if one of the given addresses is invalid or contains an
/// invalid peer ID (which includes the local peer ID).
pub fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> {
//
// NOTE: even though this function is currently sync, it's marked as async for
// future-proofing, see https://github.com/paritytech/substrate/pull/7247#discussion_r502263451.
pub async fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> {
let peers = self.split_multiaddr_and_peer_id(peers)?;

let peer_ids = peers.iter().map(|(peer_id, _addr)| peer_id.clone()).collect();
Expand Down

0 comments on commit 575cbf8

Please sign in to comment.