From 1f42ad1c8ea1c2004a4a4cf92eb31eb532d326b9 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 20 Feb 2023 17:42:46 +0800 Subject: [PATCH 1/3] fix: fix add column min\max stat bug --- Cargo.lock | 1 + src/query/expression/src/schema.rs | 40 +++++ src/query/expression/tests/it/schema.rs | 59 +++++++ src/query/service/Cargo.toml | 1 + .../storages/fuse/operations/alter_table.rs | 155 +++++++++++++----- .../storages/fuse/src/operations/commit.rs | 37 ++++- 6 files changed, 253 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1994b890a03aa..b5ac3dff81ec3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3136,6 +3136,7 @@ dependencies = [ "once_cell", "opendal", "opensrv-mysql", + "ordered-float 3.4.0", "p256", "parking_lot 0.12.1", "petgraph", diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index be12f44b1719f..e511bd4b5b39c 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -15,6 +15,7 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::collections::BTreeSet; +use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; @@ -60,6 +61,7 @@ use crate::with_number_type; use crate::BlockEntry; use crate::Column; use crate::FromData; +use crate::Scalar; use crate::TypeDeserializerImpl; use crate::Value; use crate::ARROW_EXT_TYPE_EMPTY_ARRAY; @@ -437,6 +439,44 @@ impl TableSchema { field_column_ids } + pub fn field_leaf_default_values( + &self, + default_values: &[Scalar], + ) -> HashMap { + fn collect_leaf_default_values( + default_value: &Scalar, + leaf_default_values: &mut Vec, + ) { + match default_value { + Scalar::Tuple(s) => { + s.iter().for_each(|default_val| { + collect_leaf_default_values(default_val, leaf_default_values) + }); + } + _ => { + leaf_default_values.push(default_value.to_owned()); + } + } + } + + let mut leaf_default_value_map = HashMap::with_capacity(self.num_fields()); + let leaf_field_column_ids = self.field_leaf_column_ids(); + for (default_value, field_column_ids) in default_values.iter().zip_eq(leaf_field_column_ids) + { + let mut leaf_default_values = Vec::with_capacity(field_column_ids.len()); + collect_leaf_default_values(default_value, &mut leaf_default_values); + + field_column_ids + .iter() + .zip_eq(leaf_default_values) + .for_each(|(col_id, default_value)| { + leaf_default_value_map.insert(*col_id, default_value); + }); + } + + leaf_default_value_map + } + #[inline] pub fn num_fields(&self) -> usize { self.fields.len() diff --git a/src/query/expression/tests/it/schema.rs b/src/query/expression/tests/it/schema.rs index b53279fb20d55..4a7406d55c285 100644 --- a/src/query/expression/tests/it/schema.rs +++ b/src/query/expression/tests/it/schema.rs @@ -17,6 +17,8 @@ use std::collections::BTreeMap; use common_exception::Result; use common_expression::create_test_complex_schema; use common_expression::types::NumberDataType; +use common_expression::ColumnId; +use common_expression::Scalar; use common_expression::TableDataType; use common_expression::TableField; use common_expression::TableSchema; @@ -180,6 +182,63 @@ fn test_schema_from_simple_type() -> Result<()> { Ok(()) } +#[test] +fn test_field_leaf_default_values() -> Result<()> { + let b1 = TableDataType::Tuple { + fields_name: vec!["b11".to_string(), "b12".to_string()], + fields_type: vec![TableDataType::Boolean, TableDataType::String], + }; + let b = TableDataType::Tuple { + fields_name: vec!["b1".to_string(), "b2".to_string()], + fields_type: vec![b1, TableDataType::Number(NumberDataType::Int64)], + }; + let fields = vec![ + TableField::new("a", TableDataType::Number(NumberDataType::UInt64)), + TableField::new("b", b), + TableField::new("c", TableDataType::Number(NumberDataType::UInt64)), + ]; + let schema = TableSchema::new(fields); + + let default_values = vec![ + Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)), + Scalar::Tuple(vec![ + Scalar::Tuple(vec![ + Scalar::Boolean(true), + Scalar::String(vec!['a', 'b'].iter().map(|c| *c as u8).collect::>()), + ]), + Scalar::Number(common_expression::types::number::NumberScalar::Int64(2)), + ]), + Scalar::Number(common_expression::types::number::NumberScalar::UInt64(10)), + ]; + + let leaf_default_values = schema.field_leaf_default_values(&default_values); + let expected_leaf_default_values: Vec<(ColumnId, Scalar)> = vec![ + ( + 0, + Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)), + ), + (1, Scalar::Boolean(true)), + ( + 2, + Scalar::String(vec!['a', 'b'].iter().map(|c| *c as u8).collect::>()), + ), + ( + 3, + Scalar::Number(common_expression::types::number::NumberScalar::Int64(2)), + ), + ( + 4, + Scalar::Number(common_expression::types::number::NumberScalar::UInt64(10)), + ), + ]; + expected_leaf_default_values + .iter() + .for_each(|(col_id, default_value)| { + assert_eq!(leaf_default_values.get(col_id).unwrap(), default_value) + }); + Ok(()) +} + #[test] fn test_schema_from_struct() -> Result<()> { let schema = create_test_complex_schema(); diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 8598c33a5bc3a..a2faa063f61c7 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -142,6 +142,7 @@ uuid = { version = "1.1.2", features = ["serde", "v4"] } [dev-dependencies] common-meta-embedded = { path = "../../meta/embedded" } +ordered-float = { workspace = true } base64 = "0.13.0" criterion = "0.4" diff --git a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs index 00120c793fc64..658674ee0f92e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs +++ b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs @@ -16,12 +16,15 @@ use std::collections::HashSet; use common_base::base::tokio; use common_exception::Result; +use common_expression::types::Float64Type; use common_expression::types::Int32Type; use common_expression::types::NumberDataType; use common_expression::types::UInt64Type; +use common_expression::Column; use common_expression::ColumnId; use common_expression::DataBlock; use common_expression::FromData; +use common_expression::Scalar; use common_expression::TableDataType; use common_expression::TableField; use common_expression::TableSchemaRefExt; @@ -36,6 +39,7 @@ use databend_query::interpreters::DropTableColumnInterpreter; use databend_query::interpreters::Interpreter; use databend_query::interpreters::InterpreterFactory; use futures_util::TryStreamExt; +use ordered_float::OrderedFloat; use storages_common_cache::LoadParams; use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::TableSnapshot; @@ -46,7 +50,8 @@ use crate::storages::fuse::table_test_fixture::TestFixture; async fn check_segment_column_ids( fixture: &TestFixture, - expected_column_ids: Vec, + expected_column_ids: Option>, + expected_column_min_max: Option>, ) -> Result<()> { let catalog = fixture.ctx().get_catalog("default")?; // get the latest tbl @@ -73,33 +78,45 @@ async fn check_segment_column_ids( }; let snapshot = snapshot_reader.read(¶ms).await?; - let expected_column_ids = - HashSet::::from_iter(expected_column_ids.clone().iter().cloned()); - for (seg_loc, _) in &snapshot.segments { - let segment_reader = MetaReaders::segment_info_reader( - fuse_table.get_operator(), - TestFixture::default_table_schema(), - ); - let params = LoadParams { - location: seg_loc.clone(), - len_hint: None, - ver: SegmentInfo::VERSION, - }; - let segment_info = segment_reader.read(¶ms).await?; - segment_info.blocks.iter().for_each(|block_meta| { - assert_eq!( - HashSet::from_iter( - block_meta - .col_stats - .keys() - .cloned() - .collect::>() - .iter() - .cloned() - ), - expected_column_ids, + if let Some(expected_column_min_max) = expected_column_min_max { + for (column_id, (min, max)) in expected_column_min_max { + if let Some(stat) = snapshot.summary.col_stats.get(&column_id) { + assert_eq!(min, stat.min); + assert_eq!(max, stat.max); + } + } + } + + if let Some(expected_column_ids) = expected_column_ids { + let expected_column_ids = + HashSet::::from_iter(expected_column_ids.clone().iter().cloned()); + for (seg_loc, _) in &snapshot.segments { + let segment_reader = MetaReaders::segment_info_reader( + fuse_table.get_operator(), + TestFixture::default_table_schema(), ); - }); + let params = LoadParams { + location: seg_loc.clone(), + len_hint: None, + ver: SegmentInfo::VERSION, + }; + let segment_info = segment_reader.read(¶ms).await?; + + segment_info.blocks.iter().for_each(|block_meta| { + assert_eq!( + HashSet::from_iter( + block_meta + .col_stats + .keys() + .cloned() + .collect::>() + .iter() + .cloned() + ), + expected_column_ids, + ); + }); + } } Ok(()) @@ -128,7 +145,7 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { // check column ids // the table contains two fields: id int32, t tuple(int32, int32) let expected_leaf_column_ids = vec![0, 1, 2]; - check_segment_column_ids(&fixture, expected_leaf_column_ids).await?; + check_segment_column_ids(&fixture, Some(expected_leaf_column_ids), None).await?; // drop a column let drop_table_column_plan = DropTableColumnPlan { @@ -140,11 +157,17 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { let interpreter = DropTableColumnInterpreter::try_create(ctx.clone(), drop_table_column_plan)?; interpreter.execute(ctx.clone()).await?; - // add a column - let fields = vec![TableField::new( - "b", - TableDataType::Number(NumberDataType::UInt64), - )]; + // add a column of uint64 with default value `(1,15.0)` + let fields = vec![ + TableField::new("b", TableDataType::Tuple { + fields_name: vec!["b1".to_string(), "b2".to_string()], + fields_type: vec![ + TableDataType::Number(NumberDataType::UInt64), + TableDataType::Number(NumberDataType::Float64), + ], + }) + .with_default_expr(Some("(1,15.0)".to_string())), + ]; let schema = TableSchemaRefExt::create(fields); let add_table_column_plan = AddTableColumnPlan { @@ -161,9 +184,14 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { // insert values for new schema let block = { let column0 = Int32Type::from_data(vec![1, 2]); - let column2 = UInt64Type::from_data(vec![3, 4]); + let column3 = UInt64Type::from_data(vec![3, 4]); + let column4 = Float64Type::from_data(vec![13.0, 14.0]); + let tuple_column = Column::Tuple { + fields: vec![column3, column4], + len: 2, + }; - DataBlock::new_from_columns(vec![column0, column2]) + DataBlock::new_from_columns(vec![column0, tuple_column]) }; // get the latest tbl @@ -181,6 +209,33 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { .append_commit_blocks(table.clone(), vec![block], false, true) .await?; + // verify statistics min and max value + check_segment_column_ids( + &fixture, + None, + Some(vec![ + ( + 3, + ( + Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)), + Scalar::Number(common_expression::types::number::NumberScalar::UInt64(4)), + ), + ), + ( + 4, + ( + Scalar::Number(common_expression::types::number::NumberScalar::Float64( + OrderedFloat(13.0), + )), + Scalar::Number(common_expression::types::number::NumberScalar::Float64( + OrderedFloat(15.0), + )), + ), + ), + ]), + ) + .await?; + // do compact let query = format!("optimize table {db_name}.{tbl_name} compact"); let mut planner = Planner::new(ctx.clone()); @@ -190,9 +245,33 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { let data_stream = interpreter.execute(ctx.clone()).await?; let _ = data_stream.try_collect::>().await; - // verify statistics has only [0,3] - let expected_column_ids = vec![0, 3]; - check_segment_column_ids(&fixture, expected_column_ids).await?; + // verify statistics and min\max values + let expected_column_ids = vec![0, 3, 4]; + check_segment_column_ids( + &fixture, + Some(expected_column_ids), + Some(vec![ + ( + 3, + ( + Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)), + Scalar::Number(common_expression::types::number::NumberScalar::UInt64(4)), + ), + ), + ( + 4, + ( + Scalar::Number(common_expression::types::number::NumberScalar::Float64( + OrderedFloat(13.0), + )), + Scalar::Number(common_expression::types::number::NumberScalar::Float64( + OrderedFloat(15.0), + )), + ), + ), + ]), + ) + .await?; Ok(()) } diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 96ac4b731c4f8..f25ae4e64bbae 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -32,10 +32,12 @@ use common_meta_app::schema::TableInfo; use common_meta_app::schema::TableStatistics; use common_meta_app::schema::UpdateTableMetaReq; use common_meta_types::MatchSeq; +use common_sql::field_default_value; use opendal::Operator; use storages_common_cache::CacheAccessor; use storages_common_cache_manager::CachedObject; use storages_common_table_meta::meta::ClusterKey; +use storages_common_table_meta::meta::ColumnStatistics; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::Statistics; @@ -231,6 +233,7 @@ impl FuseTable { } else { Self::merge_table_operations( self.table_info.meta.schema.as_ref(), + ctx.clone(), prev, prev_version, segments, @@ -261,6 +264,7 @@ impl FuseTable { fn merge_table_operations( schema: &TableSchema, + ctx: Arc, previous: Option>, prev_version: u64, mut new_segments: Vec, @@ -269,8 +273,37 @@ impl FuseTable { ) -> Result { // 1. merge stats with previous snapshot, if any let stats = if let Some(snapshot) = &previous { - let summary = &snapshot.summary; - merge_statistics(&statistics, summary)? + let mut summary = snapshot.summary.clone(); + let mut fill_default_values = false; + for column_id in statistics.col_stats.keys() { + if !summary.col_stats.contains_key(column_id) { + fill_default_values = true; + break; + } + } + if fill_default_values { + let mut default_values = Vec::with_capacity(schema.num_fields()); + for field in schema.fields() { + default_values.push(field_default_value(ctx.clone(), field)?); + } + let leaf_default_values = schema.field_leaf_default_values(&default_values); + leaf_default_values + .iter() + .for_each(|(col_id, default_value)| { + if !summary.col_stats.contains_key(col_id) { + let col_stat = ColumnStatistics { + min: default_value.to_owned(), + max: default_value.to_owned(), + null_count: summary.row_count, + in_memory_size: 0, + distinct_of_values: None, + }; + summary.col_stats.insert(*col_id, col_stat); + } + }); + } + + merge_statistics(&statistics, &summary)? } else { statistics }; From 89a6053cd39d3d08f0179f53b6a611e49614ceb3 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 21 Feb 2023 09:21:19 +0800 Subject: [PATCH 2/3] fix: refactor by review comment --- src/query/storages/fuse/src/operations/commit.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index f25ae4e64bbae..340f14f43d829 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -275,6 +275,7 @@ impl FuseTable { let stats = if let Some(snapshot) = &previous { let mut summary = snapshot.summary.clone(); let mut fill_default_values = false; + // check if need to fill default value in statistics for column_id in statistics.col_stats.keys() { if !summary.col_stats.contains_key(column_id) { fill_default_values = true; @@ -291,12 +292,17 @@ impl FuseTable { .iter() .for_each(|(col_id, default_value)| { if !summary.col_stats.contains_key(col_id) { + let (null_count, distinct_of_values) = if default_value.is_null() { + (summary.row_count, Some(0)) + } else { + (0, Some(1)) + }; let col_stat = ColumnStatistics { min: default_value.to_owned(), max: default_value.to_owned(), - null_count: summary.row_count, + null_count, in_memory_size: 0, - distinct_of_values: None, + distinct_of_values, }; summary.col_stats.insert(*col_id, col_stat); } From 06721b2f25a03bf45ce0d808279a6a64f4e461da Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 21 Feb 2023 09:44:56 +0800 Subject: [PATCH 3/3] refactor: refactor field_leaf_default_values function --- src/query/expression/src/schema.rs | 34 ++++++++++++++++++------------ 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index e511bd4b5b39c..091ba8bb8b0cc 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -445,36 +445,42 @@ impl TableSchema { ) -> HashMap { fn collect_leaf_default_values( default_value: &Scalar, - leaf_default_values: &mut Vec, + column_ids: &[ColumnId], + index: &mut usize, + leaf_default_values: &mut HashMap, ) { match default_value { Scalar::Tuple(s) => { s.iter().for_each(|default_val| { - collect_leaf_default_values(default_val, leaf_default_values) + collect_leaf_default_values( + default_val, + column_ids, + index, + leaf_default_values, + ) }); } _ => { - leaf_default_values.push(default_value.to_owned()); + leaf_default_values.insert(column_ids[*index], default_value.to_owned()); + *index += 1; } } } - let mut leaf_default_value_map = HashMap::with_capacity(self.num_fields()); + let mut leaf_default_values = HashMap::with_capacity(self.num_fields()); let leaf_field_column_ids = self.field_leaf_column_ids(); for (default_value, field_column_ids) in default_values.iter().zip_eq(leaf_field_column_ids) { - let mut leaf_default_values = Vec::with_capacity(field_column_ids.len()); - collect_leaf_default_values(default_value, &mut leaf_default_values); - - field_column_ids - .iter() - .zip_eq(leaf_default_values) - .for_each(|(col_id, default_value)| { - leaf_default_value_map.insert(*col_id, default_value); - }); + let mut index = 0; + collect_leaf_default_values( + default_value, + &field_column_ids, + &mut index, + &mut leaf_default_values, + ); } - leaf_default_value_map + leaf_default_values } #[inline]