From 064d61876423e857e6d47b3894da1ff262c4f7ce Mon Sep 17 00:00:00 2001 From: marvin-j97 Date: Sat, 25 May 2024 18:26:53 +0200 Subject: [PATCH] #29 #37 --- Cargo.toml | 30 +++++++++------- README.md | 2 +- benches/tree.rs | 55 ++++++++++++++++------------- src/compaction/fifo.rs | 2 +- src/compaction/levelled.rs | 2 +- src/compaction/maintenance.rs | 2 +- src/compaction/tiered.rs | 2 +- src/compaction/worker.rs | 2 +- src/config.rs | 11 ++++-- src/error.rs | 10 ++---- src/levels/mod.rs | 2 +- src/segment/block/header.rs | 2 +- src/segment/block/mod.rs | 24 +++++++++---- src/segment/meta/compression.rs | 30 +++++++++++++++- src/segment/meta/mod.rs | 4 +-- src/segment/multi_writer.rs | 6 ++-- src/segment/prefix.rs | 6 ++-- src/segment/range.rs | 8 ++--- src/segment/value_block.rs | 2 +- src/segment/value_block_consumer.rs | 2 +- src/segment/writer.rs | 4 +-- 21 files changed, 127 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f8c5be32..1c2adb31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,12 @@ name = "lsm_tree" path = "src/lib.rs" [features] -default = [] +default = ["lz4"] +lz4 = ["dep:lz4_flex"] +miniz = ["dep:miniz_oxide"] bloom = ["dep:seahash"] segment_history = ["dep:serde", "dep:serde_json"] +miniz_oxide = ["dep:miniz_oxide"] [dependencies] byteorder = "1.5.0" @@ -27,7 +30,8 @@ crc32fast = "1.4.0" crossbeam-skiplist = "0.1.3" double-ended-peekable = "0.1.0" log = "0.4.21" -lz4_flex = "0.11.3" +lz4_flex = { version = "0.11.3", optional = true } +miniz_oxide = { version = "0.7.3", optional = true } path-absolutize = "3.1.1" quick_cache = { version = "0.5.1", default-features = false, features = [] } seahash = { version = "4.1.0", optional = true } @@ -42,20 +46,20 @@ fs_extra = "1.3.0" nanoid = "0.4.0" test-log = "0.2.16" -[[bench]] -name = "bloom" -harness = false -path = "benches/bloom.rs" -required-features = ["bloom"] +# [[bench]] +# name = "bloom" +# harness = false +# path = "benches/bloom.rs" +# required-features = ["bloom"] [[bench]] name = "tree" harness = false path = "benches/tree.rs" -required-features = [] +required-features = ["lz4", "miniz"] -[[bench]] -name = "level_manifest" -harness = false -path = "benches/level_manifest.rs" -required-features = [] +# [[bench]] +# name = "level_manifest" +# harness = false +# path = "benches/level_manifest.rs" +# required-features = [] diff --git a/README.md b/README.md index 1b3c5a31..838b0cb2 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ All contributions are to be licensed as MIT OR Apache-2.0. ### Run benchmarks ```bash -cargo bench --features bloom +cargo bench --features bloom --features lz4 --features miniz ``` ## Footnotes diff --git a/benches/tree.rs b/benches/tree.rs index a413e9ee..2aa218ac 100644 --- a/benches/tree.rs +++ b/benches/tree.rs @@ -363,8 +363,12 @@ fn index_block_find_handle(c: &mut Criterion) { fn load_block_from_disk(c: &mut Criterion) { let mut group = c.benchmark_group("Load block from disk"); - for block_size in [1, 4, 8, 16, 32, 64] { - group.bench_function(format!("{block_size} KiB"), |b| { + for comp_type in [ + CompressionType::None, + CompressionType::Lz4, + CompressionType::Miniz, + ] { + for block_size in [1, 4, 8, 16, 32, 64] { let block_size = block_size * 1_024; let mut size = 0; @@ -388,32 +392,33 @@ fn load_block_from_disk(c: &mut Criterion) { } } - let mut block = ValueBlock { - items: items.clone().into_boxed_slice(), - header: BlockHeader { - compression: CompressionType::Lz4, - crc: 0, - data_length: 0, - previous_block_offset: 0, - }, - }; + group.bench_function(format!("{block_size} KiB [{comp_type}]"), |b| { + let mut block = ValueBlock { + items: items.clone().into_boxed_slice(), + header: BlockHeader { + compression: comp_type, + crc: 0, + data_length: 0, + previous_block_offset: 0, + }, + }; + + // Serialize block + block.header.crc = ValueBlock::create_crc(&block.items).unwrap(); + let (header, data) = ValueBlock::to_bytes_compressed(&items, 0, comp_type).unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + header.serialize(&mut file).unwrap(); + file.write_all(&data).unwrap(); - // Serialize block - block.header.crc = ValueBlock::create_crc(&block.items).unwrap(); - let (header, data) = - ValueBlock::to_bytes_compressed(&items, 0, CompressionType::Lz4).unwrap(); - - let mut file = tempfile::tempfile().unwrap(); - header.serialize(&mut file).unwrap(); - file.write_all(&data).unwrap(); - - b.iter(|| { - let loaded_block = ValueBlock::from_file_compressed(&mut file, 0).unwrap(); + b.iter(|| { + let loaded_block = ValueBlock::from_file_compressed(&mut file, 0).unwrap(); - assert_eq!(loaded_block.items.len(), block.items.len()); - assert_eq!(loaded_block.header.crc, block.header.crc); + assert_eq!(loaded_block.items.len(), block.items.len()); + assert_eq!(loaded_block.header.crc, block.header.crc); + }); }); - }); + } } } diff --git a/src/compaction/fifo.rs b/src/compaction/fifo.rs index 60ea523e..5b4001d7 100644 --- a/src/compaction/fifo.rs +++ b/src/compaction/fifo.rs @@ -145,7 +145,7 @@ mod tests { created_at, id, file_size: 1, - compression: crate::segment::meta::CompressionType::Lz4, + compression: crate::segment::meta::CompressionType::None, table_type: crate::segment::meta::TableType::Block, item_count: 0, key_count: 0, diff --git a/src/compaction/levelled.rs b/src/compaction/levelled.rs index c7f87317..2d856240 100644 --- a/src/compaction/levelled.rs +++ b/src/compaction/levelled.rs @@ -239,7 +239,7 @@ mod tests { created_at: unix_timestamp().as_nanos(), id, file_size: size, - compression: crate::segment::meta::CompressionType::Lz4, + compression: crate::segment::meta::CompressionType::None, table_type: crate::segment::meta::TableType::Block, item_count: 0, key_count: 0, diff --git a/src/compaction/maintenance.rs b/src/compaction/maintenance.rs index 96592104..4fed8302 100644 --- a/src/compaction/maintenance.rs +++ b/src/compaction/maintenance.rs @@ -113,7 +113,7 @@ mod tests { created_at, id, file_size: 1, - compression: crate::segment::meta::CompressionType::Lz4, + compression: crate::segment::meta::CompressionType::None, table_type: crate::segment::meta::TableType::Block, item_count: 0, key_count: 0, diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index abe1acd9..91e62c18 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -147,7 +147,7 @@ mod tests { created_at: 0, id, file_size: size_mib * 1_024 * 1_024, - compression: crate::segment::meta::CompressionType::Lz4, + compression: crate::segment::meta::CompressionType::None, table_type: crate::segment::meta::TableType::Block, item_count: 0, key_count: 0, diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index a1aebfd2..4f374abf 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -199,7 +199,7 @@ fn merge_segments( block_size: opts.config.inner.block_size, evict_tombstones: should_evict_tombstones, folder: segments_base_folder.clone(), - compression: CompressionType::Lz4, + compression: CompressionType::None, #[cfg(feature = "bloom")] bloom_fp_rate, diff --git a/src/config.rs b/src/config.rs index 11ac8e07..d04d73ba 100644 --- a/src/config.rs +++ b/src/config.rs @@ -74,8 +74,13 @@ impl Default for PersistedConfig { block_size: 4_096, level_count: 7, r#type: TreeType::Standard, - compression: CompressionType::Lz4, table_type: TableType::Block, + + #[cfg(not(feature = "lz4"))] + compression: CompressionType::None, + + #[cfg(feature = "lz4")] + compression: CompressionType::Lz4, } } } @@ -279,7 +284,7 @@ mod tests { fn tree_config_raw() -> crate::Result<()> { let config = PersistedConfig { r#type: TreeType::Standard, - compression: CompressionType::Lz4, + compression: CompressionType::None, table_type: TableType::Block, block_size: 4_096, level_count: 7, @@ -318,7 +323,7 @@ mod tests { fn tree_config_serde_round_trip() -> crate::Result<()> { let config = PersistedConfig { r#type: TreeType::Standard, - compression: CompressionType::Lz4, + compression: CompressionType::None, table_type: TableType::Block, block_size: 4_096, level_count: 7, diff --git a/src/error.rs b/src/error.rs index 8aa5724a..48b8f3cf 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,8 +1,8 @@ use crate::{ serde::{DeserializeError, SerializeError}, version::Version, + CompressionType, }; -use lz4_flex::block::DecompressError; /// Represents errors that can occur in the LSM-tree #[derive(Debug)] @@ -17,7 +17,7 @@ pub enum Error { Deserialize(DeserializeError), /// Decompression failed - Decompress(DecompressError), + Decompress(CompressionType), /// Invalid or unparseable data format version InvalidVersion(Option), @@ -49,11 +49,5 @@ impl From for Error { } } -impl From for Error { - fn from(value: DecompressError) -> Self { - Self::Decompress(value) - } -} - /// Tree result pub type Result = std::result::Result; diff --git a/src/levels/mod.rs b/src/levels/mod.rs index 0d8761b4..8fd56292 100644 --- a/src/levels/mod.rs +++ b/src/levels/mod.rs @@ -516,7 +516,7 @@ mod tests { created_at: 0, id, file_size: 0, - compression: crate::segment::meta::CompressionType::Lz4, + compression: crate::segment::meta::CompressionType::None, table_type: crate::segment::meta::TableType::Block, item_count: 0, key_count: 0, diff --git a/src/segment/block/header.rs b/src/segment/block/header.rs index 8d5c9fc5..caea893b 100644 --- a/src/segment/block/header.rs +++ b/src/segment/block/header.rs @@ -98,7 +98,7 @@ mod tests { #[test] fn block_header_raw() -> crate::Result<()> { let header = Header { - compression: CompressionType::Lz4, + compression: CompressionType::None, crc: 4, previous_block_offset: 2, data_length: 15, diff --git a/src/segment/block/mod.rs b/src/segment/block/mod.rs index eaae1a36..78a8ff0c 100644 --- a/src/segment/block/mod.rs +++ b/src/segment/block/mod.rs @@ -1,13 +1,11 @@ pub mod header; +use super::meta::CompressionType; use crate::serde::{Deserializable, Serializable}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use header::Header as BlockHeader; -use lz4_flex::{compress_prepend_size, decompress_size_prepended}; use std::io::{Cursor, Read}; -use super::meta::CompressionType; - /// A disk-based block /// /// A block is split into its header and a blob of data. @@ -32,7 +30,14 @@ impl Block { let bytes = match header.compression { super::meta::CompressionType::None => bytes, - super::meta::CompressionType::Lz4 => decompress_size_prepended(&bytes)?, + + #[cfg(feature = "lz4")] + super::meta::CompressionType::Lz4 => lz4_flex::decompress_size_prepended(&bytes) + .map_err(|_| crate::Error::Decompress(header.compression))?, + + #[cfg(feature = "miniz")] + super::meta::CompressionType::Miniz => miniz_oxide::inflate::decompress_to_vec(&bytes) + .map_err(|_| crate::Error::Decompress(header.compression))?, }; let mut bytes = Cursor::new(bytes); @@ -113,7 +118,12 @@ impl Block { Ok(match compression { CompressionType::None => buf, - CompressionType::Lz4 => compress_prepend_size(&buf), + + #[cfg(feature = "lz4")] + CompressionType::Lz4 => lz4_flex::compress_prepend_size(&buf), + + #[cfg(feature = "miniz")] + CompressionType::Miniz => miniz_oxide::deflate::compress_to_vec(&buf, 10), }) } } @@ -136,7 +146,7 @@ mod tests { // Serialize to bytes let mut serialized = Vec::new(); - let (header, data) = ValueBlock::to_bytes_compressed(&items, 0, CompressionType::Lz4)?; + let (header, data) = ValueBlock::to_bytes_compressed(&items, 0, CompressionType::None)?; header.serialize(&mut serialized)?; serialized.write_all(&data)?; @@ -165,7 +175,7 @@ mod tests { // Serialize to bytes let mut serialized = Vec::new(); - let (header, data) = ValueBlock::to_bytes_compressed(&items, 0, CompressionType::Lz4)?; + let (header, data) = ValueBlock::to_bytes_compressed(&items, 0, CompressionType::None)?; header.serialize(&mut serialized)?; serialized.write_all(&data)?; diff --git a/src/segment/meta/compression.rs b/src/segment/meta/compression.rs index b4aef388..68f93610 100644 --- a/src/segment/meta/compression.rs +++ b/src/segment/meta/compression.rs @@ -6,14 +6,24 @@ #[allow(clippy::module_name_repetitions)] pub enum CompressionType { None, + + #[cfg(feature = "lz4")] Lz4, + + #[cfg(feature = "miniz")] + Miniz, } impl From for u8 { fn from(val: CompressionType) -> Self { match val { CompressionType::None => 0, + + #[cfg(feature = "lz4")] CompressionType::Lz4 => 1, + + #[cfg(feature = "miniz")] + CompressionType::Miniz => 2, } } } @@ -24,7 +34,13 @@ impl TryFrom for CompressionType { fn try_from(value: u8) -> Result { match value { 0 => Ok(Self::None), + + #[cfg(feature = "lz4")] 1 => Ok(Self::Lz4), + + #[cfg(feature = "miniz")] + 2 => Ok(Self::Miniz), + _ => Err(()), } } @@ -32,6 +48,18 @@ impl TryFrom for CompressionType { impl std::fmt::Display for CompressionType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "lz4") + write!( + f, + "{}", + match self { + Self::None => "no compression", + + #[cfg(feature = "lz4")] + Self::Lz4 => "lz4", + + #[cfg(feature = "miniz")] + Self::Miniz => "miniz", + } + ) } } diff --git a/src/segment/meta/mod.rs b/src/segment/meta/mod.rs index b822b03f..12cf9aaa 100644 --- a/src/segment/meta/mod.rs +++ b/src/segment/meta/mod.rs @@ -181,7 +181,7 @@ impl Metadata { created_at: unix_timestamp().as_micros(), file_size: writer.file_pos, - compression: CompressionType::Lz4, + compression: CompressionType::None, table_type: TableType::Block, item_count: writer.item_count as u64, key_count: writer.key_count as u64, @@ -227,7 +227,7 @@ mod tests { created_at: 5, id: 632_632, file_size: 1, - compression: CompressionType::Lz4, + compression: CompressionType::None, table_type: TableType::Block, item_count: 0, key_count: 0, diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index a6a4d11c..50a0cd50 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -2,7 +2,7 @@ use super::{ trailer::SegmentFileTrailer, writer::{Options, Writer}, }; -use crate::{segment::meta::CompressionType, Value}; +use crate::Value; use std::sync::{atomic::AtomicU64, Arc}; /// Like `Writer` but will rotate to a new segment, once a segment grows larger than `target_size` @@ -40,7 +40,7 @@ impl MultiWriter { folder: opts.folder.clone(), evict_tombstones: opts.evict_tombstones, block_size: opts.block_size, - compression: CompressionType::Lz4, + compression: opts.compression, #[cfg(feature = "bloom")] bloom_fp_rate: opts.bloom_fp_rate, @@ -75,7 +75,7 @@ impl MultiWriter { folder: self.opts.folder.clone(), evict_tombstones: self.opts.evict_tombstones, block_size: self.opts.block_size, - compression: CompressionType::Lz4, + compression: self.opts.compression, #[cfg(feature = "bloom")] bloom_fp_rate: self.opts.bloom_fp_rate, diff --git a/src/segment/prefix.rs b/src/segment/prefix.rs index 411740b9..2dcd73f5 100644 --- a/src/segment/prefix.rs +++ b/src/segment/prefix.rs @@ -179,7 +179,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size: 4096, - compression: CompressionType::Lz4, + compression: CompressionType::None, #[cfg(feature = "bloom")] bloom_fp_rate: 0.01, @@ -289,7 +289,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size: 4096, - compression: CompressionType::Lz4, + compression: CompressionType::None, #[cfg(feature = "bloom")] bloom_fp_rate: 0.01, @@ -389,7 +389,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size: 4096, - compression: CompressionType::Lz4, + compression: CompressionType::None, #[cfg(feature = "bloom")] bloom_fp_rate: 0.01, diff --git a/src/segment/range.rs b/src/segment/range.rs index efd30390..2404e103 100644 --- a/src/segment/range.rs +++ b/src/segment/range.rs @@ -255,7 +255,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size: 1000, // NOTE: Block size 1 to for each item to be its own block - compression: CompressionType::Lz4, + compression: CompressionType::None, #[cfg(feature = "bloom")] bloom_fp_rate: 0.01, @@ -358,7 +358,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size: 4096, - compression: CompressionType::Lz4, + compression: CompressionType::None, #[cfg(feature = "bloom")] bloom_fp_rate: 0.01, @@ -562,7 +562,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size, - compression: CompressionType::Lz4, + compression: CompressionType::None, #[cfg(feature = "bloom")] bloom_fp_rate: 0.01, @@ -669,7 +669,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size: 250, - compression: CompressionType::Lz4, + compression: CompressionType::None, #[cfg(feature = "bloom")] bloom_fp_rate: 0.01, diff --git a/src/segment/value_block.rs b/src/segment/value_block.rs index 404cd5b8..5d2d19ca 100644 --- a/src/segment/value_block.rs +++ b/src/segment/value_block.rs @@ -92,7 +92,7 @@ mod tests { let block = ValueBlock { items: items.into_boxed_slice(), header: BlockHeader { - compression: CompressionType::Lz4, + compression: CompressionType::None, crc: 0, data_length: 0, previous_block_offset: 0, diff --git a/src/segment/value_block_consumer.rs b/src/segment/value_block_consumer.rs index 406b6ded..cf412598 100644 --- a/src/segment/value_block_consumer.rs +++ b/src/segment/value_block_consumer.rs @@ -102,7 +102,7 @@ mod tests { fn block(items: Vec) -> ValueBlock { ValueBlock { header: Header { - compression: crate::segment::meta::CompressionType::Lz4, + compression: crate::segment::meta::CompressionType::None, crc: 0, data_length: 0, previous_block_offset: 0, diff --git a/src/segment/writer.rs b/src/segment/writer.rs index 2c875f68..0fd20e18 100644 --- a/src/segment/writer.rs +++ b/src/segment/writer.rs @@ -335,7 +335,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size: 4096, - compression: CompressionType::Lz4, + compression: CompressionType::None, segment_id, @@ -395,7 +395,7 @@ mod tests { folder: folder.clone(), evict_tombstones: false, block_size: 4096, - compression: CompressionType::Lz4, + compression: CompressionType::None, segment_id,