diff --git a/Cargo.lock b/Cargo.lock index 0803ee4b084a..574c2772d12a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1548,6 +1548,20 @@ dependencies = [ "cipher 0.4.4", ] +[[package]] +name = "cuckoofilter" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b810a8449931679f64cd7eef1bbd0fa315801b6d5d9cdc1ace2804d6529eee18" +dependencies = [ + "byteorder", + "fnv", + "rand 0.7.3", + "serde", + "serde_bytes", + "serde_derive", +] + [[package]] name = "curve25519-dalek" version = "4.0.0" @@ -4117,9 +4131,11 @@ dependencies = [ "bincode", "bloomfilter", "bytes", + "cuckoofilter", "memmap2 0.7.1", "ph", "serde", + "tempfile", "thiserror", "zstd", ] @@ -6767,6 +6783,15 @@ dependencies = [ "smallvec 0.6.14", ] +[[package]] +name = "serde_bytes" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab33ec92f677585af6d88c65593ae2375adde54efdbf16d597f2cbc7a6d368ff" +dependencies = [ + "serde", +] + [[package]] name = "serde_derive" version = "1.0.188" diff --git a/crates/storage/nippy-jar/Cargo.toml b/crates/storage/nippy-jar/Cargo.toml index f5750ed7bc08..58a954100167 100644 --- a/crates/storage/nippy-jar/Cargo.toml +++ b/crates/storage/nippy-jar/Cargo.toml @@ -20,6 +20,8 @@ thiserror = "1.0" bincode = "1.3" serde = { version = "1.0", features = ["derive"] } bytes = "1.5" +cuckoofilter = { version = "0.5.0", features = ["serde_support", "serde_bytes"] } +tempfile = "3.4" [features] default = [] \ No newline at end of file diff --git a/crates/storage/nippy-jar/src/compression/mod.rs b/crates/storage/nippy-jar/src/compression/mod.rs index a4ee8708346d..5fbf0148449b 100644 --- a/crates/storage/nippy-jar/src/compression/mod.rs +++ b/crates/storage/nippy-jar/src/compression/mod.rs @@ -29,7 +29,7 @@ pub trait Compression { #[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum Compressors { Zstd(Zstd), - // Acoids irrefutable let errors. Remove this after adding another one. + // Avoids irrefutable let errors. Remove this after adding another one. Unused, } diff --git a/crates/storage/nippy-jar/src/filter/cuckoo.rs b/crates/storage/nippy-jar/src/filter/cuckoo.rs new file mode 100644 index 000000000000..10cd6aee6f9c --- /dev/null +++ b/crates/storage/nippy-jar/src/filter/cuckoo.rs @@ -0,0 +1,87 @@ +use super::Filter; +use crate::NippyJarError; +use cuckoofilter::{self, CuckooFilter, ExportedCuckooFilter}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::collections::hash_map::DefaultHasher; + +/// [CuckooFilter](https://www.cs.cmu.edu/~dga/papers/cuckoo-conext2014.pdf). It builds and provides an approximated set-membership filter to answer queries such as "Does this element belong to this set?". Has a theoretical 3% false positive rate. +pub struct Cuckoo { + /// Remaining number of elements that can be added. + /// + /// This is necessary because the inner implementation will fail on adding an element past capacity, **but it will still add it and remove other**: [source](https://github.com/axiomhq/rust-cuckoofilter/tree/624da891bed1dd5d002c8fa92ce0dcd301975561#notes--todos) + remaining: usize, + + /// CuckooFilter. + filter: CuckooFilter, // TODO does it need an actual hasher? +} + +impl Cuckoo { + pub fn new(max_capacity: usize) -> Self { + Cuckoo { remaining: max_capacity, filter: CuckooFilter::with_capacity(max_capacity) } + } +} + +impl Filter for Cuckoo { + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> { + if self.remaining == 0 { + return Err(NippyJarError::FilterMaxCapacity) + } + + self.remaining -= 1; + + Ok(self.filter.add(element)?) + } + + fn contains(&self, element: &[u8]) -> Result { + Ok(self.filter.contains(element)) + } +} + +impl std::fmt::Debug for Cuckoo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Cuckoo") + .field("remaining", &self.remaining) + .field("filter_size", &self.filter.memory_usage()) + .finish_non_exhaustive() + } +} + +impl PartialEq for Cuckoo { + fn eq(&self, _other: &Self) -> bool { + self.remaining == _other.remaining && + { + #[cfg(not(test))] + { + unimplemented!("No way to figure it out without exporting (expensive), so only allow direct comparison on a test") + } + #[cfg(test)] + { + let f1 = self.filter.export(); + let f2 = _other.filter.export(); + return f1.length == f2.length && f1.values == f2.values + } + } + } +} + +impl<'de> Deserialize<'de> for Cuckoo { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let (remaining, exported): (usize, ExportedCuckooFilter) = + Deserialize::deserialize(deserializer)?; + + Ok(Cuckoo { remaining, filter: exported.into() }) + } +} + +impl Serialize for Cuckoo { + /// Potentially expensive, but should be used only when creating the file. + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + (self.remaining, self.filter.export()).serialize(serializer) + } +} diff --git a/crates/storage/nippy-jar/src/filter/mod.rs b/crates/storage/nippy-jar/src/filter/mod.rs new file mode 100644 index 000000000000..d457784afe71 --- /dev/null +++ b/crates/storage/nippy-jar/src/filter/mod.rs @@ -0,0 +1,36 @@ +use crate::NippyJarError; +use serde::{Deserialize, Serialize}; + +mod cuckoo; +pub use cuckoo::Cuckoo; + +pub trait Filter { + /// Add element to the inclusion list. + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError>; + + /// Checks if the element belongs to the inclusion list. There might be false positives. + fn contains(&self, element: &[u8]) -> Result; +} + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +pub enum Filters { + Cuckoo(Cuckoo), + // Avoids irrefutable let errors. Remove this after adding another one. + Unused, +} + +impl Filter for Filters { + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> { + match self { + Filters::Cuckoo(c) => c.add(element), + Filters::Unused => todo!(), + } + } + + fn contains(&self, element: &[u8]) -> Result { + match self { + Filters::Cuckoo(c) => c.contains(element), + Filters::Unused => todo!(), + } + } +} diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index a4322f56ea41..f02337e661e5 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -8,6 +8,9 @@ use std::{ use thiserror::Error; use zstd::bulk::Decompressor; +pub mod filter; +use filter::{Cuckoo, Filter, Filters}; + pub mod compression; use compression::{Compression, Compressors}; @@ -21,24 +24,24 @@ pub struct NippyJar { columns: usize, /// Compressor if required compressor: Option, + /// Filter + filter: Option, #[serde(skip)] /// Data path for file. Index file will be `{path}.idx` path: Option, /// soon - bloom_filter: bool, - /// soon phf: bool, } impl NippyJar { /// Creates new [`NippyJar`]. - pub fn new(columns: usize, bloom_filter: bool, phf: bool, path: &Path) -> Self { + pub fn new(columns: usize, phf: bool, path: &Path) -> Self { NippyJar { version: NIPPY_JAR_VERSION, columns, - bloom_filter, phf, compressor: None, + filter: None, path: Some(path.to_path_buf()), } } @@ -50,12 +53,19 @@ impl NippyJar { self } + /// Adds [`filter::Cuckoo`] filter. + pub fn with_cuckoo_filter(mut self, max_capacity: usize) -> Self { + self.filter = Some(Filters::Cuckoo(Cuckoo::new(max_capacity))); + self + } + /// Loads the file configuration and returns [`Self`]. pub fn load(path: &Path) -> Result { let mut file = File::open(path)?; let mut obj: Self = bincode::deserialize_from(&mut file)?; obj.path = Some(path.to_path_buf()); + Ok(obj) } @@ -184,13 +194,23 @@ impl NippyJar { } /// Writes all necessary configuration to file. - fn freeze_config(&self, handle: &mut File) -> Result<(), NippyJarError> { + fn freeze_config(&mut self, handle: &mut File) -> Result<(), NippyJarError> { // TODO Split Dictionaries and Bloomfilters Configuration so we dont have to load everything // at once Ok(bincode::serialize_into(handle, &self)?) } } +impl Filter for NippyJar { + fn add(&mut self, element: &[u8]) -> Result<(), NippyJarError> { + self.filter.as_mut().ok_or(NippyJarError::FilterMissing)?.add(element) + } + + fn contains(&self, element: &[u8]) -> Result { + self.filter.as_ref().ok_or(NippyJarError::FilterMissing)?.contains(element) + } +} + #[derive(Debug, Error)] pub enum NippyJarError { #[error("err")] @@ -207,6 +227,14 @@ pub enum NippyJarError { ColumnLenMismatch(usize, usize), #[error("UnexpectedMissingValue row: {0} col:{1}")] UnexpectedMissingValue(u64, u64), + #[error("err")] + FilterError(#[from] cuckoofilter::CuckooError), + #[error("NippyJar initialized without filter.")] + FilterMissing, + #[error("Filter has reached max capacity.")] + FilterMaxCapacity, + #[error("Cuckoo was not properly initialized after loaded.")] + FilterCuckooNotLoaded, } pub struct NippyJarCursor<'a> { @@ -308,53 +336,64 @@ impl<'a> NippyJarCursor<'a> { mod tests { use super::*; - const TEST_FILE_NAME: &str = "nippyjar.nj"; + fn test_data() -> (Vec>, Vec>) { + ((0..10u8).map(|a| vec![2, a]).collect(), (10..20u8).map(|a| vec![3, a]).collect()) + } + + #[test] + fn filter() { + let (col1, col2) = test_data(); + let _num_columns = 2; + let num_rows = col1.len() as u64; + let file_path = tempfile::NamedTempFile::new().unwrap(); + + let mut nippy = NippyJar::new(_num_columns, false, file_path.path()); + + assert!(matches!(Filter::add(&mut nippy, &col1[0]), Err(NippyJarError::FilterMissing))); + + nippy = nippy.with_cuckoo_filter(4); + + // Add col1[0] + assert!(!Filter::contains(&nippy, &col1[0]).unwrap()); + assert!(Filter::add(&mut nippy, &col1[0]).is_ok()); + assert!(Filter::contains(&nippy, &col1[0]).unwrap()); + + // Add col1[1] + assert!(!Filter::contains(&nippy, &col1[1]).unwrap()); + assert!(Filter::add(&mut nippy, &col1[1]).is_ok()); + assert!(Filter::contains(&nippy, &col1[1]).unwrap()); + + // // Add more columns until max_capacity + assert!(Filter::add(&mut nippy, &col1[2]).is_ok()); + assert!(Filter::add(&mut nippy, &col1[3]).is_ok()); + assert!(matches!(Filter::add(&mut nippy, &col1[4]), Err(NippyJarError::FilterMaxCapacity))); + + nippy.freeze(vec![col1.clone(), col2.clone()], num_rows).unwrap(); + let loaded_nippy = NippyJar::load(file_path.path()).unwrap(); + + assert_eq!(nippy, loaded_nippy); + + assert!(Filter::contains(&loaded_nippy, &col1[0]).unwrap()); + assert!(Filter::contains(&loaded_nippy, &col1[1]).unwrap()); + assert!(Filter::contains(&loaded_nippy, &col1[2]).unwrap()); + assert!(Filter::contains(&loaded_nippy, &col1[3]).unwrap()); + assert!(!Filter::contains(&loaded_nippy, &col1[4]).unwrap()); + } #[test] fn zstd() { - let col1 = vec![ - vec![3, 0], - vec![3, 1], - vec![3, 2], - vec![3, 3], - vec![3, 4], - vec![3, 5], - vec![3, 6], - vec![3, 7], - vec![3, 8], - ]; - let col2 = vec![ - vec![3, 10], - vec![3, 11], - vec![3, 12], - vec![3, 13], - vec![3, 14], - vec![3, 15], - vec![3, 16], - vec![3, 17], - vec![3, 18], - ]; + let (col1, col2) = test_data(); let num_rows = col1.len() as u64; let _num_columns = 2; + let file_path = tempfile::NamedTempFile::new().unwrap(); - let data_file = NippyJar::new( - _num_columns, - false, - false, - &std::env::temp_dir().as_path().join(TEST_FILE_NAME), - ); - - assert_eq!(data_file.compressor, None); + let nippy = NippyJar::new(_num_columns, false, file_path.path()); + assert!(nippy.compressor.is_none()); - let mut data_file = NippyJar::new( - _num_columns, - false, - false, - &std::env::temp_dir().as_path().join(TEST_FILE_NAME), - ) - .with_zstd(true, 5000); + let mut nippy = NippyJar::new(_num_columns, false, file_path.path()).with_zstd(true, 5000); + assert!(nippy.compressor.is_some()); - if let Some(Compressors::Zstd(zstd)) = &mut data_file.compressor { + if let Some(Compressors::Zstd(zstd)) = &mut nippy.compressor { assert!(matches!(zstd.generate_compressors(), Err(NippyJarError::CompressorNotReady))); // Make sure the number of column iterators match the initial set up ones. @@ -362,8 +401,6 @@ mod tests { zstd.prepare_compression(vec![col1.clone(), col2.clone(), col2.clone()]), Err(NippyJarError::ColumnLenMismatch(_num_columns, 3)) )); - } else { - panic!("Expected ZSTD compressor"); } let data = vec![col1, col2]; @@ -371,28 +408,27 @@ mod tests { // If ZSTD is enabled, do not write to the file unless the column dictionaries have been // calculated. assert!(matches!( - data_file.freeze(data.clone(), num_rows), + nippy.freeze(data.clone(), num_rows), Err(NippyJarError::CompressorNotReady) )); - data_file.prepare(data.clone()).unwrap(); + nippy.prepare(data.clone()).unwrap(); - if let Some(Compressors::Zstd(zstd)) = &data_file.compressor { + if let Some(Compressors::Zstd(zstd)) = &nippy.compressor { assert!(matches!( (&zstd.state, zstd.raw_dictionaries.as_ref().map(|dict| dict.len())), (compression::ZstdState::Ready, Some(_num_columns)) )); } - data_file.freeze(data.clone(), num_rows).unwrap(); + nippy.freeze(data.clone(), num_rows).unwrap(); - let written_data = - NippyJar::load(&std::env::temp_dir().as_path().join(TEST_FILE_NAME)).unwrap(); - assert_eq!(data_file, written_data); + let loaded_nippy = NippyJar::load(file_path.path()).unwrap(); + assert_eq!(nippy, loaded_nippy); - if let Some(Compressors::Zstd(zstd)) = &data_file.compressor { + if let Some(Compressors::Zstd(zstd)) = &nippy.compressor { let mut cursor = NippyJarCursor::new( - &written_data, + &loaded_nippy, Some(Mutex::new(zstd.generate_decompressors().unwrap())), ) .unwrap();