From 29ba58f2930384885453a31a4928ccf8102d5bf1 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 25 Aug 2022 23:13:57 +0800 Subject: [PATCH 1/7] add new table_option: storage format --- analytic_engine/src/table_options.rs | 15 ++++- common_types/src/schema.rs | 94 +++++++++++++++++++++------- 2 files changed, 85 insertions(+), 24 deletions(-) diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index 500250c8dd..6ed813cc8e 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, string::ToString, time::Duration}; use arrow_deps::datafusion::parquet::basic::Compression as ParquetCompression; -use common_types::time::Timestamp; +use common_types::{schema::StorageFormat, time::Timestamp}; use common_util::{ config::{ReadableDuration, ReadableSize}, define_result, @@ -32,6 +32,7 @@ pub const COMPACTION_STRATEGY: &str = "compaction_strategy"; pub const NUM_ROWS_PER_ROW_GROUP: &str = "num_rows_per_row_group"; pub const UPDATE_MODE: &str = "update_mode"; pub const COMPRESSION: &str = "compression"; +pub const STORAGE_FORMAT: &str = "storage_format"; const UPDATE_MODE_OVERWRITE: &str = "OVERWRITE"; const UPDATE_MODE_APPEND: &str = "APPEND"; @@ -40,6 +41,8 @@ const COMPRESSION_LZ4: &str = "LZ4"; const COMPRESSION_SNAPPY: &str = "SNAPPY"; const COMPRESSION_ZSTD: &str = "ZSTD"; const AT_LEAST_OPTIONS_NUM: usize = 9; +const UPDATE_MODE_COLUMNAR: &str = "COLUMNAR"; +const UPDATE_MODE_HYBRID: &str = "HYBRID"; /// Default bucket duration (1d) const BUCKET_DURATION_1D: Duration = Duration::from_secs(24 * 60 * 60); @@ -97,6 +100,8 @@ pub enum Error { backtrace ))] ParseCompressionName { name: String, backtrace: Backtrace }, + #[snafu(display("Failed to parse storage format, err:{}\n", source))] + ParseStorageFormat { source: common_types::schema::Error }, } define_result!(Error); @@ -224,6 +229,8 @@ pub struct TableOptions { pub num_rows_per_row_group: usize, /// Table Compression pub compression: Compression, + /// Storage format + pub storage_format: StorageFormat, } impl TableOptions { @@ -267,6 +274,7 @@ impl TableOptions { format!("{}", self.num_rows_per_row_group), ); m.insert(COMPRESSION.to_string(), self.compression.to_string()); + m.insert(STORAGE_FORMAT.to_string(), self.storage_format.to_string()); assert!(m.len() >= AT_LEAST_OPTIONS_NUM); @@ -460,6 +468,7 @@ impl From for TableOptions { update_mode, write_buffer_size: opts.write_buffer_size, compression: opts.compression.into(), + storage_format: StorageFormat::default(), // FIXME: update pb } } } @@ -476,6 +485,7 @@ impl Default for TableOptions { update_mode: UpdateMode::Overwrite, write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, compression: Compression::Zstd, + storage_format: StorageFormat::default(), } } } @@ -534,6 +544,9 @@ fn merge_table_options( if let Some(v) = options.get(COMPRESSION) { table_opts.compression = Compression::parse_from(v)?; } + if let Some(v) = options.get(STORAGE_FORMAT) { + table_opts.storage_format = v.try_into().context(ParseStorageFormat {})?; + } Ok(table_opts) } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 05d313be64..67e293682e 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -150,6 +150,9 @@ pub enum Error { key: ArrowSchemaMetaKey, backtrace: Backtrace, }, + + #[snafu(display("Unkown storage format. value:{:?}.\nBacktrace:\n{}", value, backtrace))] + UnknownStorageFormat { value: String, backtrace: Backtrace }, } pub type SchemaId = u32; @@ -182,21 +185,12 @@ pub struct ArrowSchemaMeta { timestamp_index: usize, enable_tsid_primary_key: bool, version: u32, + storage_format: StorageFormat, } impl ArrowSchemaMeta { pub fn storage_format(&self) -> StorageFormat { - // TODO: parse it from table options - match std::env::var("CERESDB_TABLE_FORMAT") { - Ok(format) => { - if format == "HYBRID" { - StorageFormat::Hybrid - } else { - StorageFormat::Columnar - } - } - Err(_) => StorageFormat::Columnar, - } + self.storage_format } pub fn enable_tsid_primary_key(&self) -> bool { @@ -220,6 +214,18 @@ impl ArrowSchemaMeta { } } +impl Default for ArrowSchemaMeta { + fn default() -> Self { + Self { + num_key_columns: 0, + timestamp_index: 0, + enable_tsid_primary_key: false, + version: 0, + storage_format: StorageFormat::default(), + } + } +} + /// Parse the necessary meta information from the arrow schema's meta data. impl TryFrom<&HashMap> for ArrowSchemaMeta { type Error = Error; @@ -239,6 +245,12 @@ impl TryFrom<&HashMap> for ArrowSchemaMeta { ArrowSchemaMetaKey::EnableTsidPrimaryKey, )?, version: Self::parse_arrow_schema_meta_value(meta, ArrowSchemaMetaKey::Version)?, + storage_format: Self::parse_arrow_schema_meta_value::( + meta, + ArrowSchemaMetaKey::StorageFormat, + )? + .as_str() + .try_into()?, }) } } @@ -249,15 +261,17 @@ pub enum ArrowSchemaMetaKey { TimestampIndex, EnableTsidPrimaryKey, Version, + StorageFormat, } impl ArrowSchemaMetaKey { fn as_str(&self) -> &str { match self { - ArrowSchemaMetaKey::NumKeyColumns => "schema:num_key_columns", - ArrowSchemaMetaKey::TimestampIndex => "schema::timestamp_index", - ArrowSchemaMetaKey::EnableTsidPrimaryKey => "schema::enable_tsid_primary_key", - ArrowSchemaMetaKey::Version => "schema::version", + Self::NumKeyColumns => "schema:num_key_columns", + Self::TimestampIndex => "schema::timestamp_index", + Self::EnableTsidPrimaryKey => "schema::enable_tsid_primary_key", + Self::Version => "schema::version", + Self::StorageFormat => "schema::storage_format", } } } @@ -507,7 +521,7 @@ pub fn compare_row( } /// StorageFormat specify how records are saved in persistent storage -#[derive(Debug)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum StorageFormat { /// Traditional columnar format, every column is saved in one exact one /// column, for example: @@ -545,6 +559,35 @@ pub enum StorageFormat { Hybrid, } +impl TryFrom<&str> for StorageFormat { + type Error = Error; + + fn try_from(s: &str) -> Result { + let format = match s.to_lowercase().as_str() { + "columnar" => Self::Columnar, + "hybrid" => Self::Hybrid, + _ => return UnknownStorageFormat { value: s }.fail(), + }; + Ok(format) + } +} + +impl ToString for StorageFormat { + fn to_string(&self) -> String { + match self { + Self::Columnar => "columnar", + Self::Hybrid => "hybrid", + } + .to_string() + } +} + +impl Default for StorageFormat { + fn default() -> Self { + Self::Columnar + } +} + // TODO(yingwen): Maybe rename to TableSchema. /// Schema of a table /// @@ -575,6 +618,8 @@ pub struct Schema { column_schemas: Arc, /// Version of the schema, schemas with same version should be identical. version: Version, + /// How columns is perisisted in underlying storage. + storage_format: StorageFormat, } impl fmt::Debug for Schema { @@ -1097,6 +1142,7 @@ impl Builder { timestamp_index, enable_tsid_primary_key, version, + storage_format, } = Self::parse_arrow_schema_meta_or_default(arrow_schema.metadata())?; let tsid_index = Self::find_tsid_index(enable_tsid_primary_key, &columns)?; @@ -1110,6 +1156,7 @@ impl Builder { enable_tsid_primary_key, column_schemas, version, + storage_format, }) } @@ -1119,12 +1166,7 @@ impl Builder { ) -> Result { match ArrowSchemaMeta::try_from(meta) { Ok(v) => Ok(v), - Err(Error::ArrowSchemaMetaKeyNotFound { .. }) => Ok(ArrowSchemaMeta { - num_key_columns: 0, - timestamp_index: 0, - enable_tsid_primary_key: false, - version: 0, - }), + Err(Error::ArrowSchemaMetaKeyNotFound { .. }) => Ok(ArrowSchemaMeta::default()), Err(e) => Err(e), } } @@ -1133,7 +1175,7 @@ impl Builder { /// /// Requires: the timestamp index is not None. fn build_arrow_schema_meta(&self) -> HashMap { - let mut meta = HashMap::with_capacity(4); + let mut meta = HashMap::with_capacity(5); meta.insert( ArrowSchemaMetaKey::NumKeyColumns.to_string(), self.num_key_columns.to_string(), @@ -1150,6 +1192,11 @@ impl Builder { ArrowSchemaMetaKey::EnableTsidPrimaryKey.to_string(), self.enable_tsid_primary_key.to_string(), ); + // FIXME + meta.insert( + ArrowSchemaMetaKey::StorageFormat.to_string(), + StorageFormat::Columnar.to_string(), + ); meta } @@ -1199,6 +1246,7 @@ impl Builder { enable_tsid_primary_key: self.enable_tsid_primary_key, column_schemas: Arc::new(ColumnSchemas::new(self.columns)), version: self.version, + storage_format: StorageFormat::Columnar, // FIXME }) } } From e53275b8a94edfc91d5da338664b0876b6e46808 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 26 Aug 2022 09:54:53 +0800 Subject: [PATCH 2/7] fix clippy --- analytic_engine/src/table_options.rs | 4 +--- common_types/src/schema.rs | 25 +++++++++---------------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index 6ed813cc8e..8ac89f063e 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -41,8 +41,6 @@ const COMPRESSION_LZ4: &str = "LZ4"; const COMPRESSION_SNAPPY: &str = "SNAPPY"; const COMPRESSION_ZSTD: &str = "ZSTD"; const AT_LEAST_OPTIONS_NUM: usize = 9; -const UPDATE_MODE_COLUMNAR: &str = "COLUMNAR"; -const UPDATE_MODE_HYBRID: &str = "HYBRID"; /// Default bucket duration (1d) const BUCKET_DURATION_1D: Duration = Duration::from_secs(24 * 60 * 60); @@ -545,7 +543,7 @@ fn merge_table_options( table_opts.compression = Compression::parse_from(v)?; } if let Some(v) = options.get(STORAGE_FORMAT) { - table_opts.storage_format = v.try_into().context(ParseStorageFormat {})?; + table_opts.storage_format = v.as_str().try_into().context(ParseStorageFormat {})?; } Ok(table_opts) } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 67e293682e..34caa8cd2d 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -19,6 +19,7 @@ pub use arrow_deps::arrow::datatypes::{ DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use proto::common as common_pb; +use serde::Deserialize; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ @@ -180,6 +181,7 @@ pub enum CompatError { } /// Meta data of the arrow schema +#[derive(Default)] pub struct ArrowSchemaMeta { num_key_columns: usize, timestamp_index: usize, @@ -214,18 +216,6 @@ impl ArrowSchemaMeta { } } -impl Default for ArrowSchemaMeta { - fn default() -> Self { - Self { - num_key_columns: 0, - timestamp_index: 0, - enable_tsid_primary_key: false, - version: 0, - storage_format: StorageFormat::default(), - } - } -} - /// Parse the necessary meta information from the arrow schema's meta data. impl TryFrom<&HashMap> for ArrowSchemaMeta { type Error = Error; @@ -521,7 +511,7 @@ pub fn compare_row( } /// StorageFormat specify how records are saved in persistent storage -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] pub enum StorageFormat { /// Traditional columnar format, every column is saved in one exact one /// column, for example: @@ -559,13 +549,16 @@ pub enum StorageFormat { Hybrid, } +const STORAGE_FORMAT_COLUMNAR: &str = "COLUMNAR"; +const STORAGE_FORMAT_HYBRID: &str = "HYBRID"; + impl TryFrom<&str> for StorageFormat { type Error = Error; fn try_from(s: &str) -> Result { - let format = match s.to_lowercase().as_str() { - "columnar" => Self::Columnar, - "hybrid" => Self::Hybrid, + let format = match s.to_uppercase().as_str() { + STORAGE_FORMAT_COLUMNAR => Self::Columnar, + STORAGE_FORMAT_HYBRID => Self::Hybrid, _ => return UnknownStorageFormat { value: s }.fail(), }; Ok(format) From 2965118201f5758b88ea4e441b5e0fc7fd43f2ed Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 26 Aug 2022 11:30:56 +0800 Subject: [PATCH 3/7] add integration tests --- analytic_engine/src/table_options.rs | 67 ++++++----- common_types/src/schema.rs | 109 +++++++++++------- proto/protos/analytic_common.proto | 6 + tests/cases/local/05_ddl/create_tables.result | 58 +++++++++- tests/cases/local/05_ddl/create_tables.sql | 18 ++- .../local/06_show/show_create_table.result | 6 +- 6 files changed, 188 insertions(+), 76 deletions(-) diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index 8ac89f063e..b71b37de6f 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -13,7 +13,8 @@ use common_util::{ }; use proto::analytic_common::{ CompactionOptions as CompactionOptionsPb, CompactionStrategy as CompactionStrategyPb, - Compression as CompressionPb, TableOptions as TableOptionsPb, UpdateMode as UpdateModePb, + Compression as CompressionPb, StorageFormat as StorageFormatPb, TableOptions as TableOptionsPb, + UpdateMode as UpdateModePb, }; use serde_derive::Deserialize; use snafu::{Backtrace, GenerateBacktrace, ResultExt, Snafu}; @@ -40,7 +41,6 @@ const COMPRESSION_UNCOMPRESSED: &str = "UNCOMPRESSED"; const COMPRESSION_LZ4: &str = "LZ4"; const COMPRESSION_SNAPPY: &str = "SNAPPY"; const COMPRESSION_ZSTD: &str = "ZSTD"; -const AT_LEAST_OPTIONS_NUM: usize = 9; /// Default bucket duration (1d) const BUCKET_DURATION_1D: Duration = Duration::from_secs(24 * 60 * 60); @@ -211,6 +211,8 @@ pub struct TableOptions { pub segment_duration: Option, /// Table update mode, now support Overwrite(Default) and Append pub update_mode: UpdateMode, + /// Column's format in underlying storage + pub storage_format: StorageFormat, // The following options can be altered. /// Enable ttl @@ -227,8 +229,6 @@ pub struct TableOptions { pub num_rows_per_row_group: usize, /// Table Compression pub compression: Compression, - /// Storage format - pub storage_format: StorageFormat, } impl TableOptions { @@ -248,33 +248,34 @@ impl TableOptions { // for show create table pub fn to_raw_map(&self) -> HashMap { - let mut m = HashMap::with_capacity(AT_LEAST_OPTIONS_NUM); - m.insert( - SEGMENT_DURATION.to_string(), - self.segment_duration - .map(|v| v.to_string()) - .unwrap_or_else(String::new), - ); - m.insert(UPDATE_MODE.to_string(), self.update_mode.to_string()); - m.insert(ENABLE_TTL.to_string(), self.enable_ttl.to_string()); - m.insert(TTL.to_string(), format!("{}", self.ttl)); - m.insert( - ARENA_BLOCK_SIZE.to_string(), - format!("{}", self.arena_block_size), - ); - m.insert( - WRITE_BUFFER_SIZE.to_string(), - format!("{}", self.write_buffer_size), - ); + let mut m = [ + ( + SEGMENT_DURATION.to_string(), + self.segment_duration + .map(|v| v.to_string()) + .unwrap_or_else(String::new), + ), + (UPDATE_MODE.to_string(), self.update_mode.to_string()), + (ENABLE_TTL.to_string(), self.enable_ttl.to_string()), + (TTL.to_string(), format!("{}", self.ttl)), + ( + ARENA_BLOCK_SIZE.to_string(), + format!("{}", self.arena_block_size), + ), + ( + WRITE_BUFFER_SIZE.to_string(), + format!("{}", self.write_buffer_size), + ), + ( + NUM_ROWS_PER_ROW_GROUP.to_string(), + format!("{}", self.num_rows_per_row_group), + ), + (COMPRESSION.to_string(), self.compression.to_string()), + (STORAGE_FORMAT.to_string(), self.storage_format.to_string()), + ] + .into_iter() + .collect(); self.compaction_strategy.fill_raw_map(&mut m); - m.insert( - NUM_ROWS_PER_ROW_GROUP.to_string(), - format!("{}", self.num_rows_per_row_group), - ); - m.insert(COMPRESSION.to_string(), self.compression.to_string()); - m.insert(STORAGE_FORMAT.to_string(), self.storage_format.to_string()); - - assert!(m.len() >= AT_LEAST_OPTIONS_NUM); m } @@ -455,6 +456,10 @@ impl From for TableOptions { } else { Some(Duration::from_millis(opts.segment_duration).into()) }; + let storage_format = match opts.storage_format { + StorageFormatPb::Columnar => StorageFormat::Columnar, + StorageFormatPb::Hybrid => StorageFormat::Hybrid, + }; Self { segment_duration, @@ -466,7 +471,7 @@ impl From for TableOptions { update_mode, write_buffer_size: opts.write_buffer_size, compression: opts.compression.into(), - storage_format: StorageFormat::default(), // FIXME: update pb + storage_format, } } } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 34caa8cd2d..abd66d32ce 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -19,7 +19,7 @@ pub use arrow_deps::arrow::datatypes::{ DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use proto::common as common_pb; -use serde::Deserialize; +use serde_derive::Deserialize; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ @@ -152,7 +152,11 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Unkown storage format. value:{:?}.\nBacktrace:\n{}", value, backtrace))] + #[snafu(display( + "Unknown storage format. value:{:?}.\nBacktrace:\n{}", + value, + backtrace + ))] UnknownStorageFormat { value: String, backtrace: Backtrace }, } @@ -904,19 +908,9 @@ impl Schema { self.column_schemas.string_buffer_offset } - /// Data format in storage + /// Column's format in storage pub fn storage_format(&self) -> StorageFormat { - // TODO: parse it from table options - match std::env::var("CERESDB_TABLE_FORMAT") { - Ok(format) => { - if format == "HYBRID" { - StorageFormat::Hybrid - } else { - StorageFormat::Columnar - } - } - Err(_) => StorageFormat::Columnar, - } + self.storage_format } } @@ -978,6 +972,7 @@ pub struct Builder { auto_increment_column_id: bool, max_column_id: ColumnId, enable_tsid_primary_key: bool, + storage_format: StorageFormat, } impl Default for Builder { @@ -1004,6 +999,7 @@ impl Builder { auto_increment_column_id: false, max_column_id: column_schema::COLUMN_ID_UNINIT, enable_tsid_primary_key: false, + storage_format: StorageFormat::default(), } } @@ -1064,6 +1060,12 @@ impl Builder { self } + /// Set version of the schema + pub fn storage_format(mut self, format: StorageFormat) -> Self { + self.storage_format = format; + self + } + fn may_alloc_column_id(&mut self, column: &mut ColumnSchema) { // Assign this column an id if self.auto_increment_column_id && column.id == column_schema::COLUMN_ID_UNINIT { @@ -1168,30 +1170,30 @@ impl Builder { /// /// Requires: the timestamp index is not None. fn build_arrow_schema_meta(&self) -> HashMap { - let mut meta = HashMap::with_capacity(5); - meta.insert( - ArrowSchemaMetaKey::NumKeyColumns.to_string(), - self.num_key_columns.to_string(), - ); - meta.insert( - ArrowSchemaMetaKey::TimestampIndex.to_string(), - self.timestamp_index.unwrap().to_string(), - ); - meta.insert( - ArrowSchemaMetaKey::Version.to_string(), - self.version.to_string(), - ); - meta.insert( - ArrowSchemaMetaKey::EnableTsidPrimaryKey.to_string(), - self.enable_tsid_primary_key.to_string(), - ); - // FIXME - meta.insert( - ArrowSchemaMetaKey::StorageFormat.to_string(), - StorageFormat::Columnar.to_string(), - ); - - meta + [ + ( + ArrowSchemaMetaKey::NumKeyColumns.to_string(), + self.num_key_columns.to_string(), + ), + ( + ArrowSchemaMetaKey::TimestampIndex.to_string(), + self.timestamp_index.unwrap().to_string(), + ), + ( + ArrowSchemaMetaKey::Version.to_string(), + self.version.to_string(), + ), + ( + ArrowSchemaMetaKey::EnableTsidPrimaryKey.to_string(), + self.enable_tsid_primary_key.to_string(), + ), + ( + ArrowSchemaMetaKey::StorageFormat.to_string(), + self.storage_format.to_string(), + ), + ] + .into_iter() + .collect() } fn find_tsid_index( @@ -1239,7 +1241,7 @@ impl Builder { enable_tsid_primary_key: self.enable_tsid_primary_key, column_schemas: Arc::new(ColumnSchemas::new(self.columns)), version: self.version, - storage_format: StorageFormat::Columnar, // FIXME + storage_format: self.storage_format, }) } } @@ -1459,6 +1461,35 @@ mod tests { .unwrap(); } + #[test] + fn test_with_storage_format() { + let schema = Builder::new() + .auto_increment_column_id(true) + .add_key_column( + column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .unwrap(); + // default is columnar + assert_eq!(schema.storage_format, StorageFormat::Columnar); + + let schema = Builder::new() + .auto_increment_column_id(true) + .storage_format(StorageFormat::Hybrid) + .add_key_column( + column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) + .build() + .expect("should succeed build column schema"), + ) + .unwrap() + .build() + .unwrap(); + assert_eq!(schema.storage_format, StorageFormat::Hybrid); + } + #[test] fn test_missing_timestamp_key() { let builder = Builder::new() diff --git a/proto/protos/analytic_common.proto b/proto/protos/analytic_common.proto index c418296f99..ae47f020c8 100644 --- a/proto/protos/analytic_common.proto +++ b/proto/protos/analytic_common.proto @@ -20,6 +20,7 @@ message TableOptions { // If sampling_segment_duration is true, then the segment duration // is still unknown. bool sampling_segment_duration = 11; + StorageFormat storage_format = 12; } enum UpdateMode { @@ -27,6 +28,11 @@ enum UpdateMode { Append = 1; } +enum StorageFormat { + Columnar = 0; + Hybrid = 1; +} + message CompactionOptions { // Options for STCS float bucket_low = 1; diff --git a/tests/cases/local/05_ddl/create_tables.result b/tests/cases/local/05_ddl/create_tables.result index b00ebd7611..b85c7535bb 100644 --- a/tests/cases/local/05_ddl/create_tables.result +++ b/tests/cases/local/05_ddl/create_tables.result @@ -22,6 +22,10 @@ DROP TABLE IF EXISTS `05_create_tables_t6`; affected_rows: 0 +DROP TABLE IF EXISTS `05_create_tables_t7`; + +affected_rows: 0 + CREATE TABLE `05_create_tables_t`(c1 int) ENGINE = Analytic; Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: CREATE TABLE `05_create_tables_t`(c1 int) ENGINE = Analytic;. Caused by: Failed to create plan, err:Table must contain timestamp constraint" }) @@ -84,7 +88,7 @@ String(StringBytes(b"a")),String(StringBytes(b"int")),Boolean(false),Boolean(tru show create table `05_create_tables_t4`; Table,Create Table, -String(StringBytes(b"05_create_tables_t4")),String(StringBytes(b"CREATE TABLE `05_create_tables_t4` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t4")),String(StringBytes(b"CREATE TABLE `05_create_tables_t4` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `05_create_tables_t5`(c1 int, t timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; @@ -102,13 +106,59 @@ String(StringBytes(b"c1")),String(StringBytes(b"int")),Boolean(false),Boolean(tr show create table `05_create_tables_t5`; Table,Create Table, -String(StringBytes(b"05_create_tables_t5")),String(StringBytes(b"CREATE TABLE `05_create_tables_t5` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t5")),String(StringBytes(b"CREATE TABLE `05_create_tables_t5` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `05_create_tables_t6`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY, t2 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: CREATE TABLE `05_create_tables_t6`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY, t2 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic;. Caused by: Failed to create plan, err:Table must contain only one timestamp key and it's data type must be TIMESTAMP" }) +CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; + +affected_rows: 0 + +show create table `05_create_tables_t7`; + +Table,Create Table, +String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), + + +drop table `05_create_tables_t7`; + +affected_rows: 0 + +CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'columnar'); + +affected_rows: 0 + +show create table `05_create_tables_t7`; + +Table,Create Table, +String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), + + +drop table `05_create_tables_t7`; + +affected_rows: 0 + +CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); + +affected_rows: 0 + +show create table `05_create_tables_t7`; + +Table,Create Table, +String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='hybrid', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), + + +drop table `05_create_tables_t7`; + +affected_rows: 0 + +CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown'); + +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, query: CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown');. Caused by: Failed to execute create table, err:Failed to create table, name:05_create_tables_t7, err:Failed to create table, err:Invalid arguments, err:Invalid options, space_id:2, table:05_create_tables_t7, table_id:2199023255650, err:Failed to parse storage format, err:Unknown storage format. value:\"unknown\"." }) + DROP TABLE IF EXISTS `05_create_tables_t`; affected_rows: 0 @@ -133,3 +183,7 @@ DROP TABLE IF EXISTS `05_create_tables_t6`; affected_rows: 0 +DROP TABLE IF EXISTS `05_create_tables_t7`; + +affected_rows: 0 + diff --git a/tests/cases/local/05_ddl/create_tables.sql b/tests/cases/local/05_ddl/create_tables.sql index 34a10b6d24..e6577e9f07 100644 --- a/tests/cases/local/05_ddl/create_tables.sql +++ b/tests/cases/local/05_ddl/create_tables.sql @@ -4,6 +4,7 @@ DROP TABLE IF EXISTS `05_create_tables_t3`; DROP TABLE IF EXISTS `05_create_tables_t4`; DROP TABLE IF EXISTS `05_create_tables_t5`; DROP TABLE IF EXISTS `05_create_tables_t6`; +DROP TABLE IF EXISTS `05_create_tables_t7`; -- no TIMESTAMP column CREATE TABLE `05_create_tables_t`(c1 int) ENGINE = Analytic; @@ -41,10 +42,25 @@ show create table `05_create_tables_t5`; -- Multiple TIMESTAMP KEYs CREATE TABLE `05_create_tables_t6`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY, t2 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; +-- StorageFormat +CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; +show create table `05_create_tables_t7`; +drop table `05_create_tables_t7`; + +CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'columnar'); +show create table `05_create_tables_t7`; +drop table `05_create_tables_t7`; + +CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'hybrid'); +show create table `05_create_tables_t7`; +drop table `05_create_tables_t7`; + +CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown'); + DROP TABLE IF EXISTS `05_create_tables_t`; DROP TABLE IF EXISTS `05_create_tables_t2`; DROP TABLE IF EXISTS `05_create_tables_t3`; DROP TABLE IF EXISTS `05_create_tables_t4`; DROP TABLE IF EXISTS `05_create_tables_t5`; DROP TABLE IF EXISTS `05_create_tables_t6`; - +DROP TABLE IF EXISTS `05_create_tables_t7`; diff --git a/tests/cases/local/06_show/show_create_table.result b/tests/cases/local/06_show/show_create_table.result index 528974ed93..7adfd6d8ae 100644 --- a/tests/cases/local/06_show/show_create_table.result +++ b/tests/cases/local/06_show/show_create_table.result @@ -17,7 +17,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_a`; Table,Create Table, -String(StringBytes(b"06_show_a")),String(StringBytes(b"CREATE TABLE `06_show_a` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_a")),String(StringBytes(b"CREATE TABLE `06_show_a` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `06_show_b` (a bigint, b int null default null, c string, d smallint null, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; @@ -27,7 +27,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_b`; Table,Create Table, -String(StringBytes(b"06_show_b")),String(StringBytes(b"CREATE TABLE `06_show_b` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_b")),String(StringBytes(b"CREATE TABLE `06_show_b` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `06_show_c` (a int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; @@ -37,7 +37,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_c`; Table,Create Table, -String(StringBytes(b"06_show_c")),String(StringBytes(b"CREATE TABLE `06_show_c` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_c")),String(StringBytes(b"CREATE TABLE `06_show_c` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), DROP TABLE `06_show_a`; From ae4f08bd5a21631855262e8a6e75f26aa5b21370 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 26 Aug 2022 12:24:45 +0800 Subject: [PATCH 4/7] add storage format in table schema proto --- analytic_engine/src/sst/parquet/encoding.rs | 15 ++------- analytic_engine/src/table_options.rs | 9 ++--- common_types/src/schema.rs | 37 ++++++++++++++++----- proto/protos/analytic_common.proto | 8 ++--- proto/protos/common.proto | 7 ++++ 5 files changed, 42 insertions(+), 34 deletions(-) diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 85e6dca733..afd36466d3 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -450,13 +450,7 @@ impl ParquetEncoder { .set_max_row_group_size(num_rows_per_row_group) .set_compression(compression) .build(); - let mut format = meta_data.schema.storage_format(); - - // TODO: remove this overwrite when we can set format via table options - if matches!(format, StorageFormat::Hybrid) && meta_data.schema.index_of_tsid().is_none() { - format = StorageFormat::Columnar; - } - + let format = meta_data.schema.storage_format(); let record_encoder: Box = match format { StorageFormat::Hybrid => Box::new(HybridRecordEncoder::try_new( write_props, @@ -727,12 +721,7 @@ impl ParquetDecoder { let arrow_schema_meta = ArrowSchemaMeta::try_from(arrow_schema.metadata()) .map_err(|e| Box::new(e) as _) .context(DecodeRecordBatch)?; - let mut format = arrow_schema_meta.storage_format(); - // TODO: remove this overwrite when we can set format via table options - if matches!(format, StorageFormat::Hybrid) && !arrow_schema_meta.enable_tsid_primary_key() { - format = StorageFormat::Columnar; - } - + let format = arrow_schema_meta.storage_format(); let record_decoder: Box = match format { StorageFormat::Hybrid => Box::new(HybridRecordDecoder {}), StorageFormat::Columnar => Box::new(ColumnarRecordDecoder {}), diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index b71b37de6f..f8a7484ef4 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -13,8 +13,7 @@ use common_util::{ }; use proto::analytic_common::{ CompactionOptions as CompactionOptionsPb, CompactionStrategy as CompactionStrategyPb, - Compression as CompressionPb, StorageFormat as StorageFormatPb, TableOptions as TableOptionsPb, - UpdateMode as UpdateModePb, + Compression as CompressionPb, TableOptions as TableOptionsPb, UpdateMode as UpdateModePb, }; use serde_derive::Deserialize; use snafu::{Backtrace, GenerateBacktrace, ResultExt, Snafu}; @@ -456,10 +455,6 @@ impl From for TableOptions { } else { Some(Duration::from_millis(opts.segment_duration).into()) }; - let storage_format = match opts.storage_format { - StorageFormatPb::Columnar => StorageFormat::Columnar, - StorageFormatPb::Hybrid => StorageFormat::Hybrid, - }; Self { segment_duration, @@ -471,7 +466,7 @@ impl From for TableOptions { update_mode, write_buffer_size: opts.write_buffer_size, compression: opts.compression.into(), - storage_format, + storage_format: opts.storage_format.into(), } } } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index abd66d32ce..774c56576c 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -553,17 +553,35 @@ pub enum StorageFormat { Hybrid, } -const STORAGE_FORMAT_COLUMNAR: &str = "COLUMNAR"; -const STORAGE_FORMAT_HYBRID: &str = "HYBRID"; +const STORAGE_FORMAT_COLUMNAR: &str = "columnar"; +const STORAGE_FORMAT_HYBRID: &str = "hybrid"; + +impl From for common_pb::StorageFormat { + fn from(format: StorageFormat) -> Self { + match format { + StorageFormat::Columnar => Self::Columnar, + StorageFormat::Hybrid => Self::Hybrid, + } + } +} + +impl From for StorageFormat { + fn from(format: common_pb::StorageFormat) -> Self { + match format { + common_pb::StorageFormat::Columnar => Self::Columnar, + common_pb::StorageFormat::Hybrid => Self::Hybrid, + } + } +} impl TryFrom<&str> for StorageFormat { type Error = Error; - fn try_from(s: &str) -> Result { - let format = match s.to_uppercase().as_str() { + fn try_from(value: &str) -> Result { + let format = match value.to_lowercase().as_str() { STORAGE_FORMAT_COLUMNAR => Self::Columnar, STORAGE_FORMAT_HYBRID => Self::Hybrid, - _ => return UnknownStorageFormat { value: s }.fail(), + _ => return UnknownStorageFormat { value }.fail(), }; Ok(format) } @@ -572,8 +590,8 @@ impl TryFrom<&str> for StorageFormat { impl ToString for StorageFormat { fn to_string(&self) -> String { match self { - Self::Columnar => "columnar", - Self::Hybrid => "hybrid", + Self::Columnar => STORAGE_FORMAT_COLUMNAR, + Self::Hybrid => STORAGE_FORMAT_HYBRID, } .to_string() } @@ -629,6 +647,7 @@ impl fmt::Debug for Schema { .field("enable_tsid_primary_key", &self.enable_tsid_primary_key) .field("column_schemas", &self.column_schemas) .field("version", &self.version) + .field("storage_format", &self.storage_format.to_string()) .finish() } } @@ -920,7 +939,8 @@ impl TryFrom for Schema { fn try_from(schema: common_pb::TableSchema) -> Result { let mut builder = Builder::with_capacity(schema.columns.len()) .version(schema.version) - .enable_tsid_primary_key(schema.enable_tsid_primary_key); + .enable_tsid_primary_key(schema.enable_tsid_primary_key) + .storage_format(schema.storage_format.into()); for (i, column_schema_pb) in schema.columns.into_iter().enumerate() { let column = ColumnSchema::from(column_schema_pb); @@ -950,6 +970,7 @@ impl From for common_pb::TableSchema { table_schema.timestamp_index = schema.timestamp_index as u32; table_schema.enable_tsid_primary_key = schema.enable_tsid_primary_key; table_schema.version = schema.version; + table_schema.storage_format = schema.storage_format.into(); table_schema } diff --git a/proto/protos/analytic_common.proto b/proto/protos/analytic_common.proto index ae47f020c8..ff120e2513 100644 --- a/proto/protos/analytic_common.proto +++ b/proto/protos/analytic_common.proto @@ -3,6 +3,7 @@ // Common protos of analytic engine syntax = "proto3"; package analytic_common; +import "common.proto"; // Options of a table that need to persist message TableOptions { @@ -20,7 +21,7 @@ message TableOptions { // If sampling_segment_duration is true, then the segment duration // is still unknown. bool sampling_segment_duration = 11; - StorageFormat storage_format = 12; + common.StorageFormat storage_format = 12; } enum UpdateMode { @@ -28,11 +29,6 @@ enum UpdateMode { Append = 1; } -enum StorageFormat { - Columnar = 0; - Hybrid = 1; -} - message CompactionOptions { // Options for STCS float bucket_low = 1; diff --git a/proto/protos/common.proto b/proto/protos/common.proto index dc917685a7..0b39259090 100644 --- a/proto/protos/common.proto +++ b/proto/protos/common.proto @@ -52,6 +52,13 @@ message TableSchema { uint32 timestamp_index = 4; // Enable auto generated tsid as primary key bool enable_tsid_primary_key = 5; + // Column's format in underlying storage + StorageFormat storage_format = 12; +} + +enum StorageFormat { + Columnar = 0; + Hybrid = 1; } // Time range of [start, end) From 5abf45665ed77d3f96d61a92b6cbe9d38f4661b0 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 26 Aug 2022 15:16:43 +0800 Subject: [PATCH 5/7] move storage format to table options module --- .../src/instance/flush_compaction.rs | 2 + analytic_engine/src/sst/file.rs | 15 +- analytic_engine/src/sst/parquet/builder.rs | 2 + analytic_engine/src/sst/parquet/encoding.rs | 27 ++- analytic_engine/src/sst/parquet/reader.rs | 24 +-- analytic_engine/src/table/data.rs | 5 + analytic_engine/src/table/version_edit.rs | 1 + analytic_engine/src/table_options.rs | 105 ++++++++++- common_types/src/schema.rs | 166 +----------------- proto/protos/analytic_common.proto | 8 +- proto/protos/common.proto | 7 - proto/protos/meta_update.proto | 1 + proto/protos/sst.proto | 2 + tests/cases/local/05_ddl/create_tables.result | 12 +- .../local/06_show/show_create_table.result | 6 +- 15 files changed, 168 insertions(+), 215 deletions(-) diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index b56a46aa72..0f0944ac70 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -607,6 +607,7 @@ impl Instance { schema: table_data.schema(), size: 0, row_num: 0, + storage_format: table_data.table_options().storage_format, }; let store = self.space_store.clone(); @@ -711,6 +712,7 @@ impl Instance { schema: table_data.schema(), size: 0, row_num: 0, + storage_format: table_data.storage_format(), }; // Alloc file id for next sst file diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 865e693a14..5b87ad9cae 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -37,7 +37,7 @@ use tokio::sync::{ Mutex, }; -use crate::{space::SpaceId, sst::manager::FileId, table::sst_util}; +use crate::{space::SpaceId, sst::manager::FileId, table::sst_util, table_options::StorageFormat}; /// Error of sst file. #[derive(Debug, Snafu)] @@ -220,6 +220,11 @@ impl FileHandle { pub fn set_being_compacted(&self, value: bool) { self.inner.being_compacted.store(value, Ordering::Relaxed); } + + #[inline] + pub fn storage_format(&self) -> StorageFormat { + self.inner.meta.meta.storage_format + } } impl fmt::Debug for FileHandle { @@ -420,6 +425,7 @@ pub struct SstMetaData { pub size: u64, // total row number pub row_num: u64, + pub storage_format: StorageFormat, } impl From for SstMetaDataPb { @@ -433,6 +439,7 @@ impl From for SstMetaDataPb { target.set_schema(src.schema.into()); target.set_size(src.size); target.set_row_num(src.row_num); + target.set_storage_format(src.storage_format.into()); target } @@ -452,6 +459,7 @@ impl TryFrom for SstMetaData { schema, size: src.size, row_num: src.row_num, + storage_format: src.storage_format.into(), }) } } @@ -610,6 +618,9 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData { let mut time_range_start = files[0].time_range().inclusive_start(); let mut time_range_end = files[0].time_range().exclusive_end(); let mut max_sequence = files[0].max_sequence(); + // TODO: what if format of different file is different? + // pick first now + let storage_format = files[0].storage_format(); if files.len() > 1 { for file in &files[1..] { @@ -630,6 +641,7 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData { // we don't know file size and total row number yet size: 0, row_num: 0, + storage_format, } } @@ -685,6 +697,7 @@ pub mod tests { schema: self.schema.clone(), size: 0, row_num: 0, + storage_format: StorageFormat::default(), } } } diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index 3347a5d375..110f7b1d4c 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -159,6 +159,7 @@ mod tests { use common_types::{ bytes::Bytes, projected_schema::ProjectedSchema, + schema::StorageFormat, tests::{build_row, build_schema}, time::{TimeRange, Timestamp}, }; @@ -217,6 +218,7 @@ mod tests { schema: schema.clone(), size: 10, row_num: 2, + storage_format: StorageFormat::default(), }; let mut counter = 10; diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index afd36466d3..523f7dbdd1 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -22,9 +22,7 @@ use arrow_deps::{ use common_types::{ bytes::{BytesMut, MemBufMut, Writer}, datum::DatumKind, - schema::{ - ArrowSchema, ArrowSchemaMeta, ArrowSchemaRef, DataType, Field, Schema, StorageFormat, - }, + schema::{ArrowSchema, ArrowSchemaRef, DataType, Field, Schema}, }; use common_util::define_result; use log::trace; @@ -32,9 +30,12 @@ use proto::sst::SstMetaData as SstMetaDataPb; use protobuf::Message; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; -use crate::sst::{ - file::SstMetaData, - parquet::hybrid::{self, IndexedType}, +use crate::{ + sst::{ + file::SstMetaData, + parquet::hybrid::{self, IndexedType}, + }, + table_options::StorageFormat, }; // TODO: Only support i32 offset now, consider i64 here? @@ -450,8 +451,8 @@ impl ParquetEncoder { .set_max_row_group_size(num_rows_per_row_group) .set_compression(compression) .build(); - let format = meta_data.schema.storage_format(); - let record_encoder: Box = match format { + + let record_encoder: Box = match meta_data.storage_format { StorageFormat::Hybrid => Box::new(HybridRecordEncoder::try_new( write_props, &meta_data.schema, @@ -717,17 +718,13 @@ pub struct ParquetDecoder { } impl ParquetDecoder { - pub fn try_new(arrow_schema: ArrowSchemaRef) -> Result { - let arrow_schema_meta = ArrowSchemaMeta::try_from(arrow_schema.metadata()) - .map_err(|e| Box::new(e) as _) - .context(DecodeRecordBatch)?; - let format = arrow_schema_meta.storage_format(); - let record_decoder: Box = match format { + pub fn new(storage_format: StorageFormat) -> Self { + let record_decoder: Box = match storage_format { StorageFormat::Hybrid => Box::new(HybridRecordDecoder {}), StorageFormat::Columnar => Box::new(ColumnarRecordDecoder {}), }; - Ok(Self { record_decoder }) + Self { record_decoder } } pub fn decode_record_batch( diff --git a/analytic_engine/src/sst/parquet/reader.rs b/analytic_engine/src/sst/parquet/reader.rs index 4960a95d38..adfce855d1 100644 --- a/analytic_engine/src/sst/parquet/reader.rs +++ b/analytic_engine/src/sst/parquet/reader.rs @@ -36,11 +36,14 @@ use snafu::{ensure, OptionExt, ResultExt}; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; -use crate::sst::{ - factory::SstReaderOptions, - file::SstMetaData, - parquet::encoding::{self, ParquetDecoder}, - reader::{error::*, SstReader}, +use crate::{ + sst::{ + factory::SstReaderOptions, + file::SstMetaData, + parquet::encoding::{self, ParquetDecoder}, + reader::{error::*, SstReader}, + }, + table_options::StorageFormat, }; const DEFAULT_CHANNEL_CAP: usize = 1000; @@ -164,9 +167,9 @@ impl<'a> ParquetSstReader<'a> { let file_reader = self.file_reader.take().unwrap(); let batch_size = self.batch_size; - let schema = { + let (schema, storage_format) = { let meta_data = self.meta_data.as_ref().unwrap(); - meta_data.schema.clone() + (meta_data.schema.clone(), meta_data.storage_format) }; let projected_schema = self.projected_schema.clone(); let row_projector = projected_schema @@ -202,6 +205,7 @@ impl<'a> ParquetSstReader<'a> { predicate, batch_size, reverse, + storage_format, }; let start_fetch = Instant::now(); @@ -248,6 +252,7 @@ struct ProjectAndFilterReader { predicate: PredicateRef, batch_size: usize, reverse: bool, + storage_format: StorageFormat, } impl ProjectAndFilterReader { @@ -319,10 +324,7 @@ impl ProjectAndFilterReader { let reader = self.project_and_filter_reader()?; let arrow_record_batch_projector = ArrowRecordBatchProjector::from(self.row_projector); - let arrow_schema = self.projected_schema.to_projected_arrow_schema(); - let parquet_decoder = ParquetDecoder::try_new(arrow_schema) - .map_err(|e| Box::new(e) as _) - .context(DecodeRecordBatch)?; + let parquet_decoder = ParquetDecoder::new(self.storage_format); let mut row_num = 0; for record_batch in reader { trace!( diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 74e88dc96b..969e5e1080 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -42,6 +42,7 @@ use crate::{ sst_util, version::{MemTableForWrite, MemTableState, SamplingMemTable, TableVersion}, }, + table_options::StorageFormat, TableOptions, }; @@ -466,6 +467,10 @@ impl TableData { pub fn is_expired(&self, timestamp: Timestamp) -> bool { self.table_options().is_expired(timestamp) } + + pub fn storage_format(&self) -> StorageFormat { + self.table_options().storage_format + } } /// Table data reference diff --git a/analytic_engine/src/table/version_edit.rs b/analytic_engine/src/table/version_edit.rs index 8e73539499..4a8d2e8d26 100644 --- a/analytic_engine/src/table/version_edit.rs +++ b/analytic_engine/src/table/version_edit.rs @@ -83,6 +83,7 @@ impl TryFrom for AddFile { schema, size: src.size, row_num: src.row_num, + storage_format: src.storage_format.into(), }, }, }) diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index f8a7484ef4..9c04b2d131 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, string::ToString, time::Duration}; use arrow_deps::datafusion::parquet::basic::Compression as ParquetCompression; -use common_types::{schema::StorageFormat, time::Timestamp}; +use common_types::time::Timestamp; use common_util::{ config::{ReadableDuration, ReadableSize}, define_result, @@ -13,7 +13,8 @@ use common_util::{ }; use proto::analytic_common::{ CompactionOptions as CompactionOptionsPb, CompactionStrategy as CompactionStrategyPb, - Compression as CompressionPb, TableOptions as TableOptionsPb, UpdateMode as UpdateModePb, + Compression as CompressionPb, StorageFormat as StorageFormatPb, TableOptions as TableOptionsPb, + UpdateMode as UpdateModePb, }; use serde_derive::Deserialize; use snafu::{Backtrace, GenerateBacktrace, ResultExt, Snafu}; @@ -40,6 +41,8 @@ const COMPRESSION_UNCOMPRESSED: &str = "UNCOMPRESSED"; const COMPRESSION_LZ4: &str = "LZ4"; const COMPRESSION_SNAPPY: &str = "SNAPPY"; const COMPRESSION_ZSTD: &str = "ZSTD"; +const STORAGE_FORMAT_COLUMNAR: &str = "COLUMNAR"; +const STORAGE_FORMAT_HYBRID: &str = "HYBRID"; /// Default bucket duration (1d) const BUCKET_DURATION_1D: Duration = Duration::from_secs(24 * 60 * 60); @@ -97,8 +100,13 @@ pub enum Error { backtrace ))] ParseCompressionName { name: String, backtrace: Backtrace }, - #[snafu(display("Failed to parse storage format, err:{}\n", source))] - ParseStorageFormat { source: common_types::schema::Error }, + + #[snafu(display( + "Unknown storage format. value:{:?}.\nBacktrace:\n{}", + value, + backtrace + ))] + UnknownStorageFormat { value: String, backtrace: Backtrace }, } define_result!(Error); @@ -198,6 +206,92 @@ impl From for ParquetCompression { } } +/// StorageFormat specify how records are saved in persistent storage +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] +pub enum StorageFormat { + /// Traditional columnar format, every column is saved in one exact one + /// column, for example: + /// + ///```plaintext + /// | Timestamp | Device ID | Status Code | Tag 1 | Tag 2 | + /// | --------- |---------- | ----------- | ----- | ----- | + /// | 12:01 | A | 0 | v1 | v1 | + /// | 12:01 | B | 0 | v2 | v2 | + /// | 12:02 | A | 0 | v1 | v1 | + /// | 12:02 | B | 1 | v2 | v2 | + /// | 12:03 | A | 0 | v1 | v1 | + /// | 12:03 | B | 0 | v2 | v2 | + /// | ..... | | | | | + /// ``` + Columnar, + + /// Design for time-series data + /// Collapsible Columns within same primary key are collapsed + /// into list, other columns are the same format with columar's. + /// + /// Wether a column is collapsible is decided by + /// `Schema::is_collapsible_column` + /// + /// Note: minTime/maxTime is optional and not implemented yet, mainly used + /// for time-range pushdown filter + /// + ///```plaintext + /// | Device ID | Timestamp | Status Code | Tag 1 | Tag 2 | minTime | maxTime | + /// |-----------|---------------------|-------------|-------|-------|---------|---------| + /// | A | [12:01,12:02,12:03] | [0,0,0] | v1 | v1 | 12:01 | 12:03 | + /// | B | [12:01,12:02,12:03] | [0,1,0] | v2 | v2 | 12:01 | 12:03 | + /// | ... | | | | | | | + /// ``` + Hybrid, +} + +impl From for StorageFormatPb { + fn from(format: StorageFormat) -> Self { + match format { + StorageFormat::Columnar => Self::Columnar, + StorageFormat::Hybrid => Self::Hybrid, + } + } +} + +impl From for StorageFormat { + fn from(format: StorageFormatPb) -> Self { + match format { + StorageFormatPb::Columnar => Self::Columnar, + StorageFormatPb::Hybrid => Self::Hybrid, + } + } +} + +impl TryFrom<&str> for StorageFormat { + type Error = Error; + + fn try_from(value: &str) -> Result { + let format = match value.to_uppercase().as_str() { + STORAGE_FORMAT_COLUMNAR => Self::Columnar, + STORAGE_FORMAT_HYBRID => Self::Hybrid, + _ => return UnknownStorageFormat { value }.fail(), + }; + Ok(format) + } +} + +impl ToString for StorageFormat { + fn to_string(&self) -> String { + match self { + Self::Columnar => STORAGE_FORMAT_COLUMNAR, + Self::Hybrid => STORAGE_FORMAT_HYBRID, + } + .to_string() + } +} + +impl Default for StorageFormat { + fn default() -> Self { + Self::Columnar + } +} + /// Options for a table. #[derive(Debug, Clone, Deserialize, PartialEq)] #[serde(default)] @@ -415,6 +509,7 @@ impl From for TableOptionsPb { target.set_write_buffer_size(opts.write_buffer_size); target.set_compression(opts.compression.into()); + target.set_storage_format(opts.storage_format.into()); target } @@ -543,7 +638,7 @@ fn merge_table_options( table_opts.compression = Compression::parse_from(v)?; } if let Some(v) = options.get(STORAGE_FORMAT) { - table_opts.storage_format = v.as_str().try_into().context(ParseStorageFormat {})?; + table_opts.storage_format = v.as_str().try_into()?; } Ok(table_opts) } diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index 774c56576c..abb3cc74c0 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -19,7 +19,6 @@ pub use arrow_deps::arrow::datatypes::{ DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use proto::common as common_pb; -use serde_derive::Deserialize; use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use crate::{ @@ -151,13 +150,6 @@ pub enum Error { key: ArrowSchemaMetaKey, backtrace: Backtrace, }, - - #[snafu(display( - "Unknown storage format. value:{:?}.\nBacktrace:\n{}", - value, - backtrace - ))] - UnknownStorageFormat { value: String, backtrace: Backtrace }, } pub type SchemaId = u32; @@ -191,14 +183,9 @@ pub struct ArrowSchemaMeta { timestamp_index: usize, enable_tsid_primary_key: bool, version: u32, - storage_format: StorageFormat, } impl ArrowSchemaMeta { - pub fn storage_format(&self) -> StorageFormat { - self.storage_format - } - pub fn enable_tsid_primary_key(&self) -> bool { self.enable_tsid_primary_key } @@ -239,12 +226,6 @@ impl TryFrom<&HashMap> for ArrowSchemaMeta { ArrowSchemaMetaKey::EnableTsidPrimaryKey, )?, version: Self::parse_arrow_schema_meta_value(meta, ArrowSchemaMetaKey::Version)?, - storage_format: Self::parse_arrow_schema_meta_value::( - meta, - ArrowSchemaMetaKey::StorageFormat, - )? - .as_str() - .try_into()?, }) } } @@ -255,7 +236,6 @@ pub enum ArrowSchemaMetaKey { TimestampIndex, EnableTsidPrimaryKey, Version, - StorageFormat, } impl ArrowSchemaMetaKey { @@ -265,7 +245,6 @@ impl ArrowSchemaMetaKey { Self::TimestampIndex => "schema::timestamp_index", Self::EnableTsidPrimaryKey => "schema::enable_tsid_primary_key", Self::Version => "schema::version", - Self::StorageFormat => "schema::storage_format", } } } @@ -514,95 +493,6 @@ pub fn compare_row( Ordering::Equal } -/// StorageFormat specify how records are saved in persistent storage -#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] -pub enum StorageFormat { - /// Traditional columnar format, every column is saved in one exact one - /// column, for example: - /// - ///```plaintext - /// | Timestamp | Device ID | Status Code | Tag 1 | Tag 2 | - /// | --------- |---------- | ----------- | ----- | ----- | - /// | 12:01 | A | 0 | v1 | v1 | - /// | 12:01 | B | 0 | v2 | v2 | - /// | 12:02 | A | 0 | v1 | v1 | - /// | 12:02 | B | 1 | v2 | v2 | - /// | 12:03 | A | 0 | v1 | v1 | - /// | 12:03 | B | 0 | v2 | v2 | - /// | ..... | | | | | - /// ``` - Columnar, - - /// Design for time-series data - /// Collapsible Columns within same primary key are collapsed - /// into list, other columns are the same format with columar's. - /// - /// Wether a column is collapsible is decided by - /// `Schema::is_collapsible_column` - /// - /// Note: minTime/maxTime is optional and not implemented yet, mainly used - /// for time-range pushdown filter - /// - ///```plaintext - /// | Device ID | Timestamp | Status Code | Tag 1 | Tag 2 | minTime | maxTime | - /// |-----------|---------------------|-------------|-------|-------|---------|---------| - /// | A | [12:01,12:02,12:03] | [0,0,0] | v1 | v1 | 12:01 | 12:03 | - /// | B | [12:01,12:02,12:03] | [0,1,0] | v2 | v2 | 12:01 | 12:03 | - /// | ... | | | | | | | - /// ``` - Hybrid, -} - -const STORAGE_FORMAT_COLUMNAR: &str = "columnar"; -const STORAGE_FORMAT_HYBRID: &str = "hybrid"; - -impl From for common_pb::StorageFormat { - fn from(format: StorageFormat) -> Self { - match format { - StorageFormat::Columnar => Self::Columnar, - StorageFormat::Hybrid => Self::Hybrid, - } - } -} - -impl From for StorageFormat { - fn from(format: common_pb::StorageFormat) -> Self { - match format { - common_pb::StorageFormat::Columnar => Self::Columnar, - common_pb::StorageFormat::Hybrid => Self::Hybrid, - } - } -} - -impl TryFrom<&str> for StorageFormat { - type Error = Error; - - fn try_from(value: &str) -> Result { - let format = match value.to_lowercase().as_str() { - STORAGE_FORMAT_COLUMNAR => Self::Columnar, - STORAGE_FORMAT_HYBRID => Self::Hybrid, - _ => return UnknownStorageFormat { value }.fail(), - }; - Ok(format) - } -} - -impl ToString for StorageFormat { - fn to_string(&self) -> String { - match self { - Self::Columnar => STORAGE_FORMAT_COLUMNAR, - Self::Hybrid => STORAGE_FORMAT_HYBRID, - } - .to_string() - } -} - -impl Default for StorageFormat { - fn default() -> Self { - Self::Columnar - } -} - // TODO(yingwen): Maybe rename to TableSchema. /// Schema of a table /// @@ -633,8 +523,6 @@ pub struct Schema { column_schemas: Arc, /// Version of the schema, schemas with same version should be identical. version: Version, - /// How columns is perisisted in underlying storage. - storage_format: StorageFormat, } impl fmt::Debug for Schema { @@ -647,7 +535,6 @@ impl fmt::Debug for Schema { .field("enable_tsid_primary_key", &self.enable_tsid_primary_key) .field("column_schemas", &self.column_schemas) .field("version", &self.version) - .field("storage_format", &self.storage_format.to_string()) .finish() } } @@ -926,11 +813,6 @@ impl Schema { pub fn string_buffer_offset(&self) -> usize { self.column_schemas.string_buffer_offset } - - /// Column's format in storage - pub fn storage_format(&self) -> StorageFormat { - self.storage_format - } } impl TryFrom for Schema { @@ -939,8 +821,7 @@ impl TryFrom for Schema { fn try_from(schema: common_pb::TableSchema) -> Result { let mut builder = Builder::with_capacity(schema.columns.len()) .version(schema.version) - .enable_tsid_primary_key(schema.enable_tsid_primary_key) - .storage_format(schema.storage_format.into()); + .enable_tsid_primary_key(schema.enable_tsid_primary_key); for (i, column_schema_pb) in schema.columns.into_iter().enumerate() { let column = ColumnSchema::from(column_schema_pb); @@ -970,7 +851,6 @@ impl From for common_pb::TableSchema { table_schema.timestamp_index = schema.timestamp_index as u32; table_schema.enable_tsid_primary_key = schema.enable_tsid_primary_key; table_schema.version = schema.version; - table_schema.storage_format = schema.storage_format.into(); table_schema } @@ -993,7 +873,6 @@ pub struct Builder { auto_increment_column_id: bool, max_column_id: ColumnId, enable_tsid_primary_key: bool, - storage_format: StorageFormat, } impl Default for Builder { @@ -1020,7 +899,6 @@ impl Builder { auto_increment_column_id: false, max_column_id: column_schema::COLUMN_ID_UNINIT, enable_tsid_primary_key: false, - storage_format: StorageFormat::default(), } } @@ -1081,12 +959,6 @@ impl Builder { self } - /// Set version of the schema - pub fn storage_format(mut self, format: StorageFormat) -> Self { - self.storage_format = format; - self - } - fn may_alloc_column_id(&mut self, column: &mut ColumnSchema) { // Assign this column an id if self.auto_increment_column_id && column.id == column_schema::COLUMN_ID_UNINIT { @@ -1158,7 +1030,6 @@ impl Builder { timestamp_index, enable_tsid_primary_key, version, - storage_format, } = Self::parse_arrow_schema_meta_or_default(arrow_schema.metadata())?; let tsid_index = Self::find_tsid_index(enable_tsid_primary_key, &columns)?; @@ -1172,7 +1043,6 @@ impl Builder { enable_tsid_primary_key, column_schemas, version, - storage_format, }) } @@ -1208,10 +1078,6 @@ impl Builder { ArrowSchemaMetaKey::EnableTsidPrimaryKey.to_string(), self.enable_tsid_primary_key.to_string(), ), - ( - ArrowSchemaMetaKey::StorageFormat.to_string(), - self.storage_format.to_string(), - ), ] .into_iter() .collect() @@ -1262,7 +1128,6 @@ impl Builder { enable_tsid_primary_key: self.enable_tsid_primary_key, column_schemas: Arc::new(ColumnSchemas::new(self.columns)), version: self.version, - storage_format: self.storage_format, }) } } @@ -1482,35 +1347,6 @@ mod tests { .unwrap(); } - #[test] - fn test_with_storage_format() { - let schema = Builder::new() - .auto_increment_column_id(true) - .add_key_column( - column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .build() - .unwrap(); - // default is columnar - assert_eq!(schema.storage_format, StorageFormat::Columnar); - - let schema = Builder::new() - .auto_increment_column_id(true) - .storage_format(StorageFormat::Hybrid) - .add_key_column( - column_schema::Builder::new("key1".to_string(), DatumKind::Varbinary) - .build() - .expect("should succeed build column schema"), - ) - .unwrap() - .build() - .unwrap(); - assert_eq!(schema.storage_format, StorageFormat::Hybrid); - } - #[test] fn test_missing_timestamp_key() { let builder = Builder::new() diff --git a/proto/protos/analytic_common.proto b/proto/protos/analytic_common.proto index ff120e2513..ae47f020c8 100644 --- a/proto/protos/analytic_common.proto +++ b/proto/protos/analytic_common.proto @@ -3,7 +3,6 @@ // Common protos of analytic engine syntax = "proto3"; package analytic_common; -import "common.proto"; // Options of a table that need to persist message TableOptions { @@ -21,7 +20,7 @@ message TableOptions { // If sampling_segment_duration is true, then the segment duration // is still unknown. bool sampling_segment_duration = 11; - common.StorageFormat storage_format = 12; + StorageFormat storage_format = 12; } enum UpdateMode { @@ -29,6 +28,11 @@ enum UpdateMode { Append = 1; } +enum StorageFormat { + Columnar = 0; + Hybrid = 1; +} + message CompactionOptions { // Options for STCS float bucket_low = 1; diff --git a/proto/protos/common.proto b/proto/protos/common.proto index 0b39259090..dc917685a7 100644 --- a/proto/protos/common.proto +++ b/proto/protos/common.proto @@ -52,13 +52,6 @@ message TableSchema { uint32 timestamp_index = 4; // Enable auto generated tsid as primary key bool enable_tsid_primary_key = 5; - // Column's format in underlying storage - StorageFormat storage_format = 12; -} - -enum StorageFormat { - Columnar = 0; - Hybrid = 1; } // Time range of [start, end) diff --git a/proto/protos/meta_update.proto b/proto/protos/meta_update.proto index e014e76055..da0466c399 100644 --- a/proto/protos/meta_update.proto +++ b/proto/protos/meta_update.proto @@ -44,6 +44,7 @@ message AddFileMeta { common.TableSchema schema = 7; uint64 size = 8; uint64 row_num = 9; + analytic_common.StorageFormat storage_format = 10; } // Meta data of the file to delete diff --git a/proto/protos/sst.proto b/proto/protos/sst.proto index a1ab16e9a7..ea8b765560 100644 --- a/proto/protos/sst.proto +++ b/proto/protos/sst.proto @@ -5,6 +5,7 @@ syntax = "proto3"; package sst; import "common.proto"; +import "analytic_common.proto"; message SstMetaData { // Min key in the sst @@ -18,4 +19,5 @@ message SstMetaData { common.TableSchema schema = 5; uint64 size = 6; uint64 row_num = 7; + analytic_common.StorageFormat storage_format = 8; } diff --git a/tests/cases/local/05_ddl/create_tables.result b/tests/cases/local/05_ddl/create_tables.result index b85c7535bb..89a6f8da0d 100644 --- a/tests/cases/local/05_ddl/create_tables.result +++ b/tests/cases/local/05_ddl/create_tables.result @@ -88,7 +88,7 @@ String(StringBytes(b"a")),String(StringBytes(b"int")),Boolean(false),Boolean(tru show create table `05_create_tables_t4`; Table,Create Table, -String(StringBytes(b"05_create_tables_t4")),String(StringBytes(b"CREATE TABLE `05_create_tables_t4` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t4")),String(StringBytes(b"CREATE TABLE `05_create_tables_t4` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `05_create_tables_t5`(c1 int, t timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; @@ -106,7 +106,7 @@ String(StringBytes(b"c1")),String(StringBytes(b"int")),Boolean(false),Boolean(tr show create table `05_create_tables_t5`; Table,Create Table, -String(StringBytes(b"05_create_tables_t5")),String(StringBytes(b"CREATE TABLE `05_create_tables_t5` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t5")),String(StringBytes(b"CREATE TABLE `05_create_tables_t5` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `05_create_tables_t6`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY, t2 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic; @@ -120,7 +120,7 @@ affected_rows: 0 show create table `05_create_tables_t7`; Table,Create Table, -String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), drop table `05_create_tables_t7`; @@ -134,7 +134,7 @@ affected_rows: 0 show create table `05_create_tables_t7`; Table,Create Table, -String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), drop table `05_create_tables_t7`; @@ -148,7 +148,7 @@ affected_rows: 0 show create table `05_create_tables_t7`; Table,Create Table, -String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='hybrid', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"05_create_tables_t7")),String(StringBytes(b"CREATE TABLE `05_create_tables_t7` (`t1` timestamp NOT NULL, `tsid` uint64 NOT NULL, `c1` int, PRIMARY KEY(t1,tsid), TIMESTAMP KEY(t1)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='HYBRID', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), drop table `05_create_tables_t7`; @@ -157,7 +157,7 @@ affected_rows: 0 CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown'); -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, query: CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown');. Caused by: Failed to execute create table, err:Failed to create table, name:05_create_tables_t7, err:Failed to create table, err:Invalid arguments, err:Invalid options, space_id:2, table:05_create_tables_t7, table_id:2199023255650, err:Failed to parse storage format, err:Unknown storage format. value:\"unknown\"." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, query: CREATE TABLE `05_create_tables_t7`(c1 int, t1 timestamp NOT NULL TIMESTAMP KEY) ENGINE = Analytic with (storage_format= 'unknown');. Caused by: Failed to execute create table, err:Failed to create table, name:05_create_tables_t7, err:Failed to create table, err:Invalid arguments, err:Invalid options, space_id:2, table:05_create_tables_t7, table_id:2199023255698, err:Unknown storage format. value:\"unknown\"." }) DROP TABLE IF EXISTS `05_create_tables_t`; diff --git a/tests/cases/local/06_show/show_create_table.result b/tests/cases/local/06_show/show_create_table.result index 7adfd6d8ae..e1770783b5 100644 --- a/tests/cases/local/06_show/show_create_table.result +++ b/tests/cases/local/06_show/show_create_table.result @@ -17,7 +17,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_a`; Table,Create Table, -String(StringBytes(b"06_show_a")),String(StringBytes(b"CREATE TABLE `06_show_a` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_a")),String(StringBytes(b"CREATE TABLE `06_show_a` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `06_show_b` (a bigint, b int null default null, c string, d smallint null, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; @@ -27,7 +27,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_b`; Table,Create Table, -String(StringBytes(b"06_show_b")),String(StringBytes(b"CREATE TABLE `06_show_b` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_b")),String(StringBytes(b"CREATE TABLE `06_show_b` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` bigint, `b` int, `c` string, `d` smallint, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), CREATE TABLE `06_show_c` (a int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; @@ -37,7 +37,7 @@ affected_rows: 0 SHOW CREATE TABLE `06_show_c`; Table,Create Table, -String(StringBytes(b"06_show_c")),String(StringBytes(b"CREATE TABLE `06_show_c` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='columnar', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), +String(StringBytes(b"06_show_c")),String(StringBytes(b"CREATE TABLE `06_show_c` (`t` timestamp NOT NULL, `tsid` uint64 NOT NULL, `a` int, PRIMARY KEY(t,tsid), TIMESTAMP KEY(t)) ENGINE=Analytic WITH(arena_block_size='2097152', compaction_strategy='default', compression='ZSTD', enable_ttl='true', num_rows_per_row_group='8192', segment_duration='', storage_format='COLUMNAR', ttl='7d', update_mode='OVERWRITE', write_buffer_size='33554432')")), DROP TABLE `06_show_a`; From 3d9c89ee871c0c560a00345b969c906cec4044fb Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 26 Aug 2022 15:35:47 +0800 Subject: [PATCH 6/7] fix clippy --- analytic_engine/src/compaction/picker.rs | 2 ++ analytic_engine/src/sst/parquet/builder.rs | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 5cc9f2afc9..83883328a6 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -594,6 +594,7 @@ mod tests { file::SstMetaData, manager::{tests::LevelsControllerMockBuilder, LevelsController}, }, + table_options::StorageFormat, }; fn build_sst_meta_data(time_range: TimeRange, size: u64) -> SstMetaData { @@ -605,6 +606,7 @@ mod tests { schema: build_schema(), size, row_num: 2, + storage_format: StorageFormat::default(), } } diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index 110f7b1d4c..3a7a8e519b 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -159,7 +159,6 @@ mod tests { use common_types::{ bytes::Bytes, projected_schema::ProjectedSchema, - schema::StorageFormat, tests::{build_row, build_schema}, time::{TimeRange, Timestamp}, }; @@ -177,7 +176,7 @@ mod tests { parquet::reader::ParquetSstReader, reader::{tests::check_stream, SstReader}, }, - table_options, + table_options::{self, StorageFormat}, }; // TODO(xikai): add test for reverse reader From ddf0c068724f4310b047e84cf1790e765bf7be16 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Fri, 26 Aug 2022 16:03:16 +0800 Subject: [PATCH 7/7] fix CR comments --- analytic_engine/src/sst/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 5b87ad9cae..77b879b6de 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -618,7 +618,7 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData { let mut time_range_start = files[0].time_range().inclusive_start(); let mut time_range_end = files[0].time_range().exclusive_end(); let mut max_sequence = files[0].max_sequence(); - // TODO: what if format of different file is different? + // TODO(jiacai2050): what if format of different file is different? // pick first now let storage_format = files[0].storage_format();