diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 683d62a1df49..47e692cb966d 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -34,6 +34,7 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; @@ -580,7 +581,7 @@ impl FileSource for ParquetSource { } DisplayFormatType::TreeRender => { if let Some(predicate) = self.predicate() { - writeln!(f, "predicate={predicate}")?; + writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; } Ok(()) } diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index fe5c2ecfdf62..3a54b5b40399 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -17,7 +17,7 @@ //! Sort expressions -use crate::physical_expr::PhysicalExpr; +use crate::physical_expr::{fmt_sql, PhysicalExpr}; use std::fmt; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; @@ -117,6 +117,16 @@ impl PhysicalSortExpr { self.options.nulls_first = false; self } + + /// Like [`PhysicalExpr::fmt_sql`] prints a [`PhysicalSortExpr`] in a SQL-like format. + pub fn fmt_sql(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "{} {}", + fmt_sql(self.expr.as_ref()), + to_str(&self.options) + ) + } } /// Access the PhysicalSortExpr as a PhysicalExpr diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 5dccc09fc722..4230eeeed0c9 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -830,6 +830,7 @@ impl DisplayAs for AggregateExec { }) .collect() }; + // TODO: Implement `fmt_sql` for `AggregateFunctionExpr`. let a: Vec = self .aggr_expr .iter() diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ffcda1d888b0..a8a9973ea043 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -54,6 +54,7 @@ use datafusion_physical_expr::{ ExprBoundaries, PhysicalExpr, }; +use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -330,7 +331,7 @@ impl DisplayAs for FilterExec { write!(f, "FilterExec: {}{}", self.predicate, display_projections) } DisplayFormatType::TreeRender => { - write!(f, "predicate={}", self.predicate) + write!(f, "predicate={}", fmt_sql(self.predicate.as_ref())) } } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 39a15037260d..e9d6354e21d7 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -82,6 +82,7 @@ use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::datum::compare_op_for_nested; use ahash::RandomState; +use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::{ready, Stream, StreamExt, TryStreamExt}; use parking_lot::Mutex; @@ -672,7 +673,9 @@ impl DisplayAs for HashJoinExec { let on = self .on .iter() - .map(|(c1, c2)| format!("({} = {})", c1, c2)) + .map(|(c1, c2)| { + format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref())) + }) .collect::>() .join(", "); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index d8446fb332b1..7fb8a2d73600 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -70,6 +70,7 @@ use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use futures::{Stream, StreamExt}; @@ -373,7 +374,9 @@ impl DisplayAs for SortMergeJoinExec { let on = self .on .iter() - .map(|(c1, c2)| format!("({} = {})", c1, c2)) + .map(|(c1, c2)| { + format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref())) + }) .collect::>() .join(", "); diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 63e95c7a3018..0dcb42169e00 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -74,6 +74,7 @@ use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use ahash::RandomState; @@ -384,7 +385,9 @@ impl DisplayAs for SymmetricHashJoinExec { let on = self .on .iter() - .map(|(c1, c2)| format!("({} = {})", c1, c2)) + .map(|(c1, c2)| { + format!("({} = {})", fmt_sql(c1.as_ref()), fmt_sql(c2.as_ref())) + }) .collect::>() .join(", "); diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 68593fe6b05d..ca06a029e8db 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -187,11 +187,9 @@ impl DisplayAs for SortPreservingMergeExec { } DisplayFormatType::TreeRender => { for (i, e) in self.expr().iter().enumerate() { - let e = e.to_string(); - if i == self.expr().len() - 1 { - writeln!(f, "{e}")?; - } else { - write!(f, "{e}, ")?; + e.fmt_sql(f)?; + if i != self.expr().len() - 1 { + write!(f, ", ")?; } } if let Some(fetch) = self.fetch { diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 4e416c52a412..b9f4074a450d 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -175,7 +175,7 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != foo │ +11)│ string_col != foo │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -303,7 +303,7 @@ physical_plan 08)│ HashJoinExec │ 09)│ -------------------- │ 10)│ on: ├──────────────┐ -11)│ (int_col@0 = int_col@0) │ │ +11)│ (int_col = int_col) │ │ 12)└─────────────┬─────────────┘ │ 13)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 14)│ CoalesceBatchesExec ││ CoalesceBatchesExec │ @@ -357,7 +357,7 @@ physical_plan 08)│ HashJoinExec │ 09)│ -------------------- │ 10)│ on: ├───────────────────────────────────────────┐ -11)│ (int_col@1 = int_col@0) │ │ +11)│ (int_col = int_col) │ │ 12)└─────────────┬─────────────┘ │ 13)┌─────────────┴─────────────┐ ┌─────────────┴─────────────┐ 14)│ CoalesceBatchesExec │ │ CoalesceBatchesExec │ @@ -369,7 +369,7 @@ physical_plan 20)│ HashJoinExec │ │ RepartitionExec │ 21)│ -------------------- │ │ -------------------- │ 22)│ on: │ │ output_partition_count: │ -23)│ (int_col@0 = int_col@0) ├──────────────┐ │ 1 │ +23)│ (int_col = int_col) ├──────────────┐ │ 1 │ 24)│ │ │ │ │ 25)│ │ │ │ partitioning_scheme: │ 26)│ │ │ │ Hash([int_col@0], 4) │ @@ -423,9 +423,9 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != foo AND │ -12)│ string_col@1 != bar │ -13)│ AND string_col@1 != a │ +11)│ string_col != foo AND │ +12)│ string_col != bar │ +13)│ AND string_col != a │ 14)│ really long string │ 15)│ constant │ 16)└─────────────┬─────────────┘ @@ -461,8 +461,8 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != │ -12)│ aaaaaaaaaaaaaa │ +11)│ string_col != │ +12)│ aaaaaaaaaaaa │ 13)│aaaaaaaaaaaaaaaaaaaaaaaaaaa│ 14)│aaaaaaaaaaaaaaaaaaaaaaaaaaa│ 15)│aaaaaaaaaaaaaaaaaaaaaaaaaaa│ @@ -510,7 +510,7 @@ physical_plan # Check exactly the render width. query TT explain SELECT int_col FROM table1 -WHERE string_col != 'aaaaaaaaaaa'; +WHERE string_col != 'aaaaaaaaaaaaa'; ---- physical_plan 01)┌───────────────────────────┐ @@ -523,7 +523,7 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│string_col@1 != aaaaaaaaaaa│ +11)│string_col != aaaaaaaaaaaaa│ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -544,7 +544,7 @@ physical_plan # Check with the render witdth + 1. query TT explain SELECT int_col FROM table1 -WHERE string_col != 'aaaaaaaaaaaa'; +WHERE string_col != 'aaaaaaaaaaaaaaa'; ---- physical_plan 01)┌───────────────────────────┐ @@ -557,24 +557,25 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != │ +11)│ string_col != │ 12)│ aaaaaaaaaaaa │ -13)└─────────────┬─────────────┘ -14)┌─────────────┴─────────────┐ -15)│ RepartitionExec │ -16)│ -------------------- │ -17)│ output_partition_count: │ -18)│ 1 │ -19)│ │ -20)│ partitioning_scheme: │ -21)│ RoundRobinBatch(4) │ -22)└─────────────┬─────────────┘ -23)┌─────────────┴─────────────┐ -24)│ DataSourceExec │ -25)│ -------------------- │ -26)│ files: 1 │ -27)│ format: csv │ -28)└───────────────────────────┘ +13)│ aaa │ +14)└─────────────┬─────────────┘ +15)┌─────────────┴─────────────┐ +16)│ RepartitionExec │ +17)│ -------------------- │ +18)│ output_partition_count: │ +19)│ 1 │ +20)│ │ +21)│ partitioning_scheme: │ +22)│ RoundRobinBatch(4) │ +23)└─────────────┬─────────────┘ +24)┌─────────────┴─────────────┐ +25)│ DataSourceExec │ +26)│ -------------------- │ +27)│ files: 1 │ +28)│ format: csv │ +29)└───────────────────────────┘ # Query with filter on csv query TT @@ -591,7 +592,7 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != foo │ +11)│ string_col != foo │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -625,7 +626,7 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != foo │ +11)│ string_col != foo │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -643,7 +644,7 @@ physical_plan 26)│ format: parquet │ 27)│ │ 28)│ predicate: │ -29)│ string_col@1 != foo │ +29)│ string_col != foo │ 30)└───────────────────────────┘ # Query with filter on memory @@ -661,7 +662,7 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != foo │ +11)│ string_col != foo │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ DataSourceExec │ @@ -686,7 +687,7 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != foo │ +11)│ string_col != foo │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -719,7 +720,7 @@ physical_plan 08)│ FilterExec │ 09)│ -------------------- │ 10)│ predicate: │ -11)│ string_col@1 != foo │ +11)│ string_col != foo │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -1192,10 +1193,10 @@ physical_plan 08)│ HashJoinExec │ 09)│ -------------------- │ 10)│ on: │ -11)│ (int_col@0 = int_col@0), ├──────────────┐ -12)│ (CAST(table1.string_col │ │ -13)│ AS Utf8View)@4 = │ │ -14)│ string_col@1) │ │ +11)│ (int_col = int_col), (CAST├──────────────┐ +12)│ (table1.string_col AS │ │ +13)│ Utf8View) = │ │ +14)│ string_col) │ │ 15)└─────────────┬─────────────┘ │ 16)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 17)│ CoalesceBatchesExec ││ CoalesceBatchesExec │ @@ -1264,10 +1265,10 @@ physical_plan 10)│ join_type: Left │ 11)│ │ 12)│ on: ├──────────────┐ -13)│ (int_col@0 = int_col@0), │ │ -14)│ (CAST(table1.string_col │ │ -15)│ AS Utf8View)@4 = │ │ -16)│ string_col@1) │ │ +13)│ (int_col = int_col), (CAST│ │ +14)│ (table1.string_col AS │ │ +15)│ Utf8View) = │ │ +16)│ string_col) │ │ 17)└─────────────┬─────────────┘ │ 18)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 19)│ CoalesceBatchesExec ││ CoalesceBatchesExec │ @@ -1402,7 +1403,7 @@ physical_plan 01)┌───────────────────────────┐ 02)│ SortMergeJoinExec │ 03)│ -------------------- ├──────────────┐ -04)│ on: (c1@0 = c1@0) │ │ +04)│ on: (c1 = c1) │ │ 05)└─────────────┬─────────────┘ │ 06)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 07)│ SortExec ││ SortExec │ @@ -1569,8 +1570,8 @@ physical_plan 01)┌───────────────────────────┐ 02)│ SortPreservingMergeExec │ 03)│ -------------------- │ -04)│ date@0 ASC NULLS LAST, │ -05)│ time@2 ASC NULLS LAST │ +04)│ date ASC NULLS LAST, time │ +05)│ ASC NULLS LAST │ 06)└─────────────┬─────────────┘ 07)┌─────────────┴─────────────┐ 08)│ CoalesceBatchesExec │ @@ -1581,24 +1582,23 @@ physical_plan 13)┌─────────────┴─────────────┐ 14)│ FilterExec │ 15)│ -------------------- │ -16)│ predicate: │ -17)│ ticker@1 = A │ -18)└─────────────┬─────────────┘ -19)┌─────────────┴─────────────┐ -20)│ RepartitionExec │ -21)│ -------------------- │ -22)│ output_partition_count: │ -23)│ 1 │ -24)│ │ -25)│ partitioning_scheme: │ -26)│ RoundRobinBatch(4) │ -27)└─────────────┬─────────────┘ -28)┌─────────────┴─────────────┐ -29)│ StreamingTableExec │ -30)│ -------------------- │ -31)│ infinite: true │ -32)│ limit: None │ -33)└───────────────────────────┘ +16)│ predicate: ticker = A │ +17)└─────────────┬─────────────┘ +18)┌─────────────┴─────────────┐ +19)│ RepartitionExec │ +20)│ -------------------- │ +21)│ output_partition_count: │ +22)│ 1 │ +23)│ │ +24)│ partitioning_scheme: │ +25)│ RoundRobinBatch(4) │ +26)└─────────────┬─────────────┘ +27)┌─────────────┴─────────────┐ +28)│ StreamingTableExec │ +29)│ -------------------- │ +30)│ infinite: true │ +31)│ limit: None │ +32)└───────────────────────────┘ # constant ticker, CAST(time AS DATE) = time, order by time @@ -1611,7 +1611,7 @@ physical_plan 01)┌───────────────────────────┐ 02)│ SortPreservingMergeExec │ 03)│ -------------------- │ -04)│ time@2 ASC NULLS LAST │ +04)│ time ASC NULLS LAST │ 05)└─────────────┬─────────────┘ 06)┌─────────────┴─────────────┐ 07)│ CoalesceBatchesExec │ @@ -1623,8 +1623,8 @@ physical_plan 13)│ FilterExec │ 14)│ -------------------- │ 15)│ predicate: │ -16)│ ticker@1 = A AND CAST(time│ -17)│ @2 AS Date32) = date@0 │ +16)│ ticker = A AND CAST(time │ +17)│ AS Date32) = date │ 18)└─────────────┬─────────────┘ 19)┌─────────────┴─────────────┐ 20)│ RepartitionExec │ @@ -1652,7 +1652,7 @@ physical_plan 01)┌───────────────────────────┐ 02)│ SortPreservingMergeExec │ 03)│ -------------------- │ -04)│ date@0 ASC NULLS LAST │ +04)│ date ASC NULLS LAST │ 05)└─────────────┬─────────────┘ 06)┌─────────────┴─────────────┐ 07)│ CoalesceBatchesExec │ @@ -1664,8 +1664,8 @@ physical_plan 13)│ FilterExec │ 14)│ -------------------- │ 15)│ predicate: │ -16)│ ticker@1 = A AND CAST(time│ -17)│ @2 AS Date32) = date@0 │ +16)│ ticker = A AND CAST(time │ +17)│ AS Date32) = date │ 18)└─────────────┬─────────────┘ 19)┌─────────────┴─────────────┐ 20)│ RepartitionExec │ @@ -1703,8 +1703,8 @@ physical_plan 11)│ FilterExec │ 12)│ -------------------- │ 13)│ predicate: │ -14)│ ticker@1 = A AND CAST(time│ -15)│ @2 AS Date32) = date@0 │ +14)│ ticker = A AND CAST(time │ +15)│ AS Date32) = date │ 16)└─────────────┬─────────────┘ 17)┌─────────────┴─────────────┐ 18)│ RepartitionExec │ @@ -1733,8 +1733,8 @@ physical_plan 01)┌───────────────────────────┐ 02)│ SortPreservingMergeExec │ 03)│ -------------------- │ -04)│ time@2 ASC NULLS LAST, │ -05)│ date@0 ASC NULLS LAST │ +04)│ time ASC NULLS LAST, date │ +05)│ ASC NULLS LAST │ 06)└─────────────┬─────────────┘ 07)┌─────────────┴─────────────┐ 08)│ CoalesceBatchesExec │ @@ -1746,8 +1746,8 @@ physical_plan 14)│ FilterExec │ 15)│ -------------------- │ 16)│ predicate: │ -17)│ ticker@1 = A AND CAST(time│ -18)│ @2 AS Date32) = date@0 │ +17)│ ticker = A AND CAST(time │ +18)│ AS Date32) = date │ 19)└─────────────┬─────────────┘ 20)┌─────────────┴─────────────┐ 21)│ RepartitionExec │ @@ -1778,8 +1778,8 @@ physical_plan 01)┌───────────────────────────┐ 02)│ SortPreservingMergeExec │ 03)│ -------------------- │ -04)│ ticker@1 ASC NULLS LAST, │ -05)│ time@2 ASC NULLS LAST │ +04)│ ticker ASC NULLS LAST, │ +05)│ time ASC NULLS LAST │ 06)└─────────────┬─────────────┘ 07)┌─────────────┴─────────────┐ 08)│ CoalesceBatchesExec │ @@ -1791,7 +1791,7 @@ physical_plan 14)│ FilterExec │ 15)│ -------------------- │ 16)│ predicate: │ -17)│ date@0 = 2006-01-02 │ +17)│ date = 2006-01-02 │ 18)└─────────────┬─────────────┘ 19)┌─────────────┴─────────────┐ 20)│ RepartitionExec │ @@ -1845,7 +1845,7 @@ physical_plan 20)-----------------------------┌─────────────┴─────────────┐ 21)-----------------------------│ FilterExec │ 22)-----------------------------│ -------------------- │ -23)-----------------------------│ predicate: id@0 < 10 │ +23)-----------------------------│ predicate: id < 10 │ 24)-----------------------------└─────────────┬─────────────┘ 25)-----------------------------┌─────────────┴─────────────┐ 26)-----------------------------│ RepartitionExec │ @@ -1990,7 +1990,7 @@ physical_plan 48)┌─────────────┴─────────────┐ 49)│ FilterExec │ 50)│ -------------------- │ -51)│ predicate: a@0 > 3 │ +51)│ predicate: a > 3 │ 52)└─────────────┬─────────────┘ 53)┌─────────────┴─────────────┐ 54)│ DataSourceExec │ @@ -2034,7 +2034,7 @@ physical_plan 14)┌─────────────┴─────────────┐ 15)│ FilterExec │ 16)│ -------------------- │ -17)│ predicate: c3@2 > 0 │ +17)│ predicate: c3 > 0 │ 18)└─────────────┬─────────────┘ 19)┌─────────────┴─────────────┐ 20)│ RepartitionExec │