Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
approval-distribution: a fix for out-of-view messages (#4908)
Browse files Browse the repository at this point in the history
* approval-distribution: a fix for out-of-view messages

* approval-distribution: trace logs

* adjust the guide slightly

* comments and nits
  • Loading branch information
ordian authored Feb 22, 2022
1 parent c793509 commit 16bb323
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 42 deletions.
152 changes: 114 additions & 38 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ struct State {
gossip_peers: HashSet<PeerId>,
}

/// A short description of a validator's assignment or approval.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
enum MessageFingerprint {
Assignment(Hash, CandidateIndex, ValidatorIndex),
Expand All @@ -113,6 +114,11 @@ impl Knowledge {
}
}

/// The difference of our knowledge and peer's knowledge
/// that is used to send the missing information.
type MissingKnowledge = HashSet<MessageFingerprint>;

/// Information that has been circulated to and from a peer.
#[derive(Debug, Clone, Default)]
struct PeerKnowledge {
/// The knowledge we've sent to the peer.
Expand Down Expand Up @@ -970,21 +976,9 @@ impl State {
peer_id: PeerId,
view: View,
) {
let is_gossip_peer = gossip_peers.contains(&peer_id);
let lucky = is_gossip_peer ||
util::gen_ratio(
util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()),
util::MIN_GOSSIP_PEERS,
);

if !lucky {
tracing::trace!(target: LOG_TARGET, ?peer_id, "Unlucky peer");
return
}

metrics.on_unify_with_peer();
let _timer = metrics.time_unify_with_peer();
let mut to_send: Vec<Hash> = Vec::new();
let mut to_send: Vec<(Hash, MissingKnowledge)> = Vec::new();

let view_finalized_number = view.finalized_number;
for head in view.into_iter() {
Expand All @@ -995,43 +989,75 @@ impl State {
Some(entry) if entry.number > view_finalized_number => entry,
_ => return None,
};
let interesting_block = match entry.known_by.entry(peer_id.clone()) {
// step 3.
hash_map::Entry::Occupied(_) => return None,
let missing_knowledge = match entry.known_by.entry(peer_id.clone()) {
hash_map::Entry::Occupied(e) => {
let missing: MissingKnowledge = entry
.knowledge
.known_messages
.iter()
.filter(|m| !e.get().contains(m))
.cloned()
.collect();
// step 3.
// We assume if peer's knowledge is complete for block N,
// this is also true for its ancestors.
// This safeguard is needed primarily in case of long finality stalls
// so we don't waste time in a loop for every peer.
if missing.is_empty() {
tracing::trace!(
target: LOG_TARGET,
?block,
?peer_id,
"Stopping at this block, because peer knows all",
);
return None
}
missing
},
// step 4.
hash_map::Entry::Vacant(vacant) => {
let knowledge = PeerKnowledge {
sent: entry.knowledge.clone(),
received: Default::default(),
};
let knowledge = PeerKnowledge::default();
vacant.insert(knowledge);
block
entry.knowledge.known_messages.clone()
},
};
// step 5.
let interesting_block = block;
block = entry.parent_hash.clone();
Some(interesting_block)
Some((interesting_block, missing_knowledge))
});
to_send.extend(interesting_blocks);
}

let is_gossip_peer = gossip_peers.contains(&peer_id);
let lucky = is_gossip_peer ||
util::gen_ratio(
util::MIN_GOSSIP_PEERS.saturating_sub(gossip_peers.len()),
util::MIN_GOSSIP_PEERS,
);
if !lucky {
tracing::trace!(target: LOG_TARGET, ?peer_id, "Unlucky peer");
return
}

// step 6.
// send all assignments and approvals for all candidates in those blocks to the peer
Self::send_gossip_messages_to_peer(entries, ctx, peer_id, to_send).await;
}

async fn send_gossip_messages_to_peer(
entries: &HashMap<Hash, BlockEntry>,
entries: &mut HashMap<Hash, BlockEntry>,
ctx: &mut (impl SubsystemContext<Message = ApprovalDistributionMessage>
+ overseer::SubsystemContext<Message = ApprovalDistributionMessage>),
peer_id: PeerId,
blocks: Vec<Hash>,
blocks: Vec<(Hash, MissingKnowledge)>,
) {
let mut assignments = Vec::new();
let mut approvals = Vec::new();
let num_blocks = blocks.len();

for block in blocks.into_iter() {
let entry = match entries.get(&block) {
for (block, missing) in blocks.into_iter() {
let entry = match entries.get_mut(&block) {
Some(entry) => entry,
None => continue, // should be unreachable
};
Expand All @@ -1048,8 +1074,27 @@ impl State {
for (validator_index, (approval_state, _is_local)) in
candidate_entry.approvals.iter()
{
let assignment_fingerprint = MessageFingerprint::Assignment(
block.clone(),
candidate_index,
validator_index.clone(),
);

match approval_state {
ApprovalState::Assigned(cert) => {
if !missing.contains(&assignment_fingerprint) {
tracing::trace!(
target: LOG_TARGET,
?block,
?validator_index,
?candidate_index,
"Skipping sending known assignment",
);
continue
}
if let Some(p) = entry.known_by.get_mut(&peer_id) {
p.sent.insert(assignment_fingerprint);
}
assignments.push((
IndirectAssignmentCert {
block_hash: block.clone(),
Expand All @@ -1060,20 +1105,51 @@ impl State {
));
},
ApprovalState::Approved(assignment_cert, signature) => {
assignments.push((
IndirectAssignmentCert {
let fingerprint = MessageFingerprint::Approval(
block.clone(),
candidate_index,
validator_index.clone(),
);
if missing.contains(&assignment_fingerprint) {
if let Some(p) = entry.known_by.get_mut(&peer_id) {
p.sent.insert(assignment_fingerprint);
}
assignments.push((
IndirectAssignmentCert {
block_hash: block.clone(),
validator: validator_index.clone(),
cert: assignment_cert.clone(),
},
candidate_index.clone(),
));
} else {
tracing::trace!(
target: LOG_TARGET,
?block,
?validator_index,
?candidate_index,
"Skipping sending known assignment",
);
}
if missing.contains(&fingerprint) {
if let Some(p) = entry.known_by.get_mut(&peer_id) {
p.sent.insert(fingerprint);
}
approvals.push(IndirectSignedApprovalVote {
block_hash: block.clone(),
validator: validator_index.clone(),
cert: assignment_cert.clone(),
},
candidate_index.clone(),
));
approvals.push(IndirectSignedApprovalVote {
block_hash: block.clone(),
validator: validator_index.clone(),
candidate_index: candidate_index.clone(),
signature: signature.clone(),
});
candidate_index: candidate_index.clone(),
signature: signature.clone(),
});
} else {
tracing::trace!(
target: LOG_TARGET,
?block,
?validator_index,
?candidate_index,
"Skipping sending known approval",
);
}
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ Imports an approval signature referenced by block hash and candidate index:

#### `unify_with_peer(peer: PeerId, view)`:

1. Initialize a set `fresh_blocks = {}`
1. Initialize a set `missing_knowledge = {}`

For each block in the view:
2. Load the `BlockEntry` for the block. If the block is unknown, or the number is less than or equal to the view's finalized number go to step 6.
3. Inspect the `known_by` set of the `BlockEntry`. If the peer is already present, go to step 6.
4. Add the peer to `known_by` with a cloned version of `block_entry.knowledge`. and add the hash of the block to `fresh_blocks`.
3. Inspect the `known_by` set of the `BlockEntry`. If the peer already knows all assignments/approvals, go to step 6.
4. Add the peer to `known_by` and add the hash and missing knowledge of the block to `missing_knowledge`.
5. Return to step 2 with the ancestor of the block.

6. For each block in `fresh_blocks`, send all assignments and approvals for all candidates in those blocks to the peer.
6. For each block in `missing_knowledge`, send all assignments and approvals for all candidates in those blocks to the peer.

0 comments on commit 16bb323

Please sign in to comment.