Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stable2409] Backport #6696 #6840

Merged
merged 2 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 110 additions & 33 deletions polkadot/node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl Default for AggressionConfig {
fn default() -> Self {
AggressionConfig {
l1_threshold: Some(16),
l2_threshold: Some(28),
l2_threshold: Some(64),
resend_unfinalized_period: Some(8),
}
}
Expand Down Expand Up @@ -515,6 +515,8 @@ struct BlockEntry {
vrf_story: RelayVRFStory,
/// The block slot.
slot: Slot,
/// Backing off from re-sending messages to peers.
last_resent_at_block_number: Option<u32>,
}

impl BlockEntry {
Expand Down Expand Up @@ -890,6 +892,7 @@ impl State {
candidates_metadata: meta.candidates,
vrf_story: meta.vrf_story,
slot: meta.slot,
last_resent_at_block_number: None,
});

self.topologies.inc_session_refs(meta.session);
Expand Down Expand Up @@ -1330,6 +1333,33 @@ impl State {
self.enable_aggression(network_sender, Resend::No, metrics).await;
}

// When finality is lagging as a last resort nodes start sending the messages they have
// multiples times. This means it is safe to accept duplicate messages without punishing the
// peer and reduce the reputation and can end up banning the Peer, which in turn will create
// more no-shows.
fn accept_duplicates_from_validators(
blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
topologies: &SessionGridTopologies,
aggression_config: &AggressionConfig,
entry: &BlockEntry,
peer: PeerId,
) -> bool {
let topology = topologies.get_topology(entry.session);
let min_age = blocks_by_number.iter().next().map(|(num, _)| num);
let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num);

// Return if we don't have at least 1 block.
let (min_age, max_age) = match (min_age, max_age) {
(Some(min), Some(max)) => (*min, *max),
_ => return false,
};

let age = max_age.saturating_sub(min_age);

aggression_config.should_trigger_aggression(age) &&
topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false)
}

async fn import_and_circulate_assignment<A, N, RA, R>(
&mut self,
approval_voting_sender: &mut A,
Expand Down Expand Up @@ -1409,20 +1439,29 @@ impl State {
if peer_knowledge.contains(&message_subject, message_kind) {
// wasn't included before
if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
gum::debug!(
target: LOG_TARGET,
?peer_id,
?message_subject,
"Duplicate assignment",
);

modify_reputation(
&mut self.reputation,
network_sender,
if !Self::accept_duplicates_from_validators(
&self.blocks_by_number,
&self.topologies,
&self.aggression_config,
entry,
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
) {
gum::debug!(
target: LOG_TARGET,
?peer_id,
?message_subject,
"Duplicate assignment",
);

modify_reputation(
&mut self.reputation,
network_sender,
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}

metrics.on_assignment_duplicate();
} else {
gum::trace!(
Expand Down Expand Up @@ -1738,6 +1777,9 @@ impl State {
assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
approval_knowledge_key: &(MessageSubject, MessageKind),
entry: &mut BlockEntry,
blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
topologies: &SessionGridTopologies,
aggression_config: &AggressionConfig,
reputation: &mut ReputationAggregator,
peer_id: PeerId,
metrics: &Metrics,
Expand Down Expand Up @@ -1766,20 +1808,27 @@ impl State {
.received
.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
{
gum::trace!(
target: LOG_TARGET,
?peer_id,
?approval_knowledge_key,
"Duplicate approval",
);

modify_reputation(
reputation,
network_sender,
if !Self::accept_duplicates_from_validators(
blocks_by_number,
topologies,
aggression_config,
entry,
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
) {
gum::trace!(
target: LOG_TARGET,
?peer_id,
?approval_knowledge_key,
"Duplicate approval",
);
modify_reputation(
reputation,
network_sender,
peer_id,
COST_DUPLICATE_MESSAGE,
)
.await;
}
metrics.on_approval_duplicate();
}
return false
Expand Down Expand Up @@ -1886,6 +1935,9 @@ impl State {
&assignments_knowledge_keys,
&approval_knwowledge_key,
entry,
&self.blocks_by_number,
&self.topologies,
&self.aggression_config,
&mut self.reputation,
peer_id,
metrics,
Expand Down Expand Up @@ -2304,18 +2356,43 @@ impl State {
&self.topologies,
|block_entry| {
let block_age = max_age - block_entry.number;
// We want to resend only for blocks of min_age, there is no point in
// resending for blocks newer than that, because we are just going to create load
// and not gain anything.
let diff_from_min_age = block_entry.number - min_age;

// We want to back-off on resending for blocks that have been resent recently, to
// give time for nodes to process all the extra messages, if we still have not
// finalized we are going to resend again after unfinalized_period * 2 since the
// last resend.
let blocks_since_last_sent = block_entry
.last_resent_at_block_number
.map(|last_resent_at_block_number| max_age - last_resent_at_block_number);

let can_resend_at_this_age = blocks_since_last_sent
.zip(config.resend_unfinalized_period)
.map(|(blocks_since_last_sent, unfinalized_period)| {
blocks_since_last_sent >= unfinalized_period * 2
})
.unwrap_or(true);

if resend == Resend::Yes &&
config
.resend_unfinalized_period
.as_ref()
.map_or(false, |p| block_age > 0 && block_age % p == 0)
{
config.resend_unfinalized_period.as_ref().map_or(false, |p| {
block_age > 0 &&
block_age % p == 0 && diff_from_min_age == 0 &&
can_resend_at_this_age
}) {
// Retry sending to all peers.
for (_, knowledge) in block_entry.known_by.iter_mut() {
knowledge.sent = Knowledge::default();
}

block_entry.last_resent_at_block_number = Some(max_age);
gum::debug!(
target: LOG_TARGET,
block_number = ?block_entry.number,
?max_age,
"Aggression enabled with resend for block",
);
true
} else {
false
Expand Down
137 changes: 136 additions & 1 deletion polkadot/node/network/approval-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,141 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() {
);
}

#[test]
fn peer_sending_us_duplicates_while_aggression_enabled_is_ok() {
let parent_hash = Hash::repeat_byte(0xFF);
let hash = Hash::repeat_byte(0xAA);

let peers = make_peers_and_authority_ids(8);
let peer_a = peers.first().unwrap().0;

let _ = test_harness(
Arc::new(MockAssignmentCriteria { tranche: Ok(0) }),
Box::new(SystemClock {}),
state_without_reputation_delay(),
|mut virtual_overseer| async move {
let overseer = &mut virtual_overseer;
let peer = &peer_a;
setup_peer_with_view(overseer, peer, view![], ValidationVersion::V3).await;

let peers_with_optional_peer_id = peers
.iter()
.map(|(peer_id, authority)| (Some(*peer_id), authority.clone()))
.collect_vec();
// Setup a topology where peer_a is neighbor to current node.
setup_gossip_topology(
overseer,
make_gossip_topology(1, &peers_with_optional_peer_id, &[0], &[2], 1),
)
.await;

// new block `hash` with 1 candidates
let meta = BlockApprovalMeta {
hash,
parent_hash,
number: 1,
candidates: vec![Default::default(); 1],
slot: 1.into(),
session: 1,
vrf_story: RelayVRFStory(Default::default()),
};
let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
overseer_send(overseer, msg).await;

// import an assignment related to `hash` locally
let validator_index = ValidatorIndex(0);
let candidate_indices: CandidateBitfield =
vec![0 as CandidateIndex].try_into().unwrap();
let candidate_bitfields = vec![CoreIndex(0)].try_into().unwrap();
let cert = fake_assignment_cert_v2(hash, validator_index, candidate_bitfields);
overseer_send(
overseer,
ApprovalDistributionMessage::DistributeAssignment(
cert.clone().into(),
candidate_indices.clone(),
),
)
.await;

// update peer view to include the hash
overseer_send(
overseer,
ApprovalDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerViewChange(*peer, view![hash]),
),
)
.await;

// we should send them the assignment
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
peers,
Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
protocol_v3::ApprovalDistributionMessage::Assignments(assignments)
))
)) => {
assert_eq!(peers.len(), 1);
assert_eq!(assignments.len(), 1);
}
);

// but if someone else is sending it the same assignment
// the peer could send us it as well
let assignments = vec![(cert, candidate_indices)];
let msg = protocol_v3::ApprovalDistributionMessage::Assignments(assignments);
send_message_from_peer_v3(overseer, peer, msg.clone()).await;

assert!(
overseer.recv().timeout(TIMEOUT).await.is_none(),
"we should not punish the peer"
);

// send the assignments again
send_message_from_peer_v3(overseer, peer, msg.clone()).await;

// now we should
expect_reputation_change(overseer, peer, COST_DUPLICATE_MESSAGE).await;

// Peers will be continously punished for sending duplicates until approval-distribution
// aggression kicks, at which point they aren't anymore.
let mut parent_hash = hash;
for level in 0..16 {
// As long as the lag is bellow l1 aggression, punish peers for duplicates.
send_message_from_peer_v3(overseer, peer, msg.clone()).await;
expect_reputation_change(overseer, peer, COST_DUPLICATE_MESSAGE).await;

let number = 1 + level + 1; // first block had number 1
let hash = BlakeTwo256::hash_of(&(parent_hash, number));
let meta = BlockApprovalMeta {
hash,
parent_hash,
number,
candidates: vec![],
slot: (level as u64).into(),
session: 1,
vrf_story: RelayVRFStory(Default::default()),
};

let msg = ApprovalDistributionMessage::ApprovalCheckingLagUpdate(level + 1);
overseer_send(overseer, msg).await;

let msg = ApprovalDistributionMessage::NewBlocks(vec![meta]);
overseer_send(overseer, msg).await;

parent_hash = hash;
}

// send the assignments again, we should not punish the peer because aggression is
// enabled.
send_message_from_peer_v3(overseer, peer, msg).await;

assert!(overseer.recv().timeout(TIMEOUT).await.is_none(), "no message should be sent");
virtual_overseer
},
);
}

#[test]
fn import_approval_happy_path_v1_v2_peers() {
let peers = make_peers_and_authority_ids(15);
Expand Down Expand Up @@ -3892,7 +4027,7 @@ fn resends_messages_periodically() {
// Add blocks until resend is done.
{
let mut parent_hash = hash;
for level in 0..2 {
for level in 0..4 {
number = number + 1;
let hash = BlakeTwo256::hash_of(&(parent_hash, number));
let meta = BlockApprovalMeta {
Expand Down
15 changes: 15 additions & 0 deletions prdoc/pr_6696.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Make approval-distribution aggression a bit more robust and less spammy

doc:
- audience: Node Dev
description: |
The problem with the current implementation of approval-distribution aggression is that is too spammy,
and can overload the nodes, so make it less spammy by moving back the moment we trigger L2 aggression
and make resend enable only for the latest unfinalized block.

crates:
- name: polkadot-approval-distribution
bump: minor
Loading