Skip to content

Commit

Permalink
sc-consensus-beefy: pump gossip engine while waiting for initializati…
Browse files Browse the repository at this point in the history
…on conditions (#3435)

As part of BEEFY worker/voter initialization the task waits for certain
chain and backend conditions to be fulfilled:
- BEEFY consensus enabled on-chain & GRANDPA best finalized higher than
on-chain BEEFY genesis block,
- backend has synced headers for BEEFY mandatory blocks between best
BEEFY and best GRANDPA.

During this waiting time, any messages gossiped on the BEEFY topic for
current chain get enqueued in the gossip engine, leading to RAM bloating
and output warning/error messages when the wait time is non-negligible
(like during a clean sync).

This PR adds logic to pump the gossip engine while waiting for other
things to make sure gossiped messages get consumed (practically
discarded until worker is fully initialized).

Also raises the warning threshold for enqueued messages from 10k to
100k. This is in line with the other gossip protocols on the node.

Fixes #3390

---------

Signed-off-by: Adrian Catangiu <adrian@parity.io>
  • Loading branch information
acatangiu authored and EgorPopelyaev committed Feb 22, 2024
1 parent 815d767 commit 43b7542
Show file tree
Hide file tree
Showing 8 changed files with 520 additions and 505 deletions.
16 changes: 16 additions & 0 deletions prdoc/pr_3435.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Fix BEEFY-related gossip messages error logs

doc:
- audience: Node Operator
description: |
Added logic to pump the gossip engine while waiting for other things
to make sure gossiped messages get consumed (practically discarded
until worker is fully initialized).
This fixes an issue where node operators saw error logs, and fixes
potential RAM bloat when BEEFY initialization takes a long time
(i.e. during clean sync).
crates:
- name: sc-consensus-beefy
35 changes: 20 additions & 15 deletions substrate/client/consensus/beefy/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub(super) enum Action<H> {
Keep(H, ReputationChange),
// discard, applying cost/benefit to originator.
Discard(ReputationChange),
// ignore, no cost/benefit applied to originator.
DiscardNoReport,
}

/// An outcome of examining a message.
Expand All @@ -68,7 +70,7 @@ enum Consider {
/// Message is from the future. Reject.
RejectFuture,
/// Message cannot be evaluated. Reject.
RejectOutOfScope,
CannotEvaluate,
}

/// BEEFY gossip message type that gets encoded and sent on the network.
Expand Down Expand Up @@ -168,18 +170,14 @@ impl<B: Block> Filter<B> {
.as_ref()
.map(|f|
// only from current set and only [filter.start, filter.end]
if set_id < f.validator_set.id() {
if set_id < f.validator_set.id() || round < f.start {
Consider::RejectPast
} else if set_id > f.validator_set.id() {
Consider::RejectFuture
} else if round < f.start {
Consider::RejectPast
} else if round > f.end {
} else if set_id > f.validator_set.id() || round > f.end {
Consider::RejectFuture
} else {
Consider::Accept
})
.unwrap_or(Consider::RejectOutOfScope)
.unwrap_or(Consider::CannotEvaluate)
}

/// Return true if `round` is >= than `max(session_start, best_beefy)`,
Expand All @@ -199,7 +197,7 @@ impl<B: Block> Filter<B> {
Consider::Accept
}
)
.unwrap_or(Consider::RejectOutOfScope)
.unwrap_or(Consider::CannotEvaluate)
}

/// Add new _known_ `round` to the set of seen valid justifications.
Expand Down Expand Up @@ -244,7 +242,7 @@ where
pub(crate) fn new(
known_peers: Arc<Mutex<KnownPeers<B>>>,
) -> (GossipValidator<B>, TracingUnboundedReceiver<PeerReport>) {
let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 10_000);
let (tx, rx) = tracing_unbounded("mpsc_beefy_gossip_validator", 100_000);
let val = GossipValidator {
votes_topic: votes_topic::<B>(),
justifs_topic: proofs_topic::<B>(),
Expand Down Expand Up @@ -289,7 +287,9 @@ where
match filter.consider_vote(round, set_id) {
Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
// When we can't evaluate, it's our fault (e.g. filter not initialized yet), we
// discard the vote without punishing or rewarding the sending peer.
Consider::CannotEvaluate => return Action::DiscardNoReport,
Consider::Accept => {},
}

Expand Down Expand Up @@ -330,7 +330,9 @@ where
match guard.consider_finality_proof(round, set_id) {
Consider::RejectPast => return Action::Discard(cost::OUTDATED_MESSAGE),
Consider::RejectFuture => return Action::Discard(cost::FUTURE_MESSAGE),
Consider::RejectOutOfScope => return Action::Discard(cost::OUT_OF_SCOPE_MESSAGE),
// When we can't evaluate, it's our fault (e.g. filter not initialized yet), we
// discard the proof without punishing or rewarding the sending peer.
Consider::CannotEvaluate => return Action::DiscardNoReport,
Consider::Accept => {},
}

Expand All @@ -357,7 +359,9 @@ where
Action::Keep(self.justifs_topic, benefit::VALIDATED_PROOF)
}
})
.unwrap_or(Action::Discard(cost::OUT_OF_SCOPE_MESSAGE))
// When we can't evaluate, it's our fault (e.g. filter not initialized yet), we
// discard the proof without punishing or rewarding the sending peer.
.unwrap_or(Action::DiscardNoReport)
};
if matches!(action, Action::Keep(_, _)) {
self.gossip_filter.write().mark_round_as_proven(round);
Expand Down Expand Up @@ -404,6 +408,7 @@ where
self.report(*sender, cb);
ValidationResult::Discard
},
Action::DiscardNoReport => ValidationResult::Discard,
}
}

Expand Down Expand Up @@ -579,8 +584,8 @@ pub(crate) mod tests {
// filter not initialized
let res = gv.validate(&mut context, &sender, &encoded);
assert!(matches!(res, ValidationResult::Discard));
expected_report.cost_benefit = cost::OUT_OF_SCOPE_MESSAGE;
assert_eq!(report_stream.try_recv().unwrap(), expected_report);
// nothing reported
assert!(report_stream.try_recv().is_err());

gv.update_filter(GossipFilterCfg { start: 0, end: 10, validator_set: &validator_set });
// nothing in cache first time
Expand Down
2 changes: 0 additions & 2 deletions substrate/client/consensus/beefy/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ mod cost {
pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "BEEFY: Bad signature");
// Message received with vote from voter not in validator set.
pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "BEEFY: Unknown voter");
// A message received that cannot be evaluated relative to our current state.
pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "BEEFY: Out-of-scope message");
// Message containing invalid proof.
pub(super) const INVALID_PROOF: Rep = Rep::new(-5000, "BEEFY: Invalid commit");
// Reputation cost per signature checked for invalid proof.
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/consensus/beefy/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where
// The proof is valid and the block is imported and final, we can import.
debug!(
target: LOG_TARGET,
"🥩 import justif {:?} for block number {:?}.", proof, number
"🥩 import justif {} for block number {:?}.", proof, number
);
// Send the justification to the BEEFY voter for processing.
self.justification_sender
Expand Down
Loading

0 comments on commit 43b7542

Please sign in to comment.