diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index dad12c1c6bc91..1b0e9ca2ba67e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -709,6 +709,13 @@ config_namespace! { /// reduce the number of rows decoded. This optimization is sometimes called "late materialization". pub pushdown_filters: bool, default = false + /// (reading) If true, dynamic filter expressions (from operators like + /// TopK, Join, Aggregate) are included when building parquet row-level + /// filters (RowFilter / late materialization). If false, dynamic filters + /// are only used for pruning (file, row group, page level) and are + /// excluded before building the RowFilter. + pub dynamic_filter_pushdown: bool, default = false + /// (reading) If true, filter expressions evaluated during the parquet decoding operation /// will be reordered heuristically to minimize the cost of evaluation. If false, /// the filters are applied in the same order as written in the query diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f6608d16c1022..1760fce7136f1 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -209,6 +209,7 @@ impl ParquetOptions { coerce_int96: _, // not used for writer props skip_arrow_metadata: _, max_predicate_cache_size: _, + dynamic_filter_pushdown: _, } = self; let mut builder = WriterProperties::builder() @@ -460,6 +461,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + dynamic_filter_pushdown: defaults.dynamic_filter_pushdown, } } @@ -574,6 +576,7 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + dynamic_filter_pushdown: global_options_defaults.dynamic_filter_pushdown, }, column_specific_options, key_value_metadata, diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index b3ed8d9653fe1..a3c5e40c45768 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -1730,6 +1730,7 @@ async fn test_topk_dynamic_filter_pushdown_integration() { let store = Arc::new(InMemory::new()) as Arc; let mut cfg = SessionConfig::new(); cfg.options_mut().execution.parquet.pushdown_filters = true; + cfg.options_mut().execution.parquet.dynamic_filter_pushdown = true; cfg.options_mut().execution.parquet.max_row_group_size = 128; let ctx = SessionContext::new_with_config(cfg); ctx.register_object_store( diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..e8eabf0d4bb0a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -92,6 +92,9 @@ pub(super) struct ParquetOpener { /// Should the filters be evaluated during the parquet scan using /// [`DataFusionArrowPredicate`](row_filter::DatafusionArrowPredicate)? pub pushdown_filters: bool, + /// Should dynamic filter expressions be included in the row-level filter? + /// If false, dynamic filters are stripped before building the RowFilter. + pub dynamic_filter_pushdown: bool, /// Should the filters be reordered to optimize the scan? pub reorder_filters: bool, /// Should we force the reader to use RowSelections for filtering @@ -262,6 +265,7 @@ impl FileOpener for ParquetOpener { let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; + let dynamic_filter_pushdown = self.dynamic_filter_pushdown; let force_filter_selections = self.force_filter_selections; let coerce_int96 = self.coerce_int96; let enable_bloom_filter = self.enable_bloom_filter; @@ -461,25 +465,36 @@ impl FileOpener for ParquetOpener { // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { - let row_filter = row_filter::build_row_filter( - &predicate, - &physical_file_schema, - builder.metadata(), - reorder_predicates, - &file_metrics, - ); - - match row_filter { - Ok(Some(filter)) => { - builder = builder.with_row_filter(filter); - } - Ok(None) => {} - Err(e) => { - debug!( - "Ignoring error building row filter for '{predicate:?}': {e}" - ); - } + // If dynamic filter row-level filtering is disabled, strip + // dynamic filter expressions before building the RowFilter. + // Dynamic filters will still be used for pruning (file, row + // group, page level) above. + let predicate = if !dynamic_filter_pushdown { + strip_dynamic_filters(&predicate) + } else { + Some(predicate) }; + if let Some(predicate) = predicate { + let row_filter = row_filter::build_row_filter( + &predicate, + &physical_file_schema, + builder.metadata(), + reorder_predicates, + &file_metrics, + ); + + match row_filter { + Ok(Some(filter)) => { + builder = builder.with_row_filter(filter); + } + Ok(None) => {} + Err(e) => { + debug!( + "Ignoring error building row filter for '{predicate:?}': {e}" + ); + } + }; + } }; if force_filter_selections { builder = @@ -1000,6 +1015,43 @@ async fn load_page_index( } } +/// Strips dynamic filter expressions from a predicate, replacing them with +/// `lit(true)`, then simplifying the result. +/// Returns `None` if the entire predicate becomes trivially true after stripping. +fn strip_dynamic_filters( + predicate: &Arc, +) -> Option> { + use datafusion_common::tree_node::{Transformed, TreeNode}; + use datafusion_physical_expr::expressions::lit; + + let result = Arc::clone(predicate) + .transform_up(|expr| { + // Check if this individual node is dynamic (has non-zero generation) + if expr.snapshot_generation() != 0 { + Ok(Transformed::yes(lit(true))) + } else { + Ok(Transformed::no(expr)) + } + }) + .expect("strip_dynamic_filters transform is infallible"); + + if !result.transformed { + // No dynamic filters found, return the predicate as-is + return Some(Arc::clone(predicate)); + } + + let expr = result.data; + // If the entire predicate simplified to a literal true, return None + if let Some(literal) = expr + .as_any() + .downcast_ref::() + && literal.value() == &ScalarValue::Boolean(Some(true)) + { + return None; + } + Some(expr) +} + fn should_enable_page_index( enable_page_index: bool, page_pruning_predicate: &Option>, @@ -1055,6 +1107,7 @@ mod test { metadata_size_hint: Option, metrics: ExecutionPlanMetricsSet, pushdown_filters: bool, + dynamic_filter_pushdown: bool, reorder_filters: bool, force_filter_selections: bool, enable_page_index: bool, @@ -1081,6 +1134,7 @@ mod test { metadata_size_hint: None, metrics: ExecutionPlanMetricsSet::new(), pushdown_filters: false, + dynamic_filter_pushdown: false, reorder_filters: false, force_filter_selections: false, enable_page_index: false, @@ -1184,6 +1238,7 @@ mod test { DefaultParquetFileReaderFactory::new(store), ), pushdown_filters: self.pushdown_filters, + dynamic_filter_pushdown: self.dynamic_filter_pushdown, reorder_filters: self.reorder_filters, force_filter_selections: self.force_filter_selections, enable_page_index: self.enable_page_index, diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 75d87a4cd16fc..9cdd2ea373bd9 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -398,6 +398,11 @@ impl ParquetSource { self.table_parquet_options.global.pushdown_filters } + /// Return the value of `dynamic_filter_pushdown` + fn dynamic_filter_pushdown(&self) -> bool { + self.table_parquet_options.global.dynamic_filter_pushdown + } + /// If true, the `RowFilter` made by `pushdown_filters` may try to /// minimize the cost of filter evaluation by reordering the /// predicate [`Expr`]s. If false, the predicates are applied in @@ -555,6 +560,7 @@ impl FileSource for ParquetSource { metrics: self.metrics().clone(), parquet_file_reader_factory, pushdown_filters: self.pushdown_filters(), + dynamic_filter_pushdown: self.dynamic_filter_pushdown(), reorder_filters: self.reorder_filters(), force_filter_selections: self.force_filter_selections(), enable_page_index: self.enable_page_index(), diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..067a1e71ca996 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -535,6 +535,7 @@ message ParquetOptions { bool pruning = 2; // default = true bool skip_metadata = 3; // default = true bool pushdown_filters = 5; // default = false + bool dynamic_filter_pushdown = 35; // default = false bool reorder_filters = 6; // default = false bool force_filter_selections = 34; // default = false uint64 data_pagesize_limit = 7; // default = 1024 * 1024 diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..885156e7407e8 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1024,6 +1024,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { }) .unwrap_or(None), pushdown_filters: value.pushdown_filters, + dynamic_filter_pushdown: value.dynamic_filter_pushdown, reorder_filters: value.reorder_filters, force_filter_selections: value.force_filter_selections, data_pagesize_limit: value.data_pagesize_limit as usize, diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..72b668959d455 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5644,6 +5644,9 @@ impl serde::Serialize for ParquetOptions { if self.pushdown_filters { len += 1; } + if self.dynamic_filter_pushdown { + len += 1; + } if self.reorder_filters { len += 1; } @@ -5741,6 +5744,9 @@ impl serde::Serialize for ParquetOptions { if self.pushdown_filters { struct_ser.serialize_field("pushdownFilters", &self.pushdown_filters)?; } + if self.dynamic_filter_pushdown { + struct_ser.serialize_field("dynamicFilterPushdown", &self.dynamic_filter_pushdown)?; + } if self.reorder_filters { struct_ser.serialize_field("reorderFilters", &self.reorder_filters)?; } @@ -5910,6 +5916,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "skipMetadata", "pushdown_filters", "pushdownFilters", + "dynamic_filter_pushdown", + "dynamicFilterPushdown", "reorder_filters", "reorderFilters", "force_filter_selections", @@ -5972,6 +5980,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { Pruning, SkipMetadata, PushdownFilters, + DynamicFilterPushdown, ReorderFilters, ForceFilterSelections, DataPagesizeLimit, @@ -6025,6 +6034,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "pruning" => Ok(GeneratedField::Pruning), "skipMetadata" | "skip_metadata" => Ok(GeneratedField::SkipMetadata), "pushdownFilters" | "pushdown_filters" => Ok(GeneratedField::PushdownFilters), + "dynamicFilterPushdown" | "dynamic_filter_pushdown" => Ok(GeneratedField::DynamicFilterPushdown), "reorderFilters" | "reorder_filters" => Ok(GeneratedField::ReorderFilters), "forceFilterSelections" | "force_filter_selections" => Ok(GeneratedField::ForceFilterSelections), "dataPagesizeLimit" | "data_pagesize_limit" => Ok(GeneratedField::DataPagesizeLimit), @@ -6076,6 +6086,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut pruning__ = None; let mut skip_metadata__ = None; let mut pushdown_filters__ = None; + let mut dynamic_filter_pushdown__ = None; let mut reorder_filters__ = None; let mut force_filter_selections__ = None; let mut data_pagesize_limit__ = None; @@ -6130,6 +6141,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } pushdown_filters__ = Some(map_.next_value()?); } + GeneratedField::DynamicFilterPushdown => { + if dynamic_filter_pushdown__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilterPushdown")); + } + dynamic_filter_pushdown__ = Some(map_.next_value()?); + } GeneratedField::ReorderFilters => { if reorder_filters__.is_some() { return Err(serde::de::Error::duplicate_field("reorderFilters")); @@ -6319,6 +6336,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { pruning: pruning__.unwrap_or_default(), skip_metadata: skip_metadata__.unwrap_or_default(), pushdown_filters: pushdown_filters__.unwrap_or_default(), + dynamic_filter_pushdown: dynamic_filter_pushdown__.unwrap_or_default(), reorder_filters: reorder_filters__.unwrap_or_default(), force_filter_selections: force_filter_selections__.unwrap_or_default(), data_pagesize_limit: data_pagesize_limit__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..006a685e2af92 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -790,6 +790,9 @@ pub struct ParquetOptions { #[prost(bool, tag = "5")] pub pushdown_filters: bool, /// default = false + #[prost(bool, tag = "35")] + pub dynamic_filter_pushdown: bool, + /// default = false #[prost(bool, tag = "6")] pub reorder_filters: bool, /// default = false diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..118475eda4de3 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,6 +904,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + dynamic_filter_pushdown: value.dynamic_filter_pushdown, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..006a685e2af92 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -790,6 +790,9 @@ pub struct ParquetOptions { #[prost(bool, tag = "5")] pub pushdown_filters: bool, /// default = false + #[prost(bool, tag = "35")] + pub dynamic_filter_pushdown: bool, + /// default = false #[prost(bool, tag = "6")] pub reorder_filters: bool, /// default = false diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..5abdfa501e747 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -383,6 +383,7 @@ mod parquet { force_filter_selections: global_options.global.force_filter_selections, data_pagesize_limit: global_options.global.data_pagesize_limit as u64, write_batch_size: global_options.global.write_batch_size as u64, + dynamic_filter_pushdown: global_options.global.dynamic_filter_pushdown, writer_version: global_options.global.writer_version.to_string(), compression_opt: global_options.global.compression.map(|compression| { parquet_options::CompressionOpt::Compression(compression) @@ -475,6 +476,7 @@ mod parquet { parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize, }), pushdown_filters: proto.pushdown_filters, + dynamic_filter_pushdown: proto.dynamic_filter_pushdown, reorder_filters: proto.reorder_filters, force_filter_selections: proto.force_filter_selections, data_pagesize_limit: proto.data_pagesize_limit as usize, diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b61ceecb24fc0..8286a4f59fbd5 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -242,6 +242,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 datafusion.execution.parquet.data_pagesize_limit 1048576 datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 +datafusion.execution.parquet.dynamic_filter_pushdown false datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.force_filter_selections false @@ -380,6 +381,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes +datafusion.execution.parquet.dynamic_filter_pushdown false (reading) If true, dynamic filter expressions (from operators like TopK, Join, Aggregate) are included when building parquet row-level filters (RowFilter / late materialization). If false, dynamic filters are only used for pruning (file, row group, page level) and are excluded before building the RowFilter. datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index e48f0a7c92276..7334e3ac48d8e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -85,6 +85,7 @@ The following configuration settings are available: | datafusion.execution.parquet.skip_metadata | true | (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | | datafusion.execution.parquet.metadata_size_hint | 524288 | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | +| datafusion.execution.parquet.dynamic_filter_pushdown | false | (reading) If true, dynamic filter expressions (from operators like TopK, Join, Aggregate) are included when building parquet row-level filters (RowFilter / late materialization). If false, dynamic filters are only used for pruning (file, row group, page level) and are excluded before building the RowFilter. | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.parquet.force_filter_selections | false | (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. |