diff --git a/Cargo.lock b/Cargo.lock index 311caafcb2fe..e57a6542afbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8883,6 +8883,7 @@ dependencies = [ "lz4_flex 0.11.3", "moka", "pin-project", + "prometheus", "serde", "serde_json", "sha2", diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 7d977a328ca1..7018b039d62e 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -32,6 +32,7 @@ use moka::notification::RemovalCause; use moka::sync::Cache; use parquet::column::page::Page; use parquet::file::metadata::ParquetMetaData; +use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef}; use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector}; use crate::cache::cache_size::parquet_meta_size; @@ -68,6 +69,8 @@ pub struct CacheManager { write_cache: Option, /// Cache for inverted index. index_cache: Option, + /// Puffin metadata cache. + puffin_metadata_cache: Option, /// Cache for time series selectors. selector_result_cache: Option, } @@ -217,6 +220,10 @@ impl CacheManager { pub(crate) fn index_cache(&self) -> Option<&InvertedIndexCacheRef> { self.index_cache.as_ref() } + + pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { + self.puffin_metadata_cache.as_ref() + } } /// Increases selector cache miss metrics. @@ -237,6 +244,7 @@ pub struct CacheManagerBuilder { page_cache_size: u64, index_metadata_size: u64, index_content_size: u64, + puffin_metadata_size: u64, write_cache: Option, selector_result_cache_size: u64, } @@ -278,6 +286,12 @@ impl CacheManagerBuilder { self } + /// Sets cache size for puffin metadata. + pub fn puffin_metadata_size(mut self, bytes: u64) -> Self { + self.puffin_metadata_size = bytes; + self + } + /// Sets selector result cache size. pub fn selector_result_cache_size(mut self, bytes: u64) -> Self { self.selector_result_cache_size = bytes; @@ -340,6 +354,8 @@ impl CacheManagerBuilder { }); let inverted_index_cache = InvertedIndexCache::new(self.index_metadata_size, self.index_content_size); + let puffin_metadata_cache = + PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES); let selector_result_cache = (self.selector_result_cache_size != 0).then(|| { Cache::builder() .max_capacity(self.selector_result_cache_size) @@ -361,6 +377,7 @@ impl CacheManagerBuilder { page_cache, write_cache: self.write_cache, index_cache: Some(Arc::new(inverted_index_cache)), + puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 9b113027a41b..dda3f4271059 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -304,6 +304,9 @@ pub struct IndexConfig { /// Write buffer size for creating the index. pub write_buffer_size: ReadableSize, + + /// Cache size for metadata of puffin files. Setting it to 0 to disable the cache. + pub metadata_cache_size: ReadableSize, } impl Default for IndexConfig { @@ -312,6 +315,7 @@ impl Default for IndexConfig { aux_path: String::new(), staging_size: ReadableSize::gb(2), write_buffer_size: ReadableSize::mb(8), + metadata_cache_size: ReadableSize::mb(64), } } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 19324f119f3e..32b8c90cda02 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -413,11 +413,15 @@ impl ScanRegion { .and_then(|c| c.index_cache()) .cloned(); + let puffin_metadata_cache = self + .cache_manager + .as_ref() + .and_then(|c| c.puffin_metadata_cache()) + .cloned(); + InvertedIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), self.access_layer.object_store().clone(), - file_cache, - index_cache, self.version.metadata.as_ref(), self.version.metadata.inverted_indexed_column_ids( self.version @@ -429,6 +433,9 @@ impl ScanRegion { ), self.access_layer.puffin_manager_factory().clone(), ) + .with_file_cache(file_cache) + .with_index_cache(index_cache) + .with_puffin_metadata_cache(puffin_metadata_cache) .build(&self.request.filters) .inspect_err(|err| warn!(err; "Failed to build invereted index applier")) .ok() diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 451ec44f1cd2..4353ae55e3e9 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -149,6 +149,7 @@ impl FileMeta { pub fn inverted_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::InvertedIndex) } + pub fn fulltext_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::FulltextIndex) } diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index cac3ffedd74c..bf5206ef44be 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -22,6 +22,7 @@ use index::inverted_index::search::index_apply::{ ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; use object_store::ObjectStore; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; use store_api::storage::RegionId; @@ -60,6 +61,9 @@ pub(crate) struct InvertedIndexApplier { /// In-memory cache for inverted index. inverted_index_cache: Option, + + /// Puffin metadata cache. + puffin_metadata_cache: Option, } pub(crate) type InvertedIndexApplierRef = Arc; @@ -70,8 +74,6 @@ impl InvertedIndexApplier { region_dir: String, region_id: RegionId, store: ObjectStore, - file_cache: Option, - index_cache: Option, index_applier: Box, puffin_manager_factory: PuffinManagerFactory, ) -> Self { @@ -81,13 +83,35 @@ impl InvertedIndexApplier { region_dir, region_id, store, - file_cache, + file_cache: None, index_applier, puffin_manager_factory, - inverted_index_cache: index_cache, + inverted_index_cache: None, + puffin_metadata_cache: None, } } + /// Sets the file cache. + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + /// Sets the index cache. + pub fn with_index_cache(mut self, index_cache: Option) -> Self { + self.inverted_index_cache = index_cache; + self + } + + /// Sets the puffin metadata cache. + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + /// Applies predicates to the provided SST file id and returns the relevant row group ids pub async fn apply(&self, file_id: FileId) -> Result { let _timer = INDEX_APPLY_ELAPSED @@ -105,6 +129,7 @@ impl InvertedIndexApplier { if let Err(err) = other { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") } + self.remote_blob_reader(file_id).await? } }; @@ -157,7 +182,10 @@ impl InvertedIndexApplier { /// Creates a blob reader from the remote index file. async fn remote_blob_reader(&self, file_id: FileId) -> Result { - let puffin_manager = self.puffin_manager_factory.build(self.store.clone()); + let puffin_manager = self + .puffin_manager_factory + .build(self.store.clone()) + .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); let file_path = location::index_file_path(&self.region_dir, file_id); puffin_manager .reader(&file_path) @@ -219,8 +247,6 @@ mod tests { region_dir.clone(), RegionId::new(0, 0), object_store, - None, - None, Box::new(mock_index_applier), puffin_manager_factory, ); @@ -261,8 +287,6 @@ mod tests { region_dir.clone(), RegionId::new(0, 0), object_store, - None, - None, Box::new(mock_index_applier), puffin_manager_factory, ); diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index 603cf5aa23fd..653679b9fca8 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -28,6 +28,7 @@ use datatypes::value::Value; use index::inverted_index::search::index_apply::PredicatesIndexApplier; use index::inverted_index::search::predicate::Predicate; use object_store::ObjectStore; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::ColumnId; @@ -65,6 +66,9 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> { /// Cache for inverted index. index_cache: Option, + + /// Cache for puffin metadata. + puffin_metadata_cache: Option, } impl<'a> InvertedIndexApplierBuilder<'a> { @@ -72,8 +76,6 @@ impl<'a> InvertedIndexApplierBuilder<'a> { pub fn new( region_dir: String, object_store: ObjectStore, - file_cache: Option, - index_cache: Option, metadata: &'a RegionMetadata, indexed_column_ids: HashSet, puffin_manager_factory: PuffinManagerFactory, @@ -81,15 +83,37 @@ impl<'a> InvertedIndexApplierBuilder<'a> { Self { region_dir, object_store, - file_cache, metadata, indexed_column_ids, output: HashMap::default(), - index_cache, puffin_manager_factory, + file_cache: None, + index_cache: None, + puffin_metadata_cache: None, } } + /// Sets the file cache. + pub fn with_file_cache(mut self, file_cache: Option) -> Self { + self.file_cache = file_cache; + self + } + + /// Sets the puffin metadata cache. + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } + + /// Sets the index cache. + pub fn with_index_cache(mut self, index_cache: Option) -> Self { + self.index_cache = index_cache; + self + } + /// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on /// the expressions provided. If no predicates match, returns `None`. pub fn build(mut self, exprs: &[Expr]) -> Result> { @@ -108,15 +132,18 @@ impl<'a> InvertedIndexApplierBuilder<'a> { .collect(); let applier = PredicatesIndexApplier::try_from(predicates); - Ok(Some(InvertedIndexApplier::new( - self.region_dir, - self.metadata.region_id, - self.object_store, - self.file_cache, - self.index_cache, - Box::new(applier.context(BuildIndexApplierSnafu)?), - self.puffin_manager_factory, - ))) + Ok(Some( + InvertedIndexApplier::new( + self.region_dir, + self.metadata.region_id, + self.object_store, + Box::new(applier.context(BuildIndexApplierSnafu)?), + self.puffin_manager_factory, + ) + .with_file_cache(self.file_cache) + .with_puffin_metadata_cache(self.puffin_metadata_cache) + .with_index_cache(self.index_cache), + )) } /// Recursively traverses expressions to collect predicates. @@ -322,8 +349,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs index 0a196e6f1ac6..51f7f001e25b 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs @@ -75,8 +75,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -118,8 +116,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -144,8 +140,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -187,8 +181,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -214,8 +206,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs index cdaec9f94e95..138b15b82eb9 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs @@ -231,8 +231,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -260,8 +258,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -280,8 +276,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -315,8 +309,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs index 1d07cca48724..35a5caad56a6 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs @@ -137,8 +137,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -175,8 +173,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -204,8 +200,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -224,8 +218,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -244,8 +236,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -303,8 +293,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -341,8 +329,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs index 6a520ba401d3..224e10c452ff 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs @@ -68,8 +68,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -101,8 +99,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -126,8 +122,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -159,8 +153,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -186,8 +178,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs index 7fdf7f3de55c..7148986e6d11 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs @@ -62,8 +62,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -91,8 +89,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -120,8 +116,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, @@ -142,8 +136,6 @@ mod tests { let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), - None, - None, &metadata, HashSet::from_iter([1, 2, 3]), facotry, diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 6db1ef6e0b7b..029a0da8484f 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -310,12 +310,14 @@ mod tests { use futures::future::BoxFuture; use object_store::services::Memory; use object_store::ObjectStore; + use puffin::puffin_manager::cache::PuffinMetadataCache; use puffin::puffin_manager::PuffinManager; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use super::*; use crate::cache::index::InvertedIndexCache; + use crate::metrics::CACHE_BYTES; use crate::read::BatchColumn; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; @@ -447,15 +449,16 @@ mod tests { move |expr| { let _d = &d; let cache = Arc::new(InvertedIndexCache::new(10, 10)); + let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES)); let applier = InvertedIndexApplierBuilder::new( region_dir.clone(), object_store.clone(), - None, - Some(cache), ®ion_metadata, indexed_column_ids.clone(), factory.clone(), ) + .with_index_cache(Some(cache)) + .with_puffin_metadata_cache(Some(puffin_metadata_cache)) .build(&[expr]) .unwrap() .unwrap(); diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 33d26c8196df..f8ab9c3f4edb 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -170,6 +170,7 @@ impl WorkerGroup { .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) .index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes()) .index_content_size(config.inverted_index.content_cache_size.as_bytes()) + .puffin_metadata_size(config.index.metadata_cache_size.as_bytes()) .write_cache(write_cache) .build(), ); diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index e4e6c74a5c9b..31c92ba4f972 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -25,6 +25,7 @@ futures.workspace = true lz4_flex = "0.11" moka = { workspace = true, features = ["future", "sync"] } pin-project.workspace = true +prometheus.workspace = true serde.workspace = true serde_json.workspace = true sha2 = "0.10.8" diff --git a/src/puffin/src/blob_metadata.rs b/src/puffin/src/blob_metadata.rs index bb2475bfa336..67eb62c5ff1b 100644 --- a/src/puffin/src/blob_metadata.rs +++ b/src/puffin/src/blob_metadata.rs @@ -68,6 +68,20 @@ pub struct BlobMetadata { pub properties: HashMap, } +impl BlobMetadata { + /// Calculates the memory usage of the blob metadata in bytes. + pub fn memory_usage(&self) -> usize { + self.blob_type.len() + + self.input_fields.len() * std::mem::size_of::() + + self + .properties + .iter() + .map(|(k, v)| k.len() + v.len()) + .sum::() + + std::mem::size_of::() + } +} + /// Compression codec used to compress the blob #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index 31e8e10bc4d5..9ed40a7f181e 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -46,6 +46,11 @@ impl PuffinFileReader { } } + pub fn with_metadata(mut self, metadata: Option) -> Self { + self.metadata = metadata; + self + } + fn validate_file_size(file_size: u64) -> Result<()> { ensure!( file_size >= MIN_FILE_SIZE, diff --git a/src/puffin/src/file_metadata.rs b/src/puffin/src/file_metadata.rs index 74eea3aa08f3..4804c65be495 100644 --- a/src/puffin/src/file_metadata.rs +++ b/src/puffin/src/file_metadata.rs @@ -33,6 +33,22 @@ pub struct FileMetadata { pub properties: HashMap, } +impl FileMetadata { + /// Calculates the memory usage of the file metadata in bytes. + pub fn memory_usage(&self) -> usize { + self.blobs + .iter() + .map(|blob| blob.memory_usage()) + .sum::() + + self + .properties + .iter() + .map(|(k, v)| k.len() + v.len()) + .sum::() + + std::mem::size_of::() + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 7bd5e9039d03..17101b1662e8 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod cache; pub mod file_accessor; pub mod fs_puffin_manager; pub mod stager; diff --git a/src/puffin/src/puffin_manager/cache.rs b/src/puffin/src/puffin_manager/cache.rs new file mode 100644 index 000000000000..66fcb36bf9c2 --- /dev/null +++ b/src/puffin/src/puffin_manager/cache.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use prometheus::IntGaugeVec; + +use crate::file_metadata::FileMetadata; +/// Metrics for index metadata. +const PUFFIN_METADATA_TYPE: &str = "puffin_metadata"; + +pub type PuffinMetadataCacheRef = Arc; + +/// A cache for storing the metadata of the index files. +pub struct PuffinMetadataCache { + cache: moka::sync::Cache>, +} + +fn puffin_metadata_weight(k: &String, v: &Arc) -> u32 { + (k.as_bytes().len() + v.memory_usage()) as u32 +} + +impl PuffinMetadataCache { + pub fn new(capacity: u64, cache_bytes: &'static IntGaugeVec) -> Self { + common_telemetry::debug!("Building PuffinMetadataCache with capacity: {capacity}"); + Self { + cache: moka::sync::CacheBuilder::new(capacity) + .name("puffin_metadata") + .weigher(puffin_metadata_weight) + .eviction_listener(|k, v, _cause| { + let size = puffin_metadata_weight(&k, &v); + cache_bytes + .with_label_values(&[PUFFIN_METADATA_TYPE]) + .sub(size.into()); + }) + .build(), + } + } + + /// Gets the metadata from the cache. + pub fn get_metadata(&self, file_id: &str) -> Option> { + self.cache.get(file_id) + } + + /// Puts the metadata into the cache. + pub fn put_metadata(&self, file_id: String, metadata: Arc) { + self.cache.insert(file_id, metadata); + } +} diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 976eb239979a..52190f92fb28 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -21,6 +21,7 @@ pub use reader::FsPuffinReader; pub use writer::FsPuffinWriter; use crate::error::Result; +use crate::puffin_manager::cache::PuffinMetadataCacheRef; use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::stager::Stager; use crate::puffin_manager::PuffinManager; @@ -31,16 +32,29 @@ pub struct FsPuffinManager { stager: S, /// The puffin file accessor. puffin_file_accessor: F, + /// The puffin metadata cache. + puffin_metadata_cache: Option, } impl FsPuffinManager { - /// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`. + /// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`, + /// and optionally with a `puffin_metadata_cache`. pub fn new(stager: S, puffin_file_accessor: F) -> Self { Self { stager, puffin_file_accessor, + puffin_metadata_cache: None, } } + + /// Sets the puffin metadata cache. + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } } #[async_trait] @@ -57,6 +71,7 @@ where puffin_file_name.to_string(), self.stager.clone(), self.puffin_file_accessor.clone(), + self.puffin_metadata_cache.clone(), )) } diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 3de27fdb77b0..2e1ae594adc6 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -14,6 +14,7 @@ use std::io; use std::ops::Range; +use std::sync::Arc; use async_compression::futures::bufread::ZstdDecoder; use async_trait::async_trait; @@ -23,12 +24,14 @@ use futures::io::BufReader; use futures::{AsyncRead, AsyncWrite}; use snafu::{ensure, OptionExt, ResultExt}; +use super::PuffinMetadataCacheRef; use crate::blob_metadata::{BlobMetadata, CompressionCodec}; use crate::error::{ BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu, MetadataSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu, }; use crate::file_format::reader::{AsyncReader, PuffinFileReader}; +use crate::file_metadata::FileMetadata; use crate::partial_reader::PartialReader; use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; @@ -45,14 +48,23 @@ pub struct FsPuffinReader { /// The puffin file accessor. puffin_file_accessor: F, + + /// The puffin file metadata cache. + puffin_file_metadata_cache: Option, } impl FsPuffinReader { - pub(crate) fn new(puffin_file_name: String, stager: S, puffin_file_accessor: F) -> Self { + pub(crate) fn new( + puffin_file_name: String, + stager: S, + puffin_file_accessor: F, + puffin_file_metadata_cache: Option, + ) -> Self { Self { puffin_file_name, stager, puffin_file_accessor, + puffin_file_metadata_cache, } } } @@ -73,13 +85,13 @@ where .await?; let mut file = PuffinFileReader::new(reader); - // TODO(zhongzc): cache the metadata. - let metadata = file.metadata().await?; + let metadata = self.get_puffin_file_metadata(&mut file).await?; let blob_metadata = metadata .blobs - .into_iter() + .iter() .find(|m| m.blob_type == key) - .context(BlobNotFoundSnafu { blob: key })?; + .context(BlobNotFoundSnafu { blob: key })? + .clone(); let blob = if blob_metadata.compression_codec.is_none() { // If the blob is not compressed, we can directly read it from the puffin file. @@ -133,6 +145,23 @@ where S: Stager, F: PuffinFileAccessor + Clone, { + async fn get_puffin_file_metadata( + &self, + reader: &mut PuffinFileReader, + ) -> Result> { + if let Some(cache) = self.puffin_file_metadata_cache.as_ref() { + if let Some(metadata) = cache.get_metadata(&self.puffin_file_name) { + return Ok(metadata); + } + } + + let metadata = Arc::new(reader.metadata().await?); + if let Some(cache) = self.puffin_file_metadata_cache.as_ref() { + cache.put_metadata(self.puffin_file_name.to_string(), metadata.clone()); + } + Ok(metadata) + } + async fn init_blob_to_stager( reader: PuffinFileReader, blob_metadata: BlobMetadata,