Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cache layer #9672

Merged
merged 10 commits into from
Jan 21, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ common-storages-system = { path = "../storages/system" }
common-storages-view = { path = "../storages/view" }
common-tracing = { path = "../../common/tracing" }
common-users = { path = "../users" }
storages-common-cache = { path = "../storages/common/cache" }

storages-common-blocks = { path = "../storages/common/blocks" }
storages-common-index = { path = "../storages/common/index" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use databend_query::interpreters::AlterTableClusterKeyInterpreter;
use databend_query::interpreters::CreateTableInterpreterV2;
use databend_query::interpreters::DropTableClusterKeyInterpreter;
use databend_query::interpreters::Interpreter;
use storages_common_table_meta::caches::LoadParams;
use storages_common_cache::LoadParams;
use storages_common_table_meta::meta::TableSnapshot;
use storages_common_table_meta::meta::Versioned;
use storages_common_table_meta::table::OPT_KEY_DATABASE_ID;
Expand Down Expand Up @@ -91,7 +91,6 @@ async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> {
location: snapshot_loc.clone(),
len_hint: None,
ver: TableSnapshot::VERSION,
schema: None,
};

let snapshot = reader.read(&load_params).await?;
Expand Down Expand Up @@ -127,7 +126,6 @@ async fn test_fuse_alter_table_cluster_key() -> common_exception::Result<()> {
location: snapshot_loc.clone(),
len_hint: None,
ver: TableSnapshot::VERSION,
schema: None,
};

let snapshot = reader.read(&params).await?;
Expand Down
14 changes: 10 additions & 4 deletions src/query/service/tests/it/storages/fuse/operations/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_base::base::tokio;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::DataBlock;
use common_storages_fuse::io::write_meta;
use common_storages_fuse::io::MetaWriter;
use common_storages_fuse::io::SegmentWriter;
use common_storages_fuse::statistics::gen_columns_statistics;
use common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -92,7 +92,9 @@ async fn test_fuse_purge_normal_orphan_snapshot() -> Result<()> {
// orphan_snapshot is created by using `from_previous`, which guarantees
// that the timestamp of snapshot returned is larger than `current_snapshot`'s.
let orphan_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref());
write_meta(&operator, &orphan_snapshot_location, orphan_snapshot).await?;
orphan_snapshot
.write_meta(&operator, &orphan_snapshot_location)
.await?;
}

// do_gc
Expand Down Expand Up @@ -247,6 +249,7 @@ mod utils {
use chrono::DateTime;
use chrono::Utc;
use common_storages_factory::Table;
use common_storages_fuse::io::MetaWriter;
use common_storages_fuse::FuseStorageFormat;

use super::*;
Expand All @@ -267,7 +270,10 @@ mod utils {
new_snapshot.timestamp = Some(ts)
}

write_meta(&operator, &new_snapshot_location, &new_snapshot).await?;
new_snapshot
.write_meta(&operator, &new_snapshot_location)
.await?;

Ok(new_snapshot_location)
}

Expand Down Expand Up @@ -310,7 +316,7 @@ mod utils {
}

let segment_info = SegmentInfo::new(block_metas, Statistics::default());
let segment_writer = SegmentWriter::new(dal, fuse_table.meta_location_generator(), &None);
let segment_writer = SegmentWriter::new(dal, fuse_table.meta_location_generator());
let segment_location = segment_writer.write_segment_no_cache(&segment_info).await?;
Ok((segment_location, segment_info))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use databend_query::storages::fuse::io::SegmentWriter;
use databend_query::storages::fuse::io::TableMetaLocationGenerator;
use databend_query::storages::fuse::operations::ReclusterMutator;
use databend_query::storages::fuse::pruning::BlockPruner;
use storages_common_table_meta::caches::CacheManager;
use storages_common_table_meta::meta;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::ClusterStatistics;
Expand All @@ -48,9 +47,8 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
let ctx = fixture.ctx();
let location_generator = TableMetaLocationGenerator::with_prefix("_prefix".to_owned());

let segment_info_cache = CacheManager::instance().get_table_segment_cache();
let data_accessor = ctx.get_data_operator()?.operator();
let seg_writer = SegmentWriter::new(&data_accessor, &location_generator, &segment_info_cache);
let seg_writer = SegmentWriter::new(&data_accessor, &location_generator);

let gen_test_seg = |cluster_stats: Option<ClusterStatistics>| async {
let block_id = Uuid::new_v4().simple().to_string();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use databend_query::sessions::TableContext;
use futures_util::TryStreamExt;
use rand::thread_rng;
use rand::Rng;
use storages_common_table_meta::caches::LoadParams;
use storages_common_cache::LoadParams;
use storages_common_table_meta::meta;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::Location;
Expand Down Expand Up @@ -651,7 +651,7 @@ impl CompactSegmentTestFixture {
let location_gen = &self.location_gen;
let block_writer = BlockWriter::new(data_accessor, location_gen);

let segment_writer = SegmentWriter::new(data_accessor, location_gen, &None);
let segment_writer = SegmentWriter::new(data_accessor, location_gen);
let seg_acc = SegmentCompactor::new(block_per_seg, segment_writer.clone());

let (segments, locations, blocks) =
Expand Down Expand Up @@ -725,7 +725,6 @@ impl CompactSegmentTestFixture {
location: x.to_string(),
len_hint: None,
ver: SegmentInfo::VERSION,
schema: Some(TestFixture::default_table_schema()),
};

let seg = segment_reader.read(&load_params).await?;
Expand Down Expand Up @@ -756,7 +755,10 @@ impl CompactCase {
limit: Option<usize>,
) -> Result<()> {
// setup & run
let segment_reader = MetaReaders::segment_info_reader(ctx.get_data_operator()?.operator());
let segment_reader = MetaReaders::segment_info_reader(
ctx.get_data_operator()?.operator(),
TestFixture::default_table_schema(),
);
let mut case_fixture = CompactSegmentTestFixture::try_new(ctx, block_per_segment)?;
let r = case_fixture
.run(&self.blocks_number_of_input_segments, limit)
Expand Down Expand Up @@ -803,7 +805,6 @@ impl CompactCase {
location: location.0.clone(),
len_hint: None,
ver: location.1,
schema: Some(TestFixture::default_table_schema()),
};

let segment = segment_reader.read(&load_params).await?;
Expand Down
3 changes: 1 addition & 2 deletions src/query/service/tests/it/storages/fuse/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use databend_query::storages::fuse::pruning::BlockPruner;
use databend_query::storages::fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use databend_query::storages::fuse::FUSE_OPT_KEY_ROW_PER_BLOCK;
use opendal::Operator;
use storages_common_table_meta::caches::LoadParams;
use storages_common_cache::LoadParams;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::TableSnapshot;
use storages_common_table_meta::meta::Versioned;
Expand Down Expand Up @@ -163,7 +163,6 @@ async fn test_block_pruner() -> Result<()> {
location: snapshot_loc.clone(),
len_hint: None,
ver: TableSnapshot::VERSION,
schema: None,
};

let snapshot = reader.read(&load_params).await?;
Expand Down
22 changes: 15 additions & 7 deletions src/query/storages/common/cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Borrow;
use std::hash::Hash;
use std::sync::Arc;

use common_exception::Result;
use opendal::Object;
/// The minimum interface that cache providers should implement
pub trait StorageCache<K, V> {
type Meter;
fn put(&mut self, key: K, value: Arc<V>);

#[async_trait::async_trait]
pub trait ObjectCacheProvider<T> {
async fn read_object(&self, object: &Object, start: u64, end: u64) -> Result<Arc<T>>;
async fn write_object(&self, object: &Object, v: Arc<T>) -> Result<()>;
async fn remove_object(&self, object: &Object) -> Result<()>;
fn get<Q>(&mut self, k: &Q) -> Option<&Arc<V>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized;

fn evict<Q>(&mut self, k: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized;
}
107 changes: 95 additions & 12 deletions src/query/storages/common/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,99 @@
// limitations under the License.

mod cache;
mod object;
mod metrics;
mod providers;
mod settings;

pub use cache::ObjectCacheProvider;
pub use object::CachedObject;
pub use object::CachedObjectAccessor;
pub use providers::metrics::metrics_reset;
pub use providers::ByPassCache;
pub use providers::FileCache;
pub use providers::MemoryBytesCache;
pub use providers::MemoryItemsCache;
pub use settings::CacheSettings;
mod read;

use std::borrow::Borrow;
use std::hash::Hash;
use std::sync::Arc;

pub use providers::DiskCache;
pub use providers::InMemoryBytesCacheHolder;
pub use providers::InMemoryCacheBuilder;
pub use providers::InMemoryItemCacheHolder;
pub use read::CacheKey;
pub use read::DiskCacheReader;
pub use read::InMemoryBytesCacheReader;
pub use read::InMemoryItemCacheReader;
pub use read::LoadParams;
pub use read::Loader;
pub use read::LoaderWithCacheKey;

pub trait CacheAccessor<K, V> {
dantengsky marked this conversation as resolved.
Show resolved Hide resolved
fn get<Q>(&self, k: &Q) -> Option<Arc<V>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized;

fn put(&self, key: K, value: Arc<V>);
fn evict<Q>(&self, k: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized;
}

mod impls {
use parking_lot::RwLock;

use super::*;
use crate::cache::StorageCache;

impl<V, C> CacheAccessor<String, V> for Arc<RwLock<C>>
where C: StorageCache<String, V>
{
fn get<Q>(&self, k: &Q) -> Option<Arc<V>>
where
String: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let mut guard = self.write();
guard.get(k).cloned()
}

fn put(&self, k: String, v: Arc<V>) {
let mut guard = self.write();
guard.put(k, v);
}

fn evict<Q>(&self, k: &Q) -> bool
where
String: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let mut guard = self.write();
guard.evict(k)
}
}

impl<V, C> CacheAccessor<String, V> for Option<Arc<RwLock<C>>>
where C: StorageCache<String, V>
{
fn get<Q>(&self, k: &Q) -> Option<Arc<V>>
where
String: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.as_ref().and_then(|cache| cache.get(k))
}

fn put(&self, k: String, v: Arc<V>) {
if let Some(cache) = self {
cache.put(k, v);
}
}

fn evict<Q>(&self, k: &Q) -> bool
where
String: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
if let Some(cache) = self {
cache.evict(k)
} else {
false
}
}
}
}
38 changes: 38 additions & 0 deletions src/query/storages/common/cache/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

use metrics::increment_gauge;

fn key_str(cache_name: &str, action: &str) -> String {
format!("cache_{cache_name}_{action}")
}

pub fn metrics_inc_cache_access_count(c: u64, cache_name: &str) {
increment_gauge!(key_str(cache_name, "access_count"), c as f64);
}

pub fn metrics_inc_cache_miss_count(c: u64, cache_name: &str) {
// increment_gauge!(key!("memory_miss_count"), c as f64);
increment_gauge!(key_str(cache_name, "miss_count"), c as f64);
}

// When cache miss, load time cost.
pub fn metrics_inc_cache_miss_load_millisecond(c: u64, cache_name: &str) {
increment_gauge!(key_str(cache_name, "miss_load_millisecond"), c as f64);
}

pub fn metrics_inc_cache_hit_count(c: u64, cache_name: &str) {
increment_gauge!(key_str(cache_name, "memory_hit_count"), c as f64);
}
Loading