Skip to content

Commit

Permalink
Merge pull request #10137 from lichuang/snapshot_default_value
Browse files Browse the repository at this point in the history
fix: fix add column min\max stat bug
  • Loading branch information
mergify[bot] authored Feb 21, 2023
2 parents 5022818 + 06721b2 commit 0e1f927
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -60,6 +61,7 @@ use crate::with_number_type;
use crate::BlockEntry;
use crate::Column;
use crate::FromData;
use crate::Scalar;
use crate::TypeDeserializerImpl;
use crate::Value;
use crate::ARROW_EXT_TYPE_EMPTY_ARRAY;
Expand Down Expand Up @@ -437,6 +439,50 @@ impl TableSchema {
field_column_ids
}

pub fn field_leaf_default_values(
&self,
default_values: &[Scalar],
) -> HashMap<ColumnId, Scalar> {
fn collect_leaf_default_values(
default_value: &Scalar,
column_ids: &[ColumnId],
index: &mut usize,
leaf_default_values: &mut HashMap<ColumnId, Scalar>,
) {
match default_value {
Scalar::Tuple(s) => {
s.iter().for_each(|default_val| {
collect_leaf_default_values(
default_val,
column_ids,
index,
leaf_default_values,
)
});
}
_ => {
leaf_default_values.insert(column_ids[*index], default_value.to_owned());
*index += 1;
}
}
}

let mut leaf_default_values = HashMap::with_capacity(self.num_fields());
let leaf_field_column_ids = self.field_leaf_column_ids();
for (default_value, field_column_ids) in default_values.iter().zip_eq(leaf_field_column_ids)
{
let mut index = 0;
collect_leaf_default_values(
default_value,
&field_column_ids,
&mut index,
&mut leaf_default_values,
);
}

leaf_default_values
}

#[inline]
pub fn num_fields(&self) -> usize {
self.fields.len()
Expand Down
59 changes: 59 additions & 0 deletions src/query/expression/tests/it/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::collections::BTreeMap;
use common_exception::Result;
use common_expression::create_test_complex_schema;
use common_expression::types::NumberDataType;
use common_expression::ColumnId;
use common_expression::Scalar;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchema;
Expand Down Expand Up @@ -180,6 +182,63 @@ fn test_schema_from_simple_type() -> Result<()> {
Ok(())
}

#[test]
fn test_field_leaf_default_values() -> Result<()> {
let b1 = TableDataType::Tuple {
fields_name: vec!["b11".to_string(), "b12".to_string()],
fields_type: vec![TableDataType::Boolean, TableDataType::String],
};
let b = TableDataType::Tuple {
fields_name: vec!["b1".to_string(), "b2".to_string()],
fields_type: vec![b1, TableDataType::Number(NumberDataType::Int64)],
};
let fields = vec![
TableField::new("a", TableDataType::Number(NumberDataType::UInt64)),
TableField::new("b", b),
TableField::new("c", TableDataType::Number(NumberDataType::UInt64)),
];
let schema = TableSchema::new(fields);

let default_values = vec![
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)),
Scalar::Tuple(vec![
Scalar::Tuple(vec![
Scalar::Boolean(true),
Scalar::String(vec!['a', 'b'].iter().map(|c| *c as u8).collect::<Vec<_>>()),
]),
Scalar::Number(common_expression::types::number::NumberScalar::Int64(2)),
]),
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(10)),
];

let leaf_default_values = schema.field_leaf_default_values(&default_values);
let expected_leaf_default_values: Vec<(ColumnId, Scalar)> = vec![
(
0,
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)),
),
(1, Scalar::Boolean(true)),
(
2,
Scalar::String(vec!['a', 'b'].iter().map(|c| *c as u8).collect::<Vec<_>>()),
),
(
3,
Scalar::Number(common_expression::types::number::NumberScalar::Int64(2)),
),
(
4,
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(10)),
),
];
expected_leaf_default_values
.iter()
.for_each(|(col_id, default_value)| {
assert_eq!(leaf_default_values.get(col_id).unwrap(), default_value)
});
Ok(())
}

#[test]
fn test_schema_from_struct() -> Result<()> {
let schema = create_test_complex_schema();
Expand Down
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ uuid = { version = "1.1.2", features = ["serde", "v4"] }

[dev-dependencies]
common-meta-embedded = { path = "../../meta/embedded" }
ordered-float = { workspace = true }

base64 = "0.13.0"
criterion = "0.4"
Expand Down
155 changes: 117 additions & 38 deletions src/query/service/tests/it/storages/fuse/operations/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use std::collections::HashSet;

use common_base::base::tokio;
use common_exception::Result;
use common_expression::types::Float64Type;
use common_expression::types::Int32Type;
use common_expression::types::NumberDataType;
use common_expression::types::UInt64Type;
use common_expression::Column;
use common_expression::ColumnId;
use common_expression::DataBlock;
use common_expression::FromData;
use common_expression::Scalar;
use common_expression::TableDataType;
use common_expression::TableField;
use common_expression::TableSchemaRefExt;
Expand All @@ -36,6 +39,7 @@ use databend_query::interpreters::DropTableColumnInterpreter;
use databend_query::interpreters::Interpreter;
use databend_query::interpreters::InterpreterFactory;
use futures_util::TryStreamExt;
use ordered_float::OrderedFloat;
use storages_common_cache::LoadParams;
use storages_common_table_meta::meta::SegmentInfo;
use storages_common_table_meta::meta::TableSnapshot;
Expand All @@ -46,7 +50,8 @@ use crate::storages::fuse::table_test_fixture::TestFixture;

async fn check_segment_column_ids(
fixture: &TestFixture,
expected_column_ids: Vec<ColumnId>,
expected_column_ids: Option<Vec<ColumnId>>,
expected_column_min_max: Option<Vec<(ColumnId, (Scalar, Scalar))>>,
) -> Result<()> {
let catalog = fixture.ctx().get_catalog("default")?;
// get the latest tbl
Expand All @@ -73,33 +78,45 @@ async fn check_segment_column_ids(
};

let snapshot = snapshot_reader.read(&params).await?;
let expected_column_ids =
HashSet::<ColumnId>::from_iter(expected_column_ids.clone().iter().cloned());
for (seg_loc, _) in &snapshot.segments {
let segment_reader = MetaReaders::segment_info_reader(
fuse_table.get_operator(),
TestFixture::default_table_schema(),
);
let params = LoadParams {
location: seg_loc.clone(),
len_hint: None,
ver: SegmentInfo::VERSION,
};
let segment_info = segment_reader.read(&params).await?;
segment_info.blocks.iter().for_each(|block_meta| {
assert_eq!(
HashSet::from_iter(
block_meta
.col_stats
.keys()
.cloned()
.collect::<Vec<ColumnId>>()
.iter()
.cloned()
),
expected_column_ids,
if let Some(expected_column_min_max) = expected_column_min_max {
for (column_id, (min, max)) in expected_column_min_max {
if let Some(stat) = snapshot.summary.col_stats.get(&column_id) {
assert_eq!(min, stat.min);
assert_eq!(max, stat.max);
}
}
}

if let Some(expected_column_ids) = expected_column_ids {
let expected_column_ids =
HashSet::<ColumnId>::from_iter(expected_column_ids.clone().iter().cloned());
for (seg_loc, _) in &snapshot.segments {
let segment_reader = MetaReaders::segment_info_reader(
fuse_table.get_operator(),
TestFixture::default_table_schema(),
);
});
let params = LoadParams {
location: seg_loc.clone(),
len_hint: None,
ver: SegmentInfo::VERSION,
};
let segment_info = segment_reader.read(&params).await?;

segment_info.blocks.iter().for_each(|block_meta| {
assert_eq!(
HashSet::from_iter(
block_meta
.col_stats
.keys()
.cloned()
.collect::<Vec<ColumnId>>()
.iter()
.cloned()
),
expected_column_ids,
);
});
}
}

Ok(())
Expand Down Expand Up @@ -128,7 +145,7 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
// check column ids
// the table contains two fields: id int32, t tuple(int32, int32)
let expected_leaf_column_ids = vec![0, 1, 2];
check_segment_column_ids(&fixture, expected_leaf_column_ids).await?;
check_segment_column_ids(&fixture, Some(expected_leaf_column_ids), None).await?;

// drop a column
let drop_table_column_plan = DropTableColumnPlan {
Expand All @@ -140,11 +157,17 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
let interpreter = DropTableColumnInterpreter::try_create(ctx.clone(), drop_table_column_plan)?;
interpreter.execute(ctx.clone()).await?;

// add a column
let fields = vec![TableField::new(
"b",
TableDataType::Number(NumberDataType::UInt64),
)];
// add a column of uint64 with default value `(1,15.0)`
let fields = vec![
TableField::new("b", TableDataType::Tuple {
fields_name: vec!["b1".to_string(), "b2".to_string()],
fields_type: vec![
TableDataType::Number(NumberDataType::UInt64),
TableDataType::Number(NumberDataType::Float64),
],
})
.with_default_expr(Some("(1,15.0)".to_string())),
];
let schema = TableSchemaRefExt::create(fields);

let add_table_column_plan = AddTableColumnPlan {
Expand All @@ -161,9 +184,14 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
// insert values for new schema
let block = {
let column0 = Int32Type::from_data(vec![1, 2]);
let column2 = UInt64Type::from_data(vec![3, 4]);
let column3 = UInt64Type::from_data(vec![3, 4]);
let column4 = Float64Type::from_data(vec![13.0, 14.0]);
let tuple_column = Column::Tuple {
fields: vec![column3, column4],
len: 2,
};

DataBlock::new_from_columns(vec![column0, column2])
DataBlock::new_from_columns(vec![column0, tuple_column])
};

// get the latest tbl
Expand All @@ -181,6 +209,33 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
.append_commit_blocks(table.clone(), vec![block], false, true)
.await?;

// verify statistics min and max value
check_segment_column_ids(
&fixture,
None,
Some(vec![
(
3,
(
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)),
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(4)),
),
),
(
4,
(
Scalar::Number(common_expression::types::number::NumberScalar::Float64(
OrderedFloat(13.0),
)),
Scalar::Number(common_expression::types::number::NumberScalar::Float64(
OrderedFloat(15.0),
)),
),
),
]),
)
.await?;

// do compact
let query = format!("optimize table {db_name}.{tbl_name} compact");
let mut planner = Planner::new(ctx.clone());
Expand All @@ -190,9 +245,33 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> {
let data_stream = interpreter.execute(ctx.clone()).await?;
let _ = data_stream.try_collect::<Vec<_>>().await;

// verify statistics has only [0,3]
let expected_column_ids = vec![0, 3];
check_segment_column_ids(&fixture, expected_column_ids).await?;
// verify statistics and min\max values
let expected_column_ids = vec![0, 3, 4];
check_segment_column_ids(
&fixture,
Some(expected_column_ids),
Some(vec![
(
3,
(
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(1)),
Scalar::Number(common_expression::types::number::NumberScalar::UInt64(4)),
),
),
(
4,
(
Scalar::Number(common_expression::types::number::NumberScalar::Float64(
OrderedFloat(13.0),
)),
Scalar::Number(common_expression::types::number::NumberScalar::Float64(
OrderedFloat(15.0),
)),
),
),
]),
)
.await?;

Ok(())
}
Loading

1 comment on commit 0e1f927

@vercel
Copy link

@vercel vercel bot commented on 0e1f927 Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.rs

Please sign in to comment.