Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.6.2](https://github.com/pmcgleenon/heavykeeper-rs/compare/v0.6.1...v0.6.2) - 2025-11-19

### Other

- Improve TopKQueue::iter perf + add list benchmark
- Fix decay probability scaling and add test

## [0.6.1](https://github.com/pmcgleenon/heavykeeper-rs/compare/v0.6.0...v0.6.1) - 2025-09-02

### Other

- added Sync to rng
- Merge pull request #52 from pmcgleenon/dependabot/cargo/criterion-0.7.0

## [0.6.0](https://github.com/pmcgleenon/heavykeeper-rs/compare/v0.5.1...v0.6.0) - 2025-07-27

### Other
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "heavykeeper"
version = "0.6.0"
version = "0.6.2"
edition = "2021"
authors = [ "Patrick McGleenon"]
description = "HeavyKeeper is for finding Top-K elephant flows with high precision and low memory footprint."
Expand Down Expand Up @@ -37,3 +37,7 @@ memmap2 = "0.9.5"
[[bench]]
name = "topk_add"
harness = false

[[bench]]
name = "topk_list"
harness = false
37 changes: 37 additions & 0 deletions benches/topk_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use criterion::{criterion_group, criterion_main, Criterion};
use std::hint::black_box;
use heavykeeper::TopK;
use rand::prelude::*;

// Benchmark TopK::list() to exercise TopKQueue::iter() with large k.
fn benchmark_topk_list(c: &mut Criterion) {
let mut rng = rand::rng();

let k = 5_000;
let width = 10_000;
let depth = 4;
let decay = 0.95;

let mut topk = TopK::new(k, width, depth, decay);

// Fill the structure with more than k distinct keys so the priority
// queue is full and iter() / list() operate on O(k) entries.
for _ in 0..(k * 2) {
let key: u64 = rng.random();
topk.add(&key, 1);
}

let mut group = c.benchmark_group("TopK_list");
group.sample_size(40);
group.bench_function("list_k_5000", |b| {
b.iter(|| {
// list() internally uses TopKQueue::iter(); black_box to
// prevent the optimizer from eliminating the call.
black_box(topk.list());
});
});
group.finish();
}

criterion_group!(benches, benchmark_topk_list);
criterion_main!(benches);
75 changes: 70 additions & 5 deletions src/heavykeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct TopK<T: Ord + Clone + Hash + Debug> {
buckets: Vec<Vec<Bucket>>,
priority_queue: TopKQueue<T>,
hasher: RandomState,
random: Box<dyn RngCore + Send>,
random: Box<dyn RngCore + Send + Sync>,
}

pub struct Builder<T> {
Expand All @@ -85,15 +85,16 @@ pub struct Builder<T> {
decay: Option<f64>,
seed: Option<u64>,
hasher: Option<RandomState>,
rng: Option<Box<dyn RngCore + Send>>,
rng: Option<Box<dyn RngCore + Send + Sync>>,
_phantom: std::marker::PhantomData<T>,
}

fn precompute_decay_thresholds(decay: f64, num_entries: usize) -> Vec<u64> {
let mut thresholds = Vec::with_capacity(num_entries);
for count in 0..num_entries {
let decay_factor = decay.powf(count as f64);
let threshold = (decay_factor * (1u64 << 63) as f64) as u64;
// Use full u64 range so decay_factor = 1.0 gives probability ~= 1.0 (not 0.5)
let threshold = (decay_factor * u64::MAX as f64) as u64;
thresholds.push(threshold);
}
thresholds
Expand Down Expand Up @@ -133,6 +134,7 @@ impl<T: Ord + Clone + Hash + Debug> TopK<T> {
)
}

<<<<<<< HEAD
fn with_components(
k: usize,
width: usize,
Expand All @@ -141,6 +143,9 @@ impl<T: Ord + Clone + Hash + Debug> TopK<T> {
hasher: RandomState,
rng: Box<dyn RngCore + Send>,
) -> Self {
=======
fn with_components(k: usize, width: usize, depth: usize, decay: f64, hasher: RandomState, rng: Box<dyn RngCore + Send + Sync>) -> Self {
>>>>>>> origin/main
// Pre-allocate with capacity to avoid resizing
let mut buckets = Vec::with_capacity(depth);
for _ in 0..depth {
Expand Down Expand Up @@ -270,15 +275,25 @@ impl<T: Ord + Clone + Hash + Debug> TopK<T> {
self.decay_thresholds[count_idx]
} else {
let lookup_size = self.decay_thresholds.len();
<<<<<<< HEAD
let base_threshold =
self.decay_thresholds[lookup_size - 1] as f64 / (1u64 << 63) as f64;
let divisor = lookup_size - 1;
let quotient = count_idx / divisor;
let remainder = count_idx % divisor;
let remainder_threshold =
self.decay_thresholds[remainder] as f64 / (1u64 << 63) as f64;
=======
// Keep extrapolation normalized to the same [0,1] scale as precomputed thresholds.
let base_threshold = self.decay_thresholds[lookup_size - 1] as f64 / u64::MAX as f64;
let divisor = lookup_size - 1;
let quotient = count_idx / divisor;
let remainder = count_idx % divisor;
let remainder_threshold = self.decay_thresholds[remainder] as f64 / u64::MAX as f64;
>>>>>>> origin/main
let decay = base_threshold.powi(quotient as i32) * remainder_threshold;
(decay * (1u64 << 63) as f64) as u64
// Re-scale back into u64 range consistent with precomputed thresholds.
(decay * u64::MAX as f64) as u64
};
let rand = self.random.next_u64();
if rand < decay_threshold {
Expand Down Expand Up @@ -430,6 +445,12 @@ impl<T: Ord + Clone + Hash + Debug> Default for Builder<T> {
}
}

impl<T: Ord + Clone + Hash + Debug> Default for Builder<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: Ord + Clone + Hash + Debug> Builder<T> {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -474,7 +495,7 @@ impl<T: Ord + Clone + Hash + Debug> Builder<T> {
self
}

pub fn rng<R: RngCore + Send + 'static>(mut self, rng: R) -> Self {
pub fn rng<R: RngCore + Send + Sync + 'static>(mut self, rng: R) -> Self {
self.rng = Some(Box::new(rng));
self
}
Expand Down Expand Up @@ -1368,4 +1389,48 @@ mod tests {
assert!(topk.query(item));
assert_eq!(topk.count(item), 1);
}

/// Tests that decay probability scaling uses the full u64 range
///
/// With decay = 1.0 and RNG always returning exactly 2^63, the old
/// implementation (which used threshold = 2^63) would never decay
/// because `rand < threshold` was false for rand = 2^63.
///
/// After the fix (threshold = u64::MAX), the same condition is true,
/// so the bucket is always decayed and replaced as expected for
/// probability 1.0.
#[test]
fn test_decay_probability_scaling_fix() {
let mut mock_rng = MockRngCoreTrait::new();
// Always return value exactly at the old threshold: 2^63.
mock_rng
.expect_next_u64()
.times(1..) // Allow multiple calls
.return_const(1u64 << 63);

let mut topk = TopK::<Vec<u8>>::builder()
.k(1)
.width(1)
.depth(1)
.decay(1.0)
.rng(mock_rng)
.build()
.unwrap();

let item1 = b"item1".to_vec();
let item2 = b"item2".to_vec();

// First insert item1: single bucket becomes (fp(item1), count = 1).
topk.add(&item1, 1);
assert_eq!(topk.bucket_count(&item1), 1);
assert_eq!(topk.bucket_count(&item2), 0);

// Now insert item2, which collides into the same bucket and should
// *always* trigger decay when decay_factor == 1.0.
topk.add(&item2, 1);

// With correct scaling, item1 is fully decayed and replaced by item2.
assert_eq!(topk.bucket_count(&item1), 0);
assert_eq!(topk.bucket_count(&item2), 1);
}
}
22 changes: 21 additions & 1 deletion src/priority_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl<T: Ord + Clone + Hash + PartialEq> TopKQueue<T> {
}

pub(crate) fn iter(&self) -> impl Iterator<Item = (&T, u64)> {
<<<<<<< HEAD
let mut items: Vec<_> = self.items.iter().map(|(k, v)| (k, v.0)).collect();
// Sort by count descending, then by sequence ascending
items.sort_unstable_by(|(k1, v1), (k2, v2)| {
Expand All @@ -129,8 +130,27 @@ impl<T: Ord + Clone + Hash + PartialEq> TopKQueue<T> {
}
other => other,
}
=======
// Materialize (key, count, sequence) using stored heap index so
// per-comparison work is O(1) instead of scanning the heap.
let mut items: Vec<_> = self
.items
.iter()
.map(|(k, (count, heap_idx))| {
let seq = self.heap[*heap_idx].1;
(k, *count, seq)
})
.collect();

// Sort by count descending, then by sequence ascending.
items.sort_unstable_by(|(_, c1, s1), (_, c2, s2)| match c2.cmp(c1) {
std::cmp::Ordering::Equal => s1.cmp(s2),
other => other,
>>>>>>> origin/main
});
items.into_iter()

// Return an iterator over (&T, count), preserving sorted order.
items.into_iter().map(|(k, count, _)| (k, count))
}

// Binary heap helper methods using Eytzinger layout (0-based indexing)
Expand Down