Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

have some lenience on outdated messages in statement distribution #5150

Merged
merged 2 commits into from
Mar 18, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions node/network/statement-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use indexmap::{map::Entry as IEntry, IndexMap};
use sp_keystore::SyncCryptoStorePtr;
use util::runtime::RuntimeInfo;

use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};

use fatality::Nested;

Expand Down Expand Up @@ -153,6 +153,27 @@ impl StatementDistributionSubsystem {
}
}

#[derive(Default)]
struct RecentOutdatedHeads {
buf: VecDeque<Hash>,
}

impl RecentOutdatedHeads {
fn note_outdated(&mut self, hash: Hash) {
const MAX_BUF_LEN: usize = 10;

self.buf.push_back(hash);

while self.buf.len() > MAX_BUF_LEN {
let _ = self.buf.pop_front();
}
}

fn is_recent_outdated(&self, hash: &Hash) -> bool {
self.buf.contains(hash)
}
}

/// Tracks our impression of a single peer's view of the candidates a validator has seconded
/// for a given relay-parent.
///
Expand Down Expand Up @@ -1267,15 +1288,25 @@ async fn handle_incoming_message_and_circulate<'a>(
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
metrics: &Metrics,
) {
let handled_incoming = match peers.get_mut(&peer) {
Some(data) =>
handle_incoming_message(peer, data, active_heads, ctx, message, req_sender, metrics)
.await,
handle_incoming_message(
peer,
data,
active_heads,
recent_outdated_heads,
ctx,
message,
req_sender,
metrics,
)
.await,
None => None,
};

Expand All @@ -1302,6 +1333,7 @@ async fn handle_incoming_message<'a>(
peer: PeerId,
peer_data: &mut PeerData,
active_heads: &'a mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
message: protocol_v1::StatementDistributionMessage,
req_sender: &mpsc::Sender<RequesterMessage>,
Expand All @@ -1317,7 +1349,11 @@ async fn handle_incoming_message<'a>(
%relay_parent,
"our view out-of-sync with active heads; head not found",
);
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;

if !recent_outdated_heads.is_recent_outdated(&relay_parent) {
report_peer(ctx, peer, COST_UNEXPECTED_STATEMENT).await;
}

return None
},
};
Expand Down Expand Up @@ -1526,6 +1562,7 @@ async fn handle_network_update(
gossip_peers: &mut HashSet<PeerId>,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
ctx: &mut (impl SubsystemContext + overseer::SubsystemContext),
req_sender: &mpsc::Sender<RequesterMessage>,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
Expand Down Expand Up @@ -1581,6 +1618,7 @@ async fn handle_network_update(
gossip_peers,
peers,
active_heads,
&*recent_outdated_heads,
ctx,
message,
req_sender,
Expand Down Expand Up @@ -1621,6 +1659,7 @@ impl StatementDistributionSubsystem {
let mut gossip_peers: HashSet<PeerId> = HashSet::new();
let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut recent_outdated_heads = RecentOutdatedHeads::default();

let mut runtime = RuntimeInfo::new(Some(self.keystore.clone()));

Expand Down Expand Up @@ -1652,6 +1691,7 @@ impl StatementDistributionSubsystem {
&mut gossip_peers,
&mut authorities,
&mut active_heads,
&mut recent_outdated_heads,
&req_sender,
result?,
)
Expand All @@ -1669,6 +1709,7 @@ impl StatementDistributionSubsystem {
&gossip_peers,
&mut peers,
&mut active_heads,
&recent_outdated_heads,
&req_sender,
result.ok_or(FatalError::RequesterReceiverFinished)?,
)
Expand Down Expand Up @@ -1735,6 +1776,7 @@ impl StatementDistributionSubsystem {
gossip_peers: &HashSet<PeerId>,
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &RecentOutdatedHeads,
req_sender: &mpsc::Sender<RequesterMessage>,
message: RequesterMessage,
) -> JfyiErrorResult<()> {
Expand Down Expand Up @@ -1782,6 +1824,7 @@ impl StatementDistributionSubsystem {
gossip_peers,
peers,
active_heads,
recent_outdated_heads,
ctx,
message,
req_sender,
Expand Down Expand Up @@ -1842,6 +1885,7 @@ impl StatementDistributionSubsystem {
gossip_peers: &mut HashSet<PeerId>,
authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
recent_outdated_heads: &mut RecentOutdatedHeads,
req_sender: &mpsc::Sender<RequesterMessage>,
message: FromOverseer<StatementDistributionMessage>,
) -> Result<bool> {
Expand All @@ -1861,6 +1905,8 @@ impl StatementDistributionSubsystem {
hash = ?deactivated,
"Deactivating leaf",
);

recent_outdated_heads.note_outdated(deactivated);
}
}

Expand Down Expand Up @@ -1955,6 +2001,7 @@ impl StatementDistributionSubsystem {
gossip_peers,
authorities,
active_heads,
&*recent_outdated_heads,
ctx,
req_sender,
event,
Expand Down