Skip to content

Commit

Permalink
[stable2407] Backport #6696 (#6839)
Browse files Browse the repository at this point in the history
Backport #6696 into `stable2407` from alexggh.

See the
[documentation](https://github.com/paritytech/polkadot-sdk/blob/master/docs/BACKPORT.md)
on how to use this bot.

<!--
  # To be used by other automation, do not modify:
  original-pr-number: #${pull_number}
-->

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com>
Co-authored-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
  • Loading branch information
3 people authored Dec 11, 2024
1 parent 3fd8c60 commit f2081f6
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 34 deletions.
143 changes: 110 additions & 33 deletions polkadot/node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl Default for AggressionConfig {
fn default() -> Self {
AggressionConfig {
l1_threshold: Some(16),
l2_threshold: Some(28),
l2_threshold: Some(64),
resend_unfinalized_period: Some(8),
}
}
Expand Down Expand Up @@ -491,6 +491,8 @@ struct BlockEntry {
/// Approval entries for whole block. These also contain all approvals in the case of multiple
/// candidates being claimed by assignments.
approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>,
/// Backing off from re-sending messages to peers.
last_resent_at_block_number: Option<u32>,
}

impl BlockEntry {
Expand Down Expand Up @@ -786,6 +788,7 @@ impl State {
candidates,
session: meta.session,
approval_entries: HashMap::new(),
last_resent_at_block_number: None,
});

self.topologies.inc_session_refs(meta.session);
Expand Down Expand Up @@ -1168,6 +1171,33 @@ impl State {
self.enable_aggression(ctx, Resend::No, metrics).await;
}

// When finality is lagging as a last resort nodes start sending the messages they have
// multiples times. This means it is safe to accept duplicate messages without punishing the
// peer and reduce the reputation and can end up banning the Peer, which in turn will create
// more no-shows.
fn accept_duplicates_from_validators(
blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
topologies: &SessionGridTopologies,
aggression_config: &AggressionConfig,
entry: &BlockEntry,
peer: PeerId,
) -> bool {
let topology = topologies.get_topology(entry.session);
let min_age = blocks_by_number.iter().next().map(|(num, _)| num);
let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num);

// Return if we don't have at least 1 block.
let (min_age, max_age) = match (min_age, max_age) {
(Some(min), Some(max)) => (*min, *max),
_ => return false,
};

let age = max_age.saturating_sub(min_age);

aggression_config.should_trigger_aggression(age) &&
topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false)
}

async fn import_and_circulate_assignment<Context, R>(
&mut self,
ctx: &mut Context,
Expand Down Expand Up @@ -1239,20 +1269,29 @@ impl State {
if peer_knowledge.contains(&message_subject, message_kind) {
// wasn't included before
if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
gum::debug!(
target: LOG_TARGET,
?peer_id,
?message_subject,
"Duplicate assignment",
);

modify_reputation(
&mut self.reputation,
ctx.sender(),
if !Self::accept_duplicates_from_validators(
&self.blocks_by_number,
&self.topologies,
&self.aggression_config,
entry,
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
) {
gum::debug!(
target: LOG_TARGET,
?peer_id,
?message_subject,
"Duplicate assignment",
);

modify_reputation(
&mut self.reputation,
ctx.sender(),
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}

metrics.on_assignment_duplicate();
} else {
gum::trace!(
Expand Down Expand Up @@ -1530,6 +1569,9 @@ impl State {
assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
approval_knowledge_key: &(MessageSubject, MessageKind),
entry: &mut BlockEntry,
blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
topologies: &SessionGridTopologies,
aggression_config: &AggressionConfig,
reputation: &mut ReputationAggregator,
peer_id: PeerId,
metrics: &Metrics,
Expand Down Expand Up @@ -1557,20 +1599,27 @@ impl State {
.received
.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
{
gum::trace!(
target: LOG_TARGET,
?peer_id,
?approval_knowledge_key,
"Duplicate approval",
);

modify_reputation(
reputation,
ctx.sender(),
if !Self::accept_duplicates_from_validators(
blocks_by_number,
topologies,
aggression_config,
entry,
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
) {
gum::trace!(
target: LOG_TARGET,
?peer_id,
?approval_knowledge_key,
"Duplicate approval",
);
modify_reputation(
reputation,
ctx.sender(),
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}
metrics.on_approval_duplicate();
}
return false
Expand Down Expand Up @@ -1669,6 +1718,9 @@ impl State {
&assignments_knowledge_keys,
&approval_knwowledge_key,
entry,
&self.blocks_by_number,
&self.topologies,
&self.aggression_config,
&mut self.reputation,
peer_id,
metrics,
Expand Down Expand Up @@ -2056,18 +2108,43 @@ impl State {
&self.topologies,
|block_entry| {
let block_age = max_age - block_entry.number;
// We want to resend only for blocks of min_age, there is no point in
// resending for blocks newer than that, because we are just going to create load
// and not gain anything.
let diff_from_min_age = block_entry.number - min_age;

// We want to back-off on resending for blocks that have been resent recently, to
// give time for nodes to process all the extra messages, if we still have not
// finalized we are going to resend again after unfinalized_period * 2 since the
// last resend.
let blocks_since_last_sent = block_entry
.last_resent_at_block_number
.map(|last_resent_at_block_number| max_age - last_resent_at_block_number);

let can_resend_at_this_age = blocks_since_last_sent
.zip(config.resend_unfinalized_period)
.map(|(blocks_since_last_sent, unfinalized_period)| {
blocks_since_last_sent >= unfinalized_period * 2
})
.unwrap_or(true);

if resend == Resend::Yes &&
config
.resend_unfinalized_period
.as_ref()
.map_or(false, |p| block_age > 0 && block_age % p == 0)
{
config.resend_unfinalized_period.as_ref().map_or(false, |p| {
block_age > 0 &&
block_age % p == 0 && diff_from_min_age == 0 &&
can_resend_at_this_age
}) {
// Retry sending to all peers.
for (_, knowledge) in block_entry.known_by.iter_mut() {
knowledge.sent = Knowledge::default();
}

block_entry.last_resent_at_block_number = Some(max_age);
gum::debug!(
target: LOG_TARGET,
block_number = ?block_entry.number,
?max_age,
"Aggression enabled with resend for block",
);
true
} else {
false
Expand Down
127 changes: 126 additions & 1 deletion polkadot/node/network/approval-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,131 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() {
});
}

#[test]
fn peer_sending_us_duplicates_while_aggression_enabled_is_ok() {
let parent_hash = Hash::repeat_byte(0xFF);
let hash = Hash::repeat_byte(0xAA);

let peers = make_peers_and_authority_ids(8);
let peer_a = peers.first().unwrap().0;

let _ = test_harness(state_without_reputation_delay(), |mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
let peer = &peer_a;
setup_peer_with_view(overseer, peer, view![], ValidationVersion::V3).await;

let peers_with_optional_peer_id = peers
.iter()
.map(|(peer_id, authority)| (Some(*peer_id), authority.clone()))
.collect_vec();
// Setup a topology where peer_a is neighbor to current node.
setup_gossip_topology(
overseer,
make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1),
)
.await;

// new block `hash` with 1 candidates
let meta = BlockApprovalMeta {
hash,
parent_hash,
number: 1,
candidates: vec![Default::default(); 1],
slot: 1.into(),
session: 1,
};
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
overseer_send(overseer, msg).await;

// import an assignment related to `hash` locally
let validator_index = ValidatorIndex(0);
let candidate_indices: CandidateBitfield = vec![0 as CandidateIndex].try_into().unwrap();
let candidate_bitfields = vec![CoreIndex(0)].try_into().unwrap();
let cert = fake_assignment_cert_v2(hash, validator_index, candidate_bitfields);
overseer_send(
overseer,
ApprovalDistributionMessage::DistributeAssignment(
cert.clone().into(),
candidate_indices.clone(),
),
)
.await;

// update peer view to include the hash
overseer_send(
overseer,
ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerViewChange(
*peer,
view![hash],
)),
)
.await;

// we should send them the assignment
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
peers,
Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
protocol_v3::ApprovalDistributionMessage::Assignments(assignments)
))
)) => {
assert_eq!(peers.len(), 1);
assert_eq!(assignments.len(), 1);
}
);

// but if someone else is sending it the same assignment
// the peer could send us it as well
let assignments = vec![(cert, candidate_indices)];
let msg = protocol_v3::ApprovalDistributionMessage::Assignments(assignments);
send_message_from_peer_v3(overseer, peer, msg.clone()).await;

assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "we should not punish the peer");

// send the assignments again
send_message_from_peer_v3(overseer, peer, msg.clone()).await;

// now we should
expect_reputation_change(overseer, peer, COST_DUPLICATE_MESSAGE).await;

// Peers will be continously punished for sending duplicates until approval-distribution
// aggression kicks, at which point they aren't anymore.
let mut parent_hash = hash;
for level in 0..16 {
// As long as the lag is bellow l1 aggression, punish peers for duplicates.
send_message_from_peer_v3(overseer, peer, msg.clone()).await;
expect_reputation_change(overseer, peer, COST_DUPLICATE_MESSAGE).await;

let number = 1 + level + 1; // first block had number 1
let hash = BlakeTwo256::hash_of(&(parent_hash, number));
let meta = BlockApprovalMeta {
hash,
parent_hash,
number,
candidates: vec![],
slot: (level as u64).into(),
session: 1,
};

let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(level + 1);
overseer_send(overseer, msg).await;

let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
overseer_send(overseer, msg).await;

parent_hash = hash;
}

// send the assignments again, we should not punish the peer because aggression is
// enabled.
send_message_from_peer_v3(overseer, peer, msg).await;

assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent");
virtual_overseer
});
}

#[test]
fn import_approval_happy_path_v1_v2_peers() {
let peers = make_peers_and_authority_ids(15);
Expand Down Expand Up @@ -3446,7 +3571,7 @@ fn resends_messages_periodically() {
// Add blocks until resend is done.
{
let mut parent_hash = hash;
for level in 0..2 {
for level in 0..4 {
number = number + 1;
let hash = BlakeTwo256::hash_of(&(parent_hash, number));
let meta = BlockApprovalMeta {
Expand Down
15 changes: 15 additions & 0 deletions prdoc/pr_6696.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Make approval-distribution aggression a bit more robust and less spammy

doc:
- audience: Node Dev
description: |
The problem with the current implementation of approval-distribution aggression is that is too spammy,
and can overload the nodes, so make it less spammy by moving back the moment we trigger L2 aggression
and make resend enable only for the latest unfinalized block.

crates:
- name: polkadot-approval-distribution
bump: minor

0 comments on commit f2081f6

Please sign in to comment.