Skip to content

Commit

Permalink
feat: adds ObjectCache, to cache Manifests and ManifestLists
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Aug 9, 2024
1 parent 80c1399 commit 0c1c40c
Show file tree
Hide file tree
Showing 11 changed files with 357 additions and 67 deletions.
12 changes: 4 additions & 8 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,12 @@ impl Catalog for GlueCatalog {

builder.send().await.map_err(from_aws_sdk_error)?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.build();

Ok(table)
.build()
}

/// Loads a table from the Glue Catalog and constructs a `Table` object
Expand Down Expand Up @@ -432,17 +430,15 @@ impl Catalog for GlueCatalog {
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table_name.to_owned(),
))
.build();

Ok(table)
.build()
}
}
}
Expand Down
12 changes: 4 additions & 8 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,12 @@ impl Catalog for HmsCatalog {
.await
.map_err(from_thrift_error)?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.build();

Ok(table)
.build()
}

/// Loads a table from the Hive Metastore and constructs a `Table` object
Expand Down Expand Up @@ -407,17 +405,15 @@ impl Catalog for HmsCatalog {
let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(TableIdent::new(
NamespaceIdent::new(db_name),
table.name.clone(),
))
.build();

Ok(table)
.build()
}

/// Asynchronously drops a table from the database.
Expand Down
13 changes: 5 additions & 8 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,12 @@ impl Catalog for MemoryCatalog {

root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;

let table = Table::builder()
Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(table_ident)
.build();

Ok(table)
.build()
}

/// Load table from the catalog.
Expand All @@ -227,14 +225,13 @@ impl Catalog for MemoryCatalog {
let input_file = self.file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
let table = Table::builder()

Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location.clone())
.metadata(metadata)
.identifier(table_ident.clone())
.build();

Ok(table)
.build()
}

/// Drop a table from the catalog.
Expand Down
16 changes: 8 additions & 8 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ impl Catalog for RestCatalog {
.load_file_io(resp.metadata_location.as_deref(), resp.config)
.await?;

let table = Table::builder()
Table::builder()
.identifier(table_ident)
.file_io(file_io)
.metadata(resp.metadata)
Expand All @@ -526,9 +526,7 @@ impl Catalog for RestCatalog {
"Metadata location missing in create table response!",
)
})?)
.build();

Ok(table)
.build()
}

/// Load table from the catalog.
Expand Down Expand Up @@ -560,9 +558,9 @@ impl Catalog for RestCatalog {
.metadata(resp.metadata);

if let Some(metadata_location) = resp.metadata_location {
Ok(table_builder.metadata_location(metadata_location).build())
table_builder.metadata_location(metadata_location).build()
} else {
Ok(table_builder.build())
table_builder.build()
}
}

Expand Down Expand Up @@ -661,12 +659,12 @@ impl Catalog for RestCatalog {
let file_io = self
.load_file_io(Some(&resp.metadata_location), None)
.await?;
Ok(Table::builder()
Table::builder()
.identifier(commit.identifier().clone())
.file_io(file_io)
.metadata(resp.metadata)
.metadata_location(resp.metadata_location)
.build())
.build()
}
}

Expand Down Expand Up @@ -1661,6 +1659,7 @@ mod tests {
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
.build()
.unwrap()
};

let table = Transaction::new(&table1)
Expand Down Expand Up @@ -1785,6 +1784,7 @@ mod tests {
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIO::from_path("/tmp").unwrap().build().unwrap())
.build()
.unwrap()
};

let table_result = Transaction::new(&table1)
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ derive_builder = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
moka = { version = "0.12.8", features = ["future"] }
murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ use storage_memory::*;
mod storage_s3;
#[cfg(feature = "storage-s3")]
pub use storage_s3::*;
pub(crate) mod object_cache;
#[cfg(feature = "storage-fs")]
mod storage_fs;

#[cfg(feature = "storage-fs")]
use storage_fs::*;
161 changes: 161 additions & 0 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use crate::io::FileIO;
use crate::spec::{
FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef,
};
use crate::{Error, ErrorKind, Result};

const DEFAULT_CACHE_SIZE_BYTES: u64 = 2 ^ 15; // 32MB

#[derive(Clone, Debug)]
pub(crate) enum CachedItem {
ManifestList(Arc<ManifestList>),
Manifest(Arc<Manifest>),
}

#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub(crate) enum CachedObjectKey {
ManifestList((String, FormatVersion, SchemaId)),
Manifest(String),
}

/// Caches metadata objects deserialized from immutable files
#[derive(Clone, Debug)]
pub struct ObjectCache {
cache: moka::future::Cache<CachedObjectKey, CachedItem>,
file_io: FileIO,
cache_disabled: bool,
}

impl ObjectCache {
/// Creates a new [`ObjectCache`]
/// with the default cache size
pub(crate) fn new(file_io: FileIO) -> Self {
Self::new_with_cache_size(file_io, DEFAULT_CACHE_SIZE_BYTES)
}

/// Creates a new [`ObjectCache`]
/// with a specific cache size
pub(crate) fn new_with_cache_size(file_io: FileIO, cache_size_bytes: u64) -> Self {
if cache_size_bytes == 0 {
Self::with_disabled_cache(file_io)
} else {
Self {
cache: moka::future::Cache::new(cache_size_bytes),
file_io,
cache_disabled: false,
}
}
}

/// Creates a new [`ObjectCache`]
/// with caching disabled
pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self {
Self {
cache: moka::future::Cache::new(0),
file_io,
cache_disabled: true,
}
}

/// Retrieves an Arc [`Manifest`] from the cache
/// or retrieves one from FileIO and parses it if not present
pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
if self.cache_disabled {
return manifest_file
.load_manifest(&self.file_io)
.await
.map(Arc::new);
}

let key = CachedObjectKey::Manifest(manifest_file.manifest_path.clone());

let cache_entry = self
.cache
.entry_by_ref(&key)
.or_try_insert_with(self.fetch_and_parse_manifest(manifest_file))
.await
.map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))?
.into_value();

match cache_entry {
CachedItem::Manifest(arc_manifest) => Ok(arc_manifest),
_ => Err(Error::new(
ErrorKind::Unexpected,
format!("cached object for key '{:?}' is not a Manifest", key),
)),
}
}

/// Retrieves an Arc [`ManifestList`] from the cache
/// or retrieves one from FileIO and parses it if not present
pub(crate) async fn get_manifest_list(
&self,
snapshot: &SnapshotRef,
table_metadata: &TableMetadataRef,
) -> Result<Arc<ManifestList>> {
if self.cache_disabled {
return snapshot
.load_manifest_list(&self.file_io, table_metadata)
.await
.map(Arc::new);
}

let key = CachedObjectKey::ManifestList((
snapshot.manifest_list().to_string(),
table_metadata.format_version,
snapshot.schema_id().unwrap(),
));
let cache_entry = self
.cache
.entry_by_ref(&key)
.or_try_insert_with(self.fetch_and_parse_manifest_list(snapshot, table_metadata))
.await
.map_err(|err| Error::new(ErrorKind::Unexpected, err.as_ref().message()))?
.into_value();

match cache_entry {
CachedItem::ManifestList(arc_manifest_list) => Ok(arc_manifest_list),
_ => Err(Error::new(
ErrorKind::Unexpected,
format!("cached object for path '{:?}' is not a Manifest", key),
)),
}
}

async fn fetch_and_parse_manifest(&self, manifest_file: &ManifestFile) -> Result<CachedItem> {
let manifest = manifest_file.load_manifest(&self.file_io).await?;

Ok(CachedItem::Manifest(Arc::new(manifest)))
}

async fn fetch_and_parse_manifest_list(
&self,
snapshot: &SnapshotRef,
table_metadata: &TableMetadataRef,
) -> Result<CachedItem> {
let manifest_list = snapshot
.load_manifest_list(&self.file_io, table_metadata)
.await?;

Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
}
}
Loading

0 comments on commit 0c1c40c

Please sign in to comment.