From 556d18fc9aa74aac53462dd77e805acb94729238 Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Fri, 15 Dec 2023 18:17:15 +0000 Subject: [PATCH] ConcurrentBloomInterval --- bloom/src/bloom.rs | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/bloom/src/bloom.rs b/bloom/src/bloom.rs index 58065bf6b275bb..0163b1b8a652bf 100644 --- a/bloom/src/bloom.rs +++ b/bloom/src/bloom.rs @@ -1,14 +1,19 @@ //! Simple Bloom Filter + use { bv::BitVec, fnv::FnvHasher, rand::{self, Rng}, serde::{Deserialize, Serialize}, - solana_sdk::sanitize::{Sanitize, SanitizeError}, + solana_sdk::{ + sanitize::{Sanitize, SanitizeError}, + timing::AtomicInterval, + }, std::{ cmp, fmt, hash::Hasher, marker::PhantomData, + ops::Deref, sync::atomic::{AtomicU64, Ordering}, }, }; @@ -228,6 +233,40 @@ impl From> for Bloom { } } +/// Wrapper around `ConcurrentBloom` and `AtomicInterval` so the bloom filter +/// can be cleared periodically. +pub struct ConcurrentBloomInterval { + interval: AtomicInterval, + bloom: ConcurrentBloom, +} + +// Directly allow all methods of `AtomicBloom` to be called on `AtomicBloomInterval`. +impl Deref for ConcurrentBloomInterval { + type Target = ConcurrentBloom; + fn deref(&self) -> &Self::Target { + &self.bloom + } +} + +impl ConcurrentBloomInterval { + /// Create a new filter with the given parameters. + /// See `Bloom::random` for details. + pub fn new(num_items: usize, false_positive_rate: f64, max_bits: usize) -> Self { + let bloom = Bloom::random(num_items, false_positive_rate, max_bits); + Self { + interval: AtomicInterval::default(), + bloom: ConcurrentBloom::from(bloom), + } + } + + /// Reset the filter if the reset interval has elapsed. + pub fn maybe_reset(&self, reset_interval_ms: u64) { + if self.interval.should_update(reset_interval_ms) { + self.bloom.clear(); + } + } +} + #[cfg(test)] mod test { use {