diff --git a/common/datavalues/src/data_value.rs b/common/datavalues/src/data_value.rs index ad1eda8b20cf..f1c898cbfff2 100644 --- a/common/datavalues/src/data_value.rs +++ b/common/datavalues/src/data_value.rs @@ -15,18 +15,20 @@ // Borrow from apache/arrow/rust/datafusion/src/functions.rs // See notice.md +use std::cmp::Ordering; use std::fmt; use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; use common_macros::MallocSizeOf; +use ordered_float::OrderedFloat; use serde_json::json; use crate::prelude::*; /// A specific value of a data type. -#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, PartialOrd)] +#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq)] pub enum DataValue { /// Base type. Null, @@ -44,6 +46,8 @@ pub enum DataValue { Variant(VariantValue), } +impl Eq for DataValue {} + #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, MallocSizeOf)] pub enum ValueType { Null, @@ -271,7 +275,71 @@ impl DataValue { } } -impl Eq for DataValue {} +impl PartialOrd for DataValue { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for DataValue { + fn cmp(&self, other: &Self) -> Ordering { + if self.value_type() == other.value_type() { + return match (self, other) { + (DataValue::Null, DataValue::Null) => Ordering::Equal, + (DataValue::Boolean(v1), DataValue::Boolean(v2)) => v1.cmp(v2), + (DataValue::UInt64(v1), DataValue::UInt64(v2)) => v1.cmp(v2), + (DataValue::Int64(v1), DataValue::Int64(v2)) => v1.cmp(v2), + (DataValue::Float64(v1), DataValue::Float64(v2)) => { + OrderedFloat::from(*v1).cmp(&OrderedFloat::from(*v2)) + } + (DataValue::String(v1), DataValue::String(v2)) => v1.cmp(v2), + (DataValue::Array(v1), DataValue::Array(v2)) => { + for (l, r) in v1.iter().zip(v2) { + let cmp = l.cmp(r); + if cmp != Ordering::Equal { + return cmp; + } + } + v1.len().cmp(&v2.len()) + } + (DataValue::Struct(v1), DataValue::Struct(v2)) => { + for (l, r) in v1.iter().zip(v2.iter()) { + let cmp = l.cmp(r); + if cmp != Ordering::Equal { + return cmp; + } + } + v1.len().cmp(&v2.len()) + } + (DataValue::Variant(v1), DataValue::Variant(v2)) => v1.cmp(v2), + _ => unreachable!(), + }; + } + + if self.is_null() { + return Ordering::Less; + } + + if other.is_null() { + return Ordering::Greater; + } + + if !self.is_numeric() || !other.is_numeric() { + panic!( + "Cannot compare different types with {:?} and {:?}", + self.value_type(), + other.value_type() + ); + } + + if self.is_float() || other.is_float() { + return OrderedFloat::from(self.as_f64().unwrap()) + .cmp(&OrderedFloat::from(other.as_f64().unwrap())); + } + + self.as_i64().unwrap().cmp(&other.as_i64().unwrap()) + } +} // Did not use std::convert:TryFrom // Because we do not need custom type error. diff --git a/common/exception/src/exception_code.rs b/common/exception/src/exception_code.rs index 8b677a92a297..1d226e6f2862 100644 --- a/common/exception/src/exception_code.rs +++ b/common/exception/src/exception_code.rs @@ -122,6 +122,7 @@ build_exceptions! { InvalidTimezone(1067), InvalidDate(1068), InvalidTimestamp(1069), + InvalidClusterKeys(1070), // Uncategorized error codes. UnexpectedResponseType(1066), diff --git a/query/src/procedures/systems/clustering_information.rs b/query/src/procedures/systems/clustering_information.rs new file mode 100644 index 000000000000..f6ba20d678d3 --- /dev/null +++ b/query/src/procedures/systems/clustering_information.rs @@ -0,0 +1,72 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_datavalues::DataSchema; +use common_exception::Result; + +use crate::procedures::Procedure; +use crate::procedures::ProcedureFeatures; +use crate::sessions::QueryContext; +use crate::storages::fuse::table_functions::get_cluster_keys; +use crate::storages::fuse::table_functions::ClusteringInformation; +use crate::storages::fuse::FuseTable; + +pub struct ClusteringInformationProcedure {} + +impl ClusteringInformationProcedure { + pub fn try_create() -> Result> { + Ok(Box::new(ClusteringInformationProcedure {})) + } +} + +#[async_trait::async_trait] +impl Procedure for ClusteringInformationProcedure { + fn name(&self) -> &str { + "CLUSTERING_INFORMATION" + } + + fn features(&self) -> ProcedureFeatures { + // Todo(zhyass): ProcedureFeatures::default().variadic_arguments(2, 3) + ProcedureFeatures::default().num_arguments(2) + } + + async fn inner_eval(&self, ctx: Arc, args: Vec) -> Result { + let database_name = args[0].clone(); + let table_name = args[1].clone(); + let tenant_id = ctx.get_tenant(); + let tbl = ctx + .get_catalog(ctx.get_current_catalog())? + .get_table( + tenant_id.as_str(), + database_name.as_str(), + table_name.as_str(), + ) + .await?; + + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + let definition = if args.len() > 2 { &args[2] } else { "" }; + let cluster_keys = get_cluster_keys(tbl, definition)?; + + Ok(ClusteringInformation::new(ctx, tbl, cluster_keys) + .get_clustering_info() + .await?) + } + + fn schema(&self) -> Arc { + ClusteringInformation::schema() + } +} diff --git a/query/src/procedures/systems/fuse_segment.rs b/query/src/procedures/systems/fuse_segment.rs index 356de1b22e58..dab709561517 100644 --- a/query/src/procedures/systems/fuse_segment.rs +++ b/query/src/procedures/systems/fuse_segment.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_datavalues::DataSchema; -use common_exception::ErrorCode; use common_exception::Result; use crate::procedures::Procedure; @@ -57,12 +56,7 @@ impl Procedure for FuseSegmentProcedure { ) .await?; - let tbl = tbl.as_any().downcast_ref::().ok_or_else(|| { - ErrorCode::BadArguments(format!( - "expecting fuse table, but got table of engine type: {}", - tbl.get_table_info().meta.engine - )) - })?; + let tbl = FuseTable::try_from_table(tbl.as_ref())?; Ok(FuseSegment::new(ctx, tbl, snapshot_id) .get_segments() diff --git a/query/src/procedures/systems/fuse_snapshot.rs b/query/src/procedures/systems/fuse_snapshot.rs index 00e1fcb0ed0b..c266101a21c5 100644 --- a/query/src/procedures/systems/fuse_snapshot.rs +++ b/query/src/procedures/systems/fuse_snapshot.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_datavalues::DataSchema; -use common_exception::ErrorCode; use common_exception::Result; use crate::procedures::Procedure; @@ -57,12 +56,7 @@ impl Procedure for FuseSnapshotProcedure { ) .await?; - let tbl = tbl.as_any().downcast_ref::().ok_or_else(|| { - ErrorCode::BadArguments(format!( - "expecting fuse table, but got table of engine type: {}", - tbl.get_table_info().meta.engine - )) - })?; + let tbl = FuseTable::try_from_table(tbl.as_ref())?; Ok(FuseSnapshot::new(ctx, tbl).get_history().await?) } diff --git a/query/src/procedures/systems/mod.rs b/query/src/procedures/systems/mod.rs index b7f2d935ce71..4df9c7a912ca 100644 --- a/query/src/procedures/systems/mod.rs +++ b/query/src/procedures/systems/mod.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod clustering_information; mod fuse_segment; mod fuse_snapshot; mod system; +pub use clustering_information::ClusteringInformationProcedure; pub use fuse_segment::FuseSegmentProcedure; pub use fuse_snapshot::FuseSnapshotProcedure; pub use system::SystemProcedure; diff --git a/query/src/procedures/systems/system.rs b/query/src/procedures/systems/system.rs index 35a7275aefcb..3bd46e9a9962 100644 --- a/query/src/procedures/systems/system.rs +++ b/query/src/procedures/systems/system.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::procedures::systems::ClusteringInformationProcedure; use crate::procedures::systems::FuseSegmentProcedure; use crate::procedures::systems::FuseSnapshotProcedure; use crate::procedures::ProcedureFactory; @@ -20,6 +21,10 @@ pub struct SystemProcedure; impl SystemProcedure { pub fn register(factory: &mut ProcedureFactory) { + factory.register( + "system$clustering_information", + Box::new(ClusteringInformationProcedure::try_create), + ); factory.register( "system$fuse_snapshot", Box::new(FuseSnapshotProcedure::try_create), diff --git a/query/src/storages/fuse/fuse_table.rs b/query/src/storages/fuse/fuse_table.rs index 97e36f0254db..5ac79946433a 100644 --- a/query/src/storages/fuse/fuse_table.rs +++ b/query/src/storages/fuse/fuse_table.rs @@ -82,6 +82,10 @@ impl FuseTable { &self.meta_location_generator } + pub fn cluster_keys(&self) -> Vec { + self.order_keys.clone() + } + pub fn parse_storage_prefix(table_info: &TableInfo) -> Result { let table_id = table_info.ident.table_id; let db_id = table_info diff --git a/query/src/storages/fuse/statistics/reducers.rs b/query/src/storages/fuse/statistics/reducers.rs index 8cf70828fbb9..aeeadc488e18 100644 --- a/query/src/storages/fuse/statistics/reducers.rs +++ b/query/src/storages/fuse/statistics/reducers.rs @@ -73,14 +73,14 @@ pub fn reduce_block_stats>(stats: &[T]) -> Result>>( break; } (_, true) => break, - _ => { - if let Some(cmp) = l.partial_cmp(r) { - match cmp { - Ordering::Equal => continue, - Ordering::Less => break, - Ordering::Greater => { - min = stat.min.clone(); - break; - } - } + _ => match l.cmp(r) { + Ordering::Equal => continue, + Ordering::Less => break, + Ordering::Greater => { + min = stat.min.clone(); + break; } - } + }, } } @@ -135,18 +131,14 @@ pub fn reduce_cluster_stats>>( break; } (_, true) => break, - _ => { - if let Some(cmp) = l.partial_cmp(r) { - match cmp { - Ordering::Equal => continue, - Ordering::Less => { - max = stat.max.clone(); - break; - } - Ordering::Greater => break, - } + _ => match l.cmp(r) { + Ordering::Equal => continue, + Ordering::Less => { + max = stat.max.clone(); + break; } - } + Ordering::Greater => break, + }, } } } diff --git a/query/src/storages/fuse/table_functions/clustering_informations/clustering_information.rs b/query/src/storages/fuse/table_functions/clustering_informations/clustering_information.rs new file mode 100644 index 000000000000..6397533aca13 --- /dev/null +++ b/query/src/storages/fuse/table_functions/clustering_informations/clustering_information.rs @@ -0,0 +1,223 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_datavalues::prelude::*; +use common_exception::ErrorCode; +use common_exception::Result; +use common_planners::Expression; +use serde_json::json; + +use crate::sessions::QueryContext; +use crate::storages::fuse::io::MetaReaders; +use crate::storages::fuse::meta::BlockMeta; +use crate::storages::fuse::FuseTable; + +pub struct ClusteringInformation<'a> { + pub ctx: Arc, + pub table: &'a FuseTable, + pub cluster_keys: Vec, +} + +struct ClusteringStatistics { + total_block_count: u64, + total_constant_block_count: u64, + average_overlaps: f64, + average_depth: f64, + block_depth_histogram: VariantValue, +} + +impl<'a> ClusteringInformation<'a> { + pub fn new( + ctx: Arc, + table: &'a FuseTable, + cluster_keys: Vec, + ) -> Self { + Self { + ctx, + table, + cluster_keys, + } + } + + pub async fn get_clustering_info(&self) -> Result { + let snapshot = self.table.read_table_snapshot(self.ctx.as_ref()).await?; + + let mut blocks = Vec::new(); + if let Some(snapshot) = snapshot { + let reader = MetaReaders::segment_info_reader(self.ctx.as_ref()); + for (x, ver) in &snapshot.segments { + let res = reader.read(x, None, *ver).await?; + let mut block = res.blocks.clone(); + blocks.append(&mut block); + } + }; + + let info = self.get_clustering_stats(blocks)?; + + let names = self + .cluster_keys + .iter() + .map(|x| x.column_name()) + .collect::>() + .join(", "); + let cluster_by_keys = format!("({})", names); + + Ok(DataBlock::create(ClusteringInformation::schema(), vec![ + Series::from_data(vec![cluster_by_keys]), + Series::from_data(vec![info.total_block_count]), + Series::from_data(vec![info.total_constant_block_count]), + Series::from_data(vec![info.average_overlaps]), + Series::from_data(vec![info.average_depth]), + Series::from_data(vec![info.block_depth_histogram]), + ])) + } + + fn get_min_max_stats(&self, block: &BlockMeta) -> Result<(Vec, Vec)> { + if self.table.cluster_keys() != self.cluster_keys || block.cluster_stats.is_none() { + // Todo(zhyass): support manually specifying the cluster key. + return Err(ErrorCode::UnImplement("Unimplement error")); + } + + let cluster_stats = block.cluster_stats.clone().unwrap(); + Ok((cluster_stats.min, cluster_stats.max)) + } + + fn get_clustering_stats(&self, blocks: Vec) -> Result { + if blocks.is_empty() { + return Ok(ClusteringStatistics { + total_block_count: 0, + total_constant_block_count: 0, + average_overlaps: 0.0, + average_depth: 0.0, + block_depth_histogram: VariantValue::from(json!({})), + }); + } + + // Gather all cluster statistics points to a sorted Map. + // Key: The cluster statistics points. + // Value: 0: The block indexes with key as min value; + // 1: The block indexes with key as max value; + let mut points_map: BTreeMap, (Vec, Vec)> = BTreeMap::new(); + let mut total_constant_block_count = 0; + for (i, block) in blocks.iter().enumerate() { + let (min, max) = self.get_min_max_stats(block)?; + if min.eq(&max) { + total_constant_block_count += 1; + } + + points_map + .entry(min.clone()) + .and_modify(|v| v.0.push(i)) + .or_insert((vec![i], vec![])); + + points_map + .entry(max.clone()) + .and_modify(|v| v.1.push(i)) + .or_insert((vec![], vec![i])); + } + + // calculate overlaps and depth. + let mut statis = Vec::new(); + // key: the block index. + // value: (overlaps, depth). + let mut unfinished_parts: HashMap = HashMap::new(); + for (start, end) in points_map.values() { + let point_depth = unfinished_parts.len() + start.len(); + + for (_, val) in unfinished_parts.iter_mut() { + val.0 += start.len(); + val.1 = cmp::max(val.1, point_depth); + } + + start.iter().for_each(|&idx| { + unfinished_parts.insert(idx, (point_depth - 1, point_depth)); + }); + + end.iter().for_each(|&idx| { + let stat = unfinished_parts.remove(&idx).unwrap(); + statis.push(stat); + }); + } + assert_eq!(unfinished_parts.len(), 0); + + let mut sum_overlap = 0; + let mut sum_depth = 0; + let length = statis.len(); + let mp = statis + .into_iter() + .fold(BTreeMap::new(), |mut acc, (overlap, depth)| { + sum_overlap += overlap; + sum_depth += depth; + + let bucket = get_buckets(depth); + acc.entry(bucket).and_modify(|v| *v += 1).or_insert(1u32); + acc + }); + // round the float to 4 decimal places. + let average_depth = (10000.0 * sum_depth as f64 / length as f64).round() / 10000.0; + let average_overlaps = (10000.0 * sum_overlap as f64 / length as f64).round() / 10000.0; + + let objects = mp.iter().fold( + serde_json::Map::with_capacity(mp.len()), + |mut acc, (bucket, count)| { + acc.insert(format!("{:05}", bucket), json!(count)); + acc + }, + ); + let block_depth_histogram = VariantValue::from(serde_json::Value::Object(objects)); + + Ok(ClusteringStatistics { + total_block_count: blocks.len() as u64, + total_constant_block_count, + average_overlaps, + average_depth, + block_depth_histogram, + }) + } + + pub fn schema() -> Arc { + DataSchemaRefExt::create(vec![ + DataField::new("cluster_by_keys", Vu8::to_data_type()), + DataField::new("total_block_count", u64::to_data_type()), + DataField::new("total_constant_block_count", u64::to_data_type()), + DataField::new("average_overlaps", f64::to_data_type()), + DataField::new("average_depth", f64::to_data_type()), + DataField::new("block_depth_histogram", VariantArrayType::new_impl()), + ]) + } +} + +// The histogram contains buckets with widths: +// 1 to 16 with increments of 1. +// For buckets larger than 16, increments of twice the width of the previous bucket (e.g. 32, 64, 128, …). +// e.g. If val is 2, the bucket is 2. If val is 18, the bucket is 32. +fn get_buckets(val: usize) -> u32 { + let mut val = val as u32; + if val <= 16 || val & (val - 1) == 0 { + return val; + } + + val |= val >> 1; + val |= val >> 2; + val |= val >> 4; + val |= val >> 8; + val |= val >> 16; + val + 1 +} diff --git a/query/src/storages/fuse/table_functions/clustering_informations/clustering_information_table.rs b/query/src/storages/fuse/table_functions/clustering_informations/clustering_information_table.rs new file mode 100644 index 000000000000..36d43018d7e2 --- /dev/null +++ b/query/src/storages/fuse/table_functions/clustering_informations/clustering_information_table.rs @@ -0,0 +1,239 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use std::any::Any; +use std::future::Future; +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_exception::Result; +use common_meta_types::TableIdent; +use common_meta_types::TableInfo; +use common_meta_types::TableMeta; +use common_planners::Expression; +use common_planners::Extras; +use common_planners::Partitions; +use common_planners::ReadDataSourcePlan; +use common_planners::Statistics; +use common_streams::DataBlockStream; +use common_streams::SendableDataBlockStream; + +use super::clustering_information::ClusteringInformation; +use super::table_args::get_cluster_keys; +use super::table_args::parse_func_table_args; +use crate::catalogs::CATALOG_DEFAULT; +use crate::pipelines::new::processors::port::OutputPort; +use crate::pipelines::new::processors::processor::ProcessorPtr; +use crate::pipelines::new::processors::AsyncSource; +use crate::pipelines::new::processors::AsyncSourcer; +use crate::pipelines::new::NewPipe; +use crate::pipelines::new::NewPipeline; +use crate::sessions::QueryContext; +use crate::storages::fuse::table_functions::string_literal; +use crate::storages::fuse::FuseTable; +use crate::storages::Table; +use crate::table_functions::TableArgs; +use crate::table_functions::TableFunction; + +const FUSE_FUNC_CLUSTERING: &str = "clustering_information"; + +pub struct ClusteringInformationTable { + table_info: TableInfo, + arg_database_name: String, + arg_table_name: String, + // Todo(zhyass): support define cluster_keys. + arg_cluster_keys: String, +} + +impl ClusteringInformationTable { + pub fn create( + database_name: &str, + table_func_name: &str, + table_id: u64, + table_args: TableArgs, + ) -> Result> { + let (arg_database_name, arg_table_name) = parse_func_table_args(&table_args)?; + + let engine = FUSE_FUNC_CLUSTERING.to_owned(); + + let table_info = TableInfo { + ident: TableIdent::new(table_id, 0), + desc: format!("'{}'.'{}'", database_name, table_func_name), + name: table_func_name.to_string(), + meta: TableMeta { + schema: ClusteringInformation::schema(), + engine, + ..Default::default() + }, + }; + + Ok(Arc::new(Self { + table_info, + arg_database_name, + arg_table_name, + arg_cluster_keys: "".to_string(), + })) + } +} + +#[async_trait::async_trait] +impl Table for ClusteringInformationTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + async fn read_partitions( + &self, + _ctx: Arc, + _push_downs: Option, + ) -> Result<(Statistics, Partitions)> { + Ok((Statistics::default(), vec![])) + } + + fn table_args(&self) -> Option> { + Some(vec![ + string_literal(self.arg_database_name.as_str()), + string_literal(self.arg_table_name.as_str()), + ]) + } + + async fn read( + &self, + ctx: Arc, + _plan: &ReadDataSourcePlan, + ) -> Result { + let tenant_id = ctx.get_tenant(); + let tbl = ctx + .get_catalog(CATALOG_DEFAULT)? + .get_table( + tenant_id.as_str(), + self.arg_database_name.as_str(), + self.arg_table_name.as_str(), + ) + .await?; + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + + let cluster_keys = get_cluster_keys(tbl, &self.arg_cluster_keys)?; + + let blocks = vec![ + ClusteringInformation::new(ctx.clone(), tbl, cluster_keys) + .get_clustering_info() + .await?, + ]; + Ok(Box::pin(DataBlockStream::create( + ClusteringInformation::schema(), + None, + blocks, + ))) + } + + fn read2( + &self, + ctx: Arc, + _: &ReadDataSourcePlan, + pipeline: &mut NewPipeline, + ) -> Result<()> { + let output = OutputPort::create(); + pipeline.add_pipe(NewPipe::SimplePipe { + inputs_port: vec![], + outputs_port: vec![output.clone()], + processors: vec![ClusteringInformationSource::create( + ctx, + output, + self.arg_database_name.to_owned(), + self.arg_table_name.to_owned(), + self.arg_cluster_keys.to_owned(), + )?], + }); + + Ok(()) + } +} + +struct ClusteringInformationSource { + finish: bool, + ctx: Arc, + arg_database_name: String, + arg_table_name: String, + arg_cluster_keys: String, +} + +impl ClusteringInformationSource { + pub fn create( + ctx: Arc, + output: Arc, + arg_database_name: String, + arg_table_name: String, + arg_cluster_keys: String, + ) -> Result { + AsyncSourcer::create(ctx.clone(), output, ClusteringInformationSource { + ctx, + finish: false, + arg_table_name, + arg_database_name, + arg_cluster_keys, + }) + } +} + +impl AsyncSource for ClusteringInformationSource { + const NAME: &'static str = "clustering_information"; + + type BlockFuture<'a> = impl Future>> where Self: 'a; + + fn generate(&mut self) -> Self::BlockFuture<'_> { + async { + if self.finish { + return Ok(None); + } + + self.finish = true; + let tenant_id = self.ctx.get_tenant(); + let tbl = self + .ctx + .get_catalog(CATALOG_DEFAULT)? + .get_table( + tenant_id.as_str(), + self.arg_database_name.as_str(), + self.arg_table_name.as_str(), + ) + .await?; + + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + let cluster_keys = get_cluster_keys(tbl, &self.arg_cluster_keys)?; + + Ok(Some( + ClusteringInformation::new(self.ctx.clone(), tbl, cluster_keys) + .get_clustering_info() + .await?, + )) + } + } +} + +impl TableFunction for ClusteringInformationTable { + fn function_name(&self) -> &str { + self.name() + } + + fn as_table<'a>(self: Arc) -> Arc + where Self: 'a { + self + } +} diff --git a/query/src/storages/fuse/table_functions/clustering_informations/mod.rs b/query/src/storages/fuse/table_functions/clustering_informations/mod.rs new file mode 100644 index 000000000000..9e13889fafaa --- /dev/null +++ b/query/src/storages/fuse/table_functions/clustering_informations/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +mod clustering_information; +mod clustering_information_table; +mod table_args; + +pub use clustering_information::ClusteringInformation; +pub use clustering_information_table::ClusteringInformationTable; +pub use table_args::*; diff --git a/query/src/storages/fuse/table_functions/clustering_informations/table_args.rs b/query/src/storages/fuse/table_functions/clustering_informations/table_args.rs new file mode 100644 index 000000000000..bc0cda711645 --- /dev/null +++ b/query/src/storages/fuse/table_functions/clustering_informations/table_args.rs @@ -0,0 +1,62 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::ErrorCode; +use common_exception::Result; +use common_planners::validate_expression; +use common_planners::Expression; + +use crate::sql::PlanParser; +use crate::storages::fuse::table_functions::string_value; +use crate::storages::fuse::FuseTable; +use crate::storages::Table; +use crate::table_functions::TableArgs; + +pub fn parse_func_table_args(table_args: &TableArgs) -> Result<(String, String)> { + match table_args { + // Todo(zhyass): support 3 arguments. + Some(args) if args.len() == 2 => { + let db = string_value(&args[0])?; + let tbl = string_value(&args[1])?; + Ok((db, tbl)) + } + _ => Err(ErrorCode::BadArguments(format!( + "expecting database and table name (as two string literals), but got {:?}", + table_args + ))), + } +} + +pub fn get_cluster_keys(table: &FuseTable, definition: &str) -> Result> { + let cluster_keys = if !definition.is_empty() { + let schema = table.schema(); + let exprs = PlanParser::parse_exprs(definition)?; + for expr in exprs.iter() { + validate_expression(expr, &schema)?; + } + + exprs + } else { + table.cluster_keys() + }; + + if cluster_keys.is_empty() { + return Err(ErrorCode::InvalidClusterKeys(format!( + "Invalid clustering keys or table {} is not clustered", + table.name() + ))); + } + + Ok(cluster_keys) +} diff --git a/query/src/storages/fuse/table_functions/fuse_segments/fuse_segment_table.rs b/query/src/storages/fuse/table_functions/fuse_segments/fuse_segment_table.rs index 99d48e4f7e70..30a397d8cfff 100644 --- a/query/src/storages/fuse/table_functions/fuse_segments/fuse_segment_table.rs +++ b/query/src/storages/fuse/table_functions/fuse_segments/fuse_segment_table.rs @@ -18,7 +18,6 @@ use std::future::Future; use std::sync::Arc; use common_datablocks::DataBlock; -use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::TableIdent; use common_meta_types::TableInfo; @@ -129,12 +128,7 @@ impl Table for FuseSegmentTable { ) .await?; - let tbl = tbl.as_any().downcast_ref::().ok_or_else(|| { - ErrorCode::BadArguments(format!( - "expecting fuse table, but got table of engine type: {}", - tbl.get_table_info().meta.engine - )) - })?; + let tbl = FuseTable::try_from_table(tbl.as_ref())?; let blocks = vec![ FuseSegment::new(ctx.clone(), tbl, self.arg_snapshot_id.clone()) diff --git a/query/src/storages/fuse/table_functions/fuse_snapshots/fuse_snapshot_table.rs b/query/src/storages/fuse/table_functions/fuse_snapshots/fuse_snapshot_table.rs index 8430aa82997a..0fde8e386fa1 100644 --- a/query/src/storages/fuse/table_functions/fuse_snapshots/fuse_snapshot_table.rs +++ b/query/src/storages/fuse/table_functions/fuse_snapshots/fuse_snapshot_table.rs @@ -18,7 +18,6 @@ use std::future::Future; use std::sync::Arc; use common_datablocks::DataBlock; -use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::TableIdent; use common_meta_types::TableInfo; @@ -128,12 +127,7 @@ impl Table for FuseSnapshotTable { ) .await?; - let tbl = tbl.as_any().downcast_ref::().ok_or_else(|| { - ErrorCode::BadArguments(format!( - "expecting fuse table, but got table of engine type: {}", - tbl.get_table_info().meta.engine - )) - })?; + let tbl = FuseTable::try_from_table(tbl.as_ref())?; let blocks = vec![FuseSnapshot::new(ctx.clone(), tbl).get_history().await?]; Ok(Box::pin(DataBlockStream::create( diff --git a/query/src/storages/fuse/table_functions/mod.rs b/query/src/storages/fuse/table_functions/mod.rs index 085d304ec51b..39fc9571d38a 100644 --- a/query/src/storages/fuse/table_functions/mod.rs +++ b/query/src/storages/fuse/table_functions/mod.rs @@ -13,10 +13,14 @@ // limitations under the License. // +mod clustering_informations; mod fuse_segments; mod fuse_snapshots; mod table_args; +pub use clustering_informations::get_cluster_keys; +pub use clustering_informations::ClusteringInformation; +pub use clustering_informations::ClusteringInformationTable; pub use fuse_segments::FuseSegment; pub use fuse_segments::FuseSegmentTable; pub use fuse_snapshots::FuseSnapshot; diff --git a/query/src/storages/index/cluster_key.rs b/query/src/storages/index/cluster_key.rs new file mode 100644 index 000000000000..cc86ad85a9eb --- /dev/null +++ b/query/src/storages/index/cluster_key.rs @@ -0,0 +1,166 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::HashSet; + +use common_arrow::arrow::compute::sort as arrow_sort; +use common_datablocks::DataBlock; +use common_datavalues::prelude::*; +use common_exception::ErrorCode; +use common_exception::Result; +use common_planners::Expression; +use common_planners::ExpressionMonotonicityVisitor; +use common_planners::RequireColumnsVisitor; +use itertools::Itertools; +use serde_json::json; + +use crate::storages::fuse::meta::BlockMeta; +use crate::storages::index::range_filter::check_maybe_monotonic; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct ClusterStatistics { + pub min: Vec, + pub max: Vec, +} + +#[derive(Clone)] +pub struct ClusteringInformationExecutor { + blocks: Vec, + // (start, end). + points_map: BTreeMap, (Vec, Vec)>, + const_block_count: usize, +} + +pub struct ClusteringInformation { + pub total_block_count: u64, + pub total_constant_block_count: u64, + pub average_overlaps: f64, + pub average_depth: f64, + pub block_depth_histogram: VariantValue, +} + +impl ClusteringInformationExecutor { + pub fn create_by_cluster(blocks: Vec) -> Result { + let mut points_map: BTreeMap, (Vec, Vec)> = BTreeMap::new(); + let mut const_block_count = 0; + for (i, block) in blocks.iter().enumerate() { + // Todo(zhyass): if cluster_stats is none. + let min = block.cluster_stats.clone().unwrap().min; + let max = block.cluster_stats.clone().unwrap().max; + if min.eq(&max) { + const_block_count += 1; + } + + points_map + .entry(min.clone()) + .and_modify(|v| v.0.push(i)) + .or_insert((vec![i], vec![])); + + points_map + .entry(max.clone()) + .and_modify(|v| v.1.push(i)) + .or_insert((vec![], vec![i])); + } + + Ok(ClusteringInformationExecutor { + blocks, + points_map, + const_block_count, + }) + } + + pub fn execute(&self) -> Result { + if self.blocks.is_empty() { + return Ok(ClusteringInformation { + total_block_count: 0, + total_constant_block_count: 0, + average_overlaps: 0.0, + average_depth: 0.0, + block_depth_histogram: VariantValue::from(json!(null)), + }); + } + + let mut statis = Vec::new(); + let mut unfinished_parts: HashMap = HashMap::new(); + for (key, (start, end)) in &self.points_map { + let point_depth = unfinished_parts.len() + start.len(); + + for (_, val) in unfinished_parts.iter_mut() { + val.0 += start.len(); + val.1 = cmp::max(val.1, point_depth); + } + + start.iter().for_each(|&idx| { + unfinished_parts.insert(idx, (point_depth - 1, point_depth)); + }); + + end.iter().for_each(|&idx| { + let stat = unfinished_parts.remove(&idx).unwrap(); + statis.push(stat); + }); + } + assert_eq!(unfinished_parts.len(), 0); + + let mut sum_overlap = 0; + let mut sum_depth = 0; + let length = statis.len(); + let mp = statis + .into_iter() + .fold(BTreeMap::new(), |mut acc, (overlap, depth)| { + sum_overlap += overlap; + sum_depth += depth; + + let bucket = get_buckets(depth); + acc.entry(bucket).and_modify(|v| *v += 1).or_insert(1u32); + acc + }); + // round the float to 4 decimal places. + let average_depth = (10000.0 * sum_depth as f64 / length as f64).round() / 10000.0; + let average_overlaps = (10000.0 * sum_overlap as f64 / length as f64).round() / 10000.0; + + let objects = mp.iter().fold( + serde_json::Map::with_capacity(mp.len()), + |mut acc, (bucket, count)| { + acc.insert(format!("{:05}", bucket), json!(count)); + acc + }, + ); + let block_depth_histogram = VariantValue::from(serde_json::Value::Object(objects)); + + Ok(ClusteringInformation { + total_block_count: self.blocks.len() as u64, + total_constant_block_count: self.const_block_count as u64, + average_overlaps, + average_depth, + block_depth_histogram, + }) + } +} + +fn get_buckets(val: usize) -> u32 { + let mut val = val as u32; + if val <= 16 || val & (val - 1) == 0 { + return val; + } + + val |= val >> 1; + val |= val >> 2; + val |= val >> 4; + val |= val >> 8; + val |= val >> 16; + val + 1 +} diff --git a/query/src/storages/index/range_filter.rs b/query/src/storages/index/range_filter.rs index 568e99feefd3..b39b7dafdf46 100644 --- a/query/src/storages/index/range_filter.rs +++ b/query/src/storages/index/range_filter.rs @@ -626,7 +626,7 @@ fn get_maybe_monotonic(op: &str, args: Expressions) -> Result { Ok(true) } -fn check_maybe_monotonic(expr: &Expression) -> Result { +pub fn check_maybe_monotonic(expr: &Expression) -> Result { match expr { Expression::Literal { .. } => Ok(true), Expression::Column { .. } => Ok(true), diff --git a/query/src/table_functions/table_function_factory.rs b/query/src/table_functions/table_function_factory.rs index 3f0b32cca538..b344415c4b11 100644 --- a/query/src/table_functions/table_function_factory.rs +++ b/query/src/table_functions/table_function_factory.rs @@ -24,6 +24,7 @@ use common_planners::Expression; use crate::catalogs::SYS_TBL_FUC_ID_END; use crate::catalogs::SYS_TBL_FUNC_ID_BEGIN; +use crate::storages::fuse::table_functions::ClusteringInformationTable; use crate::storages::fuse::table_functions::FuseSegmentTable; use crate::storages::fuse::table_functions::FuseSnapshotTable; use crate::table_functions::async_crash_me::AsyncCrashMeTable; @@ -105,6 +106,11 @@ impl TableFunctionFactory { (next_id(), Arc::new(FuseSegmentTable::create)), ); + creators.insert( + "clustering_information".to_string(), + (next_id(), Arc::new(ClusteringInformationTable::create)), + ); + creators.insert( "async_crash_me".to_string(), (next_id(), Arc::new(AsyncCrashMeTable::create)), diff --git a/query/tests/it/interpreters/interpreter_call.rs b/query/tests/it/interpreters/interpreter_call.rs index c475bd1d65b6..013333067bf0 100644 --- a/query/tests/it/interpreters/interpreter_call.rs +++ b/query/tests/it/interpreters/interpreter_call.rs @@ -81,7 +81,7 @@ async fn test_call_fuse_snapshot_interpreter() -> Result<()> { let res = executor.execute(None).await; assert_eq!(res.is_err(), true); let expect = - "Code: 1006, displayText = expecting fuse table, but got table of engine type: SystemTables."; + "Code: 1015, displayText = expects table of engine FUSE, but got SystemTables."; assert_eq!(expect, res.err().unwrap().to_string()); } @@ -106,6 +106,107 @@ async fn test_call_fuse_snapshot_interpreter() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_call_clustering_information_interpreter() -> Result<()> { + common_tracing::init_default_ut_tracing(); + + let ctx = crate::tests::create_query_context().await?; + + // NumberArgumentsNotMatch + { + let plan = PlanParser::parse(ctx.clone(), "call system$clustering_information()").await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + assert_eq!(executor.name(), "CallInterpreter"); + let res = executor.execute(None).await; + assert_eq!(res.is_err(), true); + let expect = "Code: 1028, displayText = Function `CLUSTERING_INFORMATION` expect to have 2 arguments, but got 0."; + assert_eq!(expect, res.err().unwrap().to_string()); + } + + // UnknownTable + { + let plan = PlanParser::parse( + ctx.clone(), + "call system$clustering_information(default, test)", + ) + .await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + assert_eq!(executor.name(), "CallInterpreter"); + let res = executor.execute(None).await; + assert_eq!(res.is_err(), true); + assert_eq!( + res.err().unwrap().code(), + ErrorCode::UnknownTable("").code() + ); + } + + // BadArguments + { + let plan = PlanParser::parse( + ctx.clone(), + "call system$clustering_information(system, tables)", + ) + .await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + assert_eq!(executor.name(), "CallInterpreter"); + let res = executor.execute(None).await; + assert_eq!(res.is_err(), true); + let expect = + "Code: 1015, displayText = expects table of engine FUSE, but got SystemTables."; + assert_eq!(expect, res.err().unwrap().to_string()); + } + + // Create table a + { + let query = "\ + CREATE TABLE default.a(a bigint)\ + "; + + let plan = PlanParser::parse(ctx.clone(), query).await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + let _ = executor.execute(None).await?; + } + + // Unclustered. + { + let plan = PlanParser::parse( + ctx.clone(), + "call system$clustering_information(default, a)", + ) + .await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + let res = executor.execute(None).await; + assert_eq!(res.is_err(), true); + let expect = + "Code: 1070, displayText = Invalid clustering keys or table a is not clustered."; + assert_eq!(expect, res.err().unwrap().to_string()); + } + + // Create table b + { + let query = "\ + CREATE TABLE default.b(a bigint) cluster by(a)\ + "; + + let plan = PlanParser::parse(ctx.clone(), query).await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + let _ = executor.execute(None).await?; + } + + // FuseHistory + { + let plan = PlanParser::parse( + ctx.clone(), + "call system$clustering_information(default, b)", + ) + .await?; + let executor = InterpreterFactory::get(ctx.clone(), plan.clone())?; + let _ = executor.execute(None).await?; + } + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_call_bootstrap_tenant_interpreter() -> Result<()> { common_tracing::init_default_ut_tracing(); diff --git a/query/tests/it/storages/fuse/table_functions/clustering_information_table.rs b/query/tests/it/storages/fuse/table_functions/clustering_information_table.rs new file mode 100644 index 000000000000..6fead086e853 --- /dev/null +++ b/query/tests/it/storages/fuse/table_functions/clustering_information_table.rs @@ -0,0 +1,104 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use common_base::base::tokio; +use common_datablocks::DataBlock; +use common_datavalues::prelude::*; +use common_exception::ErrorCode; +use common_exception::Result; +use common_planners::*; +use databend_query::interpreters::CreateTableInterpreter; +use tokio_stream::StreamExt; + +use crate::storages::fuse::table_test_fixture::TestFixture; +use crate::storages::fuse::table_test_fixture::*; + +#[tokio::test] +async fn test_clustering_information_table_read() -> Result<()> { + let fixture = TestFixture::new().await; + let db = fixture.default_db_name(); + let tbl = fixture.default_table_name(); + let ctx = fixture.ctx(); + + // test db & table + let create_table_plan = fixture.default_crate_table_plan(); + let interpreter = CreateTableInterpreter::try_create(ctx.clone(), create_table_plan)?; + interpreter.execute(None).await?; + + // func args + let arg_db = Expression::create_literal(DataValue::String(db.as_bytes().to_vec())); + let arg_tbl = Expression::create_literal(DataValue::String(tbl.as_bytes().to_vec())); + + { + let expected = vec![ + "+-----------------+-------------------+----------------------------+------------------+---------------+-----------------------+", + "| cluster_by_keys | total_block_count | total_constant_block_count | average_overlaps | average_depth | block_depth_histogram |", + "+-----------------+-------------------+----------------------------+------------------+---------------+-----------------------+", + "| (id) | 0 | 0 | 0 | 0 | {} |", + "+-----------------+-------------------+----------------------------+------------------+---------------+-----------------------+", + ]; + + expects_ok( + "empty_data_set", + test_drive_clustering_information( + Some(vec![arg_db.clone(), arg_tbl.clone()]), + ctx.clone(), + ) + .await, + expected, + ) + .await?; + } + + { + let qry = format!("insert into {}.{} values(1),(2)", db, tbl); + execute_query(ctx.clone(), qry.as_str()).await?; + let expected = vec![ + "+-----------------+-------------------+----------------------------+------------------+---------------+-----------------------+", + "| cluster_by_keys | total_block_count | total_constant_block_count | average_overlaps | average_depth | block_depth_histogram |", + "+-----------------+-------------------+----------------------------+------------------+---------------+-----------------------+", + "| (id) | 1 | 0 | 0 | 1 | {\"00001\":1} |", + "+-----------------+-------------------+----------------------------+------------------+---------------+-----------------------+", + ]; + + let qry = format!("select * from clustering_information('{}', '{}')", db, tbl); + + expects_ok( + "clustering_information", + execute_query(ctx.clone(), qry.as_str()).await, + expected, + ) + .await?; + } + + { + // incompatible table engine + let qry = format!("create table {}.in_mem (a int) engine =Memory", db); + execute_query(ctx.clone(), qry.as_str()).await?; + + let qry = format!( + "select * from clustering_information('{}', '{}')", + db, "in_mem" + ); + let output_stream = execute_query(ctx.clone(), qry.as_str()).await?; + expects_err( + "unsupported_table_engine", + ErrorCode::logical_error_code(), + output_stream.collect::>>().await, + ); + } + + Ok(()) +} diff --git a/query/tests/it/storages/fuse/table_functions/mod.rs b/query/tests/it/storages/fuse/table_functions/mod.rs index 452b1de55106..209cc8385116 100644 --- a/query/tests/it/storages/fuse/table_functions/mod.rs +++ b/query/tests/it/storages/fuse/table_functions/mod.rs @@ -13,4 +13,5 @@ // limitations under the License. // +mod clustering_information_table; mod fuse_snapshot_table; diff --git a/query/tests/it/storages/fuse/table_test_fixture.rs b/query/tests/it/storages/fuse/table_test_fixture.rs index 499d9a5bf1a2..e350d55d0f13 100644 --- a/query/tests/it/storages/fuse/table_test_fixture.rs +++ b/query/tests/it/storages/fuse/table_test_fixture.rs @@ -23,6 +23,7 @@ use common_io::prelude::StorageFsConfig; use common_io::prelude::StorageParams; use common_meta_types::DatabaseMeta; use common_meta_types::TableMeta; +use common_planners::col; use common_planners::CreateDatabasePlan; use common_planners::CreateTablePlan; use common_planners::Expression; @@ -34,6 +35,7 @@ use databend_query::interpreters::InterpreterFactory; use databend_query::sessions::QueryContext; use databend_query::sql::PlanParser; use databend_query::sql::OPT_KEY_DATABASE_ID; +use databend_query::storages::fuse::table_functions::ClusteringInformationTable; use databend_query::storages::fuse::table_functions::FuseSnapshotTable; use databend_query::storages::fuse::FUSE_TBL_BLOCK_PREFIX; use databend_query::storages::fuse::FUSE_TBL_SEGMENT_PREFIX; @@ -133,10 +135,11 @@ impl TestFixture { (OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()), ] .into(), + order_keys: Some("(id)".to_string()), ..Default::default() }, as_select: None, - order_keys: vec![], + order_keys: vec![col("id")], } } @@ -240,6 +243,20 @@ pub async fn test_drive_with_args_and_ctx( func.read(ctx, &source_plan).await } +pub async fn test_drive_clustering_information( + tbl_args: TableArgs, + ctx: std::sync::Arc, +) -> Result { + let func = ClusteringInformationTable::create("system", "clustering_information", 1, tbl_args)?; + let source_plan = func + .clone() + .as_table() + .read_plan(ctx.clone(), Some(Extras::default())) + .await?; + ctx.try_set_partitions(source_plan.parts.clone())?; + func.read(ctx, &source_plan).await +} + pub fn expects_err(case_name: &str, err_code: u16, res: Result) { if let Err(err) = res { assert_eq!( diff --git a/tests/suites/0_stateless/09_fuse_engine/09_0014_func_clustering_information_function.result b/tests/suites/0_stateless/09_fuse_engine/09_0014_func_clustering_information_function.result new file mode 100644 index 000000000000..be00f2c51816 --- /dev/null +++ b/tests/suites/0_stateless/09_fuse_engine/09_0014_func_clustering_information_function.result @@ -0,0 +1,5 @@ +1 1 +2 1 +0 3 +1 3 +4 4 diff --git a/tests/suites/0_stateless/09_fuse_engine/09_0014_func_clustering_information_function.sql b/tests/suites/0_stateless/09_fuse_engine/09_0014_func_clustering_information_function.sql new file mode 100755 index 000000000000..60b62363c47e --- /dev/null +++ b/tests/suites/0_stateless/09_fuse_engine/09_0014_func_clustering_information_function.sql @@ -0,0 +1,14 @@ +-- Create table t09_0014 +create table t09_0014(a int, b int) cluster by(b,a); + +insert into t09_0014 values(0,3),(1,1); +insert into t09_0014 values(1,3),(2,1); +insert into t09_0014 values(4,4); + +select * from t09_0014 order by b, a; + +--Bug in cluster mode: https://github.com/datafuselabs/databend/issues/5473 +--select * from clustering_information('default','t09_0014'); + +-- Drop table. +drop table t09_0014;