Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix add column min\max stat bug #10137

Merged
merged 5 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -60,6 +61,7 @@ use crate::with_number_type;
use crate::BlockEntry;
use crate::Column;
use crate::FromData;
use crate::Scalar;
use crate::TypeDeserializerImpl;
use crate::Value;
use crate::ARROW_EXT_TYPE_EMPTY_ARRAY;
Expand Down Expand Up @@ -437,6 +439,50 @@ impl TableSchema {
field_column_ids
}

pub fn field_leaf_default_values(
&self,
default_values: &[Scalar],
) -> HashMap<ColumnId, Scalar> {
fn collect_leaf_default_values(
default_value: &Scalar,
column_ids: &[ColumnId],
index: &mut usize,
leaf_default_values: &mut HashMap<ColumnId, Scalar>,
) {
match default_value {
Scalar::Tuple(s) => {
s.iter().for_each(|default_val| {
collect_leaf_default_values(
default_val,
column_ids,
index,
leaf_default_values,
)
});
}
_ => {
leaf_default_values.insert(column_ids[*index], default_value.to_owned());
*index += 1;
}
}
}

let mut leaf_default_values = HashMap::with_capacity(self.num_fields());
let leaf_field_column_ids = self.field_leaf_column_ids();
for (default_value, field_column_ids) in default_values.iter().zip_eq(leaf_field_column_ids)
{
let mut index = 0;
collect_leaf_default_values(
default_value,
&field_column_ids,
&mut index,
&mut leaf_default_values,
);
}

leaf_default_values
}

#[inline]
pub fn num_fields(&self) -> usize {
self.fields.len()
Expand Down
59 changes: 59 additions & 0 deletions src/query/expression/tests/it/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::collections::BTreeMap;
use common_exception::Result;
use common_expression::create_test_complex_schema;
use common_expression::types::NumberDataType;
use common_expression::ColumnId;
use common_expression::Scalar;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchema;
Expand Down Expand Up @@ -180,6 +182,63 @@ fn test_schema_from_simple_type() -> Result<()> {
Ok(())
}

#[test]
fn test_field_leaf_default_values() -> Result<()> {
let b1 = TableDataType::Tuple {
fields_name: vec!["b11".to_string(), "b12".to_string()],
fields_type: vec![TableDataType::Boolean, TableDataType::String],
};
let b = TableDataType::Tuple {
fields_name: vec!["b1".to_string(), "b2".to_string()],
fields_type: vec![b1, TableDataType::Number(NumberDataType::Int64)],
};
let fields = vec![
TableField::new("a", TableDataType::Number(NumberDataType::UInt64)),
TableField::new("b", b),
TableField::new("c", TableDataType::Number(NumberDataType::UInt64)),
];
let schema = TableSchema::new(fields);

let default_values = vec![
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)),
Scalar::Tuple(vec![
Scalar::Tuple(vec![
Scalar::Boolean(true),
Scalar::String(vec!['a', 'b'].iter().map(|c| *c as u8).collect::<Vec<_>>()),
]),
Scalar::Number(common_expression::types::number::NumberScalar::Int64(2)),
]),
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(10)),
];

let leaf_default_values = schema.field_leaf_default_values(&default_values);
let expected_leaf_default_values: Vec<(ColumnId, Scalar)> = vec![
(
0,
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)),
),
(1, Scalar::Boolean(true)),
(
2,
Scalar::String(vec!['a', 'b'].iter().map(|c| *c as u8).collect::<Vec<_>>()),
),
(
3,
Scalar::Number(common_expression::types::number::NumberScalar::Int64(2)),
),
(
4,
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(10)),
),
];
expected_leaf_default_values
.iter()
.for_each(|(col_id, default_value)| {
assert_eq!(leaf_default_values.get(col_id).unwrap(), default_value)
});
Ok(())
}

#[test]
fn test_schema_from_struct() -> Result<()> {
let schema = create_test_complex_schema();
Expand Down
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ uuid = { version = "1.1.2", features = ["serde", "v4"] }

[dev-dependencies]
common-meta-embedded = { path = "../../meta/embedded" }
ordered-float = { workspace = true }

base64 = "0.13.0"
criterion = "0.4"
Expand Down
155 changes: 117 additions & 38 deletions src/query/service/tests/it/storages/fuse/operations/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use std::collections::HashSet;

use common_base::base::tokio;
use common_exception::Result;
use common_expression::types::Float64Type;
use common_expression::types::Int32Type;
use common_expression::types::NumberDataType;
use common_expression::types::UInt64Type;
use common_expression::Column;
use common_expression::ColumnId;
use common_expression::DataBlock;
use common_expression::FromData;
use common_expression::Scalar;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchemaRefExt;
Expand All @@ -36,6 +39,7 @@ use databend_query::interpreters::DropTableColumnInterpreter;
use databend_query::interpreters::Interpreter;
use databend_query::interpreters::InterpreterFactory;
use futures_util::TryStreamExt;
use ordered_float::OrderedFloat;
use storages_common_cache::LoadParams;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::TableSnapshot;
Expand All @@ -46,7 +50,8 @@ use crate::storages::fuse::table_test_fixture::TestFixture;

async fn check_segment_column_ids(
fixture: &TestFixture,
expected_column_ids: Vec<ColumnId>,
expected_column_ids: Option<Vec<ColumnId>>,
expected_column_min_max: Option<Vec<(ColumnId, (Scalar, Scalar))>>,
) -> Result<()> {
let catalog = fixture.ctx().get_catalog("default")?;
// get the latest tbl
Expand All @@ -73,33 +78,45 @@ async fn check_segment_column_ids(
};

let snapshot = snapshot_reader.read(&params).await?;
let expected_column_ids =
HashSet::<ColumnId>::from_iter(expected_column_ids.clone().iter().cloned());
for (seg_loc, _) in &snapshot.segments {
let segment_reader = MetaReaders::segment_info_reader(
fuse_table.get_operator(),
TestFixture::default_table_schema(),
);
let params = LoadParams {
location: seg_loc.clone(),
len_hint: None,
ver: SegmentInfo::VERSION,
};
let segment_info = segment_reader.read(&params).await?;
segment_info.blocks.iter().for_each(|block_meta| {
assert_eq!(
HashSet::from_iter(
block_meta
.col_stats
.keys()
.cloned()
.collect::<Vec<ColumnId>>()
.iter()
.cloned()
),
expected_column_ids,
if let Some(expected_column_min_max) = expected_column_min_max {
for (column_id, (min, max)) in expected_column_min_max {
if let Some(stat) = snapshot.summary.col_stats.get(&column_id) {
assert_eq!(min, stat.min);
assert_eq!(max, stat.max);
}
}
}

if let Some(expected_column_ids) = expected_column_ids {
let expected_column_ids =
HashSet::<ColumnId>::from_iter(expected_column_ids.clone().iter().cloned());
for (seg_loc, _) in &snapshot.segments {
let segment_reader = MetaReaders::segment_info_reader(
fuse_table.get_operator(),
TestFixture::default_table_schema(),
);
});
let params = LoadParams {
location: seg_loc.clone(),
len_hint: None,
ver: SegmentInfo::VERSION,
};
let segment_info = segment_reader.read(&params).await?;

segment_info.blocks.iter().for_each(|block_meta| {
assert_eq!(
HashSet::from_iter(
block_meta
.col_stats
.keys()
.cloned()
.collect::<Vec<ColumnId>>()
.iter()
.cloned()
),
expected_column_ids,
);
});
}
}

Ok(())
Expand Down Expand Up @@ -128,7 +145,7 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
// check column ids
// the table contains two fields: id int32, t tuple(int32, int32)
let expected_leaf_column_ids = vec![0, 1, 2];
check_segment_column_ids(&fixture, expected_leaf_column_ids).await?;
check_segment_column_ids(&fixture, Some(expected_leaf_column_ids), None).await?;

// drop a column
let drop_table_column_plan = DropTableColumnPlan {
Expand All @@ -140,11 +157,17 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
let interpreter = DropTableColumnInterpreter::try_create(ctx.clone(), drop_table_column_plan)?;
interpreter.execute(ctx.clone()).await?;

// add a column
let fields = vec![TableField::new(
"b",
TableDataType::Number(NumberDataType::UInt64),
)];
// add a column of uint64 with default value `(1,15.0)`
let fields = vec![
TableField::new("b", TableDataType::Tuple {
fields_name: vec!["b1".to_string(), "b2".to_string()],
fields_type: vec![
TableDataType::Number(NumberDataType::UInt64),
TableDataType::Number(NumberDataType::Float64),
],
})
.with_default_expr(Some("(1,15.0)".to_string())),
];
let schema = TableSchemaRefExt::create(fields);

let add_table_column_plan = AddTableColumnPlan {
Expand All @@ -161,9 +184,14 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
// insert values for new schema
let block = {
let column0 = Int32Type::from_data(vec![1, 2]);
let column2 = UInt64Type::from_data(vec![3, 4]);
let column3 = UInt64Type::from_data(vec![3, 4]);
let column4 = Float64Type::from_data(vec![13.0, 14.0]);
let tuple_column = Column::Tuple {
fields: vec![column3, column4],
len: 2,
};

DataBlock::new_from_columns(vec![column0, column2])
DataBlock::new_from_columns(vec![column0, tuple_column])
};

// get the latest tbl
Expand All @@ -181,6 +209,33 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
.append_commit_blocks(table.clone(), vec![block], false, true)
.await?;

// verify statistics min and max value
check_segment_column_ids(
&fixture,
None,
Some(vec![
(
3,
(
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)),
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(4)),
),
),
(
4,
(
Scalar::Number(common_expression::types::number::NumberScalar::Float64(
OrderedFloat(13.0),
)),
Scalar::Number(common_expression::types::number::NumberScalar::Float64(
OrderedFloat(15.0),
)),
),
),
]),
)
.await?;

// do compact
let query = format!("optimize table {db_name}.{tbl_name} compact");
let mut planner = Planner::new(ctx.clone());
Expand All @@ -190,9 +245,33 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
let data_stream = interpreter.execute(ctx.clone()).await?;
let _ = data_stream.try_collect::<Vec<_>>().await;

// verify statistics has only [0,3]
let expected_column_ids = vec![0, 3];
check_segment_column_ids(&fixture, expected_column_ids).await?;
// verify statistics and min\max values
let expected_column_ids = vec![0, 3, 4];
check_segment_column_ids(
&fixture,
Some(expected_column_ids),
Some(vec![
(
3,
(
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)),
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(4)),
),
),
(
4,
(
Scalar::Number(common_expression::types::number::NumberScalar::Float64(
OrderedFloat(13.0),
)),
Scalar::Number(common_expression::types::number::NumberScalar::Float64(
OrderedFloat(15.0),
)),
),
),
]),
)
.await?;

Ok(())
}
Loading