Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
grandpa: progressively increase target gossip peers (#4050)
Browse files Browse the repository at this point in the history
* grandpa: stricter gossip message filtering

* gossip: remove filtered message on send_message

* gossip: add test for tracking of broadcast attempts

* grandpa: only restrict gossip if we're connected to more than 5 authorities

* grandpa: add test for progressive gossip

* grandpa: add test for gossip filtering on local non-authority node

* grandpa: fix doc

* gossip, grandpa: tabify

* grandpa: relax filtering logic for global messages
  • Loading branch information
andresilva authored and gavofyork committed Nov 8, 2019
1 parent 9cda7fa commit 3fea329
Show file tree
Hide file tree
Showing 2 changed files with 469 additions and 18 deletions.
304 changes: 300 additions & 4 deletions core/finality-grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use substrate_telemetry::{telemetry, CONSENSUS_DEBUG};
use log::{trace, debug, warn};
use futures::prelude::*;
use futures::sync::mpsc;
use rand::Rng;

use crate::{environment, CatchUp, CompactCommit, SignedMessage};
use super::{cost, benefit, Round, SetId};
Expand Down Expand Up @@ -483,6 +484,14 @@ impl<N: Ord> Peers<N> {
fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo<N>> {
self.inner.get(who)
}

fn authorities(&self) -> usize {
self.inner.iter().filter(|(_, info)| info.roles.is_authority()).count()
}

fn non_authorities(&self) -> usize {
self.inner.iter().filter(|(_, info)| !info.roles.is_authority()).count()
}
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -980,6 +989,122 @@ impl<Block: BlockT> Inner<Block> {

(true, report)
}

/// The initial logic for filtering round messages follows the given state
/// transitions:
///
/// - State 0: not allowed to anyone (only if our local node is not an authority)
/// - State 1: allowed to random `sqrt(authorities)`
/// - State 2: allowed to all authorities
/// - State 3: allowed to random `sqrt(non-authorities)`
/// - State 4: allowed to all non-authorities
///
/// Transitions will be triggered on repropagation attempts by the
/// underlying gossip layer, which should happen every 30 seconds.
fn round_message_allowed<N>(&self, peer: &PeerInfo<N>, mut previous_attempts: usize) -> bool {
const MIN_AUTHORITIES: usize = 5;

if !self.config.is_authority && previous_attempts == 0 {
// non-authority nodes don't gossip any messages right away. we
// assume that authorities (and sentries) are strongly connected, so
// it should be unnecessary for non-authorities to gossip all
// messages right away.
return false;
}

if !self.config.is_authority {
// since the node is not an authority we skipped the initial attempt
// to gossip the message, therefore we decrement `previous_attempts`
// so that the state machine below works the same way it does for
// authority nodes.
previous_attempts -= 1;
}

if peer.roles.is_authority() {
let authorities = self.peers.authorities();

// the target node is an authority, on the first attempt we start by
// sending the message to only `sqrt(authorities)` (if we're
// connected to at least `MIN_AUTHORITIES`).
if previous_attempts == 0 && authorities > MIN_AUTHORITIES {
let authorities = authorities as f64;
let p = (authorities.sqrt()).max(MIN_AUTHORITIES as f64) / authorities;
rand::thread_rng().gen_bool(p)
} else {
// otherwise we already went through the step above, so
// we won't filter the message and send it to all
// authorities for whom it is polite to do so
true
}
} else {
// the node is not an authority so we apply stricter filters
if previous_attempts >= 3 {
// if we previously tried to send this message 3 (or more)
// times, then it is allowed to be sent to all peers.
true
} else if previous_attempts == 2 {
// otherwise we only send it to `sqrt(non-authorities)`.
let non_authorities = self.peers.non_authorities() as f64;
let p = non_authorities.sqrt() / non_authorities;
rand::thread_rng().gen_bool(p)
} else {
false
}
}
}

/// The initial logic for filtering global messages follows the given state
/// transitions:
///
/// - State 0: send to `sqrt(authorities)` ++ `sqrt(non-authorities)`.
/// - State 1: send to all authorities
/// - State 2: send to all non-authorities
///
/// We are more lenient with global messages since there should be a lot
/// less global messages than round messages (just commits), and we want
/// these to propagate to non-authorities fast enough so that they can
/// observe finality.
///
/// Transitions will be triggered on repropagation attempts by the
/// underlying gossip layer, which should happen every 30 seconds.
fn global_message_allowed<N>(&self, peer: &PeerInfo<N>, previous_attempts: usize) -> bool {
const MIN_PEERS: usize = 5;

if peer.roles.is_authority() {
let authorities = self.peers.authorities();

// the target node is an authority, on the first attempt we start by
// sending the message to only `sqrt(authorities)` (if we're
// connected to at least `MIN_PEERS`).
if previous_attempts == 0 && authorities > MIN_PEERS {
let authorities = authorities as f64;
let p = (authorities.sqrt()).max(MIN_PEERS as f64) / authorities;
rand::thread_rng().gen_bool(p)
} else {
// otherwise we already went through the step above, so
// we won't filter the message and send it to all
// authorities for whom it is polite to do so
true
}
} else {
let non_authorities = self.peers.non_authorities();

// the target node is not an authority, on the first and second
// attempt we start by sending the message to only
// `sqrt(non_authorities)` (if we're connected to at least
// `MIN_PEERS`).
if previous_attempts <= 1 && non_authorities > MIN_PEERS {
let non_authorities = non_authorities as f64;
let p = (non_authorities.sqrt()).max(MIN_PEERS as f64) / non_authorities ;
rand::thread_rng().gen_bool(p)
} else {
// otherwise we already went through the step above, so
// we won't filter the message and send it to all
// non-authorities for whom it is polite to do so
true
}
}
}
}

/// A validator for GRANDPA gossip messages.
Expand Down Expand Up @@ -1190,6 +1315,20 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
Some(x) => x,
};

if let MessageIntent::Broadcast { previous_attempts } = intent {
if maybe_round.is_some() {
if !inner.round_message_allowed(peer, previous_attempts) {
// early return if the vote message isn't allowed at this stage.
return false;
}
} else {
if !inner.global_message_allowed(peer, previous_attempts) {
// early return if the global message isn't allowed at this stage.
return false;
}
}
}

// if the topic is not something the peer accepts, discard.
if let Some(round) = maybe_round {
return peer.view.consider_vote(round, set_id) == Consider::Accept
Expand All @@ -1209,8 +1348,8 @@ impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block>
Ok(GossipMessage::Commit(full)) => {
// we only broadcast our best commit and only if it's
// better than last received by peer.
Some(full.message.target_number) == our_best_commit
&& Some(full.message.target_number) > peer_best_commit
Some(full.message.target_number) == our_best_commit &&
Some(full.message.target_number) > peer_best_commit
}
Ok(GossipMessage::Neighbor(_)) => false,
Ok(GossipMessage::CatchUpRequest(_)) => false,
Expand Down Expand Up @@ -1311,7 +1450,7 @@ mod tests {
use super::environment::SharedVoterSetState;
use network_gossip::Validator as GossipValidatorT;
use network::test::Block;
use primitives::crypto::Public;
use primitives::{crypto::Public, H256};

// some random config (not really needed)
fn config() -> crate::Config {
Expand All @@ -1329,7 +1468,6 @@ mod tests {
fn voter_set_state() -> SharedVoterSetState<Block> {
use crate::authorities::AuthoritySet;
use crate::environment::VoterSetState;
use primitives::H256;

let base = (H256::zero(), 0);
let voters = AuthoritySet::genesis(Vec::new());
Expand Down Expand Up @@ -1991,4 +2129,162 @@ mod tests {
)
}
}

#[test]
fn progressively_gossips_to_more_peers() {
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
);

// the validator start at set id 0
val.note_set(SetId(0), Vec::new(), |_, _| {});

// add 60 peers, 30 authorities and 30 full nodes
let mut authorities = Vec::new();
authorities.resize_with(30, || PeerId::random());

let mut full_nodes = Vec::new();
full_nodes.resize_with(30, || PeerId::random());

for i in 0..30 {
val.inner.write().peers.new_peer(authorities[i].clone(), Roles::AUTHORITY);
val.inner.write().peers.new_peer(full_nodes[i].clone(), Roles::FULL);
}

let test = |previous_attempts, peers| {
let mut message_allowed = val.message_allowed();

move || {
let mut allowed = 0;
for peer in peers {
if message_allowed(
peer,
MessageIntent::Broadcast { previous_attempts },
&crate::communication::round_topic::<Block>(1, 0),
&[],
) {
allowed += 1;
}
}
allowed
}
};

fn trial<F: FnMut() -> usize>(mut test: F) -> usize {
let mut results = Vec::new();
let n = 1000;

for _ in 0..n {
results.push(test());
}

let n = results.len();
let sum: usize = results.iter().sum();

sum / n
}

// on the first attempt we will only gossip to `sqrt(authorities)`,
// which should average out to 5 peers after a couple of trials
assert_eq!(trial(test(0, &authorities)), 5);

// on the second (and subsequent attempts) we should gossip to all
// authorities we're connected to.
assert_eq!(trial(test(1, &authorities)), 30);
assert_eq!(trial(test(2, &authorities)), 30);

// we should only gossip to non-authorities after the third attempt
assert_eq!(trial(test(0, &full_nodes)), 0);
assert_eq!(trial(test(1, &full_nodes)), 0);

// and only to `sqrt(non-authorities)`
assert_eq!(trial(test(2, &full_nodes)), 5);

// only on the fourth attempt should we gossip to all non-authorities
assert_eq!(trial(test(3, &full_nodes)), 30);
}

#[test]
fn only_restricts_gossip_to_authorities_after_a_minimum_threshold() {
let (val, _) = GossipValidator::<Block>::new(
config(),
voter_set_state(),
);

// the validator start at set id 0
val.note_set(SetId(0), Vec::new(), |_, _| {});

let mut authorities = Vec::new();
for _ in 0..5 {
let peer_id = PeerId::random();
val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY);
authorities.push(peer_id);
}

let mut message_allowed = val.message_allowed();

// since we're only connected to 5 authorities, we should never restrict
// sending of gossip messages, and instead just allow them to all
// non-authorities on the first attempt.
for authority in &authorities {
assert!(
message_allowed(
authority,
MessageIntent::Broadcast { previous_attempts: 0 },
&crate::communication::round_topic::<Block>(1, 0),
&[],
)
);
}
}

#[test]
fn non_authorities_never_gossip_messages_on_first_attempt() {
let mut config = config();
config.is_authority = false;

let (val, _) = GossipValidator::<Block>::new(
config,
voter_set_state(),
);

// the validator start at set id 0
val.note_set(SetId(0), Vec::new(), |_, _| {});

let mut authorities = Vec::new();
for _ in 0..100 {
let peer_id = PeerId::random();
val.inner.write().peers.new_peer(peer_id.clone(), Roles::AUTHORITY);
authorities.push(peer_id);
}

let mut message_allowed = val.message_allowed();

// since our node is not an authority we should **never** gossip any
// messages on the first attempt.
for authority in &authorities {
assert!(
!message_allowed(
authority,
MessageIntent::Broadcast { previous_attempts: 0 },
&crate::communication::round_topic::<Block>(1, 0),
&[],
)
);
}

// on the third attempt we should allow messages to authorities
// (on the second attempt we would do `sqrt(authorities)`)
for authority in &authorities {
assert!(
message_allowed(
authority,
MessageIntent::Broadcast { previous_attempts: 2 },
&crate::communication::round_topic::<Block>(1, 0),
&[],
)
);
}
}
}
Loading

0 comments on commit 3fea329

Please sign in to comment.