Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,13 @@ config_namespace! {
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
pub pushdown_filters: bool, default = false

/// (reading) If true, dynamic filter expressions (from operators like
/// TopK, Join, Aggregate) are included when building parquet row-level
/// filters (RowFilter / late materialization). If false, dynamic filters
/// are only used for pruning (file, row group, page level) and are
/// excluded before building the RowFilter.
pub dynamic_filter_pushdown: bool, default = false

/// (reading) If true, filter expressions evaluated during the parquet decoding operation
/// will be reordered heuristically to minimize the cost of evaluation. If false,
/// the filters are applied in the same order as written in the query
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl ParquetOptions {
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
max_predicate_cache_size: _,
dynamic_filter_pushdown: _,
} = self;

let mut builder = WriterProperties::builder()
Expand Down Expand Up @@ -460,6 +461,7 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
dynamic_filter_pushdown: defaults.dynamic_filter_pushdown,
}
}

Expand Down Expand Up @@ -574,6 +576,7 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
dynamic_filter_pushdown: global_options_defaults.dynamic_filter_pushdown,
},
column_specific_options,
key_value_metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1730,6 +1730,7 @@ async fn test_topk_dynamic_filter_pushdown_integration() {
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let mut cfg = SessionConfig::new();
cfg.options_mut().execution.parquet.pushdown_filters = true;
cfg.options_mut().execution.parquet.dynamic_filter_pushdown = true;
cfg.options_mut().execution.parquet.max_row_group_size = 128;
let ctx = SessionContext::new_with_config(cfg);
ctx.register_object_store(
Expand Down
91 changes: 73 additions & 18 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ pub(super) struct ParquetOpener {
/// Should the filters be evaluated during the parquet scan using
/// [`DataFusionArrowPredicate`](row_filter::DatafusionArrowPredicate)?
pub pushdown_filters: bool,
/// Should dynamic filter expressions be included in the row-level filter?
/// If false, dynamic filters are stripped before building the RowFilter.
pub dynamic_filter_pushdown: bool,
/// Should the filters be reordered to optimize the scan?
pub reorder_filters: bool,
/// Should we force the reader to use RowSelections for filtering
Expand Down Expand Up @@ -262,6 +265,7 @@ impl FileOpener for ParquetOpener {

let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
let dynamic_filter_pushdown = self.dynamic_filter_pushdown;
let force_filter_selections = self.force_filter_selections;
let coerce_int96 = self.coerce_int96;
let enable_bloom_filter = self.enable_bloom_filter;
Expand Down Expand Up @@ -461,25 +465,36 @@ impl FileOpener for ParquetOpener {

// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
);

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{predicate:?}': {e}"
);
}
// If dynamic filter row-level filtering is disabled, strip
// dynamic filter expressions before building the RowFilter.
// Dynamic filters will still be used for pruning (file, row
// group, page level) above.
let predicate = if !dynamic_filter_pushdown {
strip_dynamic_filters(&predicate)
} else {
Some(predicate)
};
if let Some(predicate) = predicate {
let row_filter = row_filter::build_row_filter(
&predicate,
&physical_file_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
);

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{predicate:?}': {e}"
);
}
};
}
};
if force_filter_selections {
builder =
Expand Down Expand Up @@ -1000,6 +1015,43 @@ async fn load_page_index<T: AsyncFileReader>(
}
}

/// Strips dynamic filter expressions from a predicate, replacing them with
/// `lit(true)`, then simplifying the result.
/// Returns `None` if the entire predicate becomes trivially true after stripping.
fn strip_dynamic_filters(
predicate: &Arc<dyn PhysicalExpr>,
) -> Option<Arc<dyn PhysicalExpr>> {
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_expr::expressions::lit;

let result = Arc::clone(predicate)
.transform_up(|expr| {
// Check if this individual node is dynamic (has non-zero generation)
if expr.snapshot_generation() != 0 {
Ok(Transformed::yes(lit(true)))
} else {
Ok(Transformed::no(expr))
}
})
.expect("strip_dynamic_filters transform is infallible");

if !result.transformed {
// No dynamic filters found, return the predicate as-is
return Some(Arc::clone(predicate));
}

let expr = result.data;
// If the entire predicate simplified to a literal true, return None
if let Some(literal) = expr
.as_any()
.downcast_ref::<datafusion_physical_expr::expressions::Literal>()
&& literal.value() == &ScalarValue::Boolean(Some(true))
{
return None;
}
Some(expr)
}

fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
Expand Down Expand Up @@ -1055,6 +1107,7 @@ mod test {
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
pushdown_filters: bool,
dynamic_filter_pushdown: bool,
reorder_filters: bool,
force_filter_selections: bool,
enable_page_index: bool,
Expand All @@ -1081,6 +1134,7 @@ mod test {
metadata_size_hint: None,
metrics: ExecutionPlanMetricsSet::new(),
pushdown_filters: false,
dynamic_filter_pushdown: false,
reorder_filters: false,
force_filter_selections: false,
enable_page_index: false,
Expand Down Expand Up @@ -1184,6 +1238,7 @@ mod test {
DefaultParquetFileReaderFactory::new(store),
),
pushdown_filters: self.pushdown_filters,
dynamic_filter_pushdown: self.dynamic_filter_pushdown,
reorder_filters: self.reorder_filters,
force_filter_selections: self.force_filter_selections,
enable_page_index: self.enable_page_index,
Expand Down
6 changes: 6 additions & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,11 @@ impl ParquetSource {
self.table_parquet_options.global.pushdown_filters
}

/// Return the value of `dynamic_filter_pushdown`
fn dynamic_filter_pushdown(&self) -> bool {
self.table_parquet_options.global.dynamic_filter_pushdown
}

/// If true, the `RowFilter` made by `pushdown_filters` may try to
/// minimize the cost of filter evaluation by reordering the
/// predicate [`Expr`]s. If false, the predicates are applied in
Expand Down Expand Up @@ -555,6 +560,7 @@ impl FileSource for ParquetSource {
metrics: self.metrics().clone(),
parquet_file_reader_factory,
pushdown_filters: self.pushdown_filters(),
dynamic_filter_pushdown: self.dynamic_filter_pushdown(),
reorder_filters: self.reorder_filters(),
force_filter_selections: self.force_filter_selections(),
enable_page_index: self.enable_page_index(),
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ message ParquetOptions {
bool pruning = 2; // default = true
bool skip_metadata = 3; // default = true
bool pushdown_filters = 5; // default = false
bool dynamic_filter_pushdown = 35; // default = false
bool reorder_filters = 6; // default = false
bool force_filter_selections = 34; // default = false
uint64 data_pagesize_limit = 7; // default = 1024 * 1024
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
})
.unwrap_or(None),
pushdown_filters: value.pushdown_filters,
dynamic_filter_pushdown: value.dynamic_filter_pushdown,
reorder_filters: value.reorder_filters,
force_filter_selections: value.force_filter_selections,
data_pagesize_limit: value.data_pagesize_limit as usize,
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5644,6 +5644,9 @@ impl serde::Serialize for ParquetOptions {
if self.pushdown_filters {
len += 1;
}
if self.dynamic_filter_pushdown {
len += 1;
}
if self.reorder_filters {
len += 1;
}
Expand Down Expand Up @@ -5741,6 +5744,9 @@ impl serde::Serialize for ParquetOptions {
if self.pushdown_filters {
struct_ser.serialize_field("pushdownFilters", &self.pushdown_filters)?;
}
if self.dynamic_filter_pushdown {
struct_ser.serialize_field("dynamicFilterPushdown", &self.dynamic_filter_pushdown)?;
}
if self.reorder_filters {
struct_ser.serialize_field("reorderFilters", &self.reorder_filters)?;
}
Expand Down Expand Up @@ -5910,6 +5916,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"skipMetadata",
"pushdown_filters",
"pushdownFilters",
"dynamic_filter_pushdown",
"dynamicFilterPushdown",
"reorder_filters",
"reorderFilters",
"force_filter_selections",
Expand Down Expand Up @@ -5972,6 +5980,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
Pruning,
SkipMetadata,
PushdownFilters,
DynamicFilterPushdown,
ReorderFilters,
ForceFilterSelections,
DataPagesizeLimit,
Expand Down Expand Up @@ -6025,6 +6034,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"pruning" => Ok(GeneratedField::Pruning),
"skipMetadata" | "skip_metadata" => Ok(GeneratedField::SkipMetadata),
"pushdownFilters" | "pushdown_filters" => Ok(GeneratedField::PushdownFilters),
"dynamicFilterPushdown" | "dynamic_filter_pushdown" => Ok(GeneratedField::DynamicFilterPushdown),
"reorderFilters" | "reorder_filters" => Ok(GeneratedField::ReorderFilters),
"forceFilterSelections" | "force_filter_selections" => Ok(GeneratedField::ForceFilterSelections),
"dataPagesizeLimit" | "data_pagesize_limit" => Ok(GeneratedField::DataPagesizeLimit),
Expand Down Expand Up @@ -6076,6 +6086,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut pruning__ = None;
let mut skip_metadata__ = None;
let mut pushdown_filters__ = None;
let mut dynamic_filter_pushdown__ = None;
let mut reorder_filters__ = None;
let mut force_filter_selections__ = None;
let mut data_pagesize_limit__ = None;
Expand Down Expand Up @@ -6130,6 +6141,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
pushdown_filters__ = Some(map_.next_value()?);
}
GeneratedField::DynamicFilterPushdown => {
if dynamic_filter_pushdown__.is_some() {
return Err(serde::de::Error::duplicate_field("dynamicFilterPushdown"));
}
dynamic_filter_pushdown__ = Some(map_.next_value()?);
}
GeneratedField::ReorderFilters => {
if reorder_filters__.is_some() {
return Err(serde::de::Error::duplicate_field("reorderFilters"));
Expand Down Expand Up @@ -6319,6 +6336,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
pruning: pruning__.unwrap_or_default(),
skip_metadata: skip_metadata__.unwrap_or_default(),
pushdown_filters: pushdown_filters__.unwrap_or_default(),
dynamic_filter_pushdown: dynamic_filter_pushdown__.unwrap_or_default(),
reorder_filters: reorder_filters__.unwrap_or_default(),
force_filter_selections: force_filter_selections__.unwrap_or_default(),
data_pagesize_limit: data_pagesize_limit__.unwrap_or_default(),
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,9 @@ pub struct ParquetOptions {
#[prost(bool, tag = "5")]
pub pushdown_filters: bool,
/// default = false
#[prost(bool, tag = "35")]
pub dynamic_filter_pushdown: bool,
/// default = false
#[prost(bool, tag = "6")]
pub reorder_filters: bool,
/// default = false
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
skip_arrow_metadata: value.skip_arrow_metadata,
coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
dynamic_filter_pushdown: value.dynamic_filter_pushdown,
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/datafusion_proto_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,9 @@ pub struct ParquetOptions {
#[prost(bool, tag = "5")]
pub pushdown_filters: bool,
/// default = false
#[prost(bool, tag = "35")]
pub dynamic_filter_pushdown: bool,
/// default = false
#[prost(bool, tag = "6")]
pub reorder_filters: bool,
/// default = false
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/logical_plan/file_formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ mod parquet {
force_filter_selections: global_options.global.force_filter_selections,
data_pagesize_limit: global_options.global.data_pagesize_limit as u64,
write_batch_size: global_options.global.write_batch_size as u64,
dynamic_filter_pushdown: global_options.global.dynamic_filter_pushdown,
writer_version: global_options.global.writer_version.to_string(),
compression_opt: global_options.global.compression.map(|compression| {
parquet_options::CompressionOpt::Compression(compression)
Expand Down Expand Up @@ -475,6 +476,7 @@ mod parquet {
parquet_options::MetadataSizeHintOpt::MetadataSizeHint(size) => *size as usize,
}),
pushdown_filters: proto.pushdown_filters,
dynamic_filter_pushdown: proto.dynamic_filter_pushdown,
reorder_filters: proto.reorder_filters,
force_filter_selections: proto.force_filter_selections,
data_pagesize_limit: proto.data_pagesize_limit as usize,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000
datafusion.execution.parquet.data_pagesize_limit 1048576
datafusion.execution.parquet.dictionary_enabled true
datafusion.execution.parquet.dictionary_page_size_limit 1048576
datafusion.execution.parquet.dynamic_filter_pushdown false
datafusion.execution.parquet.enable_page_index true
datafusion.execution.parquet.encoding NULL
datafusion.execution.parquet.force_filter_selections false
Expand Down Expand Up @@ -380,6 +381,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best
datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes
datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting
datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes
datafusion.execution.parquet.dynamic_filter_pushdown false (reading) If true, dynamic filter expressions (from operators like TopK, Join, Aggregate) are included when building parquet row-level filters (RowFilter / late materialization). If false, dynamic filters are only used for pruning (file, row group, page level) and are excluded before building the RowFilter.
datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded.
datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting
datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows.
Expand Down
Loading