Skip to content

Commit

Permalink
feat: support user config manifest compression (GreptimeTeam#1579)
Browse files Browse the repository at this point in the history
* feat: support user config manifest compression

* chore: change style

* chore: enhance test
  • Loading branch information
Taylor-lagrange committed May 16, 2023
1 parent 856ab5b commit fb1ac0c
Show file tree
Hide file tree
Showing 18 changed files with 139 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ mod tests {
checkpoint_margin = 9
gc_duration = '7s'
checkpoint_on_startup = true
compress = true
[logging]
level = "debug"
Expand Down Expand Up @@ -295,6 +296,7 @@ mod tests {
checkpoint_margin: Some(9),
gc_duration: Some(Duration::from_secs(7)),
checkpoint_on_startup: true,
compress: true
},
options.storage.manifest,
);
Expand Down
4 changes: 4 additions & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub struct RegionManifestConfig {
pub gc_duration: Option<Duration>,
/// Whether to try creating a manifest checkpoint on region opening
pub checkpoint_on_startup: bool,
/// Whether to compress manifest and checkpoint file by gzip
pub compress: bool,
}

impl Default for RegionManifestConfig {
Expand All @@ -178,6 +180,7 @@ impl Default for RegionManifestConfig {
checkpoint_margin: Some(10u16),
gc_duration: Some(Duration::from_secs(30)),
checkpoint_on_startup: false,
compress: false,
}
}
}
Expand Down Expand Up @@ -246,6 +249,7 @@ impl From<&DatanodeOptions> for SchedulerConfig {
impl From<&DatanodeOptions> for StorageEngineConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
compress_manifest: value.storage.manifest.compress,
manifest_checkpoint_on_startup: value.storage.manifest.checkpoint_on_startup,
manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin,
manifest_gc_duration: value.storage.manifest.gc_duration,
Expand Down
4 changes: 3 additions & 1 deletion src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ impl Instance {
let log_store = Arc::new(create_log_store(&opts.wal).await?);

let mito_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
TableEngineConfig {
compress_manifest: opts.storage.manifest.compress,
},
EngineImpl::new(
StorageEngineConfig::from(opts),
log_store.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/mito/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ common-error = { path = "../common/error" }
common-procedure = { path = "../common/procedure" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-datasource = { path = "../common/datasource" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
dashmap = "5.4"
Expand Down
4 changes: 3 additions & 1 deletion src/mito/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@
//! Table Engine config

#[derive(Debug, Clone, Default)]
pub struct EngineConfig {}
pub struct EngineConfig {
pub compress_manifest: bool,
}
7 changes: 6 additions & 1 deletion src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use async_trait::async_trait;
pub use common_catalog::consts::MITO_ENGINE;
use common_datasource::compression::CompressionType;
use common_error::ext::BoxedError;
use common_procedure::{BoxedProcedure, ProcedureManager};
use common_telemetry::{debug, logging};
Expand All @@ -29,6 +30,7 @@ use datatypes::schema::Schema;
use key_lock::KeyLock;
use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use storage::manifest::manifest_compress_type;
use store_api::storage::{
ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId,
EngineContext as StorageEngineContext, OpenOptions, RegionNumber, RowKeyDescriptor,
Expand Down Expand Up @@ -247,6 +249,7 @@ pub(crate) struct MitoEngineInner<S: StorageEngine> {
/// Writing to `tables` should also hold the `table_mutex`.
tables: DashMap<String, Arc<MitoTable<S::Region>>>,
object_store: ObjectStore,
compress_type: CompressionType,
storage_engine: S,
/// Table mutex is used to protect the operations such as creating/opening/closing
/// a table, to avoid things like opening the same table simultaneously.
Expand Down Expand Up @@ -638,6 +641,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let manifest = MitoTable::<<S as StorageEngine>::Region>::build_manifest(
table_dir,
self.object_store.clone(),
self.compress_type,
);
let Some(table_info) =
MitoTable::<<S as StorageEngine>::Region>::recover_table_info(table_name, &manifest)
Expand Down Expand Up @@ -691,11 +695,12 @@ async fn close_table(lock: Arc<KeyLock<String>>, table: TableRef) -> TableResult
}

impl<S: StorageEngine> MitoEngineInner<S> {
fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
fn new(config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self {
Self {
tables: DashMap::new(),
storage_engine,
object_store,
compress_type: manifest_compress_type(config.compress_manifest),
table_mutex: Arc::new(KeyLock::new()),
}
}
Expand Down
1 change: 1 addition & 0 deletions src/mito/src/engine/procedure/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ impl<S: StorageEngine> TableCreator<S> {
table_info,
self.regions.clone(),
self.engine_inner.object_store.clone(),
self.engine_inner.compress_type,
)
.await?;

Expand Down
16 changes: 13 additions & 3 deletions src/mito/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub type TableManifest = ManifestImpl<NoopCheckpoint, TableMetaActionList>;

#[cfg(test)]
mod tests {
use storage::manifest::MetaActionIteratorImpl;
use storage::manifest::{manifest_compress_type, MetaActionIteratorImpl};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{Manifest, MetaActionIterator};
use table::metadata::{RawTableInfo, TableInfo};
Expand Down Expand Up @@ -77,10 +77,20 @@ mod tests {
}

#[tokio::test]
async fn test_table_manifest() {
async fn test_table_manifest_compress() {
test_table_manifest(true).await
}

#[tokio::test]
async fn test_table_manifest_uncompress() {
test_table_manifest(false).await
}

async fn test_table_manifest(compress: bool) {
let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await;

let manifest = TableManifest::create("manifest/", object_store);
let manifest =
TableManifest::create("manifest/", object_store, manifest_compress_type(compress));

let mut iter = manifest.scan(0, 100).await.unwrap();
assert!(iter.next_action().await.unwrap().is_none());
Expand Down
13 changes: 10 additions & 3 deletions src/mito/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;

use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_datasource::compression::CompressionType;
use common_error::ext::BoxedError;
use common_query::logical_plan::Expr;
use common_query::physical_plan::PhysicalPlanRef;
Expand Down Expand Up @@ -445,8 +446,10 @@ impl<R: Region> MitoTable<R> {
table_info: TableInfo,
regions: HashMap<RegionNumber, R>,
object_store: ObjectStore,
compress_type: CompressionType,
) -> Result<MitoTable<R>> {
let manifest = TableManifest::create(&table_manifest_dir(table_dir), object_store);
let manifest =
TableManifest::create(&table_manifest_dir(table_dir), object_store, compress_type);

let _timer =
common_telemetry::timer!(crate::metrics::MITO_CREATE_TABLE_UPDATE_MANIFEST_ELAPSED);
Expand All @@ -463,8 +466,12 @@ impl<R: Region> MitoTable<R> {
Ok(MitoTable::new(table_info, regions, manifest))
}

pub(crate) fn build_manifest(table_dir: &str, object_store: ObjectStore) -> TableManifest {
TableManifest::create(&table_manifest_dir(table_dir), object_store)
pub(crate) fn build_manifest(
table_dir: &str,
object_store: ObjectStore,
compress_type: CompressionType,
) -> TableManifest {
TableManifest::create(&table_manifest_dir(table_dir), object_store, compress_type)
}

pub(crate) async fn recover_table_info(
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub const DEFAULT_PICKER_SCHEDULE_INTERVAL: u32 = 5 * 60 * 1000;
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub manifest_checkpoint_on_startup: bool,
pub compress_manifest: bool,
pub manifest_checkpoint_margin: Option<u16>,
pub manifest_gc_duration: Option<Duration>,
pub max_files_in_l0: usize,
Expand All @@ -49,6 +50,7 @@ impl Default for EngineConfig {
fn default() -> Self {
Self {
manifest_checkpoint_on_startup: false,
compress_manifest: false,
manifest_checkpoint_margin: Some(10),
manifest_gc_duration: Some(Duration::from_secs(30)),
max_files_in_l0: 8,
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::flush::{
FlushScheduler, FlushSchedulerRef, FlushStrategyRef, PickerConfig, SizeBasedStrategy,
};
use crate::manifest::region::RegionManifest;
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::metadata::RegionMetadata;
use crate::region::{RegionImpl, StoreConfig};
Expand Down Expand Up @@ -449,6 +450,7 @@ impl<S: LogStore> EngineInner<S> {
let manifest = RegionManifest::with_checkpointer(
&manifest_dir,
self.object_store.clone(),
manifest_compress_type(config.compress_manifest),
config.manifest_checkpoint_margin,
config.manifest_gc_duration,
);
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ pub(crate) mod storage;
pub mod test_utils;

pub use self::impl_::*;
pub use self::storage::manifest_compress_type;
24 changes: 19 additions & 5 deletions src/storage/src/manifest/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Duration;

use arc_swap::ArcSwap;
use async_trait::async_trait;
use common_datasource::compression::CompressionType;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::{debug, logging, warn};
use object_store::ObjectStore;
Expand Down Expand Up @@ -52,11 +53,16 @@ impl<S: 'static + Checkpoint<Error = Error>, M: 'static + MetaAction<Error = Err
pub fn new(
manifest_dir: &str,
object_store: ObjectStore,
compress_type: CompressionType,
checkpoint_actions_margin: Option<u16>,
gc_duration: Option<Duration>,
checkpointer: Option<Arc<dyn Checkpointer<Checkpoint = S, MetaAction = M>>>,
) -> Self {
let inner = Arc::new(ManifestImplInner::new(manifest_dir, object_store));
let inner = Arc::new(ManifestImplInner::new(
manifest_dir,
object_store,
compress_type,
));
let gc_task = if checkpointer.is_some() {
// only start gc task when checkpoint is enabled.
Some(Arc::new(RepeatedTask::new(
Expand All @@ -79,8 +85,12 @@ impl<S: 'static + Checkpoint<Error = Error>, M: 'static + MetaAction<Error = Err
}
}

pub fn create(manifest_dir: &str, object_store: ObjectStore) -> Self {
Self::new(manifest_dir, object_store, None, None, None)
pub fn create(
manifest_dir: &str,
object_store: ObjectStore,
compress_type: CompressionType,
) -> Self {
Self::new(manifest_dir, object_store, compress_type, None, None, None)
}

#[inline]
Expand Down Expand Up @@ -275,11 +285,15 @@ impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> TaskFunction<Er
}

impl<S: Checkpoint<Error = Error>, M: MetaAction<Error = Error>> ManifestImplInner<S, M> {
fn new(manifest_dir: &str, object_store: ObjectStore) -> Self {
fn new(manifest_dir: &str, object_store: ObjectStore, compress_type: CompressionType) -> Self {
let (reader_version, writer_version) = action::supported_protocol_version();

Self {
store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)),
store: Arc::new(ManifestObjectStore::new(
manifest_dir,
object_store,
compress_type,
)),
version: AtomicU64::new(0),
protocol: ArcSwap::new(Arc::new(ProtocolAction::new())),
supported_reader_version: reader_version,
Expand Down
35 changes: 32 additions & 3 deletions src/storage/src/manifest/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use common_datasource::compression::CompressionType;
use common_telemetry::{info, warn};
use object_store::ObjectStore;
use store_api::manifest::action::ProtocolAction;
Expand Down Expand Up @@ -148,12 +149,14 @@ impl RegionManifest {
pub fn with_checkpointer(
manifest_dir: &str,
object_store: ObjectStore,
compress_type: CompressionType,
checkpoint_actions_margin: Option<u16>,
gc_duration: Option<Duration>,
) -> Self {
Self::new(
manifest_dir,
object_store,
compress_type,
checkpoint_actions_margin,
gc_duration,
Some(Arc::new(RegionManifestCheckpointer {
Expand Down Expand Up @@ -186,19 +189,35 @@ mod tests {
use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION};

use super::*;
use crate::manifest::manifest_compress_type;
use crate::manifest::test_utils::*;
use crate::metadata::RegionMetadata;
use crate::sst::FileId;

#[tokio::test]
async fn test_region_manifest() {
async fn test_region_manifest_compress() {
test_region_manifest(true).await
}

#[tokio::test]
async fn test_region_manifest_uncompress() {
test_region_manifest(false).await
}

async fn test_region_manifest(compress: bool) {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_region_manifest");
let mut builder = Fs::default();
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();

let manifest = RegionManifest::with_checkpointer("/manifest/", object_store, None, None);
let manifest = RegionManifest::with_checkpointer(
"/manifest/",
object_store,
manifest_compress_type(compress),
None,
None,
);
manifest.start().await.unwrap();

let region_meta = Arc::new(build_region_meta());
Expand Down Expand Up @@ -306,7 +325,16 @@ mod tests {
}

#[tokio::test]
async fn test_region_manifest_checkpoint() {
async fn test_region_manifest_checkpoint_compress() {
test_region_manifest_checkpoint(true).await
}

#[tokio::test]
async fn test_region_manifest_checkpoint_uncompress() {
test_region_manifest_checkpoint(false).await
}

async fn test_region_manifest_checkpoint(compress: bool) {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_region_manifest_checkpoint");
let mut builder = Fs::default();
Expand All @@ -316,6 +344,7 @@ mod tests {
let manifest = RegionManifest::with_checkpointer(
"/manifest/",
object_store,
manifest_compress_type(compress),
None,
Some(Duration::from_millis(50)),
);
Expand Down
Loading

0 comments on commit fb1ac0c

Please sign in to comment.