diff --git a/Cargo.lock b/Cargo.lock index c4966c45be5f..345a81f72940 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5002,6 +5002,7 @@ dependencies = [ "async-trait", "chrono", "common-catalog", + "common-datasource", "common-error", "common-procedure", "common-procedure-test", diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 2eada5261b91..ffdc971e9d96 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -236,6 +236,7 @@ mod tests { checkpoint_margin = 9 gc_duration = '7s' checkpoint_on_startup = true + compress = true [logging] level = "debug" @@ -295,6 +296,7 @@ mod tests { checkpoint_margin: Some(9), gc_duration: Some(Duration::from_secs(7)), checkpoint_on_startup: true, + compress: true }, options.storage.manifest, ); diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index e0aa1ed21508..242ed5c48f32 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -170,6 +170,8 @@ pub struct RegionManifestConfig { pub gc_duration: Option, /// Whether to try creating a manifest checkpoint on region opening pub checkpoint_on_startup: bool, + /// Whether to compress manifest and checkpoint file by gzip + pub compress: bool, } impl Default for RegionManifestConfig { @@ -178,6 +180,7 @@ impl Default for RegionManifestConfig { checkpoint_margin: Some(10u16), gc_duration: Some(Duration::from_secs(30)), checkpoint_on_startup: false, + compress: false, } } } @@ -246,6 +249,7 @@ impl From<&DatanodeOptions> for SchedulerConfig { impl From<&DatanodeOptions> for StorageEngineConfig { fn from(value: &DatanodeOptions) -> Self { Self { + compress_manifest: value.storage.manifest.compress, manifest_checkpoint_on_startup: value.storage.manifest.checkpoint_on_startup, manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin, manifest_gc_duration: value.storage.manifest.gc_duration, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 3d09fa190de9..862ebffdd31f 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -116,7 +116,9 @@ impl Instance { let log_store = Arc::new(create_log_store(&opts.wal).await?); let mito_engine = Arc::new(DefaultEngine::new( - TableEngineConfig::default(), + TableEngineConfig { + compress_manifest: opts.storage.manifest.compress, + }, EngineImpl::new( StorageEngineConfig::from(opts), log_store.clone(), diff --git a/src/mito/Cargo.toml b/src/mito/Cargo.toml index 4ef34e9194a6..f0c264728524 100644 --- a/src/mito/Cargo.toml +++ b/src/mito/Cargo.toml @@ -19,6 +19,7 @@ common-error = { path = "../common/error" } common-procedure = { path = "../common/procedure" } common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } +common-datasource = { path = "../common/datasource" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } dashmap = "5.4" diff --git a/src/mito/src/config.rs b/src/mito/src/config.rs index 669ce741172f..6d7fc5db164f 100644 --- a/src/mito/src/config.rs +++ b/src/mito/src/config.rs @@ -15,4 +15,6 @@ //! Table Engine config #[derive(Debug, Clone, Default)] -pub struct EngineConfig {} +pub struct EngineConfig { + pub compress_manifest: bool, +} diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index b163cf010a4c..078b1e98cdbc 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use async_trait::async_trait; pub use common_catalog::consts::MITO_ENGINE; +use common_datasource::compression::CompressionType; use common_error::ext::BoxedError; use common_procedure::{BoxedProcedure, ProcedureManager}; use common_telemetry::{debug, logging}; @@ -29,6 +30,7 @@ use datatypes::schema::Schema; use key_lock::KeyLock; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; +use storage::manifest::manifest_compress_type; use store_api::storage::{ ColumnDescriptorBuilder, ColumnFamilyDescriptor, ColumnFamilyDescriptorBuilder, ColumnId, EngineContext as StorageEngineContext, OpenOptions, RegionNumber, RowKeyDescriptor, @@ -247,6 +249,7 @@ pub(crate) struct MitoEngineInner { /// Writing to `tables` should also hold the `table_mutex`. tables: DashMap>>, object_store: ObjectStore, + compress_type: CompressionType, storage_engine: S, /// Table mutex is used to protect the operations such as creating/opening/closing /// a table, to avoid things like opening the same table simultaneously. @@ -638,6 +641,7 @@ impl MitoEngineInner { let manifest = MitoTable::<::Region>::build_manifest( table_dir, self.object_store.clone(), + self.compress_type, ); let Some(table_info) = MitoTable::<::Region>::recover_table_info(table_name, &manifest) @@ -691,11 +695,12 @@ async fn close_table(lock: Arc>, table: TableRef) -> TableResult } impl MitoEngineInner { - fn new(_config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { + fn new(config: EngineConfig, storage_engine: S, object_store: ObjectStore) -> Self { Self { tables: DashMap::new(), storage_engine, object_store, + compress_type: manifest_compress_type(config.compress_manifest), table_mutex: Arc::new(KeyLock::new()), } } diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index 59d6bdf91909..7dc87e11bfb5 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -367,6 +367,7 @@ impl TableCreator { table_info, self.regions.clone(), self.engine_inner.object_store.clone(), + self.engine_inner.compress_type, ) .await?; diff --git a/src/mito/src/manifest.rs b/src/mito/src/manifest.rs index db1f42f77b98..8f4969213297 100644 --- a/src/mito/src/manifest.rs +++ b/src/mito/src/manifest.rs @@ -46,7 +46,7 @@ pub type TableManifest = ManifestImpl; #[cfg(test)] mod tests { - use storage::manifest::MetaActionIteratorImpl; + use storage::manifest::{manifest_compress_type, MetaActionIteratorImpl}; use store_api::manifest::action::ProtocolAction; use store_api::manifest::{Manifest, MetaActionIterator}; use table::metadata::{RawTableInfo, TableInfo}; @@ -77,10 +77,20 @@ mod tests { } #[tokio::test] - async fn test_table_manifest() { + async fn test_table_manifest_compress() { + test_table_manifest(true).await + } + + #[tokio::test] + async fn test_table_manifest_uncompress() { + test_table_manifest(false).await + } + + async fn test_table_manifest(compress: bool) { let (_dir, object_store) = test_util::new_test_object_store("test_table_manifest").await; - let manifest = TableManifest::create("manifest/", object_store); + let manifest = + TableManifest::create("manifest/", object_store, manifest_compress_type(compress)); let mut iter = manifest.scan(0, 100).await.unwrap(); assert!(iter.next_action().await.unwrap().is_none()); diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 19d85eaf7cb0..d7048607bab8 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use arc_swap::ArcSwap; use async_trait::async_trait; +use common_datasource::compression::CompressionType; use common_error::ext::BoxedError; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; @@ -445,8 +446,10 @@ impl MitoTable { table_info: TableInfo, regions: HashMap, object_store: ObjectStore, + compress_type: CompressionType, ) -> Result> { - let manifest = TableManifest::create(&table_manifest_dir(table_dir), object_store); + let manifest = + TableManifest::create(&table_manifest_dir(table_dir), object_store, compress_type); let _timer = common_telemetry::timer!(crate::metrics::MITO_CREATE_TABLE_UPDATE_MANIFEST_ELAPSED); @@ -463,8 +466,12 @@ impl MitoTable { Ok(MitoTable::new(table_info, regions, manifest)) } - pub(crate) fn build_manifest(table_dir: &str, object_store: ObjectStore) -> TableManifest { - TableManifest::create(&table_manifest_dir(table_dir), object_store) + pub(crate) fn build_manifest( + table_dir: &str, + object_store: ObjectStore, + compress_type: CompressionType, + ) -> TableManifest { + TableManifest::create(&table_manifest_dir(table_dir), object_store, compress_type) } pub(crate) async fn recover_table_info( diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index f1bc18164acc..a983dda41c81 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -30,6 +30,7 @@ pub const DEFAULT_PICKER_SCHEDULE_INTERVAL: u32 = 5 * 60 * 1000; #[derive(Debug, Clone)] pub struct EngineConfig { pub manifest_checkpoint_on_startup: bool, + pub compress_manifest: bool, pub manifest_checkpoint_margin: Option, pub manifest_gc_duration: Option, pub max_files_in_l0: usize, @@ -49,6 +50,7 @@ impl Default for EngineConfig { fn default() -> Self { Self { manifest_checkpoint_on_startup: false, + compress_manifest: false, manifest_checkpoint_margin: Some(10), manifest_gc_duration: Some(Duration::from_secs(30)), max_files_in_l0: 8, diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 9d9e81beddb4..ddf1269751ea 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -34,6 +34,7 @@ use crate::flush::{ FlushScheduler, FlushSchedulerRef, FlushStrategyRef, PickerConfig, SizeBasedStrategy, }; use crate::manifest::region::RegionManifest; +use crate::manifest::storage::manifest_compress_type; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::metadata::RegionMetadata; use crate::region::{RegionImpl, StoreConfig}; @@ -449,6 +450,7 @@ impl EngineInner { let manifest = RegionManifest::with_checkpointer( &manifest_dir, self.object_store.clone(), + manifest_compress_type(config.compress_manifest), config.manifest_checkpoint_margin, config.manifest_gc_duration, ); diff --git a/src/storage/src/manifest.rs b/src/storage/src/manifest.rs index 9ba89de2fa19..30c49dc69b6d 100644 --- a/src/storage/src/manifest.rs +++ b/src/storage/src/manifest.rs @@ -23,3 +23,4 @@ pub(crate) mod storage; pub mod test_utils; pub use self::impl_::*; +pub use self::storage::manifest_compress_type; diff --git a/src/storage/src/manifest/impl_.rs b/src/storage/src/manifest/impl_.rs index a9c89ef2d08c..8478148265fe 100644 --- a/src/storage/src/manifest/impl_.rs +++ b/src/storage/src/manifest/impl_.rs @@ -19,6 +19,7 @@ use std::time::Duration; use arc_swap::ArcSwap; use async_trait::async_trait; +use common_datasource::compression::CompressionType; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{debug, logging, warn}; use object_store::ObjectStore; @@ -52,11 +53,16 @@ impl, M: 'static + MetaAction, gc_duration: Option, checkpointer: Option>>, ) -> Self { - let inner = Arc::new(ManifestImplInner::new(manifest_dir, object_store)); + let inner = Arc::new(ManifestImplInner::new( + manifest_dir, + object_store, + compress_type, + )); let gc_task = if checkpointer.is_some() { // only start gc task when checkpoint is enabled. Some(Arc::new(RepeatedTask::new( @@ -79,8 +85,12 @@ impl, M: 'static + MetaAction Self { - Self::new(manifest_dir, object_store, None, None, None) + pub fn create( + manifest_dir: &str, + object_store: ObjectStore, + compress_type: CompressionType, + ) -> Self { + Self::new(manifest_dir, object_store, compress_type, None, None, None) } #[inline] @@ -275,11 +285,15 @@ impl, M: MetaAction> TaskFunction, M: MetaAction> ManifestImplInner { - fn new(manifest_dir: &str, object_store: ObjectStore) -> Self { + fn new(manifest_dir: &str, object_store: ObjectStore, compress_type: CompressionType) -> Self { let (reader_version, writer_version) = action::supported_protocol_version(); Self { - store: Arc::new(ManifestObjectStore::new(manifest_dir, object_store)), + store: Arc::new(ManifestObjectStore::new( + manifest_dir, + object_store, + compress_type, + )), version: AtomicU64::new(0), protocol: ArcSwap::new(Arc::new(ProtocolAction::new())), supported_reader_version: reader_version, diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index a0576853bd44..337b32077fef 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use common_datasource::compression::CompressionType; use common_telemetry::{info, warn}; use object_store::ObjectStore; use store_api::manifest::action::ProtocolAction; @@ -148,12 +149,14 @@ impl RegionManifest { pub fn with_checkpointer( manifest_dir: &str, object_store: ObjectStore, + compress_type: CompressionType, checkpoint_actions_margin: Option, gc_duration: Option, ) -> Self { Self::new( manifest_dir, object_store, + compress_type, checkpoint_actions_margin, gc_duration, Some(Arc::new(RegionManifestCheckpointer { @@ -186,19 +189,35 @@ mod tests { use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION}; use super::*; + use crate::manifest::manifest_compress_type; use crate::manifest::test_utils::*; use crate::metadata::RegionMetadata; use crate::sst::FileId; #[tokio::test] - async fn test_region_manifest() { + async fn test_region_manifest_compress() { + test_region_manifest(true).await + } + + #[tokio::test] + async fn test_region_manifest_uncompress() { + test_region_manifest(false).await + } + + async fn test_region_manifest(compress: bool) { common_telemetry::init_default_ut_logging(); let tmp_dir = create_temp_dir("test_region_manifest"); let mut builder = Fs::default(); builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); - let manifest = RegionManifest::with_checkpointer("/manifest/", object_store, None, None); + let manifest = RegionManifest::with_checkpointer( + "/manifest/", + object_store, + manifest_compress_type(compress), + None, + None, + ); manifest.start().await.unwrap(); let region_meta = Arc::new(build_region_meta()); @@ -306,7 +325,16 @@ mod tests { } #[tokio::test] - async fn test_region_manifest_checkpoint() { + async fn test_region_manifest_checkpoint_compress() { + test_region_manifest_checkpoint(true).await + } + + #[tokio::test] + async fn test_region_manifest_checkpoint_uncompress() { + test_region_manifest_checkpoint(false).await + } + + async fn test_region_manifest_checkpoint(compress: bool) { common_telemetry::init_default_ut_logging(); let tmp_dir = create_temp_dir("test_region_manifest_checkpoint"); let mut builder = Fs::default(); @@ -316,6 +344,7 @@ mod tests { let manifest = RegionManifest::with_checkpointer( "/manifest/", object_store, + manifest_compress_type(compress), None, Some(Duration::from_millis(50)), ); diff --git a/src/storage/src/manifest/storage.rs b/src/storage/src/manifest/storage.rs index e5ce1487ce0e..0e88b9b556dc 100644 --- a/src/storage/src/manifest/storage.rs +++ b/src/storage/src/manifest/storage.rs @@ -39,11 +39,20 @@ lazy_static! { } const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint"; -const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Uncompressed; +const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip; /// Due to backward compatibility, it is possible that the user's manifest file has not been compressed. /// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing. const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed; +#[inline] +pub const fn manifest_compress_type(compress: bool) -> CompressionType { + if compress { + DEFAULT_MANIFEST_COMPRESSION_TYPE + } else { + FALL_BACK_COMPRESS_TYPE + } +} + #[inline] pub fn delta_file(version: ManifestVersion) -> String { format!("{version:020}.json") @@ -133,11 +142,10 @@ pub struct ManifestObjectStore { } impl ManifestObjectStore { - pub fn new(path: &str, object_store: ObjectStore) -> Self { + pub fn new(path: &str, object_store: ObjectStore, compress_type: CompressionType) -> Self { Self { object_store, - //TODO: make it configurable - compress_type: DEFAULT_MANIFEST_COMPRESSION_TYPE, + compress_type, path: util::normalize_dir(path), } } @@ -528,7 +536,7 @@ mod tests { let mut builder = Fs::default(); builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); - ManifestObjectStore::new("/", object_store) + ManifestObjectStore::new("/", object_store, CompressionType::Uncompressed) } #[test] diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index eeb30ad8f3b6..eafc0da4d54c 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -42,6 +42,7 @@ use super::*; use crate::chunk::ChunkReaderImpl; use crate::file_purger::noop::NoopFilePurgeHandler; use crate::manifest::action::{RegionChange, RegionMetaActionList}; +use crate::manifest::manifest_compress_type; use crate::manifest::test_utils::*; use crate::memtable::DefaultMemtableBuilder; use crate::scheduler::{LocalScheduler, SchedulerConfig}; @@ -301,7 +302,16 @@ async fn test_new_region() { } #[tokio::test] -async fn test_recover_region_manifets() { +async fn test_recover_region_manifets_compress() { + test_recover_region_manifets(true).await; +} + +#[tokio::test] +async fn test_recover_region_manifets_uncompress() { + test_recover_region_manifets(false).await; +} + +async fn test_recover_region_manifets(compress: bool) { common_telemetry::init_default_ut_logging(); let tmp_dir = create_temp_dir("test_recover_region_manifets"); let memtable_builder = Arc::new(DefaultMemtableBuilder::default()) as _; @@ -310,8 +320,13 @@ async fn test_recover_region_manifets() { builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); - let manifest = - RegionManifest::with_checkpointer("/manifest/", object_store.clone(), None, None); + let manifest = RegionManifest::with_checkpointer( + "/manifest/", + object_store.clone(), + manifest_compress_type(compress), + None, + None, + ); let region_meta = Arc::new(build_region_meta()); let sst_layer = Arc::new(FsAccessLayer::new("sst", object_store)) as _; diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index ae418b56ee76..f6f1803e649a 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_datasource::compression::CompressionType; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; use object_store::services::Fs; @@ -57,7 +58,13 @@ pub async fn new_store_config_with_object_store( let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); - let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store, None, None); + let manifest = RegionManifest::with_checkpointer( + &manifest_dir, + object_store, + CompressionType::Uncompressed, + None, + None, + ); manifest.start().await.unwrap(); let log_config = LogConfig { log_file_dir: log_store_dir(store_dir),