From f37181669e2a751c8789c51ba25bc8a0beedd005 Mon Sep 17 00:00:00 2001 From: Isabelle Atkins Date: Mon, 17 Apr 2023 14:44:43 +0100 Subject: [PATCH 1/4] fix panic in DelayQueue --- tokio-util/CHANGELOG.md | 5 +++++ tokio-util/src/time/wheel/level.rs | 28 ++++++++++++++++++++++++---- tokio-util/src/time/wheel/mod.rs | 7 +++++-- tokio-util/tests/time_delay_queue.rs | 18 ++++++++++++++++++ 4 files changed, 52 insertions(+), 6 deletions(-) diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 0c11b2144eb..51d597e529f 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,3 +1,8 @@ + +# [Unreleased] +### Fixed +- fix panic after ~2 years and 2 months in DelayQueue. ([#5536]) + # 0.7.7 (February 12, 2023) This release reverts the removal of the `Encoder` bound on the `FramedParts` diff --git a/tokio-util/src/time/wheel/level.rs b/tokio-util/src/time/wheel/level.rs index 8ea30af30fd..c24ef85272a 100644 --- a/tokio-util/src/time/wheel/level.rs +++ b/tokio-util/src/time/wheel/level.rs @@ -140,11 +140,31 @@ impl Level { // TODO: This can probably be simplified w/ power of 2 math let level_start = now - (now % level_range); - let deadline = level_start + slot as u64 * slot_range; - + let mut deadline = level_start + slot as u64 * slot_range; + if deadline < now { + // A timer is in a slot "prior" to the current time. This can occur + // because we do not have an infinite hierarchy of timer levels, and + // eventually a timer scheduled for a very distant time might end up + // being placed in a slot that is beyond the end of all of the + // arrays. + // + // To deal with this, we first limit timers to being scheduled no + // more than MAX_DURATION ticks in the future; that is, they're at + // most one rotation of the top level away. Then, we force timers + // that logically would go into the top+1 level, to instead go into + // the top level's slots. + // + // What this means is that the top level's slots act as a + // pseudo-ring buffer, and we rotate around them indefinitely. If we + // compute a deadline before now, and it's the top level, it + // therefore means we're actually looking at a slot in the future. + debug_assert_eq!(self.level, super::NUM_LEVELS - 1); + + deadline += level_range; + } debug_assert!( deadline >= now, - "deadline={}; now={}; level={}; slot={}; occupied={:b}", + "deadline={:016X}; now={:016X}; level={}; slot={}; occupied={:b}", deadline, now, self.level, @@ -163,7 +183,7 @@ impl Level { if self.occupied == 0 { return None; } - + // Get the slot for now using Maths let now_slot = (now / slot_range(self.level)) as usize; let occupied = self.occupied.rotate_right(now_slot as u32); diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs index ffa05ab71bf..c5a03fd4b36 100644 --- a/tokio-util/src/time/wheel/mod.rs +++ b/tokio-util/src/time/wheel/mod.rs @@ -254,8 +254,11 @@ fn level_for(elapsed: u64, when: u64) -> usize { // Mask in the trailing bits ignored by the level calculation in order to cap // the possible leading zeros - let masked = elapsed ^ when | SLOT_MASK; - + let mut masked = elapsed ^ when | SLOT_MASK; + if masked >= MAX_DURATION { + // Fudge the timer into the top level + masked = MAX_DURATION - 1; + } let leading_zeros = masked.leading_zeros() as usize; let significant = 63 - leading_zeros; significant / 6 diff --git a/tokio-util/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs index c8b74d1ecd3..3c056e9a430 100644 --- a/tokio-util/tests/time_delay_queue.rs +++ b/tokio-util/tests/time_delay_queue.rs @@ -5,6 +5,7 @@ use tokio::time::{self, sleep, sleep_until, Duration, Instant}; use tokio_test::{assert_pending, assert_ready, task}; use tokio_util::time::DelayQueue; +use futures::StreamExt; macro_rules! poll { ($queue:ident) => { @@ -786,6 +787,23 @@ async fn compact_change_deadline() { assert!(entry.is_none()); } + +#[tokio::test(start_paused = true)] +async fn item_expiry_greater_than_wheel() { + // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted. + let mut queue = DelayQueue::new(); + for _ in 0..2 { + tokio::time::advance(Duration::from_millis(1 << 35)).await; + queue.insert(0, Duration::from_millis(0)); + queue.next().await; + } + // This should not panic + let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + queue.insert(1, Duration::from_millis(1)); + })); + assert!(no_panic.is_ok()); +} + #[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")] #[tokio::test(start_paused = true)] async fn remove_after_compact() { From 09f637d7929f24046cbe02e71b9661db24f8b54d Mon Sep 17 00:00:00 2001 From: Isabelle Atkins Date: Mon, 17 Apr 2023 15:07:22 +0100 Subject: [PATCH 2/4] cargo fmt --- tokio-util/tests/time_delay_queue.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tokio-util/tests/time_delay_queue.rs b/tokio-util/tests/time_delay_queue.rs index 3c056e9a430..369857a51ef 100644 --- a/tokio-util/tests/time_delay_queue.rs +++ b/tokio-util/tests/time_delay_queue.rs @@ -2,10 +2,10 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use futures::StreamExt; use tokio::time::{self, sleep, sleep_until, Duration, Instant}; use tokio_test::{assert_pending, assert_ready, task}; use tokio_util::time::DelayQueue; -use futures::StreamExt; macro_rules! poll { ($queue:ident) => { @@ -787,7 +787,6 @@ async fn compact_change_deadline() { assert!(entry.is_none()); } - #[tokio::test(start_paused = true)] async fn item_expiry_greater_than_wheel() { // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted. @@ -799,7 +798,7 @@ async fn item_expiry_greater_than_wheel() { } // This should not panic let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - queue.insert(1, Duration::from_millis(1)); + queue.insert(1, Duration::from_millis(1)); })); assert!(no_panic.is_ok()); } From 7070ca3dccd7aeb58f28e270f0d3165d41c07c5e Mon Sep 17 00:00:00 2001 From: Isabelle Atkins Date: Mon, 17 Apr 2023 15:11:59 +0100 Subject: [PATCH 3/4] rust fmt again --- tokio-util/src/time/wheel/level.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/time/wheel/level.rs b/tokio-util/src/time/wheel/level.rs index c24ef85272a..151e0948b69 100644 --- a/tokio-util/src/time/wheel/level.rs +++ b/tokio-util/src/time/wheel/level.rs @@ -183,7 +183,7 @@ impl Level { if self.occupied == 0 { return None; } - + // Get the slot for now using Maths let now_slot = (now / slot_range(self.level)) as usize; let occupied = self.occupied.rotate_right(now_slot as u32); From 8f5739a64e8d6de54491a7525d890c75a4836218 Mon Sep 17 00:00:00 2001 From: isabelleatkins <73671620+isabelleatkins@users.noreply.github.com> Date: Mon, 17 Apr 2023 21:19:50 +0100 Subject: [PATCH 4/4] Remove entry added to Changelog --- tokio-util/CHANGELOG.md | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 51d597e529f..0c11b2144eb 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,8 +1,3 @@ - -# [Unreleased] -### Fixed -- fix panic after ~2 years and 2 months in DelayQueue. ([#5536]) - # 0.7.7 (February 12, 2023) This release reverts the removal of the `Encoder` bound on the `FramedParts`