Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Rate limit improvements #6315

Merged
merged 5 commits into from
Nov 23, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ impl DisputeSender {
runtime: &mut RuntimeInfo,
msg: DisputeMessage,
) -> Result<()> {
self.rate_limit.limit().await;

let req: DisputeRequest = msg.into();
let candidate_hash = req.0.candidate_receipt.hash();
match self.disputes.entry(candidate_hash) {
Expand All @@ -118,6 +116,8 @@ impl DisputeSender {
return Ok(())
},
Entry::Vacant(vacant) => {
self.rate_limit.limit("in start_sender", candidate_hash).await;

let send_task = SendTask::new(
ctx,
runtime,
Expand Down Expand Up @@ -169,10 +169,12 @@ impl DisputeSender {

// Iterates in order of insertion:
let mut should_rate_limit = true;
for dispute in self.disputes.values_mut() {
for (candidate_hash, dispute) in self.disputes.iter_mut() {
if have_new_sessions || dispute.has_failed_sends() {
if should_rate_limit {
self.rate_limit.limit().await;
self.rate_limit
.limit("in going through new sessions/failed sends", *candidate_hash)
eskimor marked this conversation as resolved.
Show resolved Hide resolved
.await;
}
let sends_happened = dispute
.refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics)
Expand All @@ -193,7 +195,7 @@ impl DisputeSender {
// recovered at startup will be relatively "old" anyway and we assume that no more than a
// third of the validators will go offline at any point in time anyway.
for dispute in unknown_disputes {
self.rate_limit.limit().await;
self.rate_limit.limit("while going through unknown disputes", dispute.1).await;
self.start_send_for_dispute(ctx, runtime, dispute).await?;
}
Ok(())
Expand Down Expand Up @@ -383,14 +385,18 @@ impl RateLimit {
}

/// Wait until ready and prepare for next call.
async fn limit(&mut self) {
///
/// String given as occasion and candidate hash are logged in case the rate limit hit.
async fn limit(&mut self, occasion: &'static str, candidate_hash: CandidateHash) {
// Wait for rate limit and add some logging:
poll_fn(|cx| {
let old_limit = Pin::new(&mut self.limit);
match old_limit.poll(cx) {
Poll::Pending => {
gum::debug!(
target: LOG_TARGET,
?occasion,
?candidate_hash,
"Sending rate limit hit, slowing down requests"
);
Poll::Pending
Expand Down