diff --git a/Cargo.lock b/Cargo.lock index 78f94118456..efcc4e38d7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4096,7 +4096,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6175,7 +6175,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck", - "itertools 0.12.1", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -6195,7 +6195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" dependencies = [ "heck", - "itertools 0.12.1", + "itertools 0.13.0", "log", "multimap", "once_cell", @@ -6228,7 +6228,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.106", @@ -6241,7 +6241,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.106", diff --git a/bench-vortex/src/engines/df/mod.rs b/bench-vortex/src/engines/df/mod.rs index 4b2c8d51e45..e5d1a6069ab 100644 --- a/bench-vortex/src/engines/df/mod.rs +++ b/bench-vortex/src/engines/df/mod.rs @@ -11,7 +11,6 @@ use datafusion::execution::SessionStateBuilder; use datafusion::execution::cache::cache_manager::CacheManagerConfig; use datafusion::execution::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use datafusion::physical_plan::collect; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::GetExt; use datafusion_physical_plan::ExecutionPlan; @@ -135,11 +134,10 @@ pub async fn execute_query( ctx: &SessionContext, query: &str, ) -> anyhow::Result<(Vec, Arc)> { - let plan = ctx.sql(query).await?; - let (state, plan) = plan.into_parts(); - let physical_plan = state.create_physical_plan(&plan).await?; + let df = ctx.sql(query).await?; + let physical_plan = df.clone().create_physical_plan().await?; + let result = df.collect().await?; - let result = collect(physical_plan.clone(), state.task_ctx()).await?; Ok((result, physical_plan)) } diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 70974d27122..734df75511f 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -4,10 +4,13 @@ use std::sync::Arc; use arrow_schema::{DataType, Schema}; +use datafusion_common::ScalarValue; use datafusion_expr::Operator as DFOperator; use datafusion_functions::core::getfield::GetFieldFunc; -use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; -use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, is_dynamic_physical_expr}; +use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; +use datafusion_physical_expr_common::physical_expr::{ + is_dynamic_physical_expr, snapshot_physical_expr, +}; use datafusion_physical_plan::expressions as df_expr; use itertools::Itertools; use vortex::compute::LikeOptions; @@ -22,13 +25,44 @@ use vortex::scalar::Scalar; use crate::convert::{FromDataFusion, TryFromDataFusion}; +fn is_lit_true(e: &PhysicalExprRef) -> bool { + e.as_any() + .downcast_ref::() + .is_some_and(|l| matches!(l.value(), ScalarValue::Boolean(Some(true)))) +} + /// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty. pub(crate) fn make_vortex_predicate( - predicate: &[&Arc], + predicate: &[Arc], ) -> VortexResult> { let exprs = predicate .iter() - .map(|e| Expression::try_from_df(e.as_ref())) + .filter_map(|expr| { + // Handle dynamic expressions by snapshotting them first + let expr_to_convert = if is_dynamic_physical_expr(expr) { + // If snapshot fails, filter out this expression + let snapshot = snapshot_physical_expr(expr.clone()).ok()?; + + // Filter out literal true expressions (they don't add constraints) + if is_lit_true(&snapshot) { + return None; + } + + snapshot + } else { + expr.clone() + }; + + // Try to convert to Vortex expression + match Expression::try_from_df(expr_to_convert.as_ref()) { + Ok(vortex_expr) => Some(Ok(vortex_expr)), + Err(_) => { + // If we fail to convert the expression to Vortex, it's safe + // to drop it as we don't declare it as pushed down + None + } + } + }) .collect::>>()?; Ok(exprs.into_iter().reduce(and)) @@ -323,7 +357,7 @@ mod tests { #[test] fn test_make_vortex_predicate_single() { let col_expr = Arc::new(df_expr::Column::new("test", 0)) as Arc; - let result = make_vortex_predicate(&[&col_expr]).unwrap(); + let result = make_vortex_predicate(&[col_expr]).unwrap(); assert!(result.is_some()); } @@ -331,7 +365,7 @@ mod tests { fn test_make_vortex_predicate_multiple() { let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc; let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc; - let result = make_vortex_predicate(&[&col1, &col2]).unwrap(); + let result = make_vortex_predicate(&[col1, col2]).unwrap(); assert!(result.is_some()); // Result should be an AND expression combining the two columns } diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index f9b472a6b93..7cb8adf8e9e 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -32,6 +32,7 @@ fn register_vortex_format_factory( #[cfg(test)] mod tests { + use std::sync::Arc; use arrow_schema::{DataType, Field, Schema}; @@ -195,61 +196,51 @@ mod tests { #[tokio::test] async fn create_table_ordered_by() -> anyhow::Result<()> { - let dir = TempDir::new().unwrap(); + let dir = TempDir::new()?; let factory: VortexFormatFactory = VortexFormatFactory::new(); let mut session_state_builder = SessionStateBuilder::new().with_default_features(); register_vortex_format_factory(factory, &mut session_state_builder); let session = SessionContext::new_with_state(session_state_builder.build()); - // Vortex session .sql(&format!( - "CREATE EXTERNAL TABLE my_tbl_vx \ + "CREATE EXTERNAL TABLE my_tbl \ (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \ - STORED AS vortex \ + STORED AS VORTEX \ WITH ORDER (c1 ASC) - LOCATION '{}/vx/'", + LOCATION '{}/'", dir.path().to_str().unwrap() )) .await?; session - .sql("INSERT INTO my_tbl_vx VALUES ('air', 5), ('balloon', 42)") + .sql("INSERT INTO my_tbl VALUES ('air', 10), ('alabama', 20), ('balloon', 30)") .await? .collect() .await?; session - .sql("INSERT INTO my_tbl_vx VALUES ('zebra', 5)") + .sql("INSERT INTO my_tbl VALUES ('kangaroo', 11), ('zebra', 21)") .await? .collect() .await?; - session - .sql("INSERT INTO my_tbl_vx VALUES ('texas', 2000), ('alabama', 2000)") - .await? - .collect() - .await?; + let df = session.sql("SELECT * FROM my_tbl ORDER BY c1 ASC").await?; - let df = session - .sql("SELECT * FROM my_tbl_vx ORDER BY c1 ASC limit 3") - .await?; - let (state, plan) = df.clone().into_parts(); - let physical_plan = state.create_physical_plan(&plan).await?; + let physical_plan = df.clone().create_physical_plan().await?; insta::assert_snapshot!(DisplayableExecutionPlan::new(physical_plan.as_ref()) .tree_render().to_string(), @r" ┌───────────────────────────┐ │ SortPreservingMergeExec │ │ -------------------- │ - │ c1 ASC NULLS LASTlimit: │ - │ 3 │ + │ c1 ASC NULLS LAST │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ │ DataSourceExec │ │ -------------------- │ - │ files: 3 │ + │ files: 2 │ │ format: vortex │ └───────────────────────────┘ "); @@ -257,13 +248,15 @@ mod tests { let r = df.collect().await?; insta::assert_snapshot!(pretty_format_batches(&r)?.to_string(), @r" - +---------+------+ - | c1 | c2 | - +---------+------+ - | air | 5 | - | alabama | 2000 | - | balloon | 42 | - +---------+------+ + +----------+----+ + | c1 | c2 | + +----------+----+ + | air | 10 | + | alabama | 20 | + | balloon | 30 | + | kangaroo | 11 | + | zebra | 21 | + +----------+----+ "); Ok(()) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 7049436e3f2..22e773f61bf 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -40,13 +40,9 @@ pub(crate) struct VortexOpener { pub object_store: Arc, /// Projection by index of the file's columns pub projection: Option>, - /// Filter expression optimized for pushdown into Vortex scan operations. - /// This may be a subset of file_pruning_predicate containing only expressions - /// that Vortex can efficiently evaluate. - pub filter: Option, /// Filter expression used by DataFusion's FilePruner to eliminate files based on /// statistics and partition values without opening them. - pub file_pruning_predicate: Option, + pub predicate: Option, pub expr_adapter_factory: Option>, pub schema_adapter_factory: Arc, /// Hive-style partitioning columns @@ -149,8 +145,7 @@ impl FileOpener for VortexOpener { let session = self.session.clone(); let object_store = self.object_store.clone(); let projection = self.projection.clone(); - let mut filter = self.filter.clone(); - let file_pruning_predicate = self.file_pruning_predicate.clone(); + let mut predicate = self.predicate.clone(); let expr_adapter_factory = self.expr_adapter_factory.clone(); let partition_fields = self.partition_fields.clone(); let file_cache = self.file_cache.clone(); @@ -178,7 +173,8 @@ impl FileOpener for VortexOpener { // opening them based on: // - Partition column values (e.g., date=2024-01-01) // - File-level statistics (min/max values per column) - let mut file_pruner = file_pruning_predicate + let mut file_pruner = predicate + .clone() .map(|predicate| { // Only create pruner if we have dynamic expressions or file statistics // to work with. Static predicates without stats won't benefit from pruning. @@ -225,15 +221,16 @@ impl FileOpener for VortexOpener { // The adapter rewrites the expression to the local file schema, allowing // for schema evolution and divergence between the table's schema and individual files. - filter = filter - .map(|filter| { + predicate = predicate + .clone() + .map(|expr| { let logical_file_schema = compute_logical_file_schema(&physical_file_schema, &logical_schema); let expr = expr_adapter_factory .create(logical_file_schema, physical_file_schema.clone()) .with_partition_values(partition_values) - .rewrite(filter)?; + .rewrite(expr)?; // Expression might now reference columns that don't exist in the file, so we can give it // another simplification pass. @@ -294,11 +291,15 @@ impl FileOpener for VortexOpener { ); } - let filter = filter + let filter = predicate .and_then(|f| { let exprs = split_conjunction(&f) .into_iter() - .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) + .filter(|expr| { + is_dynamic_physical_expr(expr) + || can_be_pushed_down(expr, &predicate_file_schema) + }) + .cloned() .collect::>(); make_vortex_predicate(&exprs).transpose() @@ -521,8 +522,7 @@ mod tests { session: SESSION.clone(), object_store: object_store.clone(), projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, + predicate: Some(filter), expr_adapter_factory: expr_adapter_factory.clone(), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))], @@ -602,8 +602,7 @@ mod tests { session: SESSION.clone(), object_store: object_store.clone(), projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, + predicate: Some(filter), expr_adapter_factory: expr_adapter_factory.clone(), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], @@ -710,11 +709,10 @@ mod tests { session: SESSION.clone(), object_store: object_store.clone(), projection: None, - filter: Some(logical2physical( + predicate: Some(logical2physical( &col("my_struct").is_not_null(), &table_schema, )), - file_pruning_predicate: None, expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index f437cc2960e..89b5e12b81d 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -16,10 +16,8 @@ use datafusion_physical_expr::{PhysicalExprRef, conjunction}; use datafusion_physical_expr_adapter::{ DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory, }; -use datafusion_physical_expr_common::physical_expr::fmt_sql; -use datafusion_physical_plan::filter_pushdown::{ - FilterPushdownPropagation, PushedDown, PushedDownPredicate, -}; +use datafusion_physical_expr_common::physical_expr::{fmt_sql, is_dynamic_physical_expr}; +use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::{DisplayFormatType, PhysicalExpr}; use object_store::ObjectStore; @@ -45,10 +43,7 @@ pub struct VortexSource { pub(crate) file_cache: VortexFileCache, /// Combined predicate expression containing all filters from DataFusion query planning. /// Used with FilePruner to skip files based on statistics and partition values. - pub(crate) full_predicate: Option, - /// Subset of predicates that can be pushed down into Vortex scan operations. - /// These are expressions that Vortex can efficiently evaluate during scanning. - pub(crate) vortex_predicate: Option, + pub(crate) predicate: Option, pub(crate) batch_size: Option, pub(crate) projected_statistics: Option, /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema. @@ -67,8 +62,7 @@ impl VortexSource { Self { session, file_cache, - full_predicate: None, - vortex_predicate: None, + predicate: None, batch_size: None, projected_statistics: None, arrow_file_schema: None, @@ -140,8 +134,8 @@ impl FileSource for VortexSource { session: self.session.clone(), object_store, projection, - filter: self.vortex_predicate.clone(), - file_pruning_predicate: self.full_predicate.clone(), + + predicate: self.predicate.clone(), expr_adapter_factory, schema_adapter_factory, partition_fields: base_config.table_partition_cols.clone(), @@ -184,7 +178,7 @@ impl FileSource for VortexSource { } fn filter(&self) -> Option> { - self.vortex_predicate.clone() + self.predicate.clone() } fn metrics(&self) -> &ExecutionPlanMetricsSet { @@ -197,7 +191,7 @@ impl FileSource for VortexSource { .clone() .vortex_expect("projected_statistics must be set"); - if self.vortex_predicate.is_some() { + if self.predicate.is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -211,13 +205,13 @@ impl FileSource for VortexSource { fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - if let Some(ref predicate) = self.vortex_predicate { + if let Some(ref predicate) = self.predicate { write!(f, ", predicate: {predicate}")?; } } // Use TreeRender style key=value formatting to display the predicate DisplayFormatType::TreeRender => { - if let Some(ref predicate) = self.vortex_predicate { + if let Some(ref predicate) = self.predicate { writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; }; } @@ -244,57 +238,41 @@ impl FileSource for VortexSource { let mut source = self.clone(); - // Combine new filters with existing predicate for file pruning. - // This full predicate is used by FilePruner to eliminate files. - source.full_predicate = match source.full_predicate { - Some(predicate) => Some(conjunction( - std::iter::once(predicate).chain(filters.clone()), - )), - None => Some(conjunction(filters.clone())), - }; - - let supported_filters = filters - .into_iter() + let supported = filters + .iter() .map(|expr| { - if can_be_pushed_down(&expr, schema) { - PushedDownPredicate::supported(expr) + if can_be_pushed_down(expr, schema) { + PushedDown::Yes } else { - PushedDownPredicate::unsupported(expr) + PushedDown::No } }) .collect::>(); - if supported_filters + // We keep expressions we can push down dynamic expression that will be evaluated on a best-effort basis. + let saved_filters = filters .iter() - .all(|p| matches!(p.discriminant, PushedDown::No)) - { + .filter(|expr| can_be_pushed_down(expr, schema) || is_dynamic_physical_expr(expr)) + .cloned() + .collect::>(); + + // If we don't push down any filter, we don't need to update the plan's node. + if saved_filters.is_empty() { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; supported_filters.len()], - ) - .with_updated_node(Arc::new(source) as _)); + vec![PushedDown::No; filters.len()], + )); } - let supported = supported_filters - .iter() - .filter_map(|p| match p.discriminant { - PushedDown::Yes => Some(&p.predicate), - PushedDown::No => None, - }) - .cloned(); - - let predicate = match source.vortex_predicate { - Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)), - None => conjunction(supported), + // Combine new filters with existing predicate. We keep the whole original expression + source.predicate = match source.predicate { + Some(predicate) => Some(conjunction(std::iter::once(predicate).chain(saved_filters))), + None => Some(conjunction(saved_filters)), }; - tracing::debug!(%predicate, "Saving predicate"); - - source.vortex_predicate = Some(predicate); - - Ok(FilterPushdownPropagation::with_parent_pushdown_result( - supported_filters.iter().map(|f| f.discriminant).collect(), + Ok( + FilterPushdownPropagation::with_parent_pushdown_result(supported) + .with_updated_node(Arc::new(source) as _), ) - .with_updated_node(Arc::new(source) as _)) } fn with_schema_adapter_factory(