Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
have some lenience on outdated messages in statement distribution (#5150
Browse files Browse the repository at this point in the history
)

* have some lenience on outdated messages in statement distribution

* fmt
  • Loading branch information
rphmeier authored Mar 18, 2022
1 parent bbe244a commit 9361ef3
Showing 1 changed file with 51 additions and 4 deletions.
55 changes: 51 additions & 4 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use indexmap::{map::Entry as IEntry, IndexMap};
use sp_keystore::SyncCryptoStorePtr;
use util::runtime::RuntimeInfo;

use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};

use fatality::Nested;

Expand Down Expand Up @@ -153,6 +153,27 @@ impl StatementDistributionSubsystem {
}
}

#[derive(Default)]
struct RecentOutdatedHeads {
buf: VecDeque<Hash>,
}

impl RecentOutdatedHeads {
fn note_outdated(&mut self, hash: Hash) {
const MAX_BUF_LEN: usize = 10;

self.buf.push_back(hash);

while self.buf.len() > MAX_BUF_LEN {
let _ = self.buf.pop_front();
}
}

fn is_recent_outdated(&self, hash: &Hash) -> bool {
self.buf.contains(hash)
}
}

/// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent.
///
Expand Down Expand Up @@ -1287,15 +1308,25 @@ async fn handle_incoming_message_and_circulate<'a>(
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
) {
let handled_incoming = match peers.get_mut(&peer) {
Some(data) =>
handle_incoming_message(peer, data, active_heads, ctx, message, req_sender, metrics)
.await,
handle_incoming_message(
peer,
data,
active_heads,
recent_outdated_heads,
ctx,
message,
req_sender,
metrics,
)
.await,
None => None,
};

Expand Down Expand Up @@ -1331,6 +1362,7 @@ async fn handle_incoming_message<'a>(
peer: PeerId,
peer_data: &mut PeerData,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
Expand All @@ -1347,7 +1379,11 @@ async fn handle_incoming_message<'a>(
%relay_parent,
"our view out-of-sync with active heads; head not found",
);
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;

if !recent_outdated_heads.is_recent_outdated(&relay_parent) {
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
}

return None
},
};
Expand Down Expand Up @@ -1556,6 +1592,7 @@ async fn handle_network_update(
gossip_peers: &mut HashSet<PeerId>,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
req_sender: &mpsc::Sender<RequesterMessage>,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
Expand Down Expand Up @@ -1612,6 +1649,7 @@ async fn handle_network_update(
gossip_peers,
peers,
active_heads,
&*recent_outdated_heads,
ctx,
message,
req_sender,
Expand Down Expand Up @@ -1653,6 +1691,7 @@ impl StatementDistributionSubsystem {
let mut gossip_peers: HashSet<PeerId> = HashSet::new();
let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut recent_outdated_heads = RecentOutdatedHeads::default();

let mut runtime = RuntimeInfo::new(Some(self.keystore.clone()));

Expand Down Expand Up @@ -1684,6 +1723,7 @@ impl StatementDistributionSubsystem {
&mut gossip_peers,
&mut authorities,
&mut active_heads,
&mut recent_outdated_heads,
&req_sender,
result?,
)
Expand All @@ -1701,6 +1741,7 @@ impl StatementDistributionSubsystem {
&gossip_peers,
&mut peers,
&mut active_heads,
&recent_outdated_heads,
&req_sender,
result.ok_or(FatalError::RequesterReceiverFinished)?,
)
Expand Down Expand Up @@ -1767,6 +1808,7 @@ impl StatementDistributionSubsystem {
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
req_sender: &mpsc::Sender<RequesterMessage>,
message: RequesterMessage,
) -> JfyiErrorResult<()> {
Expand Down Expand Up @@ -1814,6 +1856,7 @@ impl StatementDistributionSubsystem {
gossip_peers,
peers,
active_heads,
recent_outdated_heads,
ctx,
message,
req_sender,
Expand Down Expand Up @@ -1874,6 +1917,7 @@ impl StatementDistributionSubsystem {
gossip_peers: &mut HashSet<PeerId>,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &mut RecentOutdatedHeads,
req_sender: &mpsc::Sender<RequesterMessage>,
message: FromOverseer<StatementDistributionMessage>,
) -> Result<bool> {
Expand All @@ -1893,6 +1937,8 @@ impl StatementDistributionSubsystem {
hash = ?deactivated,
"Deactivating leaf",
);

recent_outdated_heads.note_outdated(deactivated);
}
}

Expand Down Expand Up @@ -1985,6 +2031,7 @@ impl StatementDistributionSubsystem {
gossip_peers,
authorities,
active_heads,
&*recent_outdated_heads,
ctx,
req_sender,
event,
Expand Down

0 comments on commit 9361ef3

Please sign in to comment.