From 02a7e3100f47cf16aa6c245ed529a6978be20fbd Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Fri, 26 Aug 2022 16:20:26 +0800 Subject: [PATCH] feat: add new table_option storage format (#218) * add new table_option: storage format * fix clippy * add integration tests * add storage format in table schema proto * move storage format to table options module * fix clippy * fix CR comments --- analytic_engine/src/compaction/picker.rs | 2 + .../src/instance/flush_compaction.rs | 2 + analytic_engine/src/sst/file.rs | 15 +- analytic_engine/src/sst/parquet/builder.rs | 3 +- analytic_engine/src/sst/parquet/encoding.rs | 36 ++-- analytic_engine/src/sst/parquet/reader.rs | 24 +-- analytic_engine/src/table/data.rs | 5 + analytic_engine/src/table/version_edit.rs | 1 + analytic_engine/src/table_options.rs | 160 +++++++++++++++--- common_types/src/schema.rs | 123 +++----------- proto/protos/analytic_common.proto | 6 + proto/protos/meta_update.proto | 1 + proto/protos/sst.proto | 2 + tests/cases/local/05_ddl/create_tables.result | 60 ++++++- tests/cases/local/05_ddl/create_tables.sql | 17 ++ .../local/06_show/show_create_table.result | 6 +- 16 files changed, 295 insertions(+), 168 deletions(-) diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 5cc9f2afc9..83883328a6 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -594,6 +594,7 @@ mod tests { file::SstMetaData, manager::{tests::LevelsControllerMockBuilder, LevelsController}, }, + table_options::StorageFormat, }; fn build_sst_meta_data(time_range: TimeRange, size: u64) -> SstMetaData { @@ -605,6 +606,7 @@ mod tests { schema: build_schema(), size, row_num: 2, + storage_format: StorageFormat::default(), } } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index b56a46aa72..0f0944ac70 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -607,6 +607,7 @@ impl Instance { schema: table_data.schema(), size: 0, row_num: 0, + storage_format: table_data.table_options().storage_format, }; let store = self.space_store.clone(); @@ -711,6 +712,7 @@ impl Instance { schema: table_data.schema(), size: 0, row_num: 0, + storage_format: table_data.storage_format(), }; // Alloc file id for next sst file diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 865e693a14..77b879b6de 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -37,7 +37,7 @@ use tokio::sync::{ Mutex, }; -use crate::{space::SpaceId, sst::manager::FileId, table::sst_util}; +use crate::{space::SpaceId, sst::manager::FileId, table::sst_util, table_options::StorageFormat}; /// Error of sst file. #[derive(Debug, Snafu)] @@ -220,6 +220,11 @@ impl FileHandle { pub fn set_being_compacted(&self, value: bool) { self.inner.being_compacted.store(value, Ordering::Relaxed); } + + #[inline] + pub fn storage_format(&self) -> StorageFormat { + self.inner.meta.meta.storage_format + } } impl fmt::Debug for FileHandle { @@ -420,6 +425,7 @@ pub struct SstMetaData { pub size: u64, // total row number pub row_num: u64, + pub storage_format: StorageFormat, } impl From for SstMetaDataPb { @@ -433,6 +439,7 @@ impl From for SstMetaDataPb { target.set_schema(src.schema.into()); target.set_size(src.size); target.set_row_num(src.row_num); + target.set_storage_format(src.storage_format.into()); target } @@ -452,6 +459,7 @@ impl TryFrom for SstMetaData { schema, size: src.size, row_num: src.row_num, + storage_format: src.storage_format.into(), }) } } @@ -610,6 +618,9 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData { let mut time_range_start = files[0].time_range().inclusive_start(); let mut time_range_end = files[0].time_range().exclusive_end(); let mut max_sequence = files[0].max_sequence(); + // TODO(jiacai2050): what if format of different file is different? + // pick first now + let storage_format = files[0].storage_format(); if files.len() > 1 { for file in &files[1..] { @@ -630,6 +641,7 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData { // we don't know file size and total row number yet size: 0, row_num: 0, + storage_format, } } @@ -685,6 +697,7 @@ pub mod tests { schema: self.schema.clone(), size: 0, row_num: 0, + storage_format: StorageFormat::default(), } } } diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index 3347a5d375..3a7a8e519b 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -176,7 +176,7 @@ mod tests { parquet::reader::ParquetSstReader, reader::{tests::check_stream, SstReader}, }, - table_options, + table_options::{self, StorageFormat}, }; // TODO(xikai): add test for reverse reader @@ -217,6 +217,7 @@ mod tests { schema: schema.clone(), size: 10, row_num: 2, + storage_format: StorageFormat::default(), }; let mut counter = 10; diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 85e6dca733..523f7dbdd1 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -22,9 +22,7 @@ use arrow_deps::{ use common_types::{ bytes::{BytesMut, MemBufMut, Writer}, datum::DatumKind, - schema::{ - ArrowSchema, ArrowSchemaMeta, ArrowSchemaRef, DataType, Field, Schema, StorageFormat, - }, + schema::{ArrowSchema, ArrowSchemaRef, DataType, Field, Schema}, }; use common_util::define_result; use log::trace; @@ -32,9 +30,12 @@ use proto::sst::SstMetaData as SstMetaDataPb; use protobuf::Message; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; -use crate::sst::{ - file::SstMetaData, - parquet::hybrid::{self, IndexedType}, +use crate::{ + sst::{ + file::SstMetaData, + parquet::hybrid::{self, IndexedType}, + }, + table_options::StorageFormat, }; // TODO: Only support i32 offset now, consider i64 here? @@ -450,14 +451,8 @@ impl ParquetEncoder { .set_max_row_group_size(num_rows_per_row_group) .set_compression(compression) .build(); - let mut format = meta_data.schema.storage_format(); - - // TODO: remove this overwrite when we can set format via table options - if matches!(format, StorageFormat::Hybrid) && meta_data.schema.index_of_tsid().is_none() { - format = StorageFormat::Columnar; - } - let record_encoder: Box = match format { + let record_encoder: Box = match meta_data.storage_format { StorageFormat::Hybrid => Box::new(HybridRecordEncoder::try_new( write_props, &meta_data.schema, @@ -723,22 +718,13 @@ pub struct ParquetDecoder { } impl ParquetDecoder { - pub fn try_new(arrow_schema: ArrowSchemaRef) -> Result { - let arrow_schema_meta = ArrowSchemaMeta::try_from(arrow_schema.metadata()) - .map_err(|e| Box::new(e) as _) - .context(DecodeRecordBatch)?; - let mut format = arrow_schema_meta.storage_format(); - // TODO: remove this overwrite when we can set format via table options - if matches!(format, StorageFormat::Hybrid) && !arrow_schema_meta.enable_tsid_primary_key() { - format = StorageFormat::Columnar; - } - - let record_decoder: Box = match format { + pub fn new(storage_format: StorageFormat) -> Self { + let record_decoder: Box = match storage_format { StorageFormat::Hybrid => Box::new(HybridRecordDecoder {}), StorageFormat::Columnar => Box::new(ColumnarRecordDecoder {}), }; - Ok(Self { record_decoder }) + Self { record_decoder } } pub fn decode_record_batch( diff --git a/analytic_engine/src/sst/parquet/reader.rs b/analytic_engine/src/sst/parquet/reader.rs index 4960a95d38..adfce855d1 100644 --- a/analytic_engine/src/sst/parquet/reader.rs +++ b/analytic_engine/src/sst/parquet/reader.rs @@ -36,11 +36,14 @@ use snafu::{ensure, OptionExt, ResultExt}; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; -use crate::sst::{ - factory::SstReaderOptions, - file::SstMetaData, - parquet::encoding::{self, ParquetDecoder}, - reader::{error::*, SstReader}, +use crate::{ + sst::{ + factory::SstReaderOptions, + file::SstMetaData, + parquet::encoding::{self, ParquetDecoder}, + reader::{error::*, SstReader}, + }, + table_options::StorageFormat, }; const DEFAULT_CHANNEL_CAP: usize = 1000; @@ -164,9 +167,9 @@ impl<'a> ParquetSstReader<'a> { let file_reader = self.file_reader.take().unwrap(); let batch_size = self.batch_size; - let schema = { + let (schema, storage_format) = { let meta_data = self.meta_data.as_ref().unwrap(); - meta_data.schema.clone() + (meta_data.schema.clone(), meta_data.storage_format) }; let projected_schema = self.projected_schema.clone(); let row_projector = projected_schema @@ -202,6 +205,7 @@ impl<'a> ParquetSstReader<'a> { predicate, batch_size, reverse, + storage_format, }; let start_fetch = Instant::now(); @@ -248,6 +252,7 @@ struct ProjectAndFilterReader { predicate: PredicateRef, batch_size: usize, reverse: bool, + storage_format: StorageFormat, } impl ProjectAndFilterReader { @@ -319,10 +324,7 @@ impl ProjectAndFilterReader { let reader = self.project_and_filter_reader()?; let arrow_record_batch_projector = ArrowRecordBatchProjector::from(self.row_projector); - let arrow_schema = self.projected_schema.to_projected_arrow_schema(); - let parquet_decoder = ParquetDecoder::try_new(arrow_schema) - .map_err(|e| Box::new(e) as _) - .context(DecodeRecordBatch)?; + let parquet_decoder = ParquetDecoder::new(self.storage_format); let mut row_num = 0; for record_batch in reader { trace!( diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 74e88dc96b..969e5e1080 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -42,6 +42,7 @@ use crate::{ sst_util, version::{MemTableForWrite, MemTableState, SamplingMemTable, TableVersion}, }, + table_options::StorageFormat, TableOptions, }; @@ -466,6 +467,10 @@ impl TableData { pub fn is_expired(&self, timestamp: Timestamp) -> bool { self.table_options().is_expired(timestamp) } + + pub fn storage_format(&self) -> StorageFormat { + self.table_options().storage_format + } } /// Table data reference diff --git a/analytic_engine/src/table/version_edit.rs b/analytic_engine/src/table/version_edit.rs index 8e73539499..4a8d2e8d26 100644 --- a/analytic_engine/src/table/version_edit.rs +++ b/analytic_engine/src/table/version_edit.rs @@ -83,6 +83,7 @@ impl TryFrom for AddFile { schema, size: src.size, row_num: src.row_num, + storage_format: src.storage_format.into(), }, }, }) diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index 500250c8dd..9c04b2d131 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -13,7 +13,8 @@ use common_util::{ }; use proto::analytic_common::{ CompactionOptions as CompactionOptionsPb, CompactionStrategy as CompactionStrategyPb, - Compression as CompressionPb, TableOptions as TableOptionsPb, UpdateMode as UpdateModePb, + Compression as CompressionPb, StorageFormat as StorageFormatPb, TableOptions as TableOptionsPb, + UpdateMode as UpdateModePb, }; use serde_derive::Deserialize; use snafu::{Backtrace, GenerateBacktrace, ResultExt, Snafu}; @@ -32,6 +33,7 @@ pub const COMPACTION_STRATEGY: &str = "compaction_strategy"; pub const NUM_ROWS_PER_ROW_GROUP: &str = "num_rows_per_row_group"; pub const UPDATE_MODE: &str = "update_mode"; pub const COMPRESSION: &str = "compression"; +pub const STORAGE_FORMAT: &str = "storage_format"; const UPDATE_MODE_OVERWRITE: &str = "OVERWRITE"; const UPDATE_MODE_APPEND: &str = "APPEND"; @@ -39,7 +41,8 @@ const COMPRESSION_UNCOMPRESSED: &str = "UNCOMPRESSED"; const COMPRESSION_LZ4: &str = "LZ4"; const COMPRESSION_SNAPPY: &str = "SNAPPY"; const COMPRESSION_ZSTD: &str = "ZSTD"; -const AT_LEAST_OPTIONS_NUM: usize = 9; +const STORAGE_FORMAT_COLUMNAR: &str = "COLUMNAR"; +const STORAGE_FORMAT_HYBRID: &str = "HYBRID"; /// Default bucket duration (1d) const BUCKET_DURATION_1D: Duration = Duration::from_secs(24 * 60 * 60); @@ -97,6 +100,13 @@ pub enum Error { backtrace ))] ParseCompressionName { name: String, backtrace: Backtrace }, + + #[snafu(display( + "Unknown storage format. value:{:?}.\nBacktrace:\n{}", + value, + backtrace + ))] + UnknownStorageFormat { value: String, backtrace: Backtrace }, } define_result!(Error); @@ -196,6 +206,92 @@ impl From for ParquetCompression { } } +/// StorageFormat specify how records are saved in persistent storage +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] +pub enum StorageFormat { + /// Traditional columnar format, every column is saved in one exact one + /// column, for example: + /// + ///```plaintext + /// | Timestamp | Device ID | Status Code | Tag 1 | Tag 2 | + /// | --------- |---------- | ----------- | ----- | ----- | + /// | 12:01 | A | 0 | v1 | v1 | + /// | 12:01 | B | 0 | v2 | v2 | + /// | 12:02 | A | 0 | v1 | v1 | + /// | 12:02 | B | 1 | v2 | v2 | + /// | 12:03 | A | 0 | v1 | v1 | + /// | 12:03 | B | 0 | v2 | v2 | + /// | ..... | | | | | + /// ``` + Columnar, + + /// Design for time-series data + /// Collapsible Columns within same primary key are collapsed + /// into list, other columns are the same format with columar's. + /// + /// Wether a column is collapsible is decided by + /// `Schema::is_collapsible_column` + /// + /// Note: minTime/maxTime is optional and not implemented yet, mainly used + /// for time-range pushdown filter + /// + ///```plaintext + /// | Device ID | Timestamp | Status Code | Tag 1 | Tag 2 | minTime | maxTime | + /// |-----------|---------------------|-------------|-------|-------|---------|---------| + /// | A | [12:01,12:02,12:03] | [0,0,0] | v1 | v1 | 12:01 | 12:03 | + /// | B | [12:01,12:02,12:03] | [0,1,0] | v2 | v2 | 12:01 | 12:03 | + /// | ... | | | | | | | + /// ``` + Hybrid, +} + +impl From for StorageFormatPb { + fn from(format: StorageFormat) -> Self { + match format { + StorageFormat::Columnar => Self::Columnar, + StorageFormat::Hybrid => Self::Hybrid, + } + } +} + +impl From for StorageFormat { + fn from(format: StorageFormatPb) -> Self { + match format { + StorageFormatPb::Columnar => Self::Columnar, + StorageFormatPb::Hybrid => Self::Hybrid, + } + } +} + +impl TryFrom<&str> for StorageFormat { + type Error = Error; + + fn try_from(value: &str) -> Result { + let format = match value.to_uppercase().as_str() { + STORAGE_FORMAT_COLUMNAR => Self::Columnar, + STORAGE_FORMAT_HYBRID => Self::Hybrid, + _ => return UnknownStorageFormat { value }.fail(), + }; + Ok(format) + } +} + +impl ToString for StorageFormat { + fn to_string(&self) -> String { + match self { + Self::Columnar => STORAGE_FORMAT_COLUMNAR, + Self::Hybrid => STORAGE_FORMAT_HYBRID, + } + .to_string() + } +} + +impl Default for StorageFormat { + fn default() -> Self { + Self::Columnar + } +} + /// Options for a table. #[derive(Debug, Clone, Deserialize, PartialEq)] #[serde(default)] @@ -208,6 +304,8 @@ pub struct TableOptions { pub segment_duration: Option, /// Table update mode, now support Overwrite(Default) and Append pub update_mode: UpdateMode, + /// Column's format in underlying storage + pub storage_format: StorageFormat, // The following options can be altered. /// Enable ttl @@ -243,32 +341,34 @@ impl TableOptions { // for show create table pub fn to_raw_map(&self) -> HashMap { - let mut m = HashMap::with_capacity(AT_LEAST_OPTIONS_NUM); - m.insert( - SEGMENT_DURATION.to_string(), - self.segment_duration - .map(|v| v.to_string()) - .unwrap_or_else(String::new), - ); - m.insert(UPDATE_MODE.to_string(), self.update_mode.to_string()); - m.insert(ENABLE_TTL.to_string(), self.enable_ttl.to_string()); - m.insert(TTL.to_string(), format!("{}", self.ttl)); - m.insert( - ARENA_BLOCK_SIZE.to_string(), - format!("{}", self.arena_block_size), - ); - m.insert( - WRITE_BUFFER_SIZE.to_string(), - format!("{}", self.write_buffer_size), - ); + let mut m = [ + ( + SEGMENT_DURATION.to_string(), + self.segment_duration + .map(|v| v.to_string()) + .unwrap_or_else(String::new), + ), + (UPDATE_MODE.to_string(), self.update_mode.to_string()), + (ENABLE_TTL.to_string(), self.enable_ttl.to_string()), + (TTL.to_string(), format!("{}", self.ttl)), + ( + ARENA_BLOCK_SIZE.to_string(), + format!("{}", self.arena_block_size), + ), + ( + WRITE_BUFFER_SIZE.to_string(), + format!("{}", self.write_buffer_size), + ), + ( + NUM_ROWS_PER_ROW_GROUP.to_string(), + format!("{}", self.num_rows_per_row_group), + ), + (COMPRESSION.to_string(), self.compression.to_string()), + (STORAGE_FORMAT.to_string(), self.storage_format.to_string()), + ] + .into_iter() + .collect(); self.compaction_strategy.fill_raw_map(&mut m); - m.insert( - NUM_ROWS_PER_ROW_GROUP.to_string(), - format!("{}", self.num_rows_per_row_group), - ); - m.insert(COMPRESSION.to_string(), self.compression.to_string()); - - assert!(m.len() >= AT_LEAST_OPTIONS_NUM); m } @@ -409,6 +509,7 @@ impl From for TableOptionsPb { target.set_write_buffer_size(opts.write_buffer_size); target.set_compression(opts.compression.into()); + target.set_storage_format(opts.storage_format.into()); target } @@ -460,6 +561,7 @@ impl From for TableOptions { update_mode, write_buffer_size: opts.write_buffer_size, compression: opts.compression.into(), + storage_format: opts.storage_format.into(), } } } @@ -476,6 +578,7 @@ impl Default for TableOptions { update_mode: UpdateMode::Overwrite, write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, compression: Compression::Zstd, + storage_format: StorageFormat::default(), } } } @@ -534,6 +637,9 @@ fn merge_table_options( if let Some(v) = options.get(COMPRESSION) { table_opts.compression = Compression::parse_from(v)?; } + if let Some(v) = options.get(STORAGE_FORMAT) { + table_opts.storage_format = v.as_str().try_into()?; + } Ok(table_opts) } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 05d313be64..abb3cc74c0 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -177,6 +177,7 @@ pub enum CompatError { } /// Meta data of the arrow schema +#[derive(Default)] pub struct ArrowSchemaMeta { num_key_columns: usize, timestamp_index: usize, @@ -185,20 +186,6 @@ pub struct ArrowSchemaMeta { } impl ArrowSchemaMeta { - pub fn storage_format(&self) -> StorageFormat { - // TODO: parse it from table options - match std::env::var("CERESDB_TABLE_FORMAT") { - Ok(format) => { - if format == "HYBRID" { - StorageFormat::Hybrid - } else { - StorageFormat::Columnar - } - } - Err(_) => StorageFormat::Columnar, - } - } - pub fn enable_tsid_primary_key(&self) -> bool { self.enable_tsid_primary_key } @@ -254,10 +241,10 @@ pub enum ArrowSchemaMetaKey { impl ArrowSchemaMetaKey { fn as_str(&self) -> &str { match self { - ArrowSchemaMetaKey::NumKeyColumns => "schema:num_key_columns", - ArrowSchemaMetaKey::TimestampIndex => "schema::timestamp_index", - ArrowSchemaMetaKey::EnableTsidPrimaryKey => "schema::enable_tsid_primary_key", - ArrowSchemaMetaKey::Version => "schema::version", + Self::NumKeyColumns => "schema:num_key_columns", + Self::TimestampIndex => "schema::timestamp_index", + Self::EnableTsidPrimaryKey => "schema::enable_tsid_primary_key", + Self::Version => "schema::version", } } } @@ -506,45 +493,6 @@ pub fn compare_row( Ordering::Equal } -/// StorageFormat specify how records are saved in persistent storage -#[derive(Debug)] -pub enum StorageFormat { - /// Traditional columnar format, every column is saved in one exact one - /// column, for example: - /// - ///```plaintext - /// | Timestamp | Device ID | Status Code | Tag 1 | Tag 2 | - /// | --------- |---------- | ----------- | ----- | ----- | - /// | 12:01 | A | 0 | v1 | v1 | - /// | 12:01 | B | 0 | v2 | v2 | - /// | 12:02 | A | 0 | v1 | v1 | - /// | 12:02 | B | 1 | v2 | v2 | - /// | 12:03 | A | 0 | v1 | v1 | - /// | 12:03 | B | 0 | v2 | v2 | - /// | ..... | | | | | - /// ``` - Columnar, - - /// Design for time-series data - /// Collapsible Columns within same primary key are collapsed - /// into list, other columns are the same format with columar's. - /// - /// Wether a column is collapsible is decided by - /// `Schema::is_collapsible_column` - /// - /// Note: minTime/maxTime is optional and not implemented yet, mainly used - /// for time-range pushdown filter - /// - ///```plaintext - /// | Device ID | Timestamp | Status Code | Tag 1 | Tag 2 | minTime | maxTime | - /// |-----------|---------------------|-------------|-------|-------|---------|---------| - /// | A | [12:01,12:02,12:03] | [0,0,0] | v1 | v1 | 12:01 | 12:03 | - /// | B | [12:01,12:02,12:03] | [0,1,0] | v2 | v2 | 12:01 | 12:03 | - /// | ... | | | | | | | - /// ``` - Hybrid, -} - // TODO(yingwen): Maybe rename to TableSchema. /// Schema of a table /// @@ -865,21 +813,6 @@ impl Schema { pub fn string_buffer_offset(&self) -> usize { self.column_schemas.string_buffer_offset } - - /// Data format in storage - pub fn storage_format(&self) -> StorageFormat { - // TODO: parse it from table options - match std::env::var("CERESDB_TABLE_FORMAT") { - Ok(format) => { - if format == "HYBRID" { - StorageFormat::Hybrid - } else { - StorageFormat::Columnar - } - } - Err(_) => StorageFormat::Columnar, - } - } } impl TryFrom for Schema { @@ -1119,12 +1052,7 @@ impl Builder { ) -> Result { match ArrowSchemaMeta::try_from(meta) { Ok(v) => Ok(v), - Err(Error::ArrowSchemaMetaKeyNotFound { .. }) => Ok(ArrowSchemaMeta { - num_key_columns: 0, - timestamp_index: 0, - enable_tsid_primary_key: false, - version: 0, - }), + Err(Error::ArrowSchemaMetaKeyNotFound { .. }) => Ok(ArrowSchemaMeta::default()), Err(e) => Err(e), } } @@ -1133,25 +1061,26 @@ impl Builder { /// /// Requires: the timestamp index is not None. fn build_arrow_schema_meta(&self) -> HashMap { - let mut meta = HashMap::with_capacity(4); - meta.insert( - ArrowSchemaMetaKey::NumKeyColumns.to_string(), - self.num_key_columns.to_string(), - ); - meta.insert( - ArrowSchemaMetaKey::TimestampIndex.to_string(), - self.timestamp_index.unwrap().to_string(), - ); - meta.insert( - ArrowSchemaMetaKey::Version.to_string(), - self.version.to_string(), - ); - meta.insert( - ArrowSchemaMetaKey::EnableTsidPrimaryKey.to_string(), - self.enable_tsid_primary_key.to_string(), - ); - - meta + [ + ( + ArrowSchemaMetaKey::NumKeyColumns.to_string(), + self.num_key_columns.to_string(), + ), + ( + ArrowSchemaMetaKey::TimestampIndex.to_string(), + self.timestamp_index.unwrap().to_string(), + ), + ( + ArrowSchemaMetaKey::Version.to_string(), + self.version.to_string(), + ), + ( + ArrowSchemaMetaKey::EnableTsidPrimaryKey.to_string(), + self.enable_tsid_primary_key.to_string(), + ), + ] + .into_iter() + .collect() } fn find_tsid_index( diff --git a/proto/protos/analytic_common.proto b/proto/protos/analytic_common.proto index c418296f99..ae47f020c8 100644 --- a/proto/protos/analytic_common.proto +++ b/proto/protos/analytic_common.proto @@ -20,6 +20,7 @@ message TableOptions { // If sampling_segment_duration is true, then the segment duration // is still unknown. bool sampling_segment_duration = 11; + StorageFormat storage_format = 12; } enum UpdateMode { @@ -27,6 +28,11 @@ enum UpdateMode { Append = 1; } +enum StorageFormat { + Columnar = 0; + Hybrid = 1; +} + message CompactionOptions { // Options for STCS float bucket_low = 1; diff --git a/proto/protos/meta_update.proto b/proto/protos/meta_update.proto index e014e76055..da0466c399 100644 --- a/proto/protos/meta_update.proto +++ b/proto/protos/meta_update.proto @@ -44,6 +44,7 @@ message AddFileMeta { common.TableSchema schema = 7; uint64 size = 8; uint64 row_num = 9; + analytic_common.StorageFormat storage_format = 10; } // Meta data of the file to delete diff --git a/proto/protos/sst.proto b/proto/protos/sst.proto index a1ab16e9a7..ea8b765560 100644 --- a/proto/protos/sst.proto +++ b/proto/protos/sst.proto @@ -5,6 +5,7 @@ syntax = "proto3"; package sst; import "common.proto"; +import "analytic_common.proto"; message SstMetaData { // Min key in the sst @@ -18,4 +19,5 @@ message SstMetaData { common.TableSchema schema = 5; uint64 size = 6; uint64 row_num = 7; + analytic_common.StorageFormat storage_format = 8; } diff --git a/tests/cases/local/05_ddl/create_tables.result b/tests/cases/local/05_ddl/create_tables.result index b3ed0c1355..872f3650a8 100644 --- a/tests/cases/local/05_ddl/create_tables.result +++ b/tests/cases/local/05_ddl/create_tables.result @@ -26,6 +26,10 @@ DROP TABLE IF EXISTS `05_create_tables_t7`; affected_rows: 0 +DROP TABLE IF EXISTS `05_create_tables_t8`; + +affected_rows: 0 + CREATE TABLE `05_create_tables_t`(c1 int) ENGINE = Analytic; Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: CREATE TABLE `05_create_tables_t`(c1 int) ENGINE = Analytic;. Caused by: Failed to create plan, err:Table must contain timestamp constraint" }) @@ -88,7 +92,7 @@ String(StringBytes(b"a")),String(StringBytes(b"int")),Boolean(false),Boolean(tru show create table `05_create_tables_t4`; Table,Create Table, -String(StringBytes(b"05_create_tables_t4")),String(StringBytes(b"CREATE TABLE `05_create_tables_t4` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t4")),String(StringBytes(b"CREATE TABLE `05_create_tables_t4` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `05_create_tables_t5`(c1 int, t timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; @@ -106,7 +110,7 @@ String(StringBytes(b"c1")),String(StringBytes(b"int")),Boolean(false),Boolean(tr show create table `05_create_tables_t5`; Table,Create Table, -String(StringBytes(b"05_create_tables_t5")),String(StringBytes(b"CREATE TABLE `05_create_tables_t5` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t5")),String(StringBytes(b"CREATE TABLE `05_create_tables_t5` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `05_create_tables_t6`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY, t2 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; @@ -128,8 +132,54 @@ String(StringBytes(b"c1")),String(StringBytes(b"int")),Boolean(false),Boolean(tr show create table `05_create_tables_t7`; Table,Create Table, -String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int COMMENT 'id', PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int COMMENT 'id', PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), + + +CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; + +affected_rows: 0 + +show create table `05_create_tables_t8`; + +Table,Create Table, +String(StringBytes(b"05_create_tables_t8")),String(StringBytes(b"CREATE TABLE `05_create_tables_t8` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), + + +drop table `05_create_tables_t8`; + +affected_rows: 0 + +CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'columnar'); + +affected_rows: 0 + +show create table `05_create_tables_t8`; + +Table,Create Table, +String(StringBytes(b"05_create_tables_t8")),String(StringBytes(b"CREATE TABLE `05_create_tables_t8` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), + + +drop table `05_create_tables_t8`; + +affected_rows: 0 +CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); + +affected_rows: 0 + +show create table `05_create_tables_t8`; + +Table,Create Table, +String(StringBytes(b"05_create_tables_t8")),String(StringBytes(b"CREATE TABLE `05_create_tables_t8` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='HYBRID', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), + + +drop table `05_create_tables_t8`; + +affected_rows: 0 + +CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown'); + +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, query: CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown');. Caused by: Failed to execute create table, err:Failed to create table, name:05_create_tables_t8, err:Failed to create table, err:Invalid arguments, err:Invalid options, space_id:2, table:05_create_tables_t8, table_id:2199023255725, err:Unknown storage format. value:\"unknown\"." }) DROP TABLE IF EXISTS `05_create_tables_t`; @@ -159,3 +209,7 @@ DROP TABLE IF EXISTS `05_create_tables_t7`; affected_rows: 0 +DROP TABLE IF EXISTS `05_create_tables_t8`; + +affected_rows: 0 + diff --git a/tests/cases/local/05_ddl/create_tables.sql b/tests/cases/local/05_ddl/create_tables.sql index f0120d41e2..3d7082a672 100644 --- a/tests/cases/local/05_ddl/create_tables.sql +++ b/tests/cases/local/05_ddl/create_tables.sql @@ -5,6 +5,7 @@ DROP TABLE IF EXISTS `05_create_tables_t4`; DROP TABLE IF EXISTS `05_create_tables_t5`; DROP TABLE IF EXISTS `05_create_tables_t6`; DROP TABLE IF EXISTS `05_create_tables_t7`; +DROP TABLE IF EXISTS `05_create_tables_t8`; -- no TIMESTAMP column CREATE TABLE `05_create_tables_t`(c1 int) ENGINE = Analytic; @@ -47,6 +48,21 @@ CREATE TABLE `05_create_tables_t7`(c1 int COMMENT 'id', t timestamp NOT NULL, TI describe table `05_create_tables_t7`; show create table `05_create_tables_t7`; +-- StorageFormat +CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; +show create table `05_create_tables_t8`; +drop table `05_create_tables_t8`; + +CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'columnar'); +show create table `05_create_tables_t8`; +drop table `05_create_tables_t8`; + +CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); +show create table `05_create_tables_t8`; +drop table `05_create_tables_t8`; + +CREATE TABLE `05_create_tables_t8`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown'); + DROP TABLE IF EXISTS `05_create_tables_t`; DROP TABLE IF EXISTS `05_create_tables_t2`; DROP TABLE IF EXISTS `05_create_tables_t3`; @@ -54,3 +70,4 @@ DROP TABLE IF EXISTS `05_create_tables_t4`; DROP TABLE IF EXISTS `05_create_tables_t5`; DROP TABLE IF EXISTS `05_create_tables_t6`; DROP TABLE IF EXISTS `05_create_tables_t7`; +DROP TABLE IF EXISTS `05_create_tables_t8`; diff --git a/tests/cases/local/06_show/show_create_table.result b/tests/cases/local/06_show/show_create_table.result index 528974ed93..e1770783b5 100644 --- a/tests/cases/local/06_show/show_create_table.result +++ b/tests/cases/local/06_show/show_create_table.result @@ -17,7 +17,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_a`; Table,Create Table, -String(StringBytes(b"06_show_a")),String(StringBytes(b"CREATE TABLE `06_show_a` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_a")),String(StringBytes(b"CREATE TABLE `06_show_a` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `06_show_b` (a bigint, b int null default null, c string, d smallint null, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; @@ -27,7 +27,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_b`; Table,Create Table, -String(StringBytes(b"06_show_b")),String(StringBytes(b"CREATE TABLE `06_show_b` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_b")),String(StringBytes(b"CREATE TABLE `06_show_b` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `06_show_c` (a int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; @@ -37,7 +37,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_c`; Table,Create Table, -String(StringBytes(b"06_show_c")),String(StringBytes(b"CREATE TABLE `06_show_c` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_c")),String(StringBytes(b"CREATE TABLE `06_show_c` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), DROP TABLE `06_show_a`;