Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix wrong rate limit + add a few logs. #6440

Merged
merged 5 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,11 @@ async fn get_approval_signatures_for_candidate<Context>(

// No need to block subsystem on this (also required to break cycle).
// We should not be sending this message frequently - caller must make sure this is bounded.
gum::trace!(
target: LOG_TARGET,
?candidate_hash,
"Spawning task for fetching sinatures from approval-distribution"
);
ctx.spawn("get-approval-signatures", Box::pin(get_approvals))
}

Expand Down
9 changes: 8 additions & 1 deletion node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,14 @@ impl Initialized {
);
intermediate_result
},
Ok(votes) => intermediate_result.import_approval_votes(&env, votes, now),
Ok(votes) => {
gum::trace!(
target: LOG_TARGET,
count = votes.len(),
"Successfully received approval votes."
);
intermediate_result.import_approval_votes(&env, votes, now)
},
}
} else {
gum::trace!(
Expand Down
5 changes: 4 additions & 1 deletion node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl DisputeSender {
// recovered at startup will be relatively "old" anyway and we assume that no more than a
// third of the validators will go offline at any point in time anyway.
for dispute in unknown_disputes {
self.rate_limit.limit("while going through unknown disputes", dispute.1).await;
// Rate limiting handled inside `start_send_for_dispute` (calls `start_sender`).
self.start_send_for_dispute(ctx, runtime, dispute).await?;
}
Ok(())
Expand Down Expand Up @@ -389,6 +389,7 @@ impl RateLimit {
/// String given as occasion and candidate hash are logged in case the rate limit hit.
async fn limit(&mut self, occasion: &'static str, candidate_hash: CandidateHash) {
// Wait for rate limit and add some logging:
let mut num_wakes: u32 = 0;
poll_fn(|cx| {
let old_limit = Pin::new(&mut self.limit);
match old_limit.poll(cx) {
Expand All @@ -397,8 +398,10 @@ impl RateLimit {
target: LOG_TARGET,
?occasion,
?candidate_hash,
?num_wakes,
"Sending rate limit hit, slowing down requests"
);
num_wakes += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Shouldn't this be incremented before the log? Or the first log will say 0 wakes which could be misleading.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue it is correct: The very first time we call into poll there was no wake yet. So 0 is what I would expect here.

I added this, to see whether really limit is called multiple times on the same candidate or if we get woken by the executer multiple times with result Pending. I already checked the logs and it is indeed the latter. (num_wakes is 0 and then 1 the next time.) Not sure where this redundant wake is coming from, but that is what is happening and it should be harmless (except for the wasted cycles).

Poll::Pending
},
Poll::Ready(()) => Poll::Ready(()),
Expand Down
19 changes: 18 additions & 1 deletion node/network/dispute-distribution/src/sender/send_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,38 @@ impl SendTask {
let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?;

// Note this will also contain all authorities for which sending failed previously:
let add_authorities = new_authorities
let add_authorities: Vec<_> = new_authorities
.iter()
.filter(|a| !self.deliveries.contains_key(a))
.map(Clone::clone)
.collect();

// Get rid of dead/irrelevant tasks/statuses:
gum::trace!(
target: LOG_TARGET,
already_running_deliveries = ?self.deliveries.len(),
"Cleaning up deliveries"
);
self.deliveries.retain(|k, _| new_authorities.contains(k));

// Start any new tasks that are needed:
gum::trace!(
target: LOG_TARGET,
new_and_failed_authorities = ?add_authorities.len(),
overall_authority_set_size = ?new_authorities.len(),
already_running_deliveries = ?self.deliveries.len(),
"Starting new send requests for authorities."
);
let new_statuses =
send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics)
.await?;

let was_empty = new_statuses.is_empty();
gum::trace!(
target: LOG_TARGET,
sent_requests = ?new_statuses.len(),
"Requests dispatched."
);

self.has_failed_sends = false;
self.deliveries.extend(new_statuses.into_iter());
Expand Down