Skip to content

Commit

Permalink
fix(gateway-queue): don't panic after rebalancing (twilight-rs#2315)
Browse files Browse the repository at this point in the history
- **fixup(gateway-queue): don't panic after rebalancing**
- **refactor(gateway-queue): use more precise nouns & verbs**
  • Loading branch information
vilgotf authored Feb 21, 2024
1 parent e6c124c commit eae9607
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions twilight-gateway-queue/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ async fn runner(
if queues.is_empty() {
_ = tx.send(());
} else {
queues[shard as usize % queues.len()]
.push_back((shard, tx));
let key = shard as usize % queues.len();
queues[key].push_back((shard, tx));
}
}
Some(Message::Update(update)) => {
Expand All @@ -94,45 +94,46 @@ async fn runner(
.take(max_concurrency.into())
.collect();
for (shard, tx) in unbalanced {
queues[(shard % u32::from(max_concurrency)) as usize]
.push_back((shard, tx));
let key = (shard % u32::from(max_concurrency)) as usize;
queues[key].push_back((shard, tx));
}
}
}
None => break,
}
}
_ = &mut interval, if queues.iter().any(|queue| !queue.is_empty()) => {
let span = tracing::info_span!("bucket", capacity = %queues.len());
let now = Instant::now();
let span = tracing::info_span!("bucket", moment = ?now);

interval.as_mut().reset(now + IDENTIFY_DELAY);

if remaining == total {
reset_at.as_mut().reset(now + LIMIT_PERIOD);
}

for (ratelimit_key, queue) in queues.iter_mut().enumerate() {
for (key, queue) in queues.iter_mut().enumerate() {
if remaining == 0 {
let duration = reset_at.deadline().saturating_duration_since(now);
tracing::debug!(?duration, "sleeping until remaining count refills");
tracing::debug!(
refill_delay = ?reset_at.deadline().saturating_duration_since(now),
"exhausted available permits"
);
(&mut reset_at).await;
remaining = total;

break;
}

while let Some((id, tx)) = queue.pop_front() {
let calculated_ratelimit_key = (id % u32::from(max_concurrency)) as usize;
debug_assert_eq!(ratelimit_key, calculated_ratelimit_key);

while let Some((shard, tx)) = queue.pop_front() {
if tx.send(()).is_err() {
continue;
}
tracing::debug!(parent: &span, ratelimit_key, "allowing shard {id}");
remaining -= 1;

tracing::debug!(parent: &span, key, shard);
remaining -= 1;
// Reschedule behind shard for ordering correctness.
yield_now().await;

break;
}
}
Expand Down

0 comments on commit eae9607

Please sign in to comment.