From 6329ebd101bc6b9d02a4d09ca23f056849b8bc29 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Mar 2025 09:32:23 -0500 Subject: [PATCH 1/4] Implement tree explain for DataSourceExec --- datafusion/datasource-csv/src/source.rs | 9 +- datafusion/datasource-parquet/src/source.rs | 11 +- datafusion/datasource/src/file_scan_config.rs | 5 +- datafusion/datasource/src/memory.rs | 33 ++- datafusion/datasource/src/source.rs | 3 +- .../sqllogictest/test_files/explain_tree.slt | 228 ++++++++++++++++-- 6 files changed, 245 insertions(+), 44 deletions(-) diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 124b32d4f181..bb584433d1a4 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -617,8 +617,13 @@ impl FileSource for CsvSource { fn file_type(&self) -> &str { "csv" } - fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, ", has_header={}", self.has_header) + fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, ", has_header={}", self.has_header) + } + DisplayFormatType::TreeRender => Ok(()), + } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 51b38f04fa95..683d62a1df49 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -554,14 +554,11 @@ impl FileSource for ParquetSource { fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { - DisplayFormatType::Default - | DisplayFormatType::Verbose - | DisplayFormatType::TreeRender => { + DisplayFormatType::Default | DisplayFormatType::Verbose => { let predicate_string = self .predicate() .map(|p| format!(", predicate={p}")) .unwrap_or_default(); - let pruning_predicate_string = self .pruning_predicate() .map(|pre| { @@ -581,6 +578,12 @@ impl FileSource for ParquetSource { write!(f, "{}{}", predicate_string, pruning_predicate_string) } + DisplayFormatType::TreeRender => { + if let Some(predicate) = self.predicate() { + writeln!(f, "predicate={predicate}")?; + } + Ok(()) + } } } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 66ef6262688e..91b5f0157739 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -218,7 +218,10 @@ impl DataSource for FileScanConfig { self.fmt_file_source(t, f) } DisplayFormatType::TreeRender => { - // TODO: collect info + writeln!(f, "format={}", self.file_source.file_type())?; + self.file_source.fmt_extra(t, f)?; + let num_files = self.file_groups.iter().map(Vec::len).sum::(); + writeln!(f, "files={num_files}")?; Ok(()) } } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1ea0f2ea2e8b..fcddd4f3c425 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -425,25 +425,20 @@ impl DataSource for MemorySourceConfig { } } DisplayFormatType::TreeRender => { - let partition_sizes: Vec<_> = - self.partitions.iter().map(|b| b.len()).collect(); - writeln!(f, "partition_sizes={:?}", partition_sizes)?; - - if let Some(output_ordering) = self.sort_information.first() { - writeln!(f, "output_ordering={}", output_ordering)?; - } - - let eq_properties = self.eq_properties(); - let constraints = eq_properties.constraints(); - if !constraints.is_empty() { - writeln!(f, "constraints={}", constraints)?; - } - - if let Some(limit) = self.fetch { - writeln!(f, "fetch={}", limit)?; - } - - write!(f, "partitions={}", partition_sizes.len()) + let total_rows = self.partitions.iter().map(|b| b.len()).sum::(); + let total_bytes = self + .partitions + .iter() + .map(|b| { + b.iter() + .map(|batch| batch.get_array_memory_size()) + .sum::() + }) + .sum::(); + writeln!(f, "format=memory")?; + writeln!(f, "rows={total_rows}")?; + writeln!(f, "bytes={total_bytes}")?; + Ok(()) } } } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index bb9790e875b9..6e78df760dc3 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -52,6 +52,7 @@ pub trait DataSource: Send + Sync + Debug { context: Arc, ) -> datafusion_common::Result; fn as_any(&self) -> &dyn Any; + /// Format this source for display in explain plans fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; /// Return a copy of this DataSource with a new partitioning scheme @@ -103,7 +104,7 @@ impl DisplayAs for DataSourceExec { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "DataSourceExec: ")?; } - DisplayFormatType::TreeRender => write!(f, "")?, + DisplayFormatType::TreeRender => {} } self.data_source.fmt_as(t, f) } diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 18de2c6a617f..6eeb748b8cc4 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -54,10 +54,36 @@ STORED AS PARQUET LOCATION 'test_files/scratch/explain_tree/table2.parquet'; -# table3: Memoru +# table3: Memory statement ok CREATE TABLE table3 as select * from table1; +# table4: JSON +query I +COPY (SELECT * from table1) +TO 'test_files/scratch/explain_tree/table4.json' +---- +3 + +statement ok +CREATE EXTERNAL TABLE table4 +STORED AS JSON +LOCATION 'test_files/scratch/explain_tree/table4.json'; + +# table5: ARROW +query I +COPY (SELECT * from table1) +TO 'test_files/scratch/explain_tree/table5.arrow' +---- +3 + +statement ok +CREATE EXTERNAL TABLE table5 +STORED AS ARROW +LOCATION 'test_files/scratch/explain_tree/table5.arrow'; + + + ######## Begin Queries ######## # Filter @@ -83,7 +109,10 @@ physical_plan 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ DataSourceExec │ -15)└───────────────────────────┘ +15)│ -------------------- │ +16)│ files: 1 │ +17)│ format: csv │ +18)└───────────────────────────┘ # Aggregate query TT @@ -110,7 +139,10 @@ physical_plan 15)└─────────────┬─────────────┘ 16)┌─────────────┴─────────────┐ 17)│ DataSourceExec │ -18)└───────────────────────────┘ +18)│ -------------------- │ +19)│ files: 1 │ +20)│ format: csv │ +21)└───────────────────────────┘ # 2 Joins query TT @@ -139,7 +171,10 @@ physical_plan 15)└─────────────┬─────────────┘└─────────────┬─────────────┘ 16)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 17)│ DataSourceExec ││ DataSourceExec │ -18)└───────────────────────────┘└───────────────────────────┘ +18)│ -------------------- ││ -------------------- │ +19)│ files: 1 ││ files: 1 │ +20)│ format: csv ││ format: parquet │ +21)└───────────────────────────┘└───────────────────────────┘ # 3 Joins query TT @@ -175,18 +210,22 @@ physical_plan 13)┌─────────────┴─────────────┐┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 14)│ CoalesceBatchesExec ││ CoalesceBatchesExec ││ DataSourceExec │ 15)│ ││ ││ -------------------- │ -16)│ ││ ││ partition_sizes: [1] │ -17)│ ││ ││ partitions: 1 │ -18)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘ -19)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ -20)│ RepartitionExec ││ RepartitionExec │ -21)└─────────────┬─────────────┘└─────────────┬─────────────┘ -22)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ -23)│ RepartitionExec ││ RepartitionExec │ -24)└─────────────┬─────────────┘└─────────────┬─────────────┘ -25)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ -26)│ DataSourceExec ││ DataSourceExec │ -27)└───────────────────────────┘└───────────────────────────┘ +16)│ ││ ││ bytes: 1560 │ +17)│ ││ ││ format: memory │ +18)│ ││ ││ rows: 1 │ +19)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘ +20)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ +21)│ RepartitionExec ││ RepartitionExec │ +22)└─────────────┬─────────────┘└─────────────┬─────────────┘ +23)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ +24)│ RepartitionExec ││ RepartitionExec │ +25)└─────────────┬─────────────┘└─────────────┬─────────────┘ +26)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ +27)│ DataSourceExec ││ DataSourceExec │ +28)│ -------------------- ││ -------------------- │ +29)│ files: 1 ││ files: 1 │ +30)│ format: csv ││ format: parquet │ +31)└───────────────────────────┘└───────────────────────────┘ # Long Filter (demonstrate what happens with wrapping) query TT @@ -213,7 +252,156 @@ physical_plan 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ DataSourceExec │ -15)└───────────────────────────┘ +15)│ -------------------- │ +16)│ files: 1 │ +17)│ format: csv │ +18)└───────────────────────────┘ + +# Query with filter on csv +query TT +explain SELECT int_col FROM table2 WHERE string_col != 'foo'; +---- +logical_plan +01)Projection: table2.int_col +02)--Filter: table2.string_col != Utf8View("foo") +03)----TableScan: table2 projection=[int_col, string_col], partial_filters=[table2.string_col != Utf8View("foo")] +physical_plan +01)┌───────────────────────────┐ +02)│ CoalesceBatchesExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ FilterExec │ +06)│ -------------------- │ +07)│ predicate: │ +08)│ string_col@1 != foo │ +09)└─────────────┬─────────────┘ +10)┌─────────────┴─────────────┐ +11)│ RepartitionExec │ +12)└─────────────┬─────────────┘ +13)┌─────────────┴─────────────┐ +14)│ DataSourceExec │ +15)│ -------------------- │ +16)│ files: 1 │ +17)│ format: parquet │ +18)│ │ +19)│ predicate: │ +20)│ string_col@1 != foo │ +21)└───────────────────────────┘ + + +# Query with filter on parquet +query TT +explain SELECT int_col FROM table2 WHERE string_col != 'foo'; +---- +logical_plan +01)Projection: table2.int_col +02)--Filter: table2.string_col != Utf8View("foo") +03)----TableScan: table2 projection=[int_col, string_col], partial_filters=[table2.string_col != Utf8View("foo")] +physical_plan +01)┌───────────────────────────┐ +02)│ CoalesceBatchesExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ FilterExec │ +06)│ -------------------- │ +07)│ predicate: │ +08)│ string_col@1 != foo │ +09)└─────────────┬─────────────┘ +10)┌─────────────┴─────────────┐ +11)│ RepartitionExec │ +12)└─────────────┬─────────────┘ +13)┌─────────────┴─────────────┐ +14)│ DataSourceExec │ +15)│ -------------------- │ +16)│ files: 1 │ +17)│ format: parquet │ +18)│ │ +19)│ predicate: │ +20)│ string_col@1 != foo │ +21)└───────────────────────────┘ + +# Query with filter on memory +query TT +explain SELECT int_col FROM table3 WHERE string_col != 'foo'; +---- +logical_plan +01)Projection: table3.int_col +02)--Filter: table3.string_col != Utf8("foo") +03)----TableScan: table3 projection=[int_col, string_col] +physical_plan +01)┌───────────────────────────┐ +02)│ CoalesceBatchesExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ FilterExec │ +06)│ -------------------- │ +07)│ predicate: │ +08)│ string_col@1 != foo │ +09)└─────────────┬─────────────┘ +10)┌─────────────┴─────────────┐ +11)│ DataSourceExec │ +12)│ -------------------- │ +13)│ bytes: 1560 │ +14)│ format: memory │ +15)│ rows: 1 │ +16)└───────────────────────────┘ + +# Query with filter on json +query TT +explain SELECT int_col FROM table4 WHERE string_col != 'foo'; +---- +logical_plan +01)Projection: table4.int_col +02)--Filter: table4.string_col != Utf8("foo") +03)----TableScan: table4 projection=[int_col, string_col], partial_filters=[table4.string_col != Utf8("foo")] +physical_plan +01)┌───────────────────────────┐ +02)│ CoalesceBatchesExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ FilterExec │ +06)│ -------------------- │ +07)│ predicate: │ +08)│ string_col@1 != foo │ +09)└─────────────┬─────────────┘ +10)┌─────────────┴─────────────┐ +11)│ RepartitionExec │ +12)└─────────────┬─────────────┘ +13)┌─────────────┴─────────────┐ +14)│ DataSourceExec │ +15)│ -------------------- │ +16)│ files: 1 │ +17)│ format: json │ +18)└───────────────────────────┘ + +# Query with filter on arrow +query TT +explain SELECT int_col FROM table5 WHERE string_col != 'foo'; +---- +logical_plan +01)Projection: table5.int_col +02)--Filter: table5.string_col != Utf8("foo") +03)----TableScan: table5 projection=[int_col, string_col], partial_filters=[table5.string_col != Utf8("foo")] +physical_plan +01)┌───────────────────────────┐ +02)│ CoalesceBatchesExec │ +03)└─────────────┬─────────────┘ +04)┌─────────────┴─────────────┐ +05)│ FilterExec │ +06)│ -------------------- │ +07)│ predicate: │ +08)│ string_col@1 != foo │ +09)└─────────────┬─────────────┘ +10)┌─────────────┴─────────────┐ +11)│ RepartitionExec │ +12)└─────────────┬─────────────┘ +13)┌─────────────┴─────────────┐ +14)│ DataSourceExec │ +15)│ -------------------- │ +16)│ files: 1 │ +17)│ format: arrow │ +18)└───────────────────────────┘ + # cleanup @@ -225,3 +413,9 @@ drop table table2; statement ok drop table table3; + +statement ok +drop table table4; + +statement ok +drop table table5; From cc8de026eb70ec797b7b92d89893cb2085324a47 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 5 Mar 2025 20:00:06 -0500 Subject: [PATCH 2/4] improve test --- .../sqllogictest/test_files/explain_tree.slt | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 6eeb748b8cc4..9d50b9bd626e 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -259,12 +259,12 @@ physical_plan # Query with filter on csv query TT -explain SELECT int_col FROM table2 WHERE string_col != 'foo'; +explain SELECT int_col FROM table1 WHERE string_col != 'foo'; ---- logical_plan -01)Projection: table2.int_col -02)--Filter: table2.string_col != Utf8View("foo") -03)----TableScan: table2 projection=[int_col, string_col], partial_filters=[table2.string_col != Utf8View("foo")] +01)Projection: table1.int_col +02)--Filter: table1.string_col != Utf8("foo") +03)----TableScan: table1 projection=[int_col, string_col], partial_filters=[table1.string_col != Utf8("foo")] physical_plan 01)┌───────────────────────────┐ 02)│ CoalesceBatchesExec │ @@ -282,11 +282,8 @@ physical_plan 14)│ DataSourceExec │ 15)│ -------------------- │ 16)│ files: 1 │ -17)│ format: parquet │ -18)│ │ -19)│ predicate: │ -20)│ string_col@1 != foo │ -21)└───────────────────────────┘ +17)│ format: csv │ +18)└───────────────────────────┘ # Query with filter on parquet @@ -402,8 +399,6 @@ physical_plan 17)│ format: arrow │ 18)└───────────────────────────┘ - - # cleanup statement ok drop table table1; From a6c8dbe993737caaed75b202703599ebfc43e728 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Mar 2025 06:39:59 -0500 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Oleks V --- datafusion/datasource/src/memory.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index fcddd4f3c425..5b2a3637b66c 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -426,15 +426,7 @@ impl DataSource for MemorySourceConfig { } DisplayFormatType::TreeRender => { let total_rows = self.partitions.iter().map(|b| b.len()).sum::(); - let total_bytes = self - .partitions - .iter() - .map(|b| { - b.iter() - .map(|batch| batch.get_array_memory_size()) - .sum::() - }) - .sum::(); + let total_bytes: usize = self.partitions.iter().flatten().map(|batch| batch.get_array_memory_size()).sum(); writeln!(f, "format=memory")?; writeln!(f, "rows={total_rows}")?; writeln!(f, "bytes={total_bytes}")?; From dce545be47ca3ddf4bfb482f9da91b657381270e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 6 Mar 2025 06:40:31 -0500 Subject: [PATCH 4/4] fmt --- datafusion/datasource/src/memory.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 5b2a3637b66c..64fd56971b29 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -426,7 +426,12 @@ impl DataSource for MemorySourceConfig { } DisplayFormatType::TreeRender => { let total_rows = self.partitions.iter().map(|b| b.len()).sum::(); - let total_bytes: usize = self.partitions.iter().flatten().map(|batch| batch.get_array_memory_size()).sum(); + let total_bytes: usize = self + .partitions + .iter() + .flatten() + .map(|batch| batch.get_array_memory_size()) + .sum(); writeln!(f, "format=memory")?; writeln!(f, "rows={total_rows}")?; writeln!(f, "bytes={total_bytes}")?;