diff --git a/Cargo.lock b/Cargo.lock index 1ac3f274d456..dd5e4134a183 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -338,6 +338,12 @@ dependencies = [ "syn", ] +[[package]] +name = "async-task" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524" + [[package]] name = "async-trait" version = "0.1.60" @@ -369,6 +375,12 @@ dependencies = [ "critical-section", ] +[[package]] +name = "atomic-waker" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" + [[package]] name = "atty" version = "0.2.14" @@ -491,12 +503,6 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" -[[package]] -name = "base64" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" - [[package]] name = "base64" version = "0.21.0" @@ -629,6 +635,20 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", +] + [[package]] name = "borsh" version = "0.9.3" @@ -4105,12 +4125,14 @@ dependencies = [ [[package]] name = "hdrs" -version = "0.1.7" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "949814fc49979183bd43fcd899671f706f7246586738beb3f42fdafa528ec7f3" +checksum = "70a8d6dab41a27fd3825810f6413f16f15733c84114504e41955516e4156676c" dependencies = [ + "async-lock", + "blocking", "errno", - "futures-io", + "futures", "hdfs-sys", "libc", "log", @@ -5540,9 +5562,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.24.4" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9ea6283b48c0365596bf784bf3785866f05c8c88e3289a0ef7bc165c5d27651" +checksum = "f98a4e81f8e9979209a54098150c2deac5a8c57160b5781819d8a77142a5be53" dependencies = [ "anyhow", "async-compat", @@ -5556,6 +5578,7 @@ dependencies = [ "futures", "hdrs", "http", + "hyper", "log", "md-5", "metrics", @@ -5574,6 +5597,7 @@ dependencies = [ "time 0.3.17", "tokio", "tracing", + "trust-dns-resolver", "ureq", "uuid", ] @@ -6741,13 +6765,13 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.7.5" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64f8427141aacf5f9011c2f83ebc109e57764587d562f91e387ec1ffcc54a5a2" +checksum = "3f446438814fde3785305a59a85a6d1b361ce2c9d29e58dd87c9103a242c40b6" dependencies = [ "anyhow", "backon", - "base64 0.20.0", + "base64 0.21.0", "bytes", "dirs", "form_urlencoded", @@ -7771,8 +7795,7 @@ checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" [[package]] name = "strawboat" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbad34588af0e413a53579cbc362700a5bb20d2fbcdc89cf738e5c099e86118a" +source = "git+https://github.com/sundy-li/strawboat?rev=92a8e4f#92a8e4fcf499827145ac7f32315b2228552507e5" dependencies = [ "arrow2", "bytemuck", diff --git a/Cargo.toml b/Cargo.toml index a0da5d923fac..4019bec86255 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ members = [ # databend maintains: openraft = { git = "https://github.com/drmingdrmer/openraft", tag = "v0.7.4-alpha.3" } sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1", default-features = false } -opendal = "0.24" +opendal = { version = "0.25" } ordered-float = { version = "3.4.0", default-features = false } # error diff --git a/src/common/arrow/Cargo.toml b/src/common/arrow/Cargo.toml index 9c6f96559607..76ffe4d76ac8 100644 --- a/src/common/arrow/Cargo.toml +++ b/src/common/arrow/Cargo.toml @@ -42,7 +42,7 @@ arrow = { package = "arrow2", version = "0.15.0", default-features = false, feat arrow-format = { version = "0.8.0", features = ["flight-data", "flight-service", "ipc"] } futures = "0.3.24" -native = { package = "strawboat", version = "0.1.0" } +native = { package = "strawboat", git = "https://github.com/sundy-li/strawboat", rev = "92a8e4f" } parquet2 = { version = "0.17.0", default_features = false, features = ["serde_types"] } [dev-dependencies] diff --git a/src/common/hashtable/src/hashtable.rs b/src/common/hashtable/src/hashtable.rs index 8641b8c6cf42..93b69cb1742e 100644 --- a/src/common/hashtable/src/hashtable.rs +++ b/src/common/hashtable/src/hashtable.rs @@ -189,13 +189,7 @@ where _alignment: [0; 0], })); } - while (self.table.len() + other.table.len()) * 2 > self.table.capacity() { - if (self.table.entries.len() >> 22) == 0 { - self.table.grow(2); - } else { - self.table.grow(1); - } - } + unsafe { self.table.set_merge(&other.table); } diff --git a/src/common/io/src/position.rs b/src/common/io/src/position.rs index 5a8c36f6f301..268508a92562 100644 --- a/src/common/io/src/position.rs +++ b/src/common/io/src/position.rs @@ -383,12 +383,11 @@ fn position_sse42< if _mm_cmpestrc::<0>(chars_set, chars_count, bytes, 16) > 0 { return index + _mm_cmpestri::<0>(chars_set, chars_count, bytes, 16) as usize; } - } else { - if _mm_cmpestrc::<_SIDD_NEGATIVE_POLARITY>(chars_set, chars_count, bytes, 16) > 0 { - return index - + _mm_cmpestri::<_SIDD_NEGATIVE_POLARITY>(chars_set, chars_count, bytes, 16) - as usize; - } + } else if _mm_cmpestrc::<_SIDD_NEGATIVE_POLARITY>(chars_set, chars_count, bytes, 16) > 0 + { + return index + + _mm_cmpestri::<_SIDD_NEGATIVE_POLARITY>(chars_set, chars_count, bytes, 16) + as usize; } index += 16; diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index f489a2cdb740..faa5d78cfad5 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -250,7 +250,7 @@ fn init_s3_operator(cfg: &StorageS3Config) -> Result { // Disable credential loader if cfg.disable_credential_loader { - builder.disable_credential_loader(); + builder.disable_config_load(); } // Enable virtual host style diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 06f7776cc8ce..704a3dc53b6d 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -329,6 +329,24 @@ impl DataBlock { Ok(DataBlock::new(cols, arrow_chunk.len())) } + + pub fn from_arrow_chunk_with_types>( + arrow_chunk: &ArrowChunk, + data_types: &[DataType], + ) -> Result { + let cols = data_types + .iter() + .zip(arrow_chunk.arrays()) + .map(|(data_type, col)| { + Ok(BlockEntry { + data_type: data_type.clone(), + value: Value::Column(Column::from_arrow(col.as_ref(), data_type)), + }) + }) + .collect::>()?; + + Ok(DataBlock::new(cols, arrow_chunk.len())) + } } impl TryFrom for ArrowChunk { diff --git a/src/query/expression/src/types/string.rs b/src/query/expression/src/types/string.rs index 848d3ef3bafc..288ec3d2396e 100644 --- a/src/query/expression/src/types/string.rs +++ b/src/query/expression/src/types/string.rs @@ -212,7 +212,7 @@ impl StringColumn { } pub struct StringIterator<'a> { - data: &'a Buffer, + data: &'a [u8], offsets: std::slice::Windows<'a, u64>, } diff --git a/src/query/functions/src/aggregates/aggregator_common.rs b/src/query/functions/src/aggregates/aggregator_common.rs index 510f9572a11a..94e706cecb1c 100644 --- a/src/query/functions/src/aggregates/aggregator_common.rs +++ b/src/query/functions/src/aggregates/aggregator_common.rs @@ -111,12 +111,11 @@ pub fn eval_aggr( name: &str, params: Vec, columns: &[Column], - types: &[DataType], rows: usize, ) -> Result<(Column, DataType)> { let factory = AggregateFunctionFactory::instance(); - let arguments = types.to_owned(); let cols: Vec = columns.to_owned(); + let arguments = columns.iter().map(|x| x.data_type()).collect(); let func = factory.get(name, params, arguments)?; let data_type = func.return_type()?; diff --git a/src/query/functions/src/scalars/string.rs b/src/query/functions/src/scalars/string.rs index b33d212472e3..7d332eeefc4e 100644 --- a/src/query/functions/src/scalars/string.rs +++ b/src/query/functions/src/scalars/string.rs @@ -100,11 +100,23 @@ pub fn register(registry: &mut FunctionRegistry) { |val, _| 8 * val.len() as u64, ); - registry.register_1_arg::, _, _>( + registry.register_passthrough_nullable_1_arg::, _, _>( "length", FunctionProperty::default(), |_| FunctionDomain::Full, - |val, _| val.len() as u64, + |val, _| match val { + ValueRef::Scalar(s) => Value::Scalar(s.len() as u64), + ValueRef::Column(c) => { + let diffs = c + .offsets + .iter() + .zip(c.offsets.iter().skip(1)) + .map(|(a, b)| b - a) + .collect::>(); + + Value::Column(diffs.into()) + } + }, ); registry.register_passthrough_nullable_1_arg::, _, _>( diff --git a/src/query/functions/tests/it/aggregates/mod.rs b/src/query/functions/tests/it/aggregates/mod.rs index 194a4a8998ce..c3ddc8d22e4a 100644 --- a/src/query/functions/tests/it/aggregates/mod.rs +++ b/src/query/functions/tests/it/aggregates/mod.rs @@ -37,14 +37,8 @@ use itertools::Itertools; use super::scalars::parser; -pub trait AggregationSimulator = Fn( - &str, - Vec, - &[Column], - &[DataType], - usize, - ) -> common_exception::Result<(Column, DataType)> - + Copy; +pub trait AggregationSimulator = + Fn(&str, Vec, &[Column], usize) -> common_exception::Result<(Column, DataType)> + Copy; /// run ast which is agg expr pub fn run_agg_ast( @@ -97,7 +91,6 @@ pub fn run_agg_ast( .map(|p| Scalar::Number(NumberScalar::UInt64(*p as u64))) .collect(); - let arg_types: Vec = args.iter().map(|(_, ty)| ty.clone()).collect(); let arg_columns: Vec = args .iter() .map(|(arg, ty)| match arg { @@ -109,13 +102,7 @@ pub fn run_agg_ast( }) .collect(); - simulator( - name.as_str(), - params, - &arg_columns, - &arg_types, - block.num_rows(), - )? + simulator(name.as_str(), params, &arg_columns, block.num_rows())? } _ => unimplemented!(), } @@ -187,11 +174,10 @@ pub fn simulate_two_groups_group_by( name: &str, params: Vec, columns: &[Column], - types: &[DataType], rows: usize, ) -> common_exception::Result<(Column, DataType)> { let factory = AggregateFunctionFactory::instance(); - let arguments = types.to_owned(); + let arguments: Vec = columns.iter().map(|c| c.data_type()).collect(); let cols: Vec = columns.to_owned(); let func = factory.get(name, params, arguments)?; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 8d5a3c3058d4..790e80948d3e 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -345,7 +345,10 @@ impl PipelineBuilder { &aggregate.agg_funcs, )?; - if self.ctx.get_cluster().is_empty() + // this has bugs now, so we disable it for now, cc @winter + #[allow(clippy::overly_complex_bool_expr)] + if 1 == 2 + && self.ctx.get_cluster().is_empty() && !params.group_columns.is_empty() && self.main_pipeline.output_len() > 1 { diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 4881480f7a86..ba7b359e9596 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -78,6 +78,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> { min: vec![Scalar::from(1i64)], max: vec![Scalar::from(3i64)], level: 0, + pages: None, })) .await?; test_segment_locations.push(segment_location); @@ -88,6 +89,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> { min: vec![Scalar::from(2i64)], max: vec![Scalar::from(4i64)], level: 0, + pages: None, })) .await?; test_segment_locations.push(segment_location); @@ -98,6 +100,7 @@ async fn test_recluster_mutator_block_select() -> Result<()> { min: vec![Scalar::from(4i64)], max: vec![Scalar::from(5i64)], level: 0, + pages: None, })) .await?; test_segment_locations.push(segment_location); diff --git a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs index bac2ebf168de..4390472f7ba6 100644 --- a/src/query/service/tests/it/storages/fuse/operations/read_plan.rs +++ b/src/query/service/tests/it/storages/fuse/operations/read_plan.rs @@ -98,7 +98,7 @@ fn test_to_partitions() -> Result<()> { let blocks_metas = (0..num_of_block) .into_iter() - .map(|_| block_meta.clone()) + .map(|_| (None, block_meta.clone())) .collect::>(); let column_nodes = (0..num_of_col) diff --git a/src/query/service/tests/it/storages/fuse/statistics.rs b/src/query/service/tests/it/storages/fuse/statistics.rs index 6839e206c9ec..0c6a35ec7486 100644 --- a/src/query/service/tests/it/storages/fuse/statistics.rs +++ b/src/query/service/tests/it/storages/fuse/statistics.rs @@ -245,6 +245,7 @@ async fn test_ft_cluster_stats_with_stats() -> common_exception::Result<()> { min: vec![Scalar::Number(NumberScalar::Int32(1))], max: vec![Scalar::Number(NumberScalar::Int32(5))], level: 0, + pages: None, }); let block_compactor = BlockCompactThresholds::new(1_000_000, 800_000, 100 * 1024 * 1024); @@ -252,6 +253,7 @@ async fn test_ft_cluster_stats_with_stats() -> common_exception::Result<()> { 0, vec![0], 0, + None, 0, block_compactor, vec![], @@ -290,6 +292,7 @@ async fn test_ft_cluster_stats_with_stats() -> common_exception::Result<()> { 0, vec![1], 0, + None, 0, block_compactor, operators, @@ -307,6 +310,7 @@ async fn test_ft_cluster_stats_with_stats() -> common_exception::Result<()> { 1, vec![0], 0, + None, 0, block_compactor, vec![], @@ -406,14 +410,8 @@ fn test_ft_stats_block_stats_string_columns_trimming_using_eval() -> common_exce ); let block = DataBlock::new_from_columns(vec![data_col.clone()]); - let min_col = eval_aggr( - "min", - vec![], - &[data_col.clone()], - &[DataType::String], - rows, - )?; - let max_col = eval_aggr("max", vec![], &[data_col], &[DataType::String], rows)?; + let min_col = eval_aggr("min", vec![], &[data_col.clone()], rows)?; + let max_col = eval_aggr("max", vec![], &[data_col], rows)?; let min_expr = min_col.0.index(0).unwrap(); let max_expr = max_col.0.index(0).unwrap(); diff --git a/src/query/sharing/Cargo.toml b/src/query/sharing/Cargo.toml index 57536fc8c2d3..c8c22e3e011c 100644 --- a/src/query/sharing/Cargo.toml +++ b/src/query/sharing/Cargo.toml @@ -21,7 +21,7 @@ common-config = { path = "../config" } http = "0.2" log = "0.4" moka = "0.9" -opendal = "0.24" +opendal = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/src/query/sharing/src/layer.rs b/src/query/sharing/src/layer.rs index 345e64d18b63..1180eb907af7 100644 --- a/src/query/sharing/src/layer.rs +++ b/src/query/sharing/src/layer.rs @@ -59,7 +59,7 @@ pub fn create_share_table_operator( share_tenant_id: &str, share_name: &str, table_name: &str, -) -> Operator { +) -> Result { let op = match share_endpoint_address { Some(share_endpoint_address) => { let signer = SharedSigner::new( @@ -68,16 +68,15 @@ pub fn create_share_table_operator( share_endpoint_address, share_tenant_id, share_name, table_name ), share_endpoint_token, + HttpClient::new()?, ); - Operator::new(apply_wrapper(SharedAccessor { - signer, - client: HttpClient::new(), - })) + let client = HttpClient::new()?; + Operator::new(apply_wrapper(SharedAccessor { signer, client })) } None => Operator::new(DummySharedAccessor {}), }; - op + Ok(op // Add retry .layer(RetryLayer::new(ExponentialBackoff::default().with_jitter())) // Add metrics @@ -85,7 +84,7 @@ pub fn create_share_table_operator( // Add logging .layer(LoggingLayer::default()) // Add tracing - .layer(TracingLayer) + .layer(TracingLayer)) } #[derive(Debug)] diff --git a/src/query/sharing/src/signer.rs b/src/query/sharing/src/signer.rs index 1f529de25bc9..cc1a2186f732 100644 --- a/src/query/sharing/src/signer.rs +++ b/src/query/sharing/src/signer.rs @@ -58,7 +58,7 @@ impl Debug for SharedSigner { impl SharedSigner { /// Create a new SharedSigner. - pub fn new(endpoint: &str, token: RefreshableToken) -> Self { + pub fn new(endpoint: &str, token: RefreshableToken, client: HttpClient) -> Self { let cache = Cache::builder() // Databend Cloud Presign will expire after 3600s (1 hour). // We will expire them 10 minutes before to avoid edge cases. @@ -68,7 +68,7 @@ impl SharedSigner { Self { endpoint: endpoint.to_string(), cache, - client: HttpClient::new(), + client, token, } } diff --git a/src/query/storages/common/index/src/lib.rs b/src/query/storages/common/index/src/lib.rs index 21a2498666c8..8669ebde14d3 100644 --- a/src/query/storages/common/index/src/lib.rs +++ b/src/query/storages/common/index/src/lib.rs @@ -17,9 +17,11 @@ mod bloom_index; pub mod filters; mod index; +mod page_index; mod range_index; pub use bloom_index::BloomIndex; pub use bloom_index::FilterEvalResult; pub use index::Index; +pub use page_index::PageIndex; pub use range_index::RangeIndex; diff --git a/src/query/storages/common/index/src/page_index.rs b/src/query/storages/common/index/src/page_index.rs new file mode 100644 index 000000000000..32e5b779f157 --- /dev/null +++ b/src/query/storages/common/index/src/page_index.rs @@ -0,0 +1,208 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::ops::Range; +use std::sync::Arc; + +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::type_check::check_function; +use common_expression::ConstantFolder; +use common_expression::DataField; +use common_expression::DataSchemaRef; +use common_expression::Domain; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_expression::Scalar; +use common_expression::TableSchemaRef; +use common_functions::scalars::BUILTIN_FUNCTIONS; +use storages_common_table_meta::meta::ClusterStatistics; +use storages_common_table_meta::meta::ColumnStatistics; + +use crate::range_index::statistics_to_domain; + +#[derive(Clone)] +pub struct PageIndex { + expr: Expr, + func_ctx: FunctionContext, + cluster_key_id: u32, + + // index of the cluster key inside the schema + cluster_key_fields: Vec, +} + +impl PageIndex { + pub fn try_create( + func_ctx: FunctionContext, + cluster_key_id: u32, + cluster_keys: Vec, + exprs: &[Expr], + schema: TableSchemaRef, + ) -> Result { + let conjunction = exprs + .iter() + .cloned() + .reduce(|lhs, rhs| { + check_function(None, "and", &[], &[lhs, rhs], &BUILTIN_FUNCTIONS).unwrap() + }) + .unwrap(); + + let (new_expr, _) = ConstantFolder::fold(&conjunction, func_ctx, &BUILTIN_FUNCTIONS); + let data_schema: DataSchemaRef = Arc::new((&schema).into()); + let cluster_key_fields = cluster_keys + .iter() + .map(|name| data_schema.field_with_name(name.as_str()).unwrap().clone()) + .collect::>(); + + Ok(Self { + expr: new_expr, + cluster_key_fields, + cluster_key_id, + func_ctx, + }) + } + + pub fn try_apply_const(&self) -> Result { + // if the exprs did not contains the first cluster key, we should return true + if self.cluster_key_fields.is_empty() + || !self + .expr + .column_refs() + .iter() + .any(|c| c.0 == self.cluster_key_fields[0].name()) + { + return Ok(true); + } + + // Only return false, which means to skip this block, when the expression is folded to a constant false. + Ok(!matches!(self.expr, Expr::Constant { + scalar: Scalar::Boolean(false), + .. + })) + } + + #[tracing::instrument(level = "debug", name = "page_index_eval", skip_all)] + pub fn apply(&self, stats: &Option) -> Result<(bool, Option>)> { + let stats = match stats { + Some(stats) => stats, + None => return Ok((true, None)), + }; + let min_values = match stats.pages { + Some(ref pages) => pages, + None => return Ok((true, None)), + }; + + let max_value = Scalar::Tuple(stats.max.clone()); + + if self.cluster_key_id != stats.cluster_key_id { + return Ok((true, None)); + } + + let pages = min_values.len(); + let mut start = 0; + let mut end = pages - 1; + + while start <= end { + let min_value = &min_values[start]; + let max_value = if start + 1 < pages { + &min_values[start + 1] + } else { + &max_value + }; + + if self.eval_single_page(min_value, max_value)? { + break; + } + start += 1; + } + + while end >= start { + let min_value = &min_values[end]; + let max_value = if end + 1 < pages { + &min_values[end + 1] + } else { + &max_value + }; + + if self.eval_single_page(min_value, max_value)? { + break; + } + end -= 1; + } + if end - start + 1 < pages { + tracing::debug!("range cut from {pages} to : {:?} --> {:?}", start, end); + } + + // no page is pruned + if start + pages == end + 1 { + return Ok((true, None)); + } + + if start > end { + Ok((false, None)) + } else { + Ok((true, Some(start..end + 1))) + } + } + + fn eval_single_page(&self, min_value: &Scalar, max_value: &Scalar) -> Result { + let min_value = min_value + .as_tuple() + .ok_or_else(|| ErrorCode::StorageOther("cluster stats must be tuple scalar"))?; + let max_value = max_value + .as_tuple() + .ok_or_else(|| ErrorCode::StorageOther("cluster stats must be tuple scalar"))?; + + let mut input_domains = HashMap::with_capacity(self.cluster_key_fields.len()); + for (idx, (min, max)) in min_value.iter().zip(max_value.iter()).enumerate() { + let f = &self.cluster_key_fields[idx]; + + let stats = ColumnStatistics { + min: min.clone(), + max: max.clone(), + null_count: 1, + in_memory_size: 0, + distinct_of_values: None, + }; + let domain = statistics_to_domain(Some(&stats), f.data_type()); + input_domains.insert(f.name().clone(), domain); + + // For Tuple scalars, if the first element is not equal, then the monotonically increasing property is broken. + if min != max { + break; + } + } + + // Fill missing stats to be full domain + for (name, ty) in self.expr.column_refs().into_iter() { + if !input_domains.contains_key(name.as_str()) { + input_domains.insert(name.clone(), Domain::full(&ty)); + } + } + + let (new_expr, _) = ConstantFolder::fold_with_domain( + &self.expr, + input_domains, + self.func_ctx, + &BUILTIN_FUNCTIONS, + ); + + // Only return false, which means to skip this block, when the expression is folded to a constant false. + Ok(!matches!(new_expr, Expr::Constant { + scalar: Scalar::Boolean(false), + .. + })) + } +} diff --git a/src/query/storages/common/index/src/range_index.rs b/src/query/storages/common/index/src/range_index.rs index 4f07e2f0b565..761cdd05a73b 100644 --- a/src/query/storages/common/index/src/range_index.rs +++ b/src/query/storages/common/index/src/range_index.rs @@ -114,7 +114,7 @@ impl RangeIndex { } } -fn statistics_to_domain(stat: Option<&ColumnStatistics>, data_type: &DataType) -> Domain { +pub fn statistics_to_domain(stat: Option<&ColumnStatistics>, data_type: &DataType) -> Domain { if stat.is_none() { return Domain::full(data_type); } diff --git a/src/query/storages/common/pruner/src/lib.rs b/src/query/storages/common/pruner/src/lib.rs index 6c2b5f23ef67..5fb5afc828c7 100644 --- a/src/query/storages/common/pruner/src/lib.rs +++ b/src/query/storages/common/pruner/src/lib.rs @@ -16,11 +16,14 @@ #![deny(unused_crate_dependencies)] mod limiter_pruner; +mod page_pruner; mod range_pruner; mod topn_pruner; pub use limiter_pruner::LimiterPruner; pub use limiter_pruner::LimiterPrunerCreator; +pub use page_pruner::PagePruner; +pub use page_pruner::PagePrunerCreator; pub use range_pruner::RangePruner; pub use range_pruner::RangePrunerCreator; pub use topn_pruner::BlockMetaIndex; diff --git a/src/query/storages/common/pruner/src/page_pruner.rs b/src/query/storages/common/pruner/src/page_pruner.rs new file mode 100644 index 000000000000..b1358556c6a1 --- /dev/null +++ b/src/query/storages/common/pruner/src/page_pruner.rs @@ -0,0 +1,116 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; +use std::sync::Arc; + +use common_exception::Result; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_expression::RemoteExpr; +use common_expression::TableSchemaRef; +use storages_common_index::PageIndex; +use storages_common_table_meta::meta::ClusterKey; +use storages_common_table_meta::meta::ClusterStatistics; + +pub trait PagePruner { + // returns ture, if target should NOT be pruned (false positive allowed) + fn should_keep(&self, _stats: &Option) -> (bool, Option>); +} + +struct KeepTrue; + +impl PagePruner for KeepTrue { + fn should_keep(&self, _stats: &Option) -> (bool, Option>) { + (true, None) + } +} + +struct KeepFalse; + +impl PagePruner for KeepFalse { + fn should_keep(&self, _stats: &Option) -> (bool, Option>) { + (false, None) + } +} + +impl PagePruner for PageIndex { + fn should_keep(&self, stats: &Option) -> (bool, Option>) { + match self.apply(stats) { + Ok(r) => r, + Err(e) => { + // swallow exceptions intentionally, corrupted index should not prevent execution + tracing::warn!("failed to page filter, returning ture. {}", e); + (true, None) + } + } + } +} + +pub struct PagePrunerCreator; + +impl PagePrunerCreator { + /// Create a new [`PagePruner`] from expression and schema. + /// + /// Note: the schema should be the schema of the table, not the schema of the input. + pub fn try_create<'a>( + func_ctx: FunctionContext, + cluster_key_meta: Option, + cluster_keys: Vec>, + filter_expr: Option<&'a [Expr]>, + schema: &'a TableSchemaRef, + ) -> Result> { + if cluster_key_meta.is_none() + || cluster_keys.is_empty() + || cluster_keys + .iter() + .any(|expr| !matches!(expr, RemoteExpr::ColumnRef { .. })) + { + return Ok(Arc::new(KeepTrue)); + } + + let cluster_key_meta = cluster_key_meta.unwrap(); + + Ok(match filter_expr { + Some(exprs) if !exprs.is_empty() => { + let cluster_keys = cluster_keys + .iter() + .map(|expr| match expr { + RemoteExpr::ColumnRef { id, .. } => id.to_string(), + _ => unreachable!(), + }) + .collect::>(); + + let page_filter = PageIndex::try_create( + func_ctx, + cluster_key_meta.0, + cluster_keys, + exprs, + schema.clone(), + )?; + match page_filter.try_apply_const() { + Ok(v) => { + if v { + Arc::new(page_filter) + } else { + Arc::new(KeepFalse) + } + } + Err(_) => Arc::new(page_filter), + } + } + _ => Arc::new(KeepTrue), + }) + } +} diff --git a/src/query/storages/common/pruner/src/topn_pruner.rs b/src/query/storages/common/pruner/src/topn_pruner.rs index b29712ca1c05..f9838149dd0d 100644 --- a/src/query/storages/common/pruner/src/topn_pruner.rs +++ b/src/query/storages/common/pruner/src/topn_pruner.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Range; use std::sync::Arc; use common_exception::ErrorCode; @@ -26,6 +27,7 @@ use storages_common_table_meta::meta::ColumnStatistics; pub struct BlockMetaIndex { pub segment_idx: usize, pub block_idx: usize, + pub range: Option>, } /// TopN prunner. diff --git a/src/query/storages/common/table-meta/src/meta/statistics.rs b/src/query/storages/common/table-meta/src/meta/statistics.rs index d43fcaa9e09f..7a4e2d6a36c2 100644 --- a/src/query/storages/common/table-meta/src/meta/statistics.rs +++ b/src/query/storages/common/table-meta/src/meta/statistics.rs @@ -44,6 +44,9 @@ pub struct ClusterStatistics { pub min: Vec, pub max: Vec, pub level: i32, + + // currently it's only used in native engine + pub pages: Option>, } #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Default)] @@ -97,6 +100,7 @@ impl ClusterStatistics { .map(|s| from_scalar(&s, &data_type)) .collect(), level: v0.level, + pages: None, } } } diff --git a/src/query/storages/fuse/src/fuse_part.rs b/src/query/storages/fuse/src/fuse_part.rs index 8c18eba1dd35..e29411fca363 100644 --- a/src/query/storages/fuse/src/fuse_part.rs +++ b/src/query/storages/fuse/src/fuse_part.rs @@ -17,6 +17,7 @@ use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; use std::hash::Hash; use std::hash::Hasher; +use std::ops::Range; use std::sync::Arc; use common_catalog::plan::PartInfo; @@ -35,6 +36,9 @@ pub struct FusePartInfo { pub nums_rows: usize, pub columns_meta: HashMap, pub compression: Compression, + + /// page range in the file + pub range: Option>, } #[typetag::serde(name = "fuse")] @@ -64,6 +68,7 @@ impl FusePartInfo { rows_count: u64, columns_meta: HashMap, compression: Compression, + range: Option>, ) -> Arc> { Arc::new(Box::new(FusePartInfo { location, @@ -71,6 +76,7 @@ impl FusePartInfo { columns_meta, nums_rows: rows_count as usize, compression, + range, })) } diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 31dc18d52087..834c310cec99 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -118,11 +118,11 @@ impl FuseTable { DatabaseType::NormalDB => { let storage_params = table_info.meta.storage_params.clone(); match storage_params { - Some(sp) => init_operator(&sp)?, - None => DataOperator::instance().operator(), + Some(sp) => Ok(init_operator(&sp)?), + None => Ok(DataOperator::instance().operator()), } } - }; + }?; let data_metrics = Arc::new(StorageMetrics::default()); operator = operator.layer(StorageMetricsLayer::new(data_metrics.clone())); @@ -168,6 +168,10 @@ impl FuseTable { } } + pub fn is_native(&self) -> bool { + matches!(self.storage_format, FuseStorageFormat::Native) + } + pub fn meta_location_generator(&self) -> &TableMetaLocationGenerator { &self.meta_location_generator } @@ -608,6 +612,10 @@ impl Table for FuseTable { ) -> Result<()> { self.do_revert_to(ctx.as_ref(), point).await } + + fn support_prewhere(&self) -> bool { + matches!(self.storage_format, FuseStorageFormat::Native) + } } #[derive(Clone, Copy)] diff --git a/src/query/storages/fuse/src/io/mod.rs b/src/query/storages/fuse/src/io/mod.rs index 39391b68ac3f..1da3a148093c 100644 --- a/src/query/storages/fuse/src/io/mod.rs +++ b/src/query/storages/fuse/src/io/mod.rs @@ -26,6 +26,7 @@ pub use read::BlockFilterReader; pub use read::BlockReader; pub use read::MergeIOReadResult; pub use read::MetaReaders; +pub use read::NativeReaderExt; pub use read::ReadSettings; pub use read::SegmentInfoReader; pub use read::SnapshotHistoryReader; diff --git a/src/query/storages/fuse/src/io/read/block_reader.rs b/src/query/storages/fuse/src/io/read/block_reader.rs index c7ffeac2f21b..99e285cfe965 100644 --- a/src/query/storages/fuse/src/io/read/block_reader.rs +++ b/src/query/storages/fuse/src/io/read/block_reader.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; @@ -26,8 +27,10 @@ use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Projection; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::types::DataType; use common_expression::DataField; use common_expression::DataSchema; +use common_expression::TableField; use common_expression::TableSchemaRef; use common_storage::ColumnNode; use common_storage::ColumnNodes; @@ -46,6 +49,7 @@ pub struct BlockReader { pub(crate) operator: Operator, pub(crate) projection: Projection, pub(crate) projected_schema: TableSchemaRef, + pub(crate) project_indices: BTreeMap, pub(crate) column_nodes: ColumnNodes, pub(crate) parquet_schema_descriptor: SchemaDescriptor, } @@ -126,6 +130,12 @@ impl BlockReader { let arrow_schema = schema.to_arrow(); let parquet_schema_descriptor = to_parquet_schema(&arrow_schema)?; let column_nodes = ColumnNodes::new_from_schema(&arrow_schema); + let project_column_nodes: Vec = projection + .project_column_nodes(&column_nodes)? + .iter() + .map(|c| (*c).clone()) + .collect(); + let project_indices = Self::build_projection_indices(&project_column_nodes); Ok(Arc::new(BlockReader { operator, @@ -133,6 +143,7 @@ impl BlockReader { projected_schema, parquet_schema_descriptor, column_nodes, + project_indices, })) } @@ -277,11 +288,8 @@ impl BlockReader { metrics_inc_remote_io_read_parts(1); } - let columns = self.projection.project_column_nodes(&self.column_nodes)?; - let indices = Self::build_projection_indices(&columns); - let mut ranges = vec![]; - for index in indices.keys() { + for index in self.project_indices.keys() { let column_meta = &columns_meta[index]; let (offset, len) = column_meta.offset_length(); ranges.push((*index, offset..(offset + len))); @@ -304,11 +312,9 @@ impl BlockReader { part: PartInfoPtr, ) -> Result { let part = FusePartInfo::from_part(&part)?; - let columns = self.projection.project_column_nodes(&self.column_nodes)?; - let indices = Self::build_projection_indices(&columns); let mut ranges = vec![]; - for index in indices.keys() { + for index in self.project_indices.keys() { let column_meta = &part.columns_meta[index]; let (offset, len) = column_meta.offset_length(); ranges.push((*index, offset..(offset + len))); @@ -320,12 +326,14 @@ impl BlockReader { // Build non duplicate leaf_ids to avoid repeated read column from parquet pub(crate) fn build_projection_indices( - column_nodes: &Vec<&ColumnNode>, - ) -> HashMap { - let mut indices = HashMap::with_capacity(column_nodes.len()); - for column_node in column_nodes { - for index in &column_node.leaf_ids { - indices.insert(*index, column_node.field.clone()); + columns: &[ColumnNode], + ) -> BTreeMap { + let mut indices = BTreeMap::new(); + for column in columns { + for index in &column.leaf_ids { + let f: TableField = (&column.field).into(); + let data_type: DataType = f.data_type().into(); + indices.insert(*index, (column.field.clone(), data_type)); } } indices diff --git a/src/query/storages/fuse/src/io/read/block_reader_native.rs b/src/query/storages/fuse/src/io/read/block_reader_native.rs index 0a6f34471a83..9ed904ef64e8 100644 --- a/src/query/storages/fuse/src/io/read/block_reader_native.rs +++ b/src/query/storages/fuse/src/io/read/block_reader_native.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::io::BufReader; +use std::ops::Range; use std::time::Instant; use common_arrow::arrow::array::Array; -use common_arrow::arrow::chunk::Chunk; use common_arrow::native::read::reader::NativeReader; use common_arrow::native::read::NativeReadBuf; use common_catalog::plan::PartInfoPtr; use common_exception::Result; +use common_expression::BlockEntry; +use common_expression::Column; use common_expression::DataBlock; +use common_expression::Value; use opendal::Object; use storages_common_table_meta::meta::ColumnMeta; @@ -35,7 +37,10 @@ use crate::metrics::metrics_inc_remote_io_seeks; // Native storage format -pub type Reader = Box; +pub trait NativeReaderExt: NativeReadBuf + std::io::Seek + Send + Sync {} +impl NativeReaderExt for T {} + +pub type Reader = Box; impl BlockReader { pub async fn async_read_native_columns_data( @@ -48,16 +53,16 @@ impl BlockReader { } let part = FusePartInfo::from_part(&part)?; - let columns = self.projection.project_column_nodes(&self.column_nodes)?; - let indices = Self::build_projection_indices(&columns); - let mut join_handlers = Vec::with_capacity(indices.len()); + let mut join_handlers = Vec::with_capacity(self.project_indices.len()); + + for (index, (field, _)) in self.project_indices.iter() { + let column_meta = &part.columns_meta[index]; - for (index, field) in indices { - let column_meta = &part.columns_meta[&index]; join_handlers.push(Self::read_native_columns_data( self.operator.object(&part.location), - index, + *index, column_meta, + &part.range, field.data_type().clone(), )); @@ -84,13 +89,19 @@ impl BlockReader { o: Object, index: usize, meta: &ColumnMeta, + range: &Option>, data_type: common_arrow::arrow::datatypes::DataType, ) -> Result<(usize, NativeReader)> { use backon::ExponentialBackoff; use backon::Retryable; let (offset, length) = meta.offset_length(); - let meta = meta.as_native().unwrap(); + let mut meta = meta.as_native().unwrap().clone(); + + if let Some(range) = range { + meta = meta.slice(range.start, range.end); + } + let reader = { || async { o.range_read(offset..offset + length).await } } .retry(ExponentialBackoff::default()) .when(|err| err.is_temporary()) @@ -107,20 +118,18 @@ impl BlockReader { ) -> Result)>> { let part = FusePartInfo::from_part(&part)?; - let columns = self.projection.project_column_nodes(&self.column_nodes)?; - let indices = Self::build_projection_indices(&columns); - let mut results = Vec::with_capacity(indices.len()); - - for (index, field) in indices { - let column_meta = &part.columns_meta[&index]; + let mut results = Vec::with_capacity(self.project_indices.len()); + for (index, (field, _)) in self.project_indices.iter() { + let column_meta = &part.columns_meta[index]; let op = self.operator.clone(); let location = part.location.clone(); let result = Self::sync_read_native_column( op.object(&location), - index, + *index, column_meta, + &part.range, field.data_type().clone(), ); @@ -133,33 +142,39 @@ impl BlockReader { pub fn sync_read_native_column( o: Object, index: usize, - meta: &ColumnMeta, + column_meta: &ColumnMeta, + range: &Option>, data_type: common_arrow::arrow::datatypes::DataType, ) -> Result<(usize, NativeReader)> { - let (offset, length) = meta.offset_length(); + let mut column_meta = column_meta.as_native().unwrap().clone(); + + if let Some(range) = range { + column_meta = column_meta.slice(range.start, range.end); + } + let (offset, length) = ( + column_meta.offset, + column_meta.pages.iter().map(|p| p.length).sum::(), + ); let reader = o.blocking_range_reader(offset..offset + length)?; let reader: Reader = Box::new(BufReader::new(reader)); - - let page_metas = meta.as_native().unwrap().pages.clone(); - let fuse_reader = NativeReader::new(reader, data_type, page_metas, vec![]); + let fuse_reader = NativeReader::new(reader, data_type, column_meta.pages, vec![]); Ok((index, fuse_reader)) } pub fn build_block(&self, chunks: Vec<(usize, Box)>) -> Result { - let mut results = Vec::with_capacity(chunks.len()); - let mut chunk_map: HashMap> = chunks.into_iter().collect(); - let columns = self.projection.project_column_nodes(&self.column_nodes)?; - for column in &columns { - let indices = &column.leaf_ids; - - for index in indices { - if let Some(array) = chunk_map.remove(index) { - results.push(array); - break; - } + let mut entries = Vec::with_capacity(chunks.len()); + // they are already the leaf columns without inner + // TODO support tuple in native storage + let mut rows = 0; + for (id, (_, f)) in self.project_indices.iter() { + if let Some(array) = chunks.iter().find(|c| c.0 == *id).map(|c| c.1.clone()) { + entries.push(BlockEntry { + data_type: f.clone(), + value: Value::Column(Column::from_arrow(array.as_ref(), f)), + }); + rows = array.len(); } } - let chunk = Chunk::new(results); - DataBlock::from_arrow_chunk(&chunk, &self.data_schema()) + Ok(DataBlock::new(entries, rows)) } } diff --git a/src/query/storages/fuse/src/io/read/bloom_index_reader.rs b/src/query/storages/fuse/src/io/read/bloom_index_reader.rs index ea7fb54491c9..67c8cf69daaf 100644 --- a/src/query/storages/fuse/src/io/read/bloom_index_reader.rs +++ b/src/query/storages/fuse/src/io/read/bloom_index_reader.rs @@ -247,6 +247,8 @@ mod util_v1 { } } + metrics::increment_gauge!("cache_bloom_byte_miss_count", 1.0f64); + // missing cache let bytes = Arc::new( // As suggested by Winter, execute task of loading data in storage runtime diff --git a/src/query/storages/fuse/src/io/read/mod.rs b/src/query/storages/fuse/src/io/read/mod.rs index 0da7b690881f..4acb224b1f63 100644 --- a/src/query/storages/fuse/src/io/read/mod.rs +++ b/src/query/storages/fuse/src/io/read/mod.rs @@ -24,6 +24,7 @@ mod versioned_reader; pub use block_reader::BlockReader; pub use block_reader::MergeIOReadResult; +pub use block_reader_native::NativeReaderExt; pub use bloom_index_reader::load_bloom_filter_by_columns; pub use bloom_index_reader::BlockFilterReader; pub use decompressor::UncompressedBuffer; diff --git a/src/query/storages/fuse/src/metrics/fuse_metrics.rs b/src/query/storages/fuse/src/metrics/fuse_metrics.rs index 167873d877b4..0a7127234d5f 100644 --- a/src/query/storages/fuse/src/metrics/fuse_metrics.rs +++ b/src/query/storages/fuse/src/metrics/fuse_metrics.rs @@ -137,6 +137,10 @@ pub fn metrics_inc_pruning_before_block_nums(c: u64) { increment_gauge!(key!("pruning_before_block_nums"), c as f64); } +pub fn metrics_inc_pruning_prewhere_nums(c: u64) { + increment_gauge!(key!("pruning_prewhere_nums"), c as f64); +} + pub fn metrics_inc_pruning_after_block_nums(c: u64) { increment_gauge!(key!("pruning_after_block_nums"), c as f64); } @@ -178,6 +182,7 @@ pub fn metrics_reset() { // Pruning metrics. gauge!(key!("pruning_before_block_nums"), c); + gauge!(key!("pruning_prewhere_nums"), c); gauge!(key!("pruning_after_block_nums"), c); gauge!(key!("pruning_milliseconds"), c); } diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index 1e4431543342..f77d2a1865ff 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -71,8 +71,18 @@ impl FuseTable { } } - let cluster_stats_gen = - self.get_cluster_stats_gen(ctx.clone(), pipeline, 0, block_compact_thresholds)?; + let max_page_size = if self.is_native() { + Some(write_settings.max_page_size) + } else { + None + }; + let cluster_stats_gen = self.get_cluster_stats_gen( + ctx.clone(), + max_page_size, + pipeline, + 0, + block_compact_thresholds, + )?; let cluster_keys = &cluster_stats_gen.cluster_key_index; if !cluster_keys.is_empty() { @@ -130,6 +140,7 @@ impl FuseTable { pub fn get_cluster_stats_gen( &self, ctx: Arc, + max_page_size: Option, pipeline: &mut Pipeline, level: i32, block_compactor: BlockCompactThresholds, @@ -183,6 +194,7 @@ impl FuseTable { self.cluster_key_meta.as_ref().unwrap().0, cluster_key_index, extra_key_num, + max_page_size, level, block_compactor, vec![], diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index ae657540b098..2af7f64667b9 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -265,27 +265,26 @@ impl FuseTable { ) .await?; - let mut index_stats = Vec::with_capacity(block_metas.len()); - let mut metas = Vec::with_capacity(block_metas.len()); - for (index, block_meta) in block_metas.into_iter() { - index_stats.push((index, block_meta.cluster_stats.clone())); - metas.push(block_meta); - } + let range_block_metas = block_metas + .clone() + .into_iter() + .map(|(a, b)| (a.range, b)) + .collect::>(); let (_, inner_parts) = self.read_partitions_with_metas( ctx.clone(), self.table_info.schema(), None, - metas, + &range_block_metas, base_snapshot.summary.block_count as usize, )?; let parts = Partitions::create( PartitionsShuffleKind::Mod, - index_stats + block_metas .into_iter() .zip(inner_parts.partitions.into_iter()) - .map(|((a, b), c)| MutationPartInfo::create(a, b, c)) + .map(|(a, c)| MutationPartInfo::create(a.0, a.1.cluster_stats.clone(), c)) .collect(), ); ctx.try_set_partitions(parts) @@ -362,10 +361,17 @@ impl FuseTable { cluster_key_index.push(index); } + let max_page_size = if self.is_native() { + Some(self.get_write_settings().max_page_size) + } else { + None + }; + Ok(ClusterStatsGenerator::new( self.cluster_key_meta.as_ref().unwrap().0, cluster_key_index, extra_key_num, + max_page_size, 0, self.get_block_compact_thresholds(), operators, diff --git a/src/query/storages/fuse/src/operations/fuse_native_source.rs b/src/query/storages/fuse/src/operations/fuse_native_source.rs deleted file mode 100644 index 4eb159826850..000000000000 --- a/src/query/storages/fuse/src/operations/fuse_native_source.rs +++ /dev/null @@ -1,245 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use common_arrow::native::read::reader::NativeReader; -use common_arrow::native::read::NativeReadBuf; -use common_base::base::Progress; -use common_base::base::ProgressValues; -use common_catalog::plan::PartInfoPtr; -use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; -use common_exception::Result; -use common_expression::DataBlock; -use common_expression::DataSchema; -use common_expression::Evaluator; -use common_expression::Expr; -use common_functions::scalars::BUILTIN_FUNCTIONS; -use common_pipeline_core::processors::port::OutputPort; -use common_pipeline_core::processors::processor::Event; -use common_pipeline_core::processors::processor::ProcessorPtr; -use common_pipeline_core::processors::Processor; - -use crate::io::BlockReader; - -type DataChunks = Vec<(usize, NativeReader>)>; - -enum State { - ReadData(Option), - Deserialize(DataChunks), - Generated(DataBlock, DataChunks), - Finish, -} - -pub struct FuseNativeSource { - state: State, - ctx: Arc, - scan_progress: Arc, - output: Arc, - output_reader: Arc, - - prewhere_reader: Arc, - prewhere_filter: Arc>, - remain_reader: Arc>, - - support_blocking: bool, -} - -impl FuseNativeSource { - pub fn create( - ctx: Arc, - output: Arc, - output_reader: Arc, - prewhere_reader: Arc, - prewhere_filter: Arc>, - remain_reader: Arc>, - ) -> Result { - let scan_progress = ctx.get_scan_progress(); - let support_blocking = prewhere_reader.support_blocking_api(); - Ok(ProcessorPtr::create(Box::new(FuseNativeSource { - ctx, - output, - scan_progress, - state: State::ReadData(None), - output_reader, - prewhere_reader, - prewhere_filter, - remain_reader, - support_blocking, - }))) - } - - fn generate_one_block( - &mut self, - src_schema: &DataSchema, - block: DataBlock, - chunks: DataChunks, - ) -> Result<()> { - // resort and prune columns - let dest_schema = self.output_reader.data_schema(); - let block = block.resort(src_schema, &dest_schema)?; - self.state = State::Generated(block, chunks); - Ok(()) - } -} - -#[async_trait::async_trait] -impl Processor for FuseNativeSource { - fn name(&self) -> String { - "FuseEngineSource".to_string() - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if matches!(self.state, State::ReadData(None)) { - self.state = match self.ctx.try_get_part() { - None => State::Finish, - Some(part) => State::ReadData(Some(part)), - } - } - - if matches!(self.state, State::Finish) { - self.output.finish(); - return Ok(Event::Finished); - } - - if self.output.is_finished() { - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if matches!(self.state, State::Generated(_, _)) { - if let State::Generated(data_block, chunks) = - std::mem::replace(&mut self.state, State::Finish) - { - self.state = State::Deserialize(chunks); - self.output.push_data(Ok(data_block)); - return Ok(Event::NeedConsume); - } - } - - match self.state { - State::Finish => Ok(Event::Finished), - State::ReadData(_) => { - if self.support_blocking { - Ok(Event::Sync) - } else { - Ok(Event::Async) - } - } - State::Deserialize(_) => Ok(Event::Sync), - State::Generated(_, _) => Err(ErrorCode::Internal("It's a bug.")), - } - } - - fn process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::Deserialize(mut chunks) => { - let prewhere_num = self.prewhere_reader.schema().num_fields(); - let mut prewhere_chunks = Vec::with_capacity(prewhere_num); - for (index, chunk) in chunks.iter_mut().take(prewhere_num) { - // No data anymore - if !chunk.has_next() { - self.state = State::ReadData(None); - return Ok(()); - } - prewhere_chunks.push((*index, chunk.next_array()?)); - } - - let mut data_block = self.prewhere_reader.build_block(prewhere_chunks)?; - let mut fields = self.prewhere_reader.data_fields(); - - if let Some(remain_reader) = self.remain_reader.as_ref() { - let mut remain_fields = remain_reader.data_fields(); - fields.append(&mut remain_fields); - - let remain_num = remain_reader.schema().num_fields(); - let mut remain_chunks = Vec::with_capacity(remain_num); - for (index, chunk) in chunks.iter_mut().skip(prewhere_num) { - assert!(chunk.has_next()); - remain_chunks.push((*index, chunk.next_array()?)); - } - let remain_block = remain_reader.build_block(remain_chunks)?; - for col in remain_block.columns() { - data_block.add_column(col.clone()); - } - } - - if let Some(filter) = self.prewhere_filter.as_ref() { - // do filter - let func_ctx = self.ctx.try_get_function_context()?; - let evaluator = Evaluator::new(&data_block, func_ctx, &BUILTIN_FUNCTIONS); - let predicate = evaluator.run(filter).map_err(|(_, e)| { - ErrorCode::Internal(format!("eval prewhere filter failed: {}.", e)) - })?; - data_block = data_block.filter(&predicate)?; - } - - // the last step of prewhere - let progress_values = ProgressValues { - rows: data_block.num_rows(), - bytes: data_block.memory_size(), - }; - self.scan_progress.incr(&progress_values); - - let src_schema = DataSchema::new(fields); - self.generate_one_block(&src_schema, data_block, chunks)?; - Ok(()) - } - - State::ReadData(Some(part)) => { - let mut chunks = self - .prewhere_reader - .sync_read_native_columns_data(part.clone())?; - - if let Some(r) = self.remain_reader.as_ref() { - let cs = r.sync_read_native_columns_data(part.clone())?; - chunks.extend(cs) - } - - self.state = State::Deserialize(chunks); - Ok(()) - } - _ => Err(ErrorCode::Internal("It's a bug.")), - } - } - - async fn async_process(&mut self) -> Result<()> { - match std::mem::replace(&mut self.state, State::Finish) { - State::ReadData(Some(part)) => { - let mut chunks = self - .prewhere_reader - .async_read_native_columns_data(part.clone()) - .await?; - - if let Some(r) = self.remain_reader.as_ref() { - let cs = r.async_read_native_columns_data(part.clone()).await?; - chunks.extend(cs) - } - - self.state = State::Deserialize(chunks); - Ok(()) - } - _ => Err(ErrorCode::Internal("It's a bug.")), - } - } -} diff --git a/src/query/storages/fuse/src/operations/fuse_source.rs b/src/query/storages/fuse/src/operations/fuse_source.rs index ff9b7407b944..9e05b31f9548 100644 --- a/src/query/storages/fuse/src/operations/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/fuse_source.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_catalog::plan::DataSourcePlan; use common_catalog::table_context::TableContext; use common_exception::Result; use common_pipeline_core::Pipeline; @@ -28,6 +29,7 @@ pub fn build_fuse_source_pipeline( pipeline: &mut Pipeline, storage_format: FuseStorageFormat, block_reader: Arc, + plan: &DataSourcePlan, max_io_requests: usize, ) -> Result<()> { let max_threads = ctx.get_settings().get_max_threads()? as usize; @@ -38,6 +40,7 @@ pub fn build_fuse_source_pipeline( pipeline, block_reader, max_threads, + plan, max_io_requests, ), FuseStorageFormat::Parquet => build_fuse_parquet_source_pipeline( diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 31261f50ea8f..800d287790b4 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_catalog::plan::DataSourcePlan; use common_catalog::table_context::TableContext; use common_exception::Result; use common_pipeline_core::Pipeline; @@ -30,6 +31,7 @@ pub fn build_fuse_native_source_pipeline( pipeline: &mut Pipeline, block_reader: Arc, max_threads: usize, + plan: &DataSourcePlan, max_io_requests: usize, ) -> Result<()> { match block_reader.support_blocking_api() { @@ -64,6 +66,7 @@ pub fn build_fuse_native_source_pipeline( NativeDeserializeDataTransform::create( ctx.clone(), block_reader.clone(), + plan, transform_input, transform_output, ) diff --git a/src/query/storages/fuse/src/operations/read/native_data_source.rs b/src/query/storages/fuse/src/operations/read/native_data_source.rs index 166bd27983fa..28c5bc23ee6d 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source.rs @@ -17,14 +17,15 @@ use std::fmt::Debug; use std::fmt::Formatter; use common_arrow::native::read::reader::NativeReader; -use common_arrow::native::read::NativeReadBuf; use common_catalog::plan::PartInfoPtr; use common_expression::BlockMetaInfo; use common_expression::BlockMetaInfoPtr; use serde::Deserializer; use serde::Serializer; -pub type DataChunks = Vec<(usize, NativeReader>)>; +use crate::io::NativeReaderExt; + +pub type DataChunks = Vec<(usize, NativeReader>)>; pub struct NativeDataSourceMeta { pub part: Vec, diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 11b6f68c0559..2a8ce966b88e 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -17,10 +17,20 @@ use std::sync::Arc; use common_base::base::Progress; use common_base::base::ProgressValues; +use common_catalog::plan::DataSourcePlan; use common_catalog::plan::PartInfoPtr; +use common_catalog::plan::PushDownInfo; use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; use common_exception::Result; +use common_expression::ConstantFolder; use common_expression::DataBlock; +use common_expression::DataSchema; +use common_expression::Evaluator; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_expression::Value; +use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; @@ -28,10 +38,12 @@ use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; use crate::io::BlockReader; +use crate::metrics::metrics_inc_pruning_prewhere_nums; use crate::operations::read::native_data_source::DataChunks; use crate::operations::read::native_data_source::NativeDataSourceMeta; pub struct NativeDeserializeDataTransform { + func_ctx: FunctionContext, scan_progress: Arc, block_reader: Arc, @@ -40,18 +52,65 @@ pub struct NativeDeserializeDataTransform { output_data: Option, parts: Vec, chunks: Vec, + + prewhere_columns: Vec, + remain_columns: Vec, + + src_schema: DataSchema, + output_schema: DataSchema, + prewhere_filter: Arc>, + + prewhere_skipped: usize, } impl NativeDeserializeDataTransform { pub fn create( ctx: Arc, block_reader: Arc, + plan: &DataSourcePlan, input: Arc, output: Arc, ) -> Result { let scan_progress = ctx.get_scan_progress(); + let src_schema: DataSchema = (block_reader.schema().as_ref()).into(); + + let prewhere_columns: Vec = + match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { + None => (0..src_schema.num_fields()).collect(), + Some(v) => { + let projected_arrow_schema = v + .prewhere_columns + .project_schema(plan.source_info.schema().as_ref()); + projected_arrow_schema + .fields() + .iter() + .map(|f| src_schema.index_of(f.name()).unwrap()) + .collect() + } + }; + + let remain_columns: Vec = (0..src_schema.num_fields()) + .filter(|i| !prewhere_columns.contains(i)) + .collect(); + + let output_schema: DataSchema = match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) + { + None => src_schema.clone(), + Some(v) => { + let projected = v + .output_columns + .project_schema(plan.source_info.schema().as_ref()); + (&projected).into() + } + }; + + let func_ctx = ctx.try_get_function_context()?; + let prewhere_schema = src_schema.project(&prewhere_columns); + let prewhere_filter = Self::build_prewhere_filter_expr(plan, func_ctx, &prewhere_schema)?; + Ok(ProcessorPtr::create(Box::new( NativeDeserializeDataTransform { + func_ctx, scan_progress, block_reader, input, @@ -59,12 +118,37 @@ impl NativeDeserializeDataTransform { output_data: None, parts: vec![], chunks: vec![], + + prewhere_columns, + remain_columns, + src_schema, + output_schema, + prewhere_filter, + prewhere_skipped: 0, }, ))) } + + fn build_prewhere_filter_expr( + plan: &DataSourcePlan, + ctx: FunctionContext, + schema: &DataSchema, + ) -> Result>> { + Ok( + match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { + None => Arc::new(None), + Some(v) => { + let expr = v.filter.as_expr(&BUILTIN_FUNCTIONS); + let expr = + expr.project_column_ref(|name| schema.column_with_name(name).unwrap().0); + let (expr, _) = ConstantFolder::fold(&expr, ctx, &BUILTIN_FUNCTIONS); + Arc::new(Some(expr)) + } + }, + ) + } } -#[async_trait::async_trait] impl Processor for NativeDeserializeDataTransform { fn name(&self) -> String { String::from("NativeDeserializeDataTransform") @@ -115,6 +199,7 @@ impl Processor for NativeDeserializeDataTransform { } if self.input.is_finished() { + metrics_inc_pruning_prewhere_nums(self.prewhere_skipped as u64); self.output.finish(); return Ok(Event::Finished); } @@ -127,24 +212,63 @@ impl Processor for NativeDeserializeDataTransform { if let Some(chunks) = self.chunks.last_mut() { let mut arrays = Vec::with_capacity(chunks.len()); - for (index, chunk) in chunks.iter_mut() { - if !chunk.has_next() { + for index in self.prewhere_columns.iter() { + let chunk = chunks.get_mut(*index).unwrap(); + if !chunk.1.has_next() { // No data anymore let _ = self.chunks.pop(); return Ok(()); } - - arrays.push((*index, chunk.next_array()?)); + arrays.push((chunk.0, chunk.1.next_array()?)); } - let data_block = self.block_reader.build_block(arrays)?; + let data_block = match self.prewhere_filter.as_ref() { + Some(filter) => { + let prewhere_block = self.block_reader.build_block(arrays.clone())?; + let evaluator = + Evaluator::new(&prewhere_block, self.func_ctx, &BUILTIN_FUNCTIONS); + let result = evaluator.run(filter).map_err(|(_, e)| { + ErrorCode::Internal(format!("eval prewhere filter failed: {}.", e)) + })?; + let filter = DataBlock::cast_to_nonull_boolean(&result).unwrap(); + + let all_filtered = match &filter { + Value::Scalar(v) => !v, + Value::Column(bitmap) => bitmap.unset_bits() == bitmap.len(), + }; + + if all_filtered { + self.prewhere_skipped += 1; + for index in self.remain_columns.iter() { + let chunk = chunks.get_mut(*index).unwrap(); + chunk.1.skip_page()?; + } + return Ok(()); + } + + for index in self.remain_columns.iter() { + let chunk = chunks.get_mut(*index).unwrap(); + arrays.push((chunk.0, chunk.1.next_array()?)); + } + + let block = self.block_reader.build_block(arrays)?; + let block = block.resort(&self.src_schema, &self.output_schema)?; + block.filter(&result) + } + None => { + for index in self.remain_columns.iter() { + let chunk = chunks.get_mut(*index).unwrap(); + arrays.push((chunk.0, chunk.1.next_array()?)); + } + self.block_reader.build_block(arrays) + } + }?; let progress_values = ProgressValues { rows: data_block.num_rows(), bytes: data_block.memory_size(), }; self.scan_progress.incr(&progress_values); - self.output_data = Some(data_block); } diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 9f06ee9c9f1b..7028441efbf9 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -100,13 +100,6 @@ impl FuseTable { }); } - assert!( - plan.push_downs - .as_ref() - .and_then(|s| s.prewhere.as_ref()) - .is_none() - ); - let block_reader = self.build_block_reader(plan)?; let max_io_requests = self.adjust_io_request(&ctx)?; @@ -115,6 +108,7 @@ impl FuseTable { pipeline, self.storage_format, block_reader, + plan, max_io_requests, ) } diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 30bde109e8b4..0c3672a606c6 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::ops::Range; use std::sync::Arc; use std::time::Instant; @@ -22,6 +23,7 @@ use common_catalog::plan::Partitions; use common_catalog::plan::PartitionsShuffleKind; use common_catalog::plan::Projection; use common_catalog::plan::PushDownInfo; +use common_catalog::table::Table; use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::TableSchemaRef; @@ -107,17 +109,28 @@ impl FuseTable { segments_location.len() ); - let block_metas = BlockPruner::prune( - &ctx, - dal, - table_info.schema(), - &push_downs, - segments_location, - ) - .await? - .into_iter() - .map(|(_, v)| v) - .collect::>(); + let block_metas = if !self.is_native() || self.cluster_key_meta.is_none() { + BlockPruner::prune( + &ctx, + dal, + table_info.schema(), + &push_downs, + segments_location, + ) + .await? + } else { + let cluster_keys = self.cluster_keys(ctx.clone()); + BlockPruner::prune_with_pages( + &ctx, + dal, + table_info.schema(), + &push_downs, + self.cluster_key_meta.clone(), + cluster_keys, + segments_location, + ) + .await? + }; info!( "prune snapshot block end, final block numbers:{}, cost:{}", @@ -125,7 +138,11 @@ impl FuseTable { start.elapsed().as_secs() ); - self.read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, summary) + let block_metas = block_metas + .into_iter() + .map(|(block_meta_index, block_meta)| (block_meta_index.range, block_meta)) + .collect::>(); + self.read_partitions_with_metas(ctx, table_info.schema(), push_downs, &block_metas, summary) } pub fn read_partitions_with_metas( @@ -133,7 +150,7 @@ impl FuseTable { _: Arc, schema: TableSchemaRef, push_downs: Option, - block_metas: Vec>, + block_metas: &[(Option>, Arc)], partitions_total: usize, ) -> Result<(PartStatistics, Partitions)> { let arrow_schema = schema.to_arrow(); @@ -141,7 +158,7 @@ impl FuseTable { let partitions_scanned = block_metas.len(); - let (mut statistics, parts) = Self::to_partitions(&block_metas, &column_nodes, push_downs); + let (mut statistics, parts) = Self::to_partitions(block_metas, &column_nodes, push_downs); // Update planner statistics. statistics.partitions_total = partitions_total; @@ -157,7 +174,7 @@ impl FuseTable { } pub fn to_partitions( - blocks_metas: &[Arc], + block_metas: &[(Option>, Arc)], column_nodes: &ColumnNodes, push_down: Option, ) -> (PartStatistics, Partitions) { @@ -168,11 +185,11 @@ impl FuseTable { .unwrap_or(usize::MAX); let (mut statistics, partitions) = match &push_down { - None => Self::all_columns_partitions(blocks_metas, limit), + None => Self::all_columns_partitions(block_metas, limit), Some(extras) => match &extras.projection { - None => Self::all_columns_partitions(blocks_metas, limit), + None => Self::all_columns_partitions(block_metas, limit), Some(projection) => { - Self::projection_partitions(blocks_metas, column_nodes, projection, limit) + Self::projection_partitions(block_metas, column_nodes, projection, limit) } }, }; @@ -189,7 +206,7 @@ impl FuseTable { } pub fn all_columns_partitions( - metas: &[Arc], + block_metas: &[(Option>, Arc)], limit: usize, ) -> (PartStatistics, Partitions) { let mut statistics = PartStatistics::default_exact(); @@ -201,11 +218,11 @@ impl FuseTable { let mut remaining = limit; - for block_meta in metas { + for (range, block_meta) in block_metas.iter() { let rows = block_meta.row_count as usize; partitions .partitions - .push(Self::all_columns_part(block_meta)); + .push(Self::all_columns_part(range.clone(), block_meta)); statistics.read_rows += rows; statistics.read_bytes += block_meta.block_size as usize; @@ -224,7 +241,7 @@ impl FuseTable { } fn projection_partitions( - metas: &[Arc], + block_metas: &[(Option>, Arc)], column_nodes: &ColumnNodes, projection: &Projection, limit: usize, @@ -238,10 +255,13 @@ impl FuseTable { let mut remaining = limit; - for block_meta in metas { - partitions - .partitions - .push(Self::projection_part(block_meta, column_nodes, projection)); + for (range, block_meta) in block_metas { + partitions.partitions.push(Self::projection_part( + block_meta, + range.clone(), + column_nodes, + projection, + )); let rows = block_meta.row_count as usize; statistics.read_rows += rows; @@ -268,7 +288,7 @@ impl FuseTable { (statistics, partitions) } - pub fn all_columns_part(meta: &BlockMeta) -> PartInfoPtr { + pub fn all_columns_part(range: Option>, meta: &BlockMeta) -> PartInfoPtr { let mut columns_meta = HashMap::with_capacity(meta.col_metas.len()); for (idx, column_meta) in &meta.col_metas { @@ -284,11 +304,13 @@ impl FuseTable { rows_count, columns_meta, meta.compression(), + range, ) } fn projection_part( meta: &BlockMeta, + range: Option>, column_nodes: &ColumnNodes, projection: &Projection, ) -> PartInfoPtr { @@ -316,6 +338,7 @@ impl FuseTable { rows_count, columns_meta, meta.compression(), + range, ) } diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 687d671fb45d..5b1100d090d0 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -112,11 +112,17 @@ impl FuseTable { } let partitions_total = mutator.partitions_total(); + + let block_metas: Vec<_> = mutator + .selected_blocks() + .iter() + .map(|meta| (None, meta.clone())) + .collect(); let (statistics, parts) = self.read_partitions_with_metas( ctx.clone(), self.table_info.schema(), None, - mutator.selected_blocks(), + &block_metas, partitions_total, )?; let table_info = self.get_table_info(); @@ -137,8 +143,15 @@ impl FuseTable { // ReadDataKind to avoid OOM. self.do_read_data(ctx.clone(), &plan, pipeline)?; + let max_page_size = if self.is_native() { + Some(self.get_write_settings().max_page_size) + } else { + None + }; + let cluster_stats_gen = self.get_cluster_stats_gen( ctx.clone(), + max_page_size, pipeline, mutator.level() + 1, block_compact_thresholds, diff --git a/src/query/storages/fuse/src/pruning/pruning_executor.rs b/src/query/storages/fuse/src/pruning/pruning_executor.rs index 9032b03f0a01..b41cd0261344 100644 --- a/src/query/storages/fuse/src/pruning/pruning_executor.rs +++ b/src/query/storages/fuse/src/pruning/pruning_executor.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::future::Future; +use std::ops::Range; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; @@ -25,6 +26,7 @@ use common_catalog::plan::PushDownInfo; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; +use common_expression::RemoteExpr; use common_expression::TableSchemaRef; use common_functions::scalars::BUILTIN_FUNCTIONS; use futures::future; @@ -32,11 +34,14 @@ use opendal::Operator; use storages_common_pruner::BlockMetaIndex; use storages_common_pruner::LimiterPruner; use storages_common_pruner::LimiterPrunerCreator; +use storages_common_pruner::PagePruner; +use storages_common_pruner::PagePrunerCreator; use storages_common_pruner::RangePruner; use storages_common_pruner::RangePrunerCreator; use storages_common_pruner::TopNPrunner; use storages_common_table_meta::caches::LoadParams; use storages_common_table_meta::meta::BlockMeta; +use storages_common_table_meta::meta::ClusterKey; use storages_common_table_meta::meta::Location; use storages_common_table_meta::meta::SegmentInfo; use tracing::warn; @@ -53,19 +58,34 @@ struct PruningContext { limiter: LimiterPruner, range_pruner: Arc, filter_pruner: Option>, + page_pruner: Arc, rt: Arc, semaphore: Arc, } pub struct BlockPruner; impl BlockPruner { + pub async fn prune( + ctx: &Arc, + dal: Operator, + schema: TableSchemaRef, + push_down: &Option, + segment_locs: Vec, + ) -> Result)>> { + Self::prune_with_pages(ctx, dal, schema, push_down, None, vec![], segment_locs).await + } + // prune blocks by utilizing min_max index and filter, according to the pushdowns #[tracing::instrument(level = "debug", skip(schema, ctx), fields(ctx.id = ctx.get_id().as_str()))] - pub async fn prune( + pub async fn prune_with_pages( ctx: &Arc, dal: Operator, schema: TableSchemaRef, push_down: &Option, + + cluster_key_meta: Option, + cluster_keys: Vec>, + segment_locs: Vec, ) -> Result)>> { if segment_locs.is_empty() { @@ -91,19 +111,26 @@ impl BlockPruner { // prepare the limiter. in case that limit is none, an unlimited limiter will be returned let limiter = LimiterPrunerCreator::create(limit); + let func_context = ctx.try_get_function_context()?; // prepare the range filter. // if filter_expression is none, an dummy pruner will be returned, which prunes nothing - let range_pruner = RangePrunerCreator::try_create( - ctx.try_get_function_context()?, - filter_exprs.as_deref(), - &schema, - )?; + let range_pruner = + RangePrunerCreator::try_create(func_context, filter_exprs.as_deref(), &schema)?; // prepare the filter. // None will be returned, if filter is not applicable (e.g. unsuitable filter expression, index not available, etc.) let filter_pruner = pruner::new_filter_pruner(ctx, filter_exprs.as_deref(), &schema, dal.clone())?; + // prepare the page pruner, this is used in native format + let page_pruner = PagePrunerCreator::try_create( + func_context, + cluster_key_meta, + cluster_keys, + filter_exprs.as_deref(), + &schema, + )?; + // 2. constraint the degree of parallelism let max_threads = ctx.get_settings().get_max_threads()? as usize; let max_concurrency = { @@ -131,6 +158,7 @@ impl BlockPruner { limiter: limiter.clone(), range_pruner: range_pruner.clone(), filter_pruner, + page_pruner, rt: pruning_runtime.clone(), semaphore: semaphore.clone(), }); @@ -249,7 +277,8 @@ impl BlockPruner { if pruning_ctx.limiter.exceeded() { return None; } - type BlockPruningFutureReturn = Pin + Send>>; + type BlockPruningFutureReturn = + Pin>)> + Send>>; type BlockPruningFuture = Box BlockPruningFutureReturn + Send + 'static>; blocks.next().map(|(block_idx, block_meta)| { @@ -258,14 +287,23 @@ impl BlockPruner { // not pruned by block zone map index, let ctx = pruning_ctx.clone(); let filter_pruner = filter_pruner.clone(); + let page_pruner = ctx.page_pruner.clone(); let index_location = block_meta.bloom_filter_index_location.clone(); let index_size = block_meta.bloom_filter_index_size; + + let cluster_stats = block_meta.cluster_stats.clone(); let v: BlockPruningFuture = Box::new(move |permit: OwnedSemaphorePermit| { Box::pin(async move { let _permit = permit; let keep = filter_pruner.should_keep(&index_location, index_size).await && ctx.limiter.within_limit(row_count); - (block_idx, keep) + + if keep { + let (keep, range) = page_pruner.should_keep(&cluster_stats); + (block_idx, keep, range) + } else { + (block_idx, keep, None) + } }) }); v @@ -273,7 +311,7 @@ impl BlockPruner { let v: BlockPruningFuture = Box::new(move |permit: OwnedSemaphorePermit| { Box::pin(async move { let _permit = permit; - (block_idx, false) + (block_idx, false, None) }) }); v @@ -293,13 +331,15 @@ impl BlockPruner { let mut result = Vec::with_capacity(segment_info.blocks.len()); for item in joint { - let (block_idx, keep) = item; + let (block_idx, keep, range) = item; if keep { let block = segment_info.blocks[block_idx].clone(); + result.push(( BlockMetaIndex { segment_idx, block_idx, + range, }, block, )) @@ -333,13 +373,19 @@ impl BlockPruner { if pruning_ctx.range_pruner.should_keep(&block_meta.col_stats) && pruning_ctx.limiter.within_limit(row_count) { - result.push(( - BlockMetaIndex { - segment_idx, - block_idx, - }, - block_meta.clone(), - )) + let (keep, range) = pruning_ctx + .page_pruner + .should_keep(&block_meta.cluster_stats); + if keep { + result.push(( + BlockMetaIndex { + segment_idx, + block_idx, + range, + }, + block_meta.clone(), + )) + } } } diff --git a/src/query/storages/fuse/src/statistics/cluster_statistics.rs b/src/query/storages/fuse/src/statistics/cluster_statistics.rs index 7481618b08b8..66473de8ea78 100644 --- a/src/query/storages/fuse/src/statistics/cluster_statistics.rs +++ b/src/query/storages/fuse/src/statistics/cluster_statistics.rs @@ -13,12 +13,11 @@ // limitations under the License. use common_exception::Result; -use common_expression::types::DataType; use common_expression::BlockCompactThresholds; use common_expression::DataBlock; use common_expression::DataField; use common_expression::FunctionContext; -use common_expression::ScalarRef; +use common_expression::Scalar; use common_sql::evaluator::BlockOperator; use storages_common_table_meta::meta::ClusterStatistics; @@ -29,6 +28,8 @@ pub struct ClusterStatsGenerator { pub(crate) cluster_key_index: Vec, pub(crate) extra_key_num: usize, + max_page_size: Option, + level: i32, block_compact_thresholds: BlockCompactThresholds, operators: Vec, @@ -42,6 +43,7 @@ impl ClusterStatsGenerator { cluster_key_id: u32, cluster_key_index: Vec, extra_key_num: usize, + max_page_size: Option, level: i32, block_compact_thresholds: BlockCompactThresholds, operators: Vec, @@ -52,6 +54,7 @@ impl ClusterStatsGenerator { cluster_key_id, cluster_key_index, extra_key_num, + max_page_size, level, block_compact_thresholds, operators, @@ -120,31 +123,16 @@ impl ClusterStatsGenerator { if self.cluster_key_index.is_empty() { return Ok(None); } - let mut min = Vec::with_capacity(self.cluster_key_index.len()); let mut max = Vec::with_capacity(self.cluster_key_index.len()); for key in self.cluster_key_index.iter() { let val = data_block.get_by_offset(*key); let val_ref = val.value.as_ref(); - let mut left = unsafe { val_ref.index_unchecked(0) }; - // To avoid high cardinality, for the string column, - // cluster statistics uses only the first 5 bytes. - if val.data_type == DataType::String { - let v = left.into_string().unwrap(); - let l = v.len(); - let e = if l < 5 { l } else { 5 }; - left = ScalarRef::String(&v[0..e]); - } + let left = unsafe { val_ref.index_unchecked(0) }; min.push(left.to_owned()); - let mut right = unsafe { val_ref.index_unchecked(val_ref.len() - 1) }; - if val.data_type == DataType::String { - let v = right.into_string().unwrap(); - let l = v.len(); - let e = if l < 5 { l } else { 5 }; - right = ScalarRef::String(&v[0..e]); - } + let right = unsafe { val_ref.index_unchecked(val_ref.len() - 1) }; max.push(right.to_owned()); } @@ -158,11 +146,29 @@ impl ClusterStatsGenerator { level }; + let pages = if let Some(max_page_size) = self.max_page_size { + let mut values = Vec::with_capacity(data_block.num_rows() / max_page_size + 1); + for start in (0..data_block.num_rows()).step_by(max_page_size) { + let mut tuple_values = Vec::with_capacity(self.cluster_key_index.len()); + for key in self.cluster_key_index.iter() { + let val = data_block.get_by_offset(*key); + let val_ref = val.value.as_ref(); + let left = unsafe { val_ref.index_unchecked(start) }; + tuple_values.push(left.to_owned()); + } + values.push(Scalar::Tuple(tuple_values)); + } + Some(values) + } else { + None + }; + Ok(Some(ClusterStatistics { cluster_key_id: self.cluster_key_id, min, max, level, + pages, })) } } diff --git a/src/query/storages/fuse/src/statistics/column_statistic.rs b/src/query/storages/fuse/src/statistics/column_statistic.rs index e29dcaf44b47..2d7e70d23e69 100644 --- a/src/query/storages/fuse/src/statistics/column_statistic.rs +++ b/src/query/storages/fuse/src/statistics/column_statistic.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use common_exception::Result; -use common_expression::types::DataType; use common_expression::types::NumberType; use common_expression::types::ValueType; use common_expression::Column; @@ -27,18 +26,8 @@ use storages_common_index::RangeIndex; use storages_common_table_meta::meta::ColumnStatistics; use storages_common_table_meta::meta::StatisticsOfColumns; -pub fn calc_column_distinct_of_values( - column: &Column, - data_type: &DataType, - rows: usize, -) -> Result { - let distinct_values = eval_aggr( - "approx_count_distinct", - vec![], - &[column.clone()], - &[data_type.clone()], - rows, - )?; +pub fn calc_column_distinct_of_values(column: &Column, rows: usize) -> Result { + let distinct_values = eval_aggr("approx_count_distinct", vec![], &[column.clone()], rows)?; let col = NumberType::::try_downcast_column(&distinct_values.0).unwrap(); Ok(col[0]) } @@ -71,8 +60,8 @@ pub fn gen_columns_statistics( let mut min = Scalar::Null; let mut max = Scalar::Null; - let (mins, _) = eval_aggr("min", vec![], &[col.clone()], &[data_type.clone()], rows)?; - let (maxs, _) = eval_aggr("max", vec![], &[col.clone()], &[data_type.clone()], rows)?; + let (mins, _) = eval_aggr("min", vec![], &[col.clone()], rows)?; + let (maxs, _) = eval_aggr("max", vec![], &[col.clone()], rows)?; if mins.len() > 0 { min = if let Some(v) = mins.index(0) { @@ -116,10 +105,10 @@ pub fn gen_columns_statistics( *value as u64 } } else { - calc_column_distinct_of_values(col, data_type, rows)? + calc_column_distinct_of_values(col, rows)? } } - (_, _) => calc_column_distinct_of_values(col, data_type, rows)?, + (_, _) => calc_column_distinct_of_values(col, rows)?, }; let in_memory_size = col.memory_size() as u64; diff --git a/src/query/storages/fuse/src/statistics/reducers.rs b/src/query/storages/fuse/src/statistics/reducers.rs index 5b82484be6aa..22ad714a221d 100644 --- a/src/query/storages/fuse/src/statistics/reducers.rs +++ b/src/query/storages/fuse/src/statistics/reducers.rs @@ -95,7 +95,7 @@ pub fn reduce_block_statistics>( Some(data_block) => { if let Some(col) = leaves.as_ref().unwrap().get(*id as usize) { if let Some(column) = &col.1 { - calc_column_distinct_of_values(column, &col.2, data_block.num_rows())? + calc_column_distinct_of_values(column, data_block.num_rows())? } else { 0 }