diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 6835d9a6da2a..71d8d0026674 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -302,8 +302,8 @@ impl DisplayAs for ArrowFileSink { write!(f, ")") } DisplayFormatType::TreeRender => { - // TODO: collect info - write!(f, "") + writeln!(f, "format: arrow")?; + write!(f, "file={}", &self.config.original_url) } } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e2c7d1ecafa3..3b71593b3334 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1373,6 +1373,7 @@ mod tests { let object_store_url = ObjectStoreUrl::local_filesystem(); let file_sink_config = FileSinkConfig { + original_url: String::default(), object_store_url: object_store_url.clone(), file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], table_paths: vec![ListingTableUrl::parse(table_path)?], @@ -1458,6 +1459,7 @@ mod tests { // set file config to include partitioning on field_a let file_sink_config = FileSinkConfig { + original_url: String::default(), object_store_url: object_store_url.clone(), file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], table_paths: vec![ListingTableUrl::parse("file:///")?], @@ -1541,6 +1543,7 @@ mod tests { let object_store_url = ObjectStoreUrl::local_filesystem(); let file_sink_config = FileSinkConfig { + original_url: String::default(), object_store_url: object_store_url.clone(), file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], table_paths: vec![ListingTableUrl::parse("file:///")?], diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 4d7762784d78..21b35bac2174 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1037,6 +1037,7 @@ impl TableProvider for ListingTable { // Sink related option, apart from format let config = FileSinkConfig { + original_url: String::default(), object_store_url: self.table_paths()[0].object_store(), table_paths: self.table_paths().clone(), file_groups, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 6aff9280ffad..170f85af7a89 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -502,6 +502,7 @@ impl DefaultPhysicalPlanner { partition_by, options: source_option_tuples, }) => { + let original_url = output_url.clone(); let input_exec = children.one()?; let parsed_url = ListingTableUrl::parse(output_url)?; let object_store_url = parsed_url.object_store(); @@ -531,6 +532,7 @@ impl DefaultPhysicalPlanner { // Set file sink related options let config = FileSinkConfig { + original_url, object_store_url, table_paths: vec![parsed_url], file_groups: vec![], diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index cc1b63cfc9b4..522cb12db0c7 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -666,10 +666,8 @@ impl DisplayAs for CsvSink { write!(f, ")") } DisplayFormatType::TreeRender => { - if !self.config.file_groups.is_empty() { - FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; - } - Ok(()) + writeln!(f, "format: csv")?; + write!(f, "file={}", &self.config.original_url) } } } diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index bec3a524f657..9b6d5925fe81 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -325,10 +325,8 @@ impl DisplayAs for JsonSink { write!(f, ")") } DisplayFormatType::TreeRender => { - if !self.config.file_groups.is_empty() { - FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; - } - Ok(()) + writeln!(f, "format: json")?; + write!(f, "file={}", &self.config.original_url) } } } diff --git a/datafusion/datasource/src/file_sink_config.rs b/datafusion/datasource/src/file_sink_config.rs index 6087f930d3fe..279c9d2100ec 100644 --- a/datafusion/datasource/src/file_sink_config.rs +++ b/datafusion/datasource/src/file_sink_config.rs @@ -86,6 +86,8 @@ pub trait FileSink: DataSink { /// The base configurations to provide when creating a physical plan for /// writing to any given file format. pub struct FileSinkConfig { + /// The unresolved URL specified by the user + pub original_url: String, /// Object store URL, used to get an ObjectStore instance pub object_store_url: ObjectStoreUrl, /// A vector of [`PartitionedFile`] structs, each representing a file partition diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 6331b7fb3114..0bf9fdb63d59 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -657,6 +657,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { protobuf::InsertOp::Replace => InsertOp::Replace, }; Ok(Self { + original_url: String::default(), object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?, file_groups, table_paths, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b5bfef99a6f3..21622390c6d0 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1281,6 +1281,7 @@ fn roundtrip_json_sink() -> Result<()> { let input = Arc::new(PlaceholderRowExec::new(schema.clone())); let file_sink_config = FileSinkConfig { + original_url: String::default(), object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], table_paths: vec![ListingTableUrl::parse("file:///")?], @@ -1317,6 +1318,7 @@ fn roundtrip_csv_sink() -> Result<()> { let input = Arc::new(PlaceholderRowExec::new(schema.clone())); let file_sink_config = FileSinkConfig { + original_url: String::default(), object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], table_paths: vec![ListingTableUrl::parse("file:///")?], @@ -1372,6 +1374,7 @@ fn roundtrip_parquet_sink() -> Result<()> { let input = Arc::new(PlaceholderRowExec::new(schema.clone())); let file_sink_config = FileSinkConfig { + original_url: String::default(), object_store_url: ObjectStoreUrl::local_filesystem(), file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)], table_paths: vec![ListingTableUrl::parse("file:///")?], diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 9e047133fcfc..dbb24ac2f16d 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -1716,14 +1716,21 @@ TO 'test_files/scratch/explain_tree/1.json'; physical_plan 01)┌───────────────────────────┐ 02)│ DataSinkExec │ -03)└─────────────┬─────────────┘ -04)┌─────────────┴─────────────┐ -05)│ DataSourceExec │ -06)│ -------------------- │ -07)│ bytes: 2672 │ -08)│ format: memory │ -09)│ rows: 1 │ -10)└───────────────────────────┘ +03)│ -------------------- │ +04)│ file: │ +05)│ test_files/scratch │ +06)│ /explain_tree/1 │ +07)│ .json │ +08)│ │ +09)│ format: json │ +10)└─────────────┬─────────────┘ +11)┌─────────────┴─────────────┐ +12)│ DataSourceExec │ +13)│ -------------------- │ +14)│ bytes: 2672 │ +15)│ format: memory │ +16)│ rows: 1 │ +17)└───────────────────────────┘ query TT explain COPY (VALUES (1, 'foo', 1, '2023-01-01'), (2, 'bar', 2, '2023-01-02'), (3, 'baz', 3, '2023-01-03')) @@ -1732,14 +1739,45 @@ TO 'test_files/scratch/explain_tree/2.csv'; physical_plan 01)┌───────────────────────────┐ 02)│ DataSinkExec │ -03)└─────────────┬─────────────┘ -04)┌─────────────┴─────────────┐ -05)│ DataSourceExec │ -06)│ -------------------- │ -07)│ bytes: 2672 │ -08)│ format: memory │ -09)│ rows: 1 │ -10)└───────────────────────────┘ +03)│ -------------------- │ +04)│ file: │ +05)│ test_files/scratch │ +06)│ /explain_tree/2 │ +07)│ .csv │ +08)│ │ +09)│ format: csv │ +10)└─────────────┬─────────────┘ +11)┌─────────────┴─────────────┐ +12)│ DataSourceExec │ +13)│ -------------------- │ +14)│ bytes: 2672 │ +15)│ format: memory │ +16)│ rows: 1 │ +17)└───────────────────────────┘ + +query TT +explain COPY (VALUES (1, 'foo', 1, '2023-01-01'), (2, 'bar', 2, '2023-01-02'), (3, 'baz', 3, '2023-01-03')) +TO 'test_files/scratch/explain_tree/3.arrow'; +---- +physical_plan +01)┌───────────────────────────┐ +02)│ DataSinkExec │ +03)│ -------------------- │ +04)│ file: │ +05)│ test_files/scratch │ +06)│ /explain_tree/3 │ +07)│ .arrow │ +08)│ │ +09)│ format: arrow │ +10)└─────────────┬─────────────┘ +11)┌─────────────┴─────────────┐ +12)│ DataSourceExec │ +13)│ -------------------- │ +14)│ bytes: 2672 │ +15)│ format: memory │ +16)│ rows: 1 │ +17)└───────────────────────────┘ + # Test explain tree rendering for CoalesceBatchesExec with limit statement ok