Skip to content

Commit

Permalink
feat: introduce PuffinMetadataCache (#5148)
Browse files Browse the repository at this point in the history
* feat: introduce `PuffinMetadataCache`

* refactor: remove too_many_arguments

* chore: fmt toml
  • Loading branch information
WenyXu authored Dec 12, 2024
1 parent 8c1959c commit d53fbcb
Show file tree
Hide file tree
Showing 22 changed files with 258 additions and 84 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use moka::notification::RemovalCause;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};

use crate::cache::cache_size::parquet_meta_size;
Expand Down Expand Up @@ -68,6 +69,8 @@ pub struct CacheManager {
write_cache: Option<WriteCacheRef>,
/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,
/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for time series selectors.
selector_result_cache: Option<SelectorResultCache>,
}
Expand Down Expand Up @@ -217,6 +220,10 @@ impl CacheManager {
pub(crate) fn index_cache(&self) -> Option<&InvertedIndexCacheRef> {
self.index_cache.as_ref()
}

pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
self.puffin_metadata_cache.as_ref()
}
}

/// Increases selector cache miss metrics.
Expand All @@ -237,6 +244,7 @@ pub struct CacheManagerBuilder {
page_cache_size: u64,
index_metadata_size: u64,
index_content_size: u64,
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
}
Expand Down Expand Up @@ -278,6 +286,12 @@ impl CacheManagerBuilder {
self
}

/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
self
}

/// Sets selector result cache size.
pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
self.selector_result_cache_size = bytes;
Expand Down Expand Up @@ -340,6 +354,8 @@ impl CacheManagerBuilder {
});
let inverted_index_cache =
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.selector_result_cache_size)
Expand All @@ -361,6 +377,7 @@ impl CacheManagerBuilder {
page_cache,
write_cache: self.write_cache,
index_cache: Some(Arc::new(inverted_index_cache)),
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ pub struct IndexConfig {

/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,

/// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
pub metadata_cache_size: ReadableSize,
}

impl Default for IndexConfig {
Expand All @@ -312,6 +315,7 @@ impl Default for IndexConfig {
aux_path: String::new(),
staging_size: ReadableSize::gb(2),
write_buffer_size: ReadableSize::mb(8),
metadata_cache_size: ReadableSize::mb(64),
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,15 @@ impl ScanRegion {
.and_then(|c| c.index_cache())
.cloned();

let puffin_metadata_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.puffin_metadata_cache())
.cloned();

InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
file_cache,
index_cache,
self.version.metadata.as_ref(),
self.version.metadata.inverted_indexed_column_ids(
self.version
Expand All @@ -429,6 +433,9 @@ impl ScanRegion {
),
self.access_layer.puffin_manager_factory().clone(),
)
.with_file_cache(file_cache)
.with_index_cache(index_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
.ok()
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl FileMeta {
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}

pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}
Expand Down
42 changes: 33 additions & 9 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -60,6 +61,9 @@ pub(crate) struct InvertedIndexApplier {

/// In-memory cache for inverted index.
inverted_index_cache: Option<InvertedIndexCacheRef>,

/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}

pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
Expand All @@ -70,8 +74,6 @@ impl InvertedIndexApplier {
region_dir: String,
region_id: RegionId,
store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_cache: Option<InvertedIndexCacheRef>,
index_applier: Box<dyn IndexApplier>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Expand All @@ -81,13 +83,35 @@ impl InvertedIndexApplier {
region_dir,
region_id,
store,
file_cache,
file_cache: None,
index_applier,
puffin_manager_factory,
inverted_index_cache: index_cache,
inverted_index_cache: None,
puffin_metadata_cache: None,
}
}

/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}

/// Sets the index cache.
pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
self.inverted_index_cache = index_cache;
self
}

/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}

/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
Expand All @@ -105,6 +129,7 @@ impl InvertedIndexApplier {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}

self.remote_blob_reader(file_id).await?
}
};
Expand Down Expand Up @@ -157,7 +182,10 @@ impl InvertedIndexApplier {

/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(&self, file_id: FileId) -> Result<BlobReader> {
let puffin_manager = self.puffin_manager_factory.build(self.store.clone());
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
Expand Down Expand Up @@ -219,8 +247,6 @@ mod tests {
region_dir.clone(),
RegionId::new(0, 0),
object_store,
None,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
Expand Down Expand Up @@ -261,8 +287,6 @@ mod tests {
region_dir.clone(),
RegionId::new(0, 0),
object_store,
None,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
Expand Down
55 changes: 40 additions & 15 deletions src/mito2/src/sst/index/inverted_index/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
use index::inverted_index::search::predicate::Predicate;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;
Expand Down Expand Up @@ -65,31 +66,54 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> {

/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,

/// Cache for puffin metadata.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}

impl<'a> InvertedIndexApplierBuilder<'a> {
/// Creates a new [`InvertedIndexApplierBuilder`].
pub fn new(
region_dir: String,
object_store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_cache: Option<InvertedIndexCacheRef>,
metadata: &'a RegionMetadata,
indexed_column_ids: HashSet<ColumnId>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
object_store,
file_cache,
metadata,
indexed_column_ids,
output: HashMap::default(),
index_cache,
puffin_manager_factory,
file_cache: None,
index_cache: None,
puffin_metadata_cache: None,
}
}

/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}

/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}

/// Sets the index cache.
pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
self.index_cache = index_cache;
self
}

/// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
/// the expressions provided. If no predicates match, returns `None`.
pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
Expand All @@ -108,15 +132,18 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
.collect();
let applier = PredicatesIndexApplier::try_from(predicates);

Ok(Some(InvertedIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
self.file_cache,
self.index_cache,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
)))
Ok(Some(
InvertedIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
.with_index_cache(self.index_cache),
))
}

/// Recursively traverses expressions to collect predicates.
Expand Down Expand Up @@ -322,8 +349,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand Down
10 changes: 0 additions & 10 deletions src/mito2/src/sst/index/inverted_index/applier/builder/between.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand Down Expand Up @@ -118,8 +116,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand All @@ -144,8 +140,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand Down Expand Up @@ -187,8 +181,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand All @@ -214,8 +206,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand Down
Loading

0 comments on commit d53fbcb

Please sign in to comment.