diff --git a/Cargo.lock b/Cargo.lock index 62c09ecca5e6e..c5f9c3b7955f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,8 +261,8 @@ dependencies = [ "memchr", "pin-project-lite", "xz2", - "zstd", - "zstd-safe", + "zstd 0.11.2+zstd.1.5.2", + "zstd-safe 5.0.2+zstd.1.5.2", ] [[package]] @@ -2227,6 +2227,7 @@ dependencies = [ "common-functions", "common-meta-app", "common-pipeline-core", + "common-sql", "common-storage", "glob", "opendal", @@ -5852,7 +5853,7 @@ dependencies = [ [[package]] name = "parquet2" version = "0.17.0" -source = "git+https://github.com/jorgecarleitao/parquet2?rev=fb08b72#fb08b725aa15d700d2a67c19ef65889b032ae371" +source = "git+https://github.com/jorgecarleitao/parquet2?rev=ed0e1ff#ed0e1ffb3a673f9f79ffc0e0b114e3d9f30144d7" dependencies = [ "async-stream", "brotli", @@ -5861,9 +5862,10 @@ dependencies = [ "lz4", "parquet-format-safe", "seq-macro", + "serde", "snap", "streaming-decompression", - "zstd", + "zstd 0.12.1+zstd.1.5.2", ] [[package]] @@ -7708,7 +7710,6 @@ dependencies = [ name = "storages-common-pruner" version = "0.1.0" dependencies = [ - "common-catalog", "common-exception", "common-expression", "serde", @@ -7756,7 +7757,7 @@ dependencies = [ "serde", "serde_json", "snap", - "zstd", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] @@ -9129,7 +9130,16 @@ version = "0.11.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" dependencies = [ - "zstd-safe", + "zstd-safe 5.0.2+zstd.1.5.2", +] + +[[package]] +name = "zstd" +version = "0.12.1+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c947d2adc84ff9a59f2e3c03b81aa4128acf28d6ad7d56273f7e8af14e47bea" +dependencies = [ + "zstd-safe 6.0.2+zstd.1.5.2", ] [[package]] @@ -9142,6 +9152,16 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "6.0.2+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cf39f730b440bab43da8fb5faf5f254574462f73f260f85f7987f32154ff17" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-sys" version = "2.0.4+zstd.1.5.2" diff --git a/Cargo.toml b/Cargo.toml index 512d65779f2d1..225774d1d4bab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,7 +158,8 @@ rpath = false # If there are dependencies that need patching, they can be listed below. # For example: # arrow-format = { git = "https://github.com/datafuse-extras/arrow-format", rev = "78dacc1" } + arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "211be21" } -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "fb08b72" } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "ed0e1ff" } limits-rs = { git = "https://github.com/datafuse-extras/limits-rs", rev = "abfcf7b" } metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "bc49d03" } diff --git a/src/common/arrow/Cargo.toml b/src/common/arrow/Cargo.toml index 533d77277da95..9c6f965596072 100644 --- a/src/common/arrow/Cargo.toml +++ b/src/common/arrow/Cargo.toml @@ -43,6 +43,6 @@ arrow = { package = "arrow2", version = "0.15.0", default-features = false, feat arrow-format = { version = "0.8.0", features = ["flight-data", "flight-service", "ipc"] } futures = "0.3.24" native = { package = "strawboat", version = "0.1.0" } -parquet2 = { version = "0.17.0", default_features = false } +parquet2 = { version = "0.17.0", default_features = false, features = ["serde_types"] } [dev-dependencies] diff --git a/src/query/storages/common/index/src/range_filter.rs b/src/query/storages/common/index/src/range_filter.rs index 63375f1953170..dc1e64db53602 100644 --- a/src/query/storages/common/index/src/range_filter.rs +++ b/src/query/storages/common/index/src/range_filter.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::type_check::check_function; use common_expression::types::nullable::NullableDomain; @@ -46,7 +43,7 @@ pub struct RangeFilter { } impl RangeFilter { pub fn try_create( - ctx: Arc, + func_ctx: FunctionContext, exprs: &[Expr], schema: TableSchemaRef, ) -> Result { @@ -58,8 +55,6 @@ impl RangeFilter { }) .unwrap(); - let func_ctx = ctx.try_get_function_context()?; - let (new_expr, _) = ConstantFolder::fold(&conjunction, func_ctx, &BUILTIN_FUNCTIONS); Ok(Self { diff --git a/src/query/storages/common/pruner/Cargo.toml b/src/query/storages/common/pruner/Cargo.toml index cdfd7fbf9d4ad..ffbdaa6e7d80e 100644 --- a/src/query/storages/common/pruner/Cargo.toml +++ b/src/query/storages/common/pruner/Cargo.toml @@ -12,7 +12,6 @@ doctest = false test = false [dependencies] -common-catalog = { path = "../../../../query/catalog" } common-exception = { path = "../../../../common/exception" } common-expression = { path = "../../../expression" } diff --git a/src/query/storages/common/pruner/src/range_pruner.rs b/src/query/storages/common/pruner/src/range_pruner.rs index 9abc65c02008b..23c91e5a16321 100644 --- a/src/query/storages/common/pruner/src/range_pruner.rs +++ b/src/query/storages/common/pruner/src/range_pruner.rs @@ -14,9 +14,9 @@ use std::sync::Arc; -use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::Expr; +use common_expression::FunctionContext; use common_expression::TableSchemaRef; use storages_common_index::RangeFilter; use storages_common_table_meta::meta::StatisticsOfColumns; @@ -62,13 +62,13 @@ impl RangePrunerCreator { /// /// Note: the schema should be the schema of the table, not the schema of the input. pub fn try_create<'a>( - ctx: &Arc, + func_ctx: FunctionContext, filter_expr: Option<&'a [Expr]>, schema: &'a TableSchemaRef, ) -> Result> { Ok(match filter_expr { Some(exprs) if !exprs.is_empty() => { - let range_filter = RangeFilter::try_create(ctx.clone(), exprs, schema.clone())?; + let range_filter = RangeFilter::try_create(func_ctx, exprs, schema.clone())?; match range_filter.try_eval_const() { Ok(v) => { if v { diff --git a/src/query/storages/fuse/src/pruning/pruning_executor.rs b/src/query/storages/fuse/src/pruning/pruning_executor.rs index 9ead863827834..d2ce9dca3ca45 100644 --- a/src/query/storages/fuse/src/pruning/pruning_executor.rs +++ b/src/query/storages/fuse/src/pruning/pruning_executor.rs @@ -93,7 +93,11 @@ impl BlockPruner { // prepare the range filter. // if filter_expression is none, an dummy pruner will be returned, which prunes nothing - let range_pruner = RangePrunerCreator::try_create(ctx, filter_exprs.as_deref(), &schema)?; + let range_pruner = RangePrunerCreator::try_create( + ctx.try_get_function_context()?, + filter_exprs.as_deref(), + &schema, + )?; // prepare the filter. // None will be returned, if filter is not applicable (e.g. unsuitable filter expression, index not available, etc.) diff --git a/src/query/storages/hive/hive/src/hive_partition_pruner.rs b/src/query/storages/hive/hive/src/hive_partition_pruner.rs index 5a79a5e44583f..cc6fa5f80975c 100644 --- a/src/query/storages/hive/hive/src/hive_partition_pruner.rs +++ b/src/query/storages/hive/hive/src/hive_partition_pruner.rs @@ -74,8 +74,11 @@ impl HivePartitionPruner { } pub fn prune(&self, partitions: Vec) -> Result> { - let range_filter = - RangeFilter::try_create(self.ctx.clone(), &self.filters, self.full_schema.clone())?; + let range_filter = RangeFilter::try_create( + self.ctx.try_get_function_context()?, + &self.filters, + self.full_schema.clone(), + )?; let column_stats = self.get_column_stats(&partitions)?; let mut filted_partitions = vec![]; for (idx, stats) in column_stats.into_iter().enumerate() { diff --git a/src/query/storages/hive/hive/src/hive_table.rs b/src/query/storages/hive/hive/src/hive_table.rs index 11d2a0e0e7278..49791f5774f61 100644 --- a/src/query/storages/hive/hive/src/hive_table.rs +++ b/src/query/storages/hive/hive/src/hive_table.rs @@ -116,7 +116,7 @@ impl HiveTable { }); let range_filter = match filter_expressions { Some(exprs) if !exprs.is_empty() => Some(RangeFilter::try_create( - ctx.clone(), + ctx.try_get_function_context()?, &exprs, self.table_info.schema(), )?), diff --git a/src/query/storages/parquet/Cargo.toml b/src/query/storages/parquet/Cargo.toml index aa61660cf6afc..a0f4775d0c1e7 100644 --- a/src/query/storages/parquet/Cargo.toml +++ b/src/query/storages/parquet/Cargo.toml @@ -32,3 +32,6 @@ glob = "0.3.0" opendal = { workspace = true } serde = { workspace = true } typetag = "0.2.3" + +[dev-dependencies] +common-sql = { path = "../../sql" } diff --git a/src/query/storages/parquet/src/lib.rs b/src/query/storages/parquet/src/lib.rs index 3eeaae307f894..321ec6687fdac 100644 --- a/src/query/storages/parquet/src/lib.rs +++ b/src/query/storages/parquet/src/lib.rs @@ -15,13 +15,13 @@ #![allow(clippy::uninlined_format_args)] #![deny(unused_crate_dependencies)] -mod parquet_column; mod parquet_part; mod parquet_reader; mod parquet_source; +mod pruning; +mod read_options; +mod statistics; mod table_function; -pub use parquet_part::ParquetLocationPart; -pub use parquet_reader::ParquetReader; -pub use parquet_source::ParquetSource; +pub use read_options::ReadOptions; pub use table_function::ParquetTable; diff --git a/src/query/storages/parquet/src/parquet_column.rs b/src/query/storages/parquet/src/parquet_column.rs deleted file mode 100644 index c16d6706c6e82..0000000000000 --- a/src/query/storages/parquet/src/parquet_column.rs +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_arrow::parquet::compression::Compression as ParquetCompression; - -#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, serde::Deserialize, serde::Serialize)] -pub enum Compression { - Uncompressed, - Snappy, - Gzip, - Lzo, - Brotli, - Lz4, - Zstd, - Lz4Raw, -} - -impl From for ParquetCompression { - fn from(value: Compression) -> ParquetCompression { - match value { - Compression::Uncompressed => ParquetCompression::Uncompressed, - Compression::Snappy => ParquetCompression::Snappy, - Compression::Gzip => ParquetCompression::Gzip, - Compression::Lzo => ParquetCompression::Lzo, - Compression::Brotli => ParquetCompression::Brotli, - Compression::Lz4 => ParquetCompression::Lz4, - Compression::Zstd => ParquetCompression::Zstd, - Compression::Lz4Raw => ParquetCompression::Lz4Raw, - } - } -} - -impl From for Compression { - fn from(value: ParquetCompression) -> Self { - match value { - ParquetCompression::Uncompressed => Compression::Uncompressed, - ParquetCompression::Snappy => Compression::Snappy, - ParquetCompression::Gzip => Compression::Gzip, - ParquetCompression::Lzo => Compression::Lzo, - ParquetCompression::Brotli => Compression::Brotli, - ParquetCompression::Lz4 => Compression::Lz4, - ParquetCompression::Zstd => Compression::Zstd, - ParquetCompression::Lz4Raw => Compression::Lz4Raw, - } - } -} diff --git a/src/query/storages/parquet/src/parquet_part.rs b/src/query/storages/parquet/src/parquet_part.rs index ab0aa15b38a22..5a42c0fd3b1a7 100644 --- a/src/query/storages/parquet/src/parquet_part.rs +++ b/src/query/storages/parquet/src/parquet_part.rs @@ -19,7 +19,8 @@ use std::hash::Hash; use std::hash::Hasher; use std::sync::Arc; -use common_arrow::parquet::compression::Compression as ParquetCompression; +use common_arrow::parquet::compression::Compression; +use common_arrow::parquet::indexes::Interval; use common_catalog::plan::PartInfo; use common_catalog::plan::PartInfoPtr; use common_exception::ErrorCode; @@ -65,48 +66,6 @@ impl ParquetLocationPart { } } -#[derive(serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq, Hash, Clone, Copy)] -pub enum Compression { - Uncompressed, - Snappy, - Gzip, - Lzo, - Brotli, - Lz4, - Zstd, - Lz4Raw, -} - -impl From for ParquetCompression { - fn from(value: Compression) -> Self { - match value { - Compression::Uncompressed => ParquetCompression::Uncompressed, - Compression::Snappy => ParquetCompression::Snappy, - Compression::Gzip => ParquetCompression::Gzip, - Compression::Lzo => ParquetCompression::Lzo, - Compression::Brotli => ParquetCompression::Brotli, - Compression::Lz4 => ParquetCompression::Lz4, - Compression::Zstd => ParquetCompression::Zstd, - Compression::Lz4Raw => ParquetCompression::Lz4Raw, - } - } -} - -impl From for Compression { - fn from(value: ParquetCompression) -> Self { - match value { - ParquetCompression::Uncompressed => Compression::Uncompressed, - ParquetCompression::Snappy => Compression::Snappy, - ParquetCompression::Gzip => Compression::Gzip, - ParquetCompression::Lzo => Compression::Lzo, - ParquetCompression::Brotli => Compression::Brotli, - ParquetCompression::Lz4 => Compression::Lz4, - ParquetCompression::Zstd => Compression::Zstd, - ParquetCompression::Lz4Raw => Compression::Lz4Raw, - } - } -} - #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct ColumnMeta { pub offset: u64, @@ -119,6 +78,7 @@ pub struct ParquetRowGroupPart { pub location: String, pub num_rows: usize, pub column_metas: HashMap, + pub row_selection: Option>, } #[typetag::serde(name = "parquet_row_group")] @@ -146,11 +106,13 @@ impl ParquetRowGroupPart { location: String, num_rows: usize, column_metas: HashMap, + row_selection: Option>, ) -> Arc> { Arc::new(Box::new(ParquetRowGroupPart { location, num_rows, column_metas, + row_selection, })) } diff --git a/src/query/storages/parquet/src/parquet_reader/deserialize.rs b/src/query/storages/parquet/src/parquet_reader/deserialize.rs index 2417a9d816a94..3c7035006937e 100644 --- a/src/query/storages/parquet/src/parquet_reader/deserialize.rs +++ b/src/query/storages/parquet/src/parquet_reader/deserialize.rs @@ -36,7 +36,7 @@ use common_storage::ColumnLeaf; use super::filter::FilterState; use crate::parquet_part::ColumnMeta; use crate::parquet_part::ParquetRowGroupPart; -use crate::ParquetReader; +use crate::parquet_reader::ParquetReader; impl ParquetReader { pub fn deserialize( @@ -148,8 +148,8 @@ impl ParquetReader { std::io::Cursor::new(chunk), PageMetaData { column_start: meta.offset, - num_values: meta.length as i64, - compression: meta.compression.into(), + num_values: rows as i64, + compression: meta.compression, descriptor: descriptor.descriptor.clone(), }, Arc::new(|_, _| true), @@ -191,8 +191,8 @@ impl ParquetReader { std::io::Cursor::new(chunk), PageMetaData { column_start: meta.offset, - num_values: meta.length as i64, - compression: meta.compression.into(), + num_values: rows as i64, + compression: meta.compression, descriptor: descriptor.descriptor.clone(), }, Arc::new(move |_, header| { diff --git a/src/query/storages/parquet/src/parquet_reader/mod.rs b/src/query/storages/parquet/src/parquet_reader/mod.rs index 8ca64608505fc..1f39e096e13d7 100644 --- a/src/query/storages/parquet/src/parquet_reader/mod.rs +++ b/src/query/storages/parquet/src/parquet_reader/mod.rs @@ -14,7 +14,6 @@ mod deserialize; mod filter; -mod meta; mod reader; pub use reader::IndexedChunk; diff --git a/src/query/storages/parquet/src/parquet_source.rs b/src/query/storages/parquet/src/parquet_source.rs index 5ced4aca438f0..da92bfa677d5e 100644 --- a/src/query/storages/parquet/src/parquet_source.rs +++ b/src/query/storages/parquet/src/parquet_source.rs @@ -15,6 +15,9 @@ use std::any::Any; use std::sync::Arc; +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::bitmap::MutableBitmap; +use common_arrow::parquet::indexes::Interval; use common_base::base::Progress; use common_base::base::ProgressValues; use common_catalog::plan::PartInfoPtr; @@ -38,6 +41,7 @@ use crate::parquet_part::ParquetRowGroupPart; use crate::parquet_reader::IndexedChunk; use crate::parquet_reader::ParquetReader; use crate::parquet_source::State::Generated; +use crate::ReadOptions; struct PrewhereData { data_block: DataBlock, @@ -47,9 +51,14 @@ struct PrewhereData { /// The states for [`ParquetSource`]. The states will recycle for each row group of a parquet file. enum State { ReadDataPrewhere(Option), - ReadDataRemain(PartInfoPtr, PrewhereData), - PrewhereFilter(PartInfoPtr, Vec), - Deserialize(PartInfoPtr, Vec, Option), + ReadDataRemain(PartInfoPtr, PrewhereData, Option), + PrewhereFilter(PartInfoPtr, Vec, Option), + Deserialize( + PartInfoPtr, + Vec, + Option, + Option, + ), Generated(Option, DataBlock), Finish, } @@ -67,6 +76,8 @@ pub struct ParquetSource { prewhere_reader: Arc, prewhere_filter: Arc>, remain_reader: Arc>, + + read_options: ReadOptions, } impl ParquetSource { @@ -77,6 +88,7 @@ impl ParquetSource { prewhere_reader: Arc, prewhere_filter: Arc>, remain_reader: Arc>, + read_options: ReadOptions, ) -> Result { let scan_progress = ctx.get_scan_progress(); let mut src_fields = prewhere_reader.output_schema().fields().clone(); @@ -96,6 +108,7 @@ impl ParquetSource { prewhere_reader, prewhere_filter, remain_reader, + read_options, }))) } @@ -103,12 +116,18 @@ impl ParquetSource { &mut self, part: PartInfoPtr, raw_chunks: Vec, + row_selection: Option, ) -> Result<()> { let rg_part = ParquetRowGroupPart::from_part(&part)?; // deserialize prewhere data block first - let data_block = self - .prewhere_reader - .deserialize(rg_part, raw_chunks, None)?; + let data_block = if let Some(row_selection) = &row_selection { + self.prewhere_reader + .deserialize(rg_part, raw_chunks, Some(row_selection.clone()))? + } else { + self.prewhere_reader + .deserialize(rg_part, raw_chunks, None)? + }; + if let Some(filter) = self.prewhere_filter.as_ref() { // do filter let func_ctx = self.ctx.try_get_function_context()?; @@ -165,10 +184,14 @@ impl ParquetSource { filtered_block.resort(self.src_schema.as_ref(), self.output_schema.as_ref())?; self.state = Generated(self.ctx.try_get_part(), block); } else { - self.state = State::ReadDataRemain(part, PrewhereData { - data_block: filtered_block, - filter, - }); + self.state = State::ReadDataRemain( + part, + PrewhereData { + data_block: filtered_block, + filter, + }, + row_selection, + ); } Ok(()) } else { @@ -183,6 +206,7 @@ impl ParquetSource { part: PartInfoPtr, raw_chunks: Vec, prewhere_data: Option, + row_selection: Option, ) -> Result<()> { let rg_part = ParquetRowGroupPart::from_part(&part)?; let output_block = if let Some(PrewhereData { @@ -193,6 +217,7 @@ impl ParquetSource { let block = if raw_chunks.is_empty() { prewhere_block } else if let Some(remain_reader) = self.remain_reader.as_ref() { + // If reach in this branch, it means `read_options.do_prewhere = true` let remain_block = match filter { Value::Scalar(_) => { // The case of all filtered is already covered in `do_prewhere_filter`. @@ -200,9 +225,17 @@ impl ParquetSource { remain_reader.deserialize(rg_part, raw_chunks, None)? } Value::Column(bitmap) => { - if bitmap.unset_bits() == 0 { - // don't need filter - remain_reader.deserialize(rg_part, raw_chunks, None)? + if !self.read_options.push_down_bitmap() || bitmap.unset_bits() == 0 { + let block = if let Some(row_selection) = &row_selection { + remain_reader.deserialize( + rg_part, + raw_chunks, + Some(row_selection.clone()), + )? + } else { + remain_reader.deserialize(rg_part, raw_chunks, None)? + }; + DataBlock::filter_with_bitmap(block, &bitmap)? } else { remain_reader.deserialize(rg_part, raw_chunks, Some(bitmap))? } @@ -290,9 +323,9 @@ impl Processor for ParquetSource { match self.state { State::Finish => Ok(Event::Finished), State::ReadDataPrewhere(_) - | State::ReadDataRemain(_, _) - | State::PrewhereFilter(_, _) - | State::Deserialize(_, _, _) => Ok(Event::Sync), + | State::ReadDataRemain(_, _, _) + | State::PrewhereFilter(_, _, _) + | State::Deserialize(_, _, _, _) => Ok(Event::Sync), State::Generated(_, _) => Err(ErrorCode::Internal("It's a bug.")), } } @@ -301,32 +334,59 @@ impl Processor for ParquetSource { match std::mem::replace(&mut self.state, State::Finish) { State::ReadDataPrewhere(Some(part)) => { let rg_part = ParquetRowGroupPart::from_part(&part)?; + let row_selection = rg_part + .row_selection + .as_ref() + .map(|sel| intervals_to_bitmap(sel, rg_part.num_rows)); let chunks = self.prewhere_reader.sync_read_columns(rg_part)?; if self.prewhere_filter.is_some() { - self.state = State::PrewhereFilter(part, chunks); + self.state = State::PrewhereFilter(part, chunks, row_selection); } else { // If there is no prewhere filter, it means there is only the prewhere reader. assert!(self.remain_reader.is_none()); // So all the needed columns are read. - self.state = State::Deserialize(part, chunks, None) + self.state = State::Deserialize(part, chunks, None, row_selection) } Ok(()) } - State::ReadDataRemain(part, prewhere_data) => { + State::ReadDataRemain(part, prewhere_data, row_selection) => { if let Some(remain_reader) = self.remain_reader.as_ref() { let rg_part = ParquetRowGroupPart::from_part(&part)?; let chunks = remain_reader.sync_read_columns(rg_part)?; - self.state = State::Deserialize(part, chunks, Some(prewhere_data)); + self.state = + State::Deserialize(part, chunks, Some(prewhere_data), row_selection); Ok(()) } else { Err(ErrorCode::Internal("It's a bug. No remain reader")) } } - State::PrewhereFilter(part, chunks) => self.do_prewhere_filter(part, chunks), - State::Deserialize(part, chunks, prewhere_data) => { - self.do_deserialize(part, chunks, prewhere_data) + State::PrewhereFilter(part, chunks, row_selection) => { + self.do_prewhere_filter(part, chunks, row_selection) + } + State::Deserialize(part, chunks, prewhere_data, row_selection) => { + self.do_deserialize(part, chunks, prewhere_data, row_selection) } _ => Err(ErrorCode::Internal("It's a bug.")), } } } + +/// Convert intervals to a bitmap. The `intervals` represents the row selection across `num_rows`. +fn intervals_to_bitmap(interval: &[Interval], num_rows: usize) -> Bitmap { + debug_assert!( + interval.is_empty() + || interval.last().unwrap().start + interval.last().unwrap().length < num_rows + ); + + let mut bitmap = MutableBitmap::with_capacity(num_rows); + let mut offset = 0; + + for intv in interval { + bitmap.extend_constant(intv.start - offset, false); + bitmap.extend_constant(intv.length, true); + offset = intv.start + intv.length; + } + bitmap.extend_constant(num_rows - offset, false); + + bitmap.into() +} diff --git a/src/query/storages/parquet/src/pruning.rs b/src/query/storages/parquet/src/pruning.rs new file mode 100644 index 0000000000000..bb658b4d66131 --- /dev/null +++ b/src/query/storages/parquet/src/pruning.rs @@ -0,0 +1,800 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::fs::File; +use std::io::Read; +use std::io::Seek; +use std::sync::Arc; + +use common_arrow::arrow::datatypes::Field as ArrowField; +use common_arrow::arrow::io::parquet::read as pread; +use common_arrow::arrow::io::parquet::read::get_field_pages; +use common_arrow::arrow::io::parquet::read::indexes::compute_page_row_intervals; +use common_arrow::arrow::io::parquet::read::indexes::read_columns_indexes; +use common_arrow::arrow::io::parquet::read::indexes::FieldPageStatistics; +use common_arrow::parquet::indexes::Interval; +use common_arrow::parquet::metadata::RowGroupMetaData; +use common_arrow::parquet::read::read_pages_locations; +use common_catalog::plan::Partitions; +use common_catalog::plan::PartitionsShuffleKind; +use common_catalog::table_context::TableContext; +use common_exception::ErrorCode; +use common_exception::Result; +use common_expression::Expr; +use common_expression::FunctionContext; +use common_expression::TableSchemaRef; +use common_storage::ColumnLeaves; +use storages_common_pruner::RangePruner; +use storages_common_pruner::RangePrunerCreator; + +use crate::parquet_part::ColumnMeta; +use crate::parquet_part::ParquetRowGroupPart; +use crate::read_options::ReadOptions; +use crate::statistics::collect_row_group_stats; +use crate::statistics::BatchStatistics; + +/// Try to prune parquet files and gernerate the final row group partitions. +/// +/// `ctx`: the table context. +/// +/// `locations`: the parquet file locations. +/// +/// `schema`: the projected table schema. +/// +/// `filters`: the pushed-down filters. +/// +/// `columns_to_read`: the projected column indices. +/// +/// `column_leaves`: the projected column leaves. +/// +/// `skip_pruning`: whether to skip pruning. +/// +/// `read_options`: more information can be found in [`ReadOptions`]. +#[allow(clippy::too_many_arguments)] +pub fn prune_and_set_partitions( + ctx: &Arc, + locations: &[String], + schema: &TableSchemaRef, + filters: &Option<&[Expr]>, + columns_to_read: &HashSet, + column_leaves: &ColumnLeaves, + skip_pruning: bool, + read_options: ReadOptions, +) -> Result<()> { + let mut partitions = Vec::with_capacity(locations.len()); + let func_ctx = ctx.try_get_function_context()?; + + let row_group_pruner = if read_options.prune_row_groups() { + Some(RangePrunerCreator::try_create(func_ctx, *filters, schema)?) + } else { + None + }; + + let page_pruners = if read_options.prune_pages() && filters.is_some() { + let filters = filters.unwrap(); + Some(build_column_page_pruners(func_ctx, schema, filters)?) + } else { + None + }; + + for location in locations { + let mut file = File::open(location).map_err(|e| { + ErrorCode::Internal(format!("Failed to open file '{}': {}", location, e)) + })?; + let file_meta = pread::read_metadata(&mut file).map_err(|e| { + ErrorCode::Internal(format!( + "Read parquet file '{}''s meta error: {}", + location, e + )) + })?; + let mut row_group_pruned = vec![false; file_meta.row_groups.len()]; + + let no_stats = file_meta.row_groups.iter().any(|r| { + r.columns() + .iter() + .any(|c| c.metadata().statistics.is_none()) + }); + + if read_options.prune_row_groups() && !skip_pruning && !no_stats { + let pruner = row_group_pruner.as_ref().unwrap(); + // If collecting stats fails or `should_keep` is true, we still read the row group. + // Otherwise, the row group will be pruned. + if let Ok(row_group_stats) = + collect_row_group_stats(column_leaves, &file_meta.row_groups) + { + for (idx, (stats, _rg)) in row_group_stats + .iter() + .zip(file_meta.row_groups.iter()) + .enumerate() + { + row_group_pruned[idx] = !pruner.should_keep(stats); + } + } + } + + for (idx, rg) in file_meta.row_groups.iter().enumerate() { + if row_group_pruned[idx] { + continue; + } + + let row_selection = if read_options.prune_pages() { + page_pruners + .as_ref() + .map(|pruners| filter_pages(&mut file, schema, rg, pruners)) + .transpose() + .unwrap_or(None) + } else { + None + }; + + let mut column_metas = HashMap::with_capacity(columns_to_read.len()); + for index in columns_to_read { + let c = &rg.columns()[*index]; + let (offset, length) = c.byte_range(); + column_metas.insert(*index, ColumnMeta { + offset, + length, + compression: c.compression(), + }); + } + + partitions.push(ParquetRowGroupPart::create( + location.clone(), + rg.num_rows(), + column_metas, + row_selection, + )) + } + } + ctx.try_set_partitions(Partitions::create(PartitionsShuffleKind::Mod, partitions))?; + Ok(()) +} + +/// [`RangePruner`]s for each column +type ColumnRangePruners = Vec<(usize, Arc)>; + +/// Build page pruner of each column. +/// Only one column expression can be used to build the page pruner. +fn build_column_page_pruners( + func_ctx: FunctionContext, + schema: &TableSchemaRef, + filters: &[Expr], +) -> Result { + let mut pruner_per_col: HashMap>> = HashMap::new(); + for expr in filters { + let columns = expr.column_refs(); + if columns.len() != 1 { + continue; + } + let (col_name, _) = columns.iter().next().unwrap(); + pruner_per_col + .entry(col_name.to_string()) + .and_modify(|f| f.push(expr.clone())) + .or_insert_with(|| vec![expr.clone()]); + } + pruner_per_col + .iter() + .map(|(k, v)| { + let filter = RangePrunerCreator::try_create(func_ctx, Some(v), schema)?; + let col_idx = schema.index_of(k)?; + Ok((col_idx, filter)) + }) + .collect() +} + +/// Filter pages by filter expression. +/// +/// Returns the final selection of rows. +fn filter_pages( + reader: &mut R, + schema: &TableSchemaRef, + row_group: &RowGroupMetaData, + pruners: &ColumnRangePruners, +) -> Result> { + let mut fields = Vec::with_capacity(pruners.len()); + for (col_idx, _) in pruners { + let field: ArrowField = schema.field(*col_idx).into(); + fields.push(field); + } + + let num_rows = row_group.num_rows(); + + // one vec per column + let locations = read_pages_locations(reader, row_group.columns())?; + // one Vec> per field (non-nested contain a single entry on the first column) + let locations = fields + .iter() + .map(|field| get_field_pages(row_group.columns(), &locations, &field.name)) + .collect::>(); + + // one ColumnPageStatistics per field + let page_stats = read_columns_indexes(reader, row_group.columns(), &fields)?; + + let intervals = locations + .iter() + .map(|locations| { + locations + .iter() + .map(|locations| Ok(compute_page_row_intervals(locations, num_rows)?)) + .collect::>>() + }) + .collect::>>()?; + + // Currently, only non-nested types are supported. + let mut row_selections = Vec::with_capacity(pruners.len()); + for (i, (col_offset, pruner)) in pruners.iter().enumerate() { + let stat = &page_stats[i]; + let page_intervals = &intervals[i][0]; + let data_type = schema.field(*col_offset).data_type(); + + let mut row_selection = vec![]; + match stat { + FieldPageStatistics::Single(stats) => { + let stats = BatchStatistics::from_column_statistics(stats, &data_type.into())?; + for (page_num, intv) in page_intervals.iter().enumerate() { + let stat = stats.get(page_num); + if pruner.should_keep(&HashMap::from([(*col_offset as u32, stat)])) { + row_selection.push(*intv); + } + } + } + _ => { + return Err(ErrorCode::Internal( + "Only non-nested types are supported in page filter.", + )); + } + } + row_selections.push(row_selection); + } + + Ok(combine_intervals(row_selections)) +} + +/// Combine row selection of each column into a final selection of the whole row group. +fn combine_intervals(row_selections: Vec>) -> Vec { + if row_selections.is_empty() { + return vec![]; + } + let mut selection = row_selections[0].clone(); + for sel in row_selections.iter().skip(1) { + selection = and_intervals(&selection, sel); + } + + // Merge intervals if they are consecutive + let mut res = vec![]; + for sel in selection { + if res.is_empty() { + res.push(sel); + continue; + } + let back = res.last_mut().unwrap(); + if back.start + back.length == sel.start { + back.length += sel.length; + } else { + res.push(sel); + } + } + + res +} + +/// Do "and" operation on two row selections. +/// Select the rows which both `sel1` and `sel2` select. +fn and_intervals(sel1: &[Interval], sel2: &[Interval]) -> Vec { + let mut res = vec![]; + + for sel in sel1 { + res.extend(is_in(*sel, sel2)); + } + res +} + +/// If `probe` overlaps with `intervals`, +/// return the overlapping part of `probe` in `intervals`. +/// Otherwise, return an empty vector. +fn is_in(probe: Interval, intervals: &[Interval]) -> Vec { + intervals + .iter() + .filter_map(|interval| { + let interval_end = interval.start + interval.length; + let probe_end = probe.start + probe.length; + let overlaps = (probe.start < interval_end) && (probe_end > interval.start); + if overlaps { + let start = interval.start.max(probe.start); + let end = interval_end.min(probe_end); + Some(Interval::new(start, end - start)) + } else { + None + } + }) + .collect() +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use common_arrow::parquet::compression::CompressionOptions; + use common_arrow::parquet::encoding::hybrid_rle::encode_bool; + use common_arrow::parquet::encoding::Encoding; + use common_arrow::parquet::indexes::Interval; + use common_arrow::parquet::metadata::Descriptor; + use common_arrow::parquet::metadata::SchemaDescriptor; + use common_arrow::parquet::page::DataPage; + use common_arrow::parquet::page::DataPageHeader; + use common_arrow::parquet::page::DataPageHeaderV1; + use common_arrow::parquet::page::Page; + use common_arrow::parquet::read::read_metadata; + use common_arrow::parquet::schema::types::ParquetType; + use common_arrow::parquet::schema::types::PhysicalType; + use common_arrow::parquet::statistics::serialize_statistics; + use common_arrow::parquet::statistics::PrimitiveStatistics; + use common_arrow::parquet::statistics::Statistics; + use common_arrow::parquet::types::NativeType; + use common_arrow::parquet::write::Compressor; + use common_arrow::parquet::write::DynIter; + use common_arrow::parquet::write::DynStreamingIterator; + use common_arrow::parquet::write::FileWriter; + use common_arrow::parquet::write::Version; + use common_arrow::parquet::write::WriteOptions; + use common_exception::Result; + use common_expression::types::DataType; + use common_expression::types::NumberDataType; + use common_expression::FunctionContext; + use common_expression::Literal; + use common_expression::TableDataType; + use common_expression::TableField; + use common_expression::TableSchemaRef; + use common_expression::TableSchemaRefExt; + use common_sql::plans::BoundColumnRef; + use common_sql::plans::ConstantExpr; + use common_sql::plans::FunctionCall; + use common_sql::plans::Scalar; + use common_sql::ColumnBinding; + use common_sql::Visibility; + use common_storage::ColumnLeaves; + use storages_common_pruner::RangePrunerCreator; + + use crate::pruning::and_intervals; + use crate::pruning::build_column_page_pruners; + use crate::pruning::combine_intervals; + use crate::pruning::filter_pages; + use crate::statistics::collect_row_group_stats; + + #[test] + fn test_and_intervals() { + // [12, 35), [38, 43) + let sel1 = vec![Interval::new(12, 23), Interval::new(38, 5)]; + // [0, 5), [9, 24), [30, 40) + let sel2 = vec![ + Interval::new(0, 5), + Interval::new(9, 15), + Interval::new(30, 10), + ]; + + // [12, 24), [30, 35), [38, 40) + let expected = vec![ + Interval::new(12, 12), + Interval::new(30, 5), + Interval::new(38, 2), + ]; + let actual = and_intervals(&sel1, &sel2); + + assert_eq!(expected, actual); + } + + #[test] + fn test_combine_intervals() { + { + // sel1: [12, 35), [38, 43) + // sel2: [0, 5), [9, 24), [30, 40) + // sel3: [1,2), [4, 31), [30, 41) + let intervals = vec![ + vec![Interval::new(12, 23), Interval::new(38, 5)], + vec![ + Interval::new(0, 5), + Interval::new(9, 15), + Interval::new(30, 10), + ], + vec![ + Interval::new(1, 1), + Interval::new(4, 27), + Interval::new(39, 2), + ], + ]; + + // [12, 24), [30, 31), [39, 40) + let expected = vec![ + Interval::new(12, 12), + Interval::new(30, 1), + Interval::new(39, 1), + ]; + + let actual = combine_intervals(intervals); + + assert_eq!(expected, actual); + } + + { + // sel1: [1,2), [2, 4), [4, 7) + let intervals = vec![vec![ + Interval::new(1, 1), + Interval::new(2, 2), + Interval::new(4, 3), + ]]; + + // [12, 24), [30, 31), [39, 40) + let expected = vec![Interval::new(1, 6)]; + + let actual = combine_intervals(intervals); + + assert_eq!(expected, actual); + } + } + + fn unzip_option( + array: &[Option], + ) -> common_arrow::parquet::error::Result<(Vec, Vec)> { + // leave the first 4 bytes anouncing the length of the def level + // this will be overwritten at the end, once the length is known. + // This is unknown at this point because of the uleb128 encoding, + // whose length is variable. + let mut validity = std::io::Cursor::new(vec![0; 4]); + validity.set_position(4); + + let mut values = vec![]; + let iter = array.iter().map(|value| { + if let Some(item) = value { + values.extend_from_slice(item.to_le_bytes().as_ref()); + true + } else { + false + } + }); + encode_bool(&mut validity, iter)?; + + // write the length, now that it is known + let mut validity = validity.into_inner(); + let length = validity.len() - 4; + // todo: pay this small debt (loop?) + let length = length.to_le_bytes(); + validity[0] = length[0]; + validity[1] = length[1]; + validity[2] = length[2]; + validity[3] = length[3]; + + Ok((values, validity)) + } + + pub fn array_to_page_v1( + array: &[Option], + options: &WriteOptions, + descriptor: &Descriptor, + ) -> common_arrow::parquet::error::Result { + let (values, mut buffer) = unzip_option(array)?; + + buffer.extend_from_slice(&values); + + let statistics = if options.write_statistics { + let statistics = &PrimitiveStatistics { + primitive_type: descriptor.primitive_type.clone(), + null_count: Some((array.len() - array.iter().flatten().count()) as i64), + distinct_count: None, + max_value: array.iter().flatten().max_by(|x, y| x.ord(y)).copied(), + min_value: array.iter().flatten().min_by(|x, y| x.ord(y)).copied(), + } as &dyn Statistics; + Some(serialize_statistics(statistics)) + } else { + None + }; + + let header = DataPageHeaderV1 { + num_values: array.len() as i32, + encoding: Encoding::Plain.into(), + definition_level_encoding: Encoding::Rle.into(), + repetition_level_encoding: Encoding::Rle.into(), + statistics, + }; + + Ok(Page::Data(DataPage::new( + DataPageHeader::V1(header), + buffer, + descriptor.clone(), + Some(array.len()), + ))) + } + + fn write_test_parquet() -> Result<(TableSchemaRef, Vec)> { + let page1 = vec![Some(0), Some(1), None, Some(3), Some(4), Some(5), Some(6)]; + let page2 = vec![Some(10), Some(11)]; + + let options = WriteOptions { + write_statistics: true, + version: Version::V1, + }; + + let schema = SchemaDescriptor::new("schema".to_string(), vec![ParquetType::from_physical( + "col1".to_string(), + PhysicalType::Int32, + )]); + + let pages = vec![ + array_to_page_v1::(&page1, &options, &schema.columns()[0].descriptor), + array_to_page_v1::(&page2, &options, &schema.columns()[0].descriptor), + ]; + + let pages = DynStreamingIterator::new(Compressor::new( + DynIter::new(pages.into_iter()), + CompressionOptions::Uncompressed, + vec![], + )); + let columns = std::iter::once(Ok(pages)); + + let writer = Cursor::new(vec![]); + let mut writer = FileWriter::new(writer, schema, options, None); + + writer.write(DynIter::new(columns))?; + writer.end(None)?; + + Ok(( + TableSchemaRefExt::create(vec![TableField::new( + "col1", + TableDataType::Number(NumberDataType::Int32), + )]), + writer.into_inner().into_inner(), + )) + } + + #[test] + fn test_prune_row_group() -> Result<()> { + let (schema, data) = write_test_parquet()?; + let mut reader = Cursor::new(data); + let metadata = read_metadata(&mut reader)?; + let rgs = metadata.row_groups; + let arrow_schema = schema.to_arrow(); + let column_leaves = ColumnLeaves::new_from_schema(&arrow_schema); + + let row_group_stats = collect_row_group_stats(&column_leaves, &rgs)?; + + // col1 > 12 + { + let filter = Scalar::FunctionCall(FunctionCall { + params: vec![], + arguments: vec![ + Scalar::BoundColumnRef(BoundColumnRef { + column: ColumnBinding { + database_name: None, + table_name: None, + column_name: "col1".to_string(), + index: 0, + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + visibility: Visibility::Visible, + }, + }), + Scalar::ConstantExpr(ConstantExpr { + value: Literal::Int32(12), + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + }), + ], + func_name: "gt".to_string(), + return_type: Box::new(DataType::Boolean), + }); + let filters = vec![filter.as_expr()?]; + let pruner = RangePrunerCreator::try_create( + FunctionContext::default(), + Some(&filters), + &schema, + )?; + assert!(!pruner.should_keep(&row_group_stats[0])); + } + + // col1 < 0 + { + let filter = Scalar::FunctionCall(FunctionCall { + params: vec![], + arguments: vec![ + Scalar::BoundColumnRef(BoundColumnRef { + column: ColumnBinding { + database_name: None, + table_name: None, + column_name: "col1".to_string(), + index: 0, + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + visibility: Visibility::Visible, + }, + }), + Scalar::ConstantExpr(ConstantExpr { + value: Literal::Int32(0), + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + }), + ], + func_name: "lt".to_string(), + return_type: Box::new(DataType::Boolean), + }); + let filters = vec![filter.as_expr()?]; + let pruner = RangePrunerCreator::try_create( + FunctionContext::default(), + Some(&filters), + &schema, + )?; + assert!(!pruner.should_keep(&row_group_stats[0])); + } + + // col1 <= 5 + { + let filter = Scalar::FunctionCall(FunctionCall { + params: vec![], + arguments: vec![ + Scalar::BoundColumnRef(BoundColumnRef { + column: ColumnBinding { + database_name: None, + table_name: None, + column_name: "col1".to_string(), + index: 0, + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + visibility: Visibility::Visible, + }, + }), + Scalar::ConstantExpr(ConstantExpr { + value: Literal::Int32(5), + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + }), + ], + func_name: "lte".to_string(), + return_type: Box::new(DataType::Boolean), + }); + let filters = vec![filter.as_expr()?]; + let pruner = RangePrunerCreator::try_create( + FunctionContext::default(), + Some(&filters), + &schema, + )?; + assert!(pruner.should_keep(&row_group_stats[0])); + } + + Ok(()) + } + + #[test] + fn test_filter_pages() -> Result<()> { + let (schema, data) = write_test_parquet()?; + let mut reader = Cursor::new(data); + let metadata = read_metadata(&mut reader)?; + let rg = &metadata.row_groups[0]; + + // col1 > 12 + { + let filter = Scalar::FunctionCall(FunctionCall { + params: vec![], + arguments: vec![ + Scalar::BoundColumnRef(BoundColumnRef { + column: ColumnBinding { + database_name: None, + table_name: None, + column_name: "col1".to_string(), + index: 0, + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + visibility: Visibility::Visible, + }, + }), + Scalar::ConstantExpr(ConstantExpr { + value: Literal::Int32(12), + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + }), + ], + func_name: "gt".to_string(), + return_type: Box::new(DataType::Boolean), + }); + let filters = vec![filter.as_expr()?]; + let pruners = build_column_page_pruners(FunctionContext::default(), &schema, &filters)?; + let row_selection = filter_pages(&mut reader, &schema, rg, &pruners)?; + + assert_eq!(Vec::::new(), row_selection); + } + + // col1 <= 5 + { + let filter = Scalar::FunctionCall(FunctionCall { + params: vec![], + arguments: vec![ + Scalar::BoundColumnRef(BoundColumnRef { + column: ColumnBinding { + database_name: None, + table_name: None, + column_name: "col1".to_string(), + index: 0, + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + visibility: Visibility::Visible, + }, + }), + Scalar::ConstantExpr(ConstantExpr { + value: Literal::Int32(5), + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + }), + ], + func_name: "lte".to_string(), + return_type: Box::new(DataType::Boolean), + }); + let filters = vec![filter.as_expr()?]; + let pruners = build_column_page_pruners(FunctionContext::default(), &schema, &filters)?; + let row_selection = filter_pages(&mut reader, &schema, rg, &pruners)?; + + assert_eq!(vec![Interval::new(0, 7)], row_selection); + } + + // col1 > 10 + { + let filter = Scalar::FunctionCall(FunctionCall { + params: vec![], + arguments: vec![ + Scalar::BoundColumnRef(BoundColumnRef { + column: ColumnBinding { + database_name: None, + table_name: None, + column_name: "col1".to_string(), + index: 0, + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + visibility: Visibility::Visible, + }, + }), + Scalar::ConstantExpr(ConstantExpr { + value: Literal::Int32(10), + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + }), + ], + func_name: "gt".to_string(), + return_type: Box::new(DataType::Boolean), + }); + let filters = vec![filter.as_expr()?]; + let pruners = build_column_page_pruners(FunctionContext::default(), &schema, &filters)?; + let row_selection = filter_pages(&mut reader, &schema, rg, &pruners)?; + + assert_eq!(vec![Interval::new(7, 2)], row_selection); + } + + // col1 <= 10 + { + let filter = Scalar::FunctionCall(FunctionCall { + params: vec![], + arguments: vec![ + Scalar::BoundColumnRef(BoundColumnRef { + column: ColumnBinding { + database_name: None, + table_name: None, + column_name: "col1".to_string(), + index: 0, + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + visibility: Visibility::Visible, + }, + }), + Scalar::ConstantExpr(ConstantExpr { + value: Literal::Int32(10), + data_type: Box::new(DataType::Number(NumberDataType::Int32)), + }), + ], + func_name: "lte".to_string(), + return_type: Box::new(DataType::Boolean), + }); + let filters = vec![filter.as_expr()?]; + let pruners = build_column_page_pruners(FunctionContext::default(), &schema, &filters)?; + let row_selection = filter_pages(&mut reader, &schema, rg, &pruners)?; + + assert_eq!(vec![Interval::new(0, 9)], row_selection); + } + + Ok(()) + } +} diff --git a/src/query/storages/parquet/src/read_options.rs b/src/query/storages/parquet/src/read_options.rs new file mode 100644 index 0000000000000..fe537e9605121 --- /dev/null +++ b/src/query/storages/parquet/src/read_options.rs @@ -0,0 +1,87 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[derive(Copy, Clone, Debug, Default)] +pub struct ReadOptions { + /// Prune row groups before reading. Require Chunk level statistics. + /// Filter row groups don't need to read. + prune_row_groups: bool, + /// Prune pages before reading. Require Page level statistics. + /// Filter rows don't need to read. + prune_pages: bool, + /// If use prewhere filter. + do_prewhere: bool, + /// If push down bitmap generated by prewhere reader to remain reader. + /// If true, when remain reader deserializing, + /// it will skip part of decompression and decoding according to the bitmap. + /// + /// Notice: + /// + /// - `push_down_bitmap` and `prune_pages` are exclusive. (`push_down_bitmap && prune_pages == false`) + /// - If `do_prewhere` is disabled, `push_down_bitmap` is useless. + push_down_bitmap: bool, +} + +impl ReadOptions { + #[inline] + pub fn new() -> Self { + ReadOptions::default() + } + + #[inline] + pub fn with_prune_row_groups(mut self) -> Self { + self.prune_row_groups = true; + self + } + + #[inline] + pub fn with_prune_pages(mut self) -> Self { + self.prune_pages = true; + self.push_down_bitmap = false; + self + } + + #[inline] + pub fn with_push_down_bitmap(mut self) -> Self { + self.push_down_bitmap = true; + self.prune_pages = false; + self + } + + #[inline] + pub fn with_do_prewhere(mut self) -> Self { + self.do_prewhere = true; + self + } + + #[inline] + pub fn prune_row_groups(&self) -> bool { + self.prune_row_groups + } + + #[inline] + pub fn prune_pages(&self) -> bool { + self.prune_pages + } + + #[inline] + pub fn push_down_bitmap(&self) -> bool { + self.push_down_bitmap + } + + #[inline] + pub fn do_prewhere(&self) -> bool { + self.do_prewhere + } +} diff --git a/src/query/storages/parquet/src/parquet_reader/meta.rs b/src/query/storages/parquet/src/statistics.rs similarity index 50% rename from src/query/storages/parquet/src/parquet_reader/meta.rs rename to src/query/storages/parquet/src/statistics.rs index 78d39a178299c..93b765b42089a 100644 --- a/src/query/storages/parquet/src/parquet_reader/meta.rs +++ b/src/query/storages/parquet/src/statistics.rs @@ -1,4 +1,4 @@ -// Copyright 2022 Datafuse Labs. +// Copyright 2023 Datafuse Labs. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,12 +13,10 @@ // limitations under the License. use std::collections::HashMap; -use std::fs::File; use common_arrow::arrow::array::UInt64Array; use common_arrow::arrow::buffer::Buffer; use common_arrow::arrow::io::parquet::read as pread; -use common_arrow::parquet::metadata::FileMetaData; use common_arrow::parquet::metadata::RowGroupMetaData; use common_exception::ErrorCode; use common_exception::Result; @@ -29,57 +27,41 @@ use common_storage::ColumnLeaves; use storages_common_table_meta::meta::ColumnStatistics; use storages_common_table_meta::meta::StatisticsOfColumns; -use crate::ParquetReader; +/// Collect statistics of a batch of row groups of the specified columns. +/// +/// The retuened vector's length is the same as `rgs`. +pub fn collect_row_group_stats( + column_leaves: &ColumnLeaves, + rgs: &[RowGroupMetaData], +) -> Result> { + let mut stats = Vec::with_capacity(rgs.len()); + let mut stats_of_row_groups = HashMap::with_capacity(rgs.len()); -impl ParquetReader { - pub fn read_meta(location: &str) -> Result { - let mut file = File::open(location).map_err(|e| { - ErrorCode::Internal(format!("Failed to open file '{}': {}", location, e)) - })?; - pread::read_metadata(&mut file).map_err(|e| { - ErrorCode::Internal(format!( - "Read parquet file '{}''s meta error: {}", - location, e - )) - }) + // Each row_group_stat is a `HashMap` holding key-value pairs. + // The first element of the pair is the offset in the schema, + // and the second element is the statistics of the column (according to the offset) + // `column_leaves` is parallel to the schema, so we can iterate `column_leaves` directly. + for (index, column_leaf) in column_leaves.column_leaves.iter().enumerate() { + let field = &column_leaf.field; + let table_type: TableDataType = field.into(); + let data_type = (&table_type).into(); + let column_stats = pread::statistics::deserialize(field, rgs)?; + stats_of_row_groups.insert( + index, + BatchStatistics::from_statistics(&column_stats, &data_type)?, + ); } - /// Collect statistics of a batch of row groups of the specified columns. - /// - /// The retuened vector's length is the same as `rgs`. - pub fn collect_row_group_stats( - column_leaves: &ColumnLeaves, - rgs: &[RowGroupMetaData], - ) -> Result> { - let mut stats = Vec::with_capacity(rgs.len()); - let mut stats_of_row_groups = HashMap::with_capacity(rgs.len()); - - // Each row_group_stat is a `HashMap` holding key-value pairs. - // The first element of the pair is the offset in the schema, - // and the second element is the statistics of the column (according to the offset) - // `column_leaves` is parallel to the schema, so we can iterate `column_leaves` directly. - for (index, column_leaf) in column_leaves.column_leaves.iter().enumerate() { - let field = &column_leaf.field; - let table_type: TableDataType = field.into(); - let data_type = (&table_type).into(); - let column_stats = pread::statistics::deserialize(field, rgs)?; - stats_of_row_groups.insert( - index, - BatchStatistics::from_statistics(column_stats, &data_type)?, - ); + for (rg_idx, _) in rgs.iter().enumerate() { + let mut cols_stats = HashMap::with_capacity(stats.capacity()); + for index in 0..column_leaves.column_leaves.len() { + let col_stats = stats_of_row_groups[&index].get(rg_idx); + cols_stats.insert(index as u32, col_stats); } - - for (rg_idx, _) in rgs.iter().enumerate() { - let mut cols_stats = HashMap::with_capacity(stats.capacity()); - for index in 0..column_leaves.column_leaves.len() { - let col_stats = stats_of_row_groups[&index].get(rg_idx); - cols_stats.insert(index as u32, col_stats); - } - stats.push(cols_stats); - } - - Ok(stats) + stats.push(cols_stats); } + + Ok(stats) } /// A temporary struct to present [`pread::statistics::Statistics`]. @@ -87,7 +69,7 @@ impl ParquetReader { /// Convert the inner fields into Databend data structures. pub struct BatchStatistics { pub null_count: Buffer, - pub distinct_count: Buffer, + pub distinct_count: Option>, pub min_values: Column, pub max_values: Column, } @@ -99,12 +81,12 @@ impl BatchStatistics { max: unsafe { self.max_values.index_unchecked(index).to_owned() }, null_count: self.null_count[index], in_memory_size: 0, // this field is not used. - distinct_of_values: Some(self.distinct_count[index]), + distinct_of_values: self.distinct_count.as_ref().map(|d| d[index]), } } pub fn from_statistics( - stats: pread::statistics::Statistics, + stats: &pread::statistics::Statistics, data_type: &DataType, ) -> Result { let null_count = stats @@ -123,14 +105,8 @@ impl BatchStatistics { .distinct_count .as_any() .downcast_ref::() - .ok_or_else(|| { - ErrorCode::Internal(format!( - "distinct_count should be UInt64Array, but is {:?}", - stats.distinct_count.data_type() - )) - })? - .values() - .clone(); + .map(|d| d.values()) + .cloned(); let min_values = Column::from_arrow(&*stats.min_value, data_type); let max_values = Column::from_arrow(&*stats.max_value, data_type); Ok(Self { @@ -140,4 +116,19 @@ impl BatchStatistics { max_values, }) } + + pub fn from_column_statistics( + stats: &pread::indexes::ColumnPageStatistics, + data_type: &DataType, + ) -> Result { + let null_count = stats.null_count.values().clone(); + let min_values = Column::from_arrow(&*stats.min, data_type); + let max_values = Column::from_arrow(&*stats.max, data_type); + Ok(Self { + null_count, + distinct_count: None, + min_values, + max_values, + }) + } } diff --git a/src/query/storages/parquet/src/table_function/read.rs b/src/query/storages/parquet/src/table_function/read.rs index ffc42d0be86e6..9faae7b90759a 100644 --- a/src/query/storages/parquet/src/table_function/read.rs +++ b/src/query/storages/parquet/src/table_function/read.rs @@ -12,29 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::Arc; use common_catalog::plan::DataSourcePlan; -use common_catalog::plan::Partitions; -use common_catalog::plan::PartitionsShuffleKind; use common_catalog::plan::Projection; use common_catalog::plan::PushDownInfo; use common_exception::Result; +use common_expression::ConstantFolder; use common_expression::DataSchema; use common_expression::Expr; +use common_expression::FunctionContext; use common_functions::scalars::BUILTIN_FUNCTIONS; use common_pipeline_core::Pipeline; -use storages_common_pruner::RangePrunerCreator; use super::table::arrow_to_table_schema; use super::ParquetTable; use super::TableContext; -use crate::parquet_part::ColumnMeta; -use crate::parquet_part::ParquetRowGroupPart; -use crate::ParquetLocationPart; -use crate::ParquetReader; -use crate::ParquetSource; +use crate::parquet_part::ParquetLocationPart; +use crate::parquet_reader::ParquetReader; +use crate::parquet_source::ParquetSource; +use crate::pruning::prune_and_set_partitions; impl ParquetTable { pub fn create_reader(&self, projection: Projection) -> Result> { @@ -56,7 +53,7 @@ impl ParquetTable { // Build the prewhere filter expression. fn build_prewhere_filter_expr( &self, - _ctx: Arc, + ctx: FunctionContext, plan: &DataSourcePlan, schema: &DataSchema, ) -> Result>> { @@ -64,7 +61,10 @@ impl ParquetTable { match PushDownInfo::prewhere_of_push_downs(&plan.push_downs) { None => Arc::new(None), Some(v) => Arc::new(v.filter.as_expr(&BUILTIN_FUNCTIONS).map(|expr| { - expr.project_column_ref(|name| schema.column_with_name(name).unwrap().0) + let expr = + expr.project_column_ref(|name| schema.column_with_name(name).unwrap().0); + let (expr, _) = ConstantFolder::fold(&expr, ctx, &BUILTIN_FUNCTIONS); + expr })), }, ) @@ -127,7 +127,7 @@ impl ParquetTable { // `projected_column_leaves` contains the smallest column set that is needed for the query. // Use `projected_arrow_schema` to create `row_group_pruner` (`RangePruner`). // - // During evaluation, + // During pruning evaluation, // `RangePruner` will use field name to find the offset in the schema, // and use the offset to find the column stat from `StatisticsOfColumns` (HashMap). // @@ -135,73 +135,27 @@ impl ParquetTable { let (projected_arrow_schema, projected_column_leaves, _, columns_to_read) = ParquetReader::do_projection(&self.arrow_schema, &columns_to_read)?; let schema = Arc::new(arrow_to_table_schema(projected_arrow_schema)); + let filters = push_downs.as_ref().map(|extra| { + extra + .filters + .iter() + .map(|f| f.as_expr(&BUILTIN_FUNCTIONS).unwrap()) + .collect::>() + }); - pipeline.set_on_init(move || { - let mut partitions = Vec::with_capacity(locations.len()); - - // build row group pruner. - let filter_exprs = push_downs.as_ref().map(|extra| { - extra - .filters - .iter() - .map(|f| f.as_expr(&BUILTIN_FUNCTIONS).unwrap()) - .collect::>() - }); - let row_group_pruner = - RangePrunerCreator::try_create(&ctx_ref, filter_exprs.as_deref(), &schema)?; - - for location in &locations { - let file_meta = ParquetReader::read_meta(location)?; - let mut row_group_pruned = vec![false; file_meta.row_groups.len()]; - - let no_stats = file_meta.row_groups.iter().any(|r| { - r.columns() - .iter() - .any(|c| c.metadata().statistics.is_none()) - }); - - if !skip_pruning && !no_stats { - // If collecting stats fails or `should_keep` is true, we still read the row group. - // Otherwise, the row group will be pruned. - if let Ok(row_group_stats) = ParquetReader::collect_row_group_stats( - &projected_column_leaves, - &file_meta.row_groups, - ) { - for (idx, (stats, _rg)) in row_group_stats - .iter() - .zip(file_meta.row_groups.iter()) - .enumerate() - { - row_group_pruned[idx] = !row_group_pruner.should_keep(stats); - } - } - } + let read_options = self.read_options; - for (idx, rg) in file_meta.row_groups.iter().enumerate() { - if row_group_pruned[idx] { - continue; - } - let mut column_metas = HashMap::with_capacity(columns_to_read.len()); - for index in &columns_to_read { - let c = &rg.columns()[*index]; - let (offset, length) = c.byte_range(); - column_metas.insert(*index, ColumnMeta { - offset, - length, - compression: c.compression().into(), - }); - } - - partitions.push(ParquetRowGroupPart::create( - location.clone(), - rg.num_rows(), - column_metas, - )) - } - } - ctx_ref - .try_set_partitions(Partitions::create(PartitionsShuffleKind::Mod, partitions))?; - Ok(()) + pipeline.set_on_init(move || { + prune_and_set_partitions( + &ctx_ref, + &locations, + &schema, + &filters.as_deref(), + &columns_to_read, + &projected_column_leaves, + skip_pruning, + read_options, + ) }); // If there is a `PrewhereInfo`, the final output should be `PrehwereInfo.output_columns`. @@ -217,8 +171,11 @@ impl ParquetTable { )); let prewhere_reader = self.build_prewhere_reader(plan)?; - let prewhere_filter = - self.build_prewhere_filter_expr(ctx.clone(), plan, prewhere_reader.output_schema())?; + let prewhere_filter = self.build_prewhere_filter_expr( + ctx.try_get_function_context()?, + plan, + prewhere_reader.output_schema(), + )?; let remain_reader = self.build_remain_reader(plan)?; // Add source pipe. @@ -231,6 +188,7 @@ impl ParquetTable { prewhere_reader.clone(), prewhere_filter.clone(), remain_reader.clone(), + self.read_options, ) }, max_io_requests, diff --git a/src/query/storages/parquet/src/table_function/table.rs b/src/query/storages/parquet/src/table_function/table.rs index f150e6835fcc5..42c7eed4db319 100644 --- a/src/query/storages/parquet/src/table_function/table.rs +++ b/src/query/storages/parquet/src/table_function/table.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::fs::File; use std::sync::Arc; use chrono::NaiveDateTime; @@ -42,8 +43,8 @@ use common_pipeline_core::Pipeline; use opendal::Operator; use super::TableContext; -use crate::ParquetLocationPart; -use crate::ParquetReader; +use crate::parquet_part::ParquetLocationPart; +use crate::ReadOptions; pub struct ParquetTable { table_args: Vec, @@ -52,6 +53,7 @@ pub struct ParquetTable { pub(super) table_info: TableInfo, pub(super) arrow_schema: ArrowSchema, pub(super) operator: Operator, + pub(super) read_options: ReadOptions, } impl ParquetTable { @@ -110,7 +112,16 @@ impl ParquetTable { // Infer schema from the first parquet file. // Assume all parquet files have the same schema. // If not, throw error during reading. - let first_meta = ParquetReader::read_meta(&file_locations[0])?; + let location = &file_locations[0]; + let mut file = File::open(location).map_err(|e| { + ErrorCode::Internal(format!("Failed to open file '{}': {}", location, e)) + })?; + let first_meta = pread::read_metadata(&mut file).map_err(|e| { + ErrorCode::Internal(format!( + "Read parquet file '{}''s meta error: {}", + location, e + )) + })?; let arrow_schema = pread::infer_schema(&first_meta)?; let table_info = TableInfo { @@ -141,6 +152,10 @@ impl ParquetTable { table_info, arrow_schema, operator, + read_options: ReadOptions::new() + .with_prune_row_groups() + .with_prune_pages() + .with_do_prewhere(), // Now, `read_options` is hard-coded. })) } } @@ -160,7 +175,7 @@ impl Table for ParquetTable { } fn support_prewhere(&self) -> bool { - true + self.read_options.do_prewhere() } fn has_exact_total_row_count(&self) -> bool {