From 611bee03f483edc52dd594946997ddce787d925a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 14 Dec 2023 15:16:25 +0000 Subject: [PATCH 1/4] Remove ListingTable and FileScanConfig Unbounded (#8540) --- .../core/src/datasource/file_format/mod.rs | 1 - .../src/datasource/file_format/options.rs | 48 ++---- .../core/src/datasource/listing/table.rs | 152 ------------------ .../src/datasource/listing_table_factory.rs | 16 +- .../datasource/physical_plan/arrow_file.rs | 4 - .../core/src/datasource/physical_plan/avro.rs | 4 - .../core/src/datasource/physical_plan/csv.rs | 4 - .../physical_plan/file_scan_config.rs | 3 - .../datasource/physical_plan/file_stream.rs | 1 - .../core/src/datasource/physical_plan/json.rs | 8 - .../core/src/datasource/physical_plan/mod.rs | 4 - .../datasource/physical_plan/parquet/mod.rs | 4 - datafusion/core/src/execution/context/mod.rs | 11 +- .../combine_partial_final_agg.rs | 1 - .../enforce_distribution.rs | 5 - .../src/physical_optimizer/enforce_sorting.rs | 67 +------- .../physical_optimizer/projection_pushdown.rs | 2 - .../replace_with_order_preserving_variants.rs | 92 ++++++----- .../core/src/physical_optimizer/test_utils.rs | 24 +-- datafusion/core/src/test/mod.rs | 3 - datafusion/core/src/test_util/mod.rs | 25 +-- datafusion/core/src/test_util/parquet.rs | 1 - datafusion/core/tests/sql/joins.rs | 8 +- .../proto/src/physical_plan/from_proto.rs | 1 - 24 files changed, 87 insertions(+), 402 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 7c2331548e5e..12c9fb91adb1 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -165,7 +165,6 @@ pub(crate) mod test_util { limit, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, None, ) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 4c7557a4a9c0..d389137785ff 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Schema, SchemaRef}; use async_trait::async_trait; -use datafusion_common::{plan_err, DataFusionError}; use crate::datasource::file_format::arrow::ArrowFormat; use crate::datasource::file_format::file_compression_type::FileCompressionType; @@ -72,8 +71,6 @@ pub struct CsvReadOptions<'a> { pub table_partition_cols: Vec<(String, DataType)>, /// File compression type pub file_compression_type: FileCompressionType, - /// Flag indicating whether this file may be unbounded (as in a FIFO file). - pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, } @@ -97,7 +94,6 @@ impl<'a> CsvReadOptions<'a> { file_extension: DEFAULT_CSV_EXTENSION, table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, - infinite: false, file_sort_order: vec![], } } @@ -108,12 +104,6 @@ impl<'a> CsvReadOptions<'a> { self } - /// Configure mark_infinite setting - pub fn mark_infinite(mut self, infinite: bool) -> Self { - self.infinite = infinite; - self - } - /// Specify delimiter to use for CSV read pub fn delimiter(mut self, delimiter: u8) -> Self { self.delimiter = delimiter; @@ -324,8 +314,6 @@ pub struct AvroReadOptions<'a> { pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec<(String, DataType)>, - /// Flag indicating whether this file may be unbounded (as in a FIFO file). - pub infinite: bool, } impl<'a> Default for AvroReadOptions<'a> { @@ -334,7 +322,6 @@ impl<'a> Default for AvroReadOptions<'a> { schema: None, file_extension: DEFAULT_AVRO_EXTENSION, table_partition_cols: vec![], - infinite: false, } } } @@ -349,12 +336,6 @@ impl<'a> AvroReadOptions<'a> { self } - /// Configure mark_infinite setting - pub fn mark_infinite(mut self, infinite: bool) -> Self { - self.infinite = infinite; - self - } - /// Specify schema to use for AVRO read pub fn schema(mut self, schema: &'a Schema) -> Self { self.schema = Some(schema); @@ -466,21 +447,17 @@ pub trait ReadOptions<'a> { state: SessionState, table_path: ListingTableUrl, schema: Option<&'a Schema>, - infinite: bool, ) -> Result where 'a: 'async_trait, { - match (schema, infinite) { - (Some(s), _) => Ok(Arc::new(s.to_owned())), - (None, false) => Ok(self - .to_listing_options(config) - .infer_schema(&state, &table_path) - .await?), - (None, true) => { - plan_err!("Schema inference for infinite data sources is not supported.") - } + if let Some(s) = schema { + return Ok(Arc::new(s.to_owned())); } + + self.to_listing_options(config) + .infer_schema(&state, &table_path) + .await } } @@ -500,7 +477,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) .with_file_sort_order(self.file_sort_order.clone()) - .with_infinite_source(self.infinite) } async fn get_resolved_schema( @@ -509,7 +485,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { state: SessionState, table_path: ListingTableUrl, ) -> Result { - self._get_resolved_schema(config, state, table_path, self.schema, self.infinite) + self._get_resolved_schema(config, state, table_path, self.schema) .await } } @@ -535,7 +511,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { state: SessionState, table_path: ListingTableUrl, ) -> Result { - self._get_resolved_schema(config, state, table_path, self.schema, false) + self._get_resolved_schema(config, state, table_path, self.schema) .await } } @@ -551,7 +527,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { .with_file_extension(self.file_extension) .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) - .with_infinite_source(self.infinite) .with_file_sort_order(self.file_sort_order.clone()) } @@ -561,7 +536,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { state: SessionState, table_path: ListingTableUrl, ) -> Result { - self._get_resolved_schema(config, state, table_path, self.schema, self.infinite) + self._get_resolved_schema(config, state, table_path, self.schema) .await } } @@ -575,7 +550,6 @@ impl ReadOptions<'_> for AvroReadOptions<'_> { .with_file_extension(self.file_extension) .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) - .with_infinite_source(self.infinite) } async fn get_resolved_schema( @@ -584,7 +558,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> { state: SessionState, table_path: ListingTableUrl, ) -> Result { - self._get_resolved_schema(config, state, table_path, self.schema, self.infinite) + self._get_resolved_schema(config, state, table_path, self.schema) .await } } @@ -606,7 +580,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> { state: SessionState, table_path: ListingTableUrl, ) -> Result { - self._get_resolved_schema(config, state, table_path, self.schema, false) + self._get_resolved_schema(config, state, table_path, self.schema) .await } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 0ce1b43fe456..4c13d9d443ca 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -246,11 +246,6 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, - /// Infinite source means that the input is not guaranteed to end. - /// Currently, CSV, JSON, and AVRO formats are supported. - /// In order to support infinite inputs, DataFusion may adjust query - /// plans (e.g. joins) to run the given query in full pipelining mode. - pub infinite_source: bool, /// This setting when true indicates that the table is backed by a single file. /// Any inserts to the table may only append to this existing file. pub single_file: bool, @@ -274,30 +269,11 @@ impl ListingOptions { collect_stat: true, target_partitions: 1, file_sort_order: vec![], - infinite_source: false, single_file: false, file_type_write_options: None, } } - /// Set unbounded assumption on [`ListingOptions`] and returns self. - /// - /// ``` - /// use std::sync::Arc; - /// use datafusion::datasource::{listing::ListingOptions, file_format::csv::CsvFormat}; - /// use datafusion::prelude::SessionContext; - /// let ctx = SessionContext::new(); - /// let listing_options = ListingOptions::new(Arc::new( - /// CsvFormat::default() - /// )).with_infinite_source(true); - /// - /// assert_eq!(listing_options.infinite_source, true); - /// ``` - pub fn with_infinite_source(mut self, infinite_source: bool) -> Self { - self.infinite_source = infinite_source; - self - } - /// Set file extension on [`ListingOptions`] and returns self. /// /// ``` @@ -557,7 +533,6 @@ pub struct ListingTable { options: ListingOptions, definition: Option, collected_statistics: FileStatisticsCache, - infinite_source: bool, constraints: Constraints, column_defaults: HashMap, } @@ -587,7 +562,6 @@ impl ListingTable { for (part_col_name, part_col_type) in &options.table_partition_cols { builder.push(Field::new(part_col_name, part_col_type.clone(), false)); } - let infinite_source = options.infinite_source; let table = Self { table_paths: config.table_paths, @@ -596,7 +570,6 @@ impl ListingTable { options, definition: None, collected_statistics: Arc::new(DefaultFileStatisticsCache::default()), - infinite_source, constraints: Constraints::empty(), column_defaults: HashMap::new(), }; @@ -729,7 +702,6 @@ impl TableProvider for ListingTable { limit, output_ordering: self.try_create_output_ordering()?, table_partition_cols, - infinite_source: self.infinite_source, }, filters.as_ref(), ) @@ -943,7 +915,6 @@ impl ListingTable { #[cfg(test)] mod tests { use std::collections::HashMap; - use std::fs::File; use super::*; #[cfg(feature = "parquet")] @@ -955,7 +926,6 @@ mod tests { use crate::{ assert_batches_eq, datasource::file_format::avro::AvroFormat, - execution::options::ReadOptions, logical_expr::{col, lit}, test::{columns, object_store::register_test_store}, }; @@ -967,37 +937,8 @@ mod tests { use datafusion_common::{assert_contains, GetExt, ScalarValue}; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use datafusion_physical_expr::PhysicalSortExpr; - use rstest::*; use tempfile::TempDir; - /// It creates dummy file and checks if it can create unbounded input executors. - async fn unbounded_table_helper( - file_type: FileType, - listing_option: ListingOptions, - infinite_data: bool, - ) -> Result<()> { - let ctx = SessionContext::new(); - register_test_store( - &ctx, - &[(&format!("table/file{}", file_type.get_ext()), 100)], - ); - - let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); - - let table_path = ListingTableUrl::parse("test:///table/").unwrap(); - let config = ListingTableConfig::new(table_path) - .with_listing_options(listing_option) - .with_schema(Arc::new(schema)); - // Create a table - let table = ListingTable::try_new(config)?; - // Create executor from table - let source_exec = table.scan(&ctx.state(), None, &[], None).await?; - - assert_eq!(source_exec.unbounded_output(&[])?, infinite_data); - - Ok(()) - } - #[tokio::test] async fn read_single_file() -> Result<()> { let ctx = SessionContext::new(); @@ -1205,99 +1146,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn unbounded_csv_table_without_schema() -> Result<()> { - let tmp_dir = TempDir::new()?; - let file_path = tmp_dir.path().join("dummy.csv"); - File::create(file_path)?; - let ctx = SessionContext::new(); - let error = ctx - .register_csv( - "test", - tmp_dir.path().to_str().unwrap(), - CsvReadOptions::new().mark_infinite(true), - ) - .await - .unwrap_err(); - match error { - DataFusionError::Plan(_) => Ok(()), - val => Err(val), - } - } - - #[tokio::test] - async fn unbounded_json_table_without_schema() -> Result<()> { - let tmp_dir = TempDir::new()?; - let file_path = tmp_dir.path().join("dummy.json"); - File::create(file_path)?; - let ctx = SessionContext::new(); - let error = ctx - .register_json( - "test", - tmp_dir.path().to_str().unwrap(), - NdJsonReadOptions::default().mark_infinite(true), - ) - .await - .unwrap_err(); - match error { - DataFusionError::Plan(_) => Ok(()), - val => Err(val), - } - } - - #[tokio::test] - async fn unbounded_avro_table_without_schema() -> Result<()> { - let tmp_dir = TempDir::new()?; - let file_path = tmp_dir.path().join("dummy.avro"); - File::create(file_path)?; - let ctx = SessionContext::new(); - let error = ctx - .register_avro( - "test", - tmp_dir.path().to_str().unwrap(), - AvroReadOptions::default().mark_infinite(true), - ) - .await - .unwrap_err(); - match error { - DataFusionError::Plan(_) => Ok(()), - val => Err(val), - } - } - - #[rstest] - #[tokio::test] - async fn unbounded_csv_table( - #[values(true, false)] infinite_data: bool, - ) -> Result<()> { - let config = CsvReadOptions::new().mark_infinite(infinite_data); - let session_config = SessionConfig::new().with_target_partitions(1); - let listing_options = config.to_listing_options(&session_config); - unbounded_table_helper(FileType::CSV, listing_options, infinite_data).await - } - - #[rstest] - #[tokio::test] - async fn unbounded_json_table( - #[values(true, false)] infinite_data: bool, - ) -> Result<()> { - let config = NdJsonReadOptions::default().mark_infinite(infinite_data); - let session_config = SessionConfig::new().with_target_partitions(1); - let listing_options = config.to_listing_options(&session_config); - unbounded_table_helper(FileType::JSON, listing_options, infinite_data).await - } - - #[rstest] - #[tokio::test] - async fn unbounded_avro_table( - #[values(true, false)] infinite_data: bool, - ) -> Result<()> { - let config = AvroReadOptions::default().mark_infinite(infinite_data); - let session_config = SessionConfig::new().with_target_partitions(1); - let listing_options = config.to_listing_options(&session_config); - unbounded_table_helper(FileType::AVRO, listing_options, infinite_data).await - } - #[tokio::test] async fn test_assert_list_files_for_scan_grouping() -> Result<()> { // more expected partitions than files diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index a9d0c3a0099e..7c859ee988d5 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -133,21 +133,9 @@ impl TableProviderFactory for ListingTableFactory { (Some(schema), table_partition_cols) }; - // look for 'infinite' as an option - let infinite_source = cmd.unbounded; - let mut statement_options = StatementOptions::from(&cmd.options); // Extract ListingTable specific options if present or set default - let unbounded = if infinite_source { - statement_options.take_str_option("unbounded"); - infinite_source - } else { - statement_options - .take_bool_option("unbounded")? - .unwrap_or(false) - }; - let single_file = statement_options .take_bool_option("single_file")? .unwrap_or(false); @@ -159,6 +147,7 @@ impl TableProviderFactory for ListingTableFactory { } } statement_options.take_bool_option("create_local_path")?; + statement_options.take_str_option("unbounded"); let file_type = file_format.file_type(); @@ -207,8 +196,7 @@ impl TableProviderFactory for ListingTableFactory { .with_table_partition_cols(table_partition_cols) .with_file_sort_order(cmd.order_exprs.clone()) .with_single_file(single_file) - .with_write_options(file_type_writer_options) - .with_infinite_source(unbounded); + .with_write_options(file_type_writer_options); let resolved_schema = match provided_schema { None => options.infer_schema(state, &table_path).await?, diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 30b55db28491..ae1e879d0da1 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -93,10 +93,6 @@ impl ExecutionPlan for ArrowExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } - fn unbounded_output(&self, _: &[bool]) -> Result { - Ok(self.base_config().infinite_source) - } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { self.projected_output_ordering .first() diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 885b4c5d3911..10a6a85daa2f 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -89,10 +89,6 @@ impl ExecutionPlan for AvroExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } - fn unbounded_output(&self, _: &[bool]) -> Result { - Ok(self.base_config().infinite_source) - } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { self.projected_output_ordering .first() diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 0eca37da139d..0c34d22e9fa9 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -146,10 +146,6 @@ impl ExecutionPlan for CsvExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } - fn unbounded_output(&self, _: &[bool]) -> Result { - Ok(self.base_config().infinite_source) - } - /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { self.projected_output_ordering diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 89694ff28500..516755e4d293 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -99,8 +99,6 @@ pub struct FileScanConfig { pub table_partition_cols: Vec, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, - /// Indicates whether this plan may produce an infinite stream of records. - pub infinite_source: bool, } impl FileScanConfig { @@ -707,7 +705,6 @@ mod tests { statistics, table_partition_cols, output_ordering: vec![], - infinite_source: false, } } diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index a715f6e8e3cd..99fb088b66f4 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -667,7 +667,6 @@ mod tests { limit: self.limit, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }; let metrics_set = ExecutionPlanMetricsSet::new(); let file_stream = FileStream::new(&config, 0, self.opener, &metrics_set) diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 9c3b523a652c..c74fd13e77aa 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -110,10 +110,6 @@ impl ExecutionPlan for NdJsonExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } - fn unbounded_output(&self, _: &[bool]) -> Result { - Ok(self.base_config.infinite_source) - } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { self.projected_output_ordering .first() @@ -462,7 +458,6 @@ mod tests { limit: Some(3), table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, file_compression_type.to_owned(), ); @@ -541,7 +536,6 @@ mod tests { limit: Some(3), table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, file_compression_type.to_owned(), ); @@ -589,7 +583,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, file_compression_type.to_owned(), ); @@ -642,7 +635,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 8e4dd5400b20..9d1c373aee7c 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -133,10 +133,6 @@ impl DisplayAs for FileScanConfig { write!(f, ", limit={limit}")?; } - if self.infinite_source { - write!(f, ", infinite_source=true")?; - } - if let Some(ordering) = orderings.first() { if !ordering.is_empty() { let start = if orderings.len() == 1 { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 2b10b05a273a..ade149da6991 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -882,7 +882,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, predicate, None, @@ -1539,7 +1538,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, None, None, @@ -1654,7 +1652,6 @@ mod tests { ), ], output_ordering: vec![], - infinite_source: false, }, None, None, @@ -1718,7 +1715,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, None, None, diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 58a4f08341d6..8916fa814a4a 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -964,14 +964,9 @@ impl SessionContext { sql_definition: Option, ) -> Result<()> { let table_path = ListingTableUrl::parse(table_path)?; - let resolved_schema = match (provided_schema, options.infinite_source) { - (Some(s), _) => s, - (None, false) => options.infer_schema(&self.state(), &table_path).await?, - (None, true) => { - return plan_err!( - "Schema inference for infinite data sources is not supported." - ) - } + let resolved_schema = match provided_schema { + Some(s) => s, + None => options.infer_schema(&self.state(), &table_path).await?, }; let config = ListingTableConfig::new(table_path) .with_listing_options(options) diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index c50ea36b68ec..7359a6463059 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -257,7 +257,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, None, None, diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 099759741a10..0aef126578f3 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1775,7 +1775,6 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering, - infinite_source: false, }, None, None, @@ -1803,7 +1802,6 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering, - infinite_source: false, }, None, None, @@ -1825,7 +1823,6 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering, - infinite_source: false, }, false, b',', @@ -1856,7 +1853,6 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering, - infinite_source: false, }, false, b',', @@ -3957,7 +3953,6 @@ pub(crate) mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, false, b',', diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 277404b301c4..6e344550965e 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -2117,7 +2117,7 @@ mod tests { async fn test_with_lost_ordering_bounded() -> Result<()> { let schema = create_test_schema3()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, false); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec(source); let repartition_hash = Arc::new(RepartitionExec::try_new( repartition_rr, @@ -2140,70 +2140,11 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_with_lost_ordering_unbounded() -> Result<()> { - let schema = create_test_schema3()?; - let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); - let repartition_rr = repartition_exec(source); - let repartition_hash = Arc::new(RepartitionExec::try_new( - repartition_rr, - Partitioning::Hash(vec![col("c", &schema).unwrap()], 10), - )?) as _; - let coalesce_partitions = coalesce_partitions_exec(repartition_hash); - let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions); - - let expected_input = [ - "SortExec: expr=[a@0 ASC]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false" - ]; - let expected_optimized = [ - "SortPreservingMergeExec: [a@0 ASC]", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, true); - Ok(()) - } - - #[tokio::test] - async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> { - let schema = create_test_schema3()?; - let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); - let repartition_rr = repartition_exec(source); - let repartition_hash = Arc::new(RepartitionExec::try_new( - repartition_rr, - Partitioning::Hash(vec![col("c", &schema).unwrap()], 10), - )?) as _; - let coalesce_partitions = coalesce_partitions_exec(repartition_hash); - let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions); - - let expected_input = ["SortExec: expr=[a@0 ASC]", - " CoalescePartitionsExec", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false" - ]; - let expected_optimized = [ - "SortPreservingMergeExec: [a@0 ASC]", - " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false", - ]; - assert_optimized!(expected_input, expected_optimized, physical_plan, false); - Ok(()) - } - #[tokio::test] async fn test_do_not_pushdown_through_spm() -> Result<()> { let schema = create_test_schema3()?; let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs.clone(), false); + let source = csv_exec_sorted(&schema, sort_exprs.clone()); let repartition_rr = repartition_exec(source); let spm = sort_preserving_merge_exec(sort_exprs, repartition_rr); let physical_plan = sort_exec(vec![sort_expr("b", &schema)], spm); @@ -2224,7 +2165,7 @@ mod tests { async fn test_pushdown_through_spm() -> Result<()> { let schema = create_test_schema3()?; let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs.clone(), false); + let source = csv_exec_sorted(&schema, sort_exprs.clone()); let repartition_rr = repartition_exec(source); let spm = sort_preserving_merge_exec(sort_exprs, repartition_rr); let physical_plan = sort_exec( @@ -2252,7 +2193,7 @@ mod tests { async fn test_window_multi_layer_requirement() -> Result<()> { let schema = create_test_schema3()?; let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b", &schema)]; - let source = csv_exec_sorted(&schema, vec![], false); + let source = csv_exec_sorted(&schema, vec![]); let sort = sort_exec(sort_exprs.clone(), source); let repartition = repartition_exec(sort); let repartition = spr_repartition_exec(repartition); diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 664afbe822ff..7e1312dad23e 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -1541,7 +1541,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![vec![]], - infinite_source: false, }, false, 0, @@ -1568,7 +1567,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![vec![]], - infinite_source: false, }, false, 0, diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index af45df7d8474..41f2b39978a4 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -350,7 +350,7 @@ mod tests { async fn test_replace_multiple_input_repartition_1() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); @@ -362,15 +362,15 @@ mod tests { " SortExec: expr=[a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } @@ -378,7 +378,7 @@ mod tests { async fn test_with_inter_children_change_only() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr_default("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); @@ -408,7 +408,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", ]; let expected_optimized = [ @@ -419,9 +419,9 @@ mod tests { " SortPreservingMergeExec: [a@0 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } @@ -429,7 +429,7 @@ mod tests { async fn test_replace_multiple_input_repartition_2() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let filter = filter_exec(repartition_rr); let repartition_hash = repartition_exec_hash(filter); @@ -444,16 +444,16 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " FilterExec: c@1 > 3", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } @@ -461,7 +461,7 @@ mod tests { async fn test_replace_multiple_input_repartition_with_extra_steps() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -478,7 +478,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -486,9 +486,9 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } @@ -496,7 +496,7 @@ mod tests { async fn test_replace_multiple_input_repartition_with_extra_steps_2() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr); let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1); @@ -516,7 +516,7 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -525,9 +525,9 @@ mod tests { " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } @@ -535,7 +535,7 @@ mod tests { async fn test_not_replacing_when_no_need_to_preserve_sorting() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -550,7 +550,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ "CoalescePartitionsExec", @@ -558,7 +558,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -568,7 +568,7 @@ mod tests { async fn test_with_multiple_replacable_repartitions() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let filter = filter_exec(repartition_hash); @@ -587,7 +587,7 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", @@ -596,9 +596,9 @@ mod tests { " FilterExec: c@1 > 3", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } @@ -606,7 +606,7 @@ mod tests { async fn test_not_replace_with_different_orderings() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let sort = sort_exec( @@ -625,14 +625,14 @@ mod tests { " SortExec: expr=[c@1 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ "SortPreservingMergeExec: [c@1 ASC]", " SortExec: expr=[c@1 ASC]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -642,7 +642,7 @@ mod tests { async fn test_with_lost_ordering() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); @@ -654,15 +654,15 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ "SortPreservingMergeExec: [a@0 ASC NULLS LAST]", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } @@ -670,7 +670,7 @@ mod tests { async fn test_with_lost_and_kept_ordering() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, true); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition_rr = repartition_exec_round_robin(source); let repartition_hash = repartition_exec_hash(repartition_rr); let coalesce_partitions = coalesce_partitions_exec(repartition_hash); @@ -700,7 +700,7 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ @@ -712,9 +712,9 @@ mod tests { " CoalescePartitionsExec", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; - assert_optimized!(expected_input, expected_optimized, physical_plan); + assert_optimized!(expected_input, expected_optimized, physical_plan, true); Ok(()) } @@ -723,14 +723,14 @@ mod tests { let schema = create_test_schema()?; let left_sort_exprs = vec![sort_expr("a", &schema)]; - let left_source = csv_exec_sorted(&schema, left_sort_exprs, true); + let left_source = csv_exec_sorted(&schema, left_sort_exprs); let left_repartition_rr = repartition_exec_round_robin(left_source); let left_repartition_hash = repartition_exec_hash(left_repartition_rr); let left_coalesce_partitions = Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096)); let right_sort_exprs = vec![sort_expr("a", &schema)]; - let right_source = csv_exec_sorted(&schema, right_sort_exprs, true); + let right_source = csv_exec_sorted(&schema, right_sort_exprs); let right_repartition_rr = repartition_exec_round_robin(right_source); let right_repartition_hash = repartition_exec_hash(right_repartition_rr); let right_coalesce_partitions = @@ -756,11 +756,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; let expected_optimized = [ @@ -770,11 +770,11 @@ mod tests { " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", " CoalesceBatchesExec: target_batch_size=4096", " RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", - " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true", + " CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true", ]; assert_optimized!(expected_input, expected_optimized, physical_plan); Ok(()) @@ -784,7 +784,7 @@ mod tests { async fn test_with_bounded_input() -> Result<()> { let schema = create_test_schema()?; let sort_exprs = vec![sort_expr("a", &schema)]; - let source = csv_exec_sorted(&schema, sort_exprs, false); + let source = csv_exec_sorted(&schema, sort_exprs); let repartition = repartition_exec_hash(repartition_exec_round_robin(source)); let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true); @@ -931,7 +931,6 @@ mod tests { fn csv_exec_sorted( schema: &SchemaRef, sort_exprs: impl IntoIterator, - infinite_source: bool, ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); let projection: Vec = vec![0, 2, 3]; @@ -949,7 +948,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![sort_exprs], - infinite_source, }, true, 0, diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 678dc1f373e3..6e14cca21fed 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -45,6 +45,7 @@ use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction}; use datafusion_physical_expr::expressions::col; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use crate::datasource::stream::{StreamConfig, StreamTable}; use async_trait::async_trait; async fn register_current_csv( @@ -54,14 +55,19 @@ async fn register_current_csv( ) -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let schema = crate::test_util::aggr_test_schema(); - ctx.register_csv( - table_name, - &format!("{testdata}/csv/aggregate_test_100.csv"), - CsvReadOptions::new() - .schema(&schema) - .mark_infinite(infinite), - ) - .await?; + let path = format!("{testdata}/csv/aggregate_test_100.csv"); + + match infinite { + true => { + let config = StreamConfig::new_file(schema, path.into()); + ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; + } + false => { + ctx.register_csv(table_name, &path, CsvReadOptions::new().schema(&schema)) + .await?; + } + } + Ok(()) } @@ -272,7 +278,6 @@ pub fn parquet_exec(schema: &SchemaRef) -> Arc { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, None, None, @@ -296,7 +301,6 @@ pub fn parquet_exec_sorted( limit: None, table_partition_cols: vec![], output_ordering: vec![sort_exprs], - infinite_source: false, }, None, None, diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index aad5c19044ea..8770c0c4238a 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -203,7 +203,6 @@ pub fn partitioned_csv_config( limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }) } @@ -277,7 +276,6 @@ fn make_decimal() -> RecordBatch { pub fn csv_exec_sorted( schema: &SchemaRef, sort_exprs: impl IntoIterator, - infinite_source: bool, ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); @@ -291,7 +289,6 @@ pub fn csv_exec_sorted( limit: None, table_partition_cols: vec![], output_ordering: vec![sort_exprs], - infinite_source, }, false, 0, diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index c6b43de0c18d..282b0f7079ee 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -36,7 +36,6 @@ use crate::datasource::provider::TableProviderFactory; use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider}; use crate::error::Result; use crate::execution::context::{SessionState, TaskContext}; -use crate::execution::options::ReadOptions; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use crate::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, @@ -58,6 +57,7 @@ use futures::Stream; pub use datafusion_common::test_util::parquet_test_data; pub use datafusion_common::test_util::{arrow_test_data, get_data_dir}; +use crate::datasource::stream::{StreamConfig, StreamTable}; pub use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq}; /// Scan an empty data source, mainly used in tests @@ -342,30 +342,17 @@ impl RecordBatchStream for UnboundedStream { } /// This function creates an unbounded sorted file for testing purposes. -pub async fn register_unbounded_file_with_ordering( +pub fn register_unbounded_file_with_ordering( ctx: &SessionContext, schema: SchemaRef, file_path: &Path, table_name: &str, file_sort_order: Vec>, - with_unbounded_execution: bool, ) -> Result<()> { - // Mark infinite and provide schema: - let fifo_options = CsvReadOptions::new() - .schema(schema.as_ref()) - .mark_infinite(with_unbounded_execution); - // Get listing options: - let options_sort = fifo_options - .to_listing_options(&ctx.copied_config()) - .with_file_sort_order(file_sort_order); + let config = + StreamConfig::new_file(schema, file_path.into()).with_order(file_sort_order); + // Register table: - ctx.register_listing_table( - table_name, - file_path.as_os_str().to_str().unwrap(), - options_sort, - Some(schema), - None, - ) - .await?; + ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; Ok(()) } diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index f3c0d2987a46..336a6804637a 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -156,7 +156,6 @@ impl TestParquetFile { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }; let df_schema = self.schema.clone().to_dfschema_ref()?; diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 528bde632355..6170859142ca 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -105,9 +105,7 @@ async fn join_change_in_planner() -> Result<()> { &left_file_path, "left", file_sort_order.clone(), - true, - ) - .await?; + )?; let right_file_path = tmp_dir.path().join("right.csv"); File::create(right_file_path.clone()).unwrap(); register_unbounded_file_with_ordering( @@ -116,9 +114,7 @@ async fn join_change_in_planner() -> Result<()> { &right_file_path, "right", file_sort_order, - true, - ) - .await?; + )?; let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; let dataframe = ctx.sql(sql).await?; let physical_plan = dataframe.create_physical_plan().await?; diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index dcebfbf2dabb..5c0ef615cacd 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -526,7 +526,6 @@ pub fn parse_protobuf_file_scan_config( limit: proto.limit.as_ref().map(|sl| sl.limit as usize), table_partition_cols, output_ordering, - infinite_source: false, }) } From 35be3284564f7bced250399651b9fbb35deecd27 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 17 Dec 2023 22:19:04 +0000 Subject: [PATCH 2/4] Fix substrait --- datafusion/substrait/src/physical_plan/consumer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 942798173e0e..3098dc386e6a 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -112,7 +112,6 @@ pub async fn from_substrait_rel( limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }; if let Some(MaskExpression { select, .. }) = &read.projection { From 603a3abbad0de7c12e60dd6c7302d55f94c08fb8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 17 Dec 2023 22:34:47 +0000 Subject: [PATCH 3/4] Fix logical conflicts --- datafusion-examples/examples/csv_opener.rs | 1 - datafusion-examples/examples/json_opener.rs | 1 - .../core/src/datasource/physical_plan/avro.rs | 3 -- .../core/tests/parquet/custom_reader.rs | 1 - datafusion/core/tests/parquet/page_pruning.rs | 1 - .../core/tests/parquet/schema_coercion.rs | 2 -- datafusion/core/tests/sql/joins.rs | 34 ++++++------------- .../tests/cases/roundtrip_physical_plan.rs | 1 - .../tests/cases/roundtrip_physical_plan.rs | 1 - 9 files changed, 10 insertions(+), 35 deletions(-) diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index 15fb07ded481..96753c8c5260 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -67,7 +67,6 @@ async fn main() -> Result<()> { limit: Some(5), table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }; let result = diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs index 1a3dbe57be75..ee33f969caa9 100644 --- a/datafusion-examples/examples/json_opener.rs +++ b/datafusion-examples/examples/json_opener.rs @@ -70,7 +70,6 @@ async fn main() -> Result<()> { limit: Some(5), table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }; let result = diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 10a6a85daa2f..e448bf39f427 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -272,7 +272,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); let mut results = avro_exec @@ -344,7 +343,6 @@ mod tests { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); @@ -415,7 +413,6 @@ mod tests { limit: None, table_partition_cols: vec![Field::new("date", DataType::Utf8, false)], output_ordering: vec![], - infinite_source: false, }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 3752d42dbf43..e76b201e0222 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -85,7 +85,6 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, None, None, diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index e1e8b8e66edd..23a56bc821d4 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -81,7 +81,6 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, Some(predicate), None, diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 25c62f18f5ba..00f3eada496e 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -69,7 +69,6 @@ async fn multi_parquet_coercion() { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, None, None, @@ -133,7 +132,6 @@ async fn multi_parquet_coercion_projection() { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }, None, None, diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 6170859142ca..d1f270b540b5 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use datafusion::datasource::stream::{StreamConfig, StreamTable}; use datafusion::test_util::register_unbounded_file_with_ordering; use super::*; @@ -156,20 +157,13 @@ async fn join_change_in_planner_without_sort() -> Result<()> { Field::new("a1", DataType::UInt32, false), Field::new("a2", DataType::UInt32, false), ])); - ctx.register_csv( - "left", - left_file_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).mark_infinite(true), - ) - .await?; + let left = StreamConfig::new_file(schema.clone(), left_file_path); + ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?; + let right_file_path = tmp_dir.path().join("right.csv"); File::create(right_file_path.clone())?; - ctx.register_csv( - "right", - right_file_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).mark_infinite(true), - ) - .await?; + let right = StreamConfig::new_file(schema, right_file_path); + ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?; let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10"; let dataframe = ctx.sql(sql).await?; let physical_plan = dataframe.create_physical_plan().await?; @@ -213,20 +207,12 @@ async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> { Field::new("a1", DataType::UInt32, false), Field::new("a2", DataType::UInt32, false), ])); - ctx.register_csv( - "left", - left_file_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).mark_infinite(true), - ) - .await?; + let left = StreamConfig::new_file(schema.clone(), left_file_path); + ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?; let right_file_path = tmp_dir.path().join("right.csv"); File::create(right_file_path.clone())?; - ctx.register_csv( - "right", - right_file_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new().schema(&schema).mark_infinite(true), - ) - .await?; + let right = StreamConfig::new_file(schema.clone(), right_file_path); + ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?; let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?; match df.create_physical_plan().await { Ok(_) => panic!("Expecting error."), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 4a512413e73e..9a9827f2a090 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -492,7 +492,6 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }; let predicate = Arc::new(BinaryExpr::new( diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index b64dd2c138fc..e5af3f94cc05 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -49,7 +49,6 @@ async fn parquet_exec() -> Result<()> { limit: None, table_partition_cols: vec![], output_ordering: vec![], - infinite_source: false, }; let parquet_exec: Arc = Arc::new(ParquetExec::new(scan_config, None, None)); From 65e3d3ed5d6fb3d7730bc0f50c1ab202c156d5c7 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Mon, 18 Dec 2023 13:21:39 +0300 Subject: [PATCH 4/4] Add deleted tests as ignored --- .../src/physical_optimizer/enforce_sorting.rs | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 6e344550965e..c0e9b834e66f 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -2140,6 +2140,68 @@ mod tests { Ok(()) } + #[tokio::test] + #[ignore] + async fn test_with_lost_ordering_unbounded() -> Result<()> { + let schema = create_test_schema3()?; + let sort_exprs = vec![sort_expr("a", &schema)]; + let source = csv_exec_sorted(&schema, sort_exprs); + let repartition_rr = repartition_exec(source); + let repartition_hash = Arc::new(RepartitionExec::try_new( + repartition_rr, + Partitioning::Hash(vec![col("c", &schema).unwrap()], 10), + )?) as _; + let coalesce_partitions = coalesce_partitions_exec(repartition_hash); + let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions); + + let expected_input = [ + "SortExec: expr=[a@0 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false" + ]; + let expected_optimized = [ + "SortPreservingMergeExec: [a@0 ASC]", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + Ok(()) + } + + #[tokio::test] + #[ignore] + async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> { + let schema = create_test_schema3()?; + let sort_exprs = vec![sort_expr("a", &schema)]; + // Make source unbounded + let source = csv_exec_sorted(&schema, sort_exprs); + let repartition_rr = repartition_exec(source); + let repartition_hash = Arc::new(RepartitionExec::try_new( + repartition_rr, + Partitioning::Hash(vec![col("c", &schema).unwrap()], 10), + )?) as _; + let coalesce_partitions = coalesce_partitions_exec(repartition_hash); + let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions); + + let expected_input = ["SortExec: expr=[a@0 ASC]", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false" + ]; + let expected_optimized = [ + "SortPreservingMergeExec: [a@0 ASC]", + " RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, false); + Ok(()) + } + #[tokio::test] async fn test_do_not_pushdown_through_spm() -> Result<()> { let schema = create_test_schema3()?;