From beff9936305f2c033ef8ced42f6565d4a26916a5 Mon Sep 17 00:00:00 2001 From: nevrohelios Date: Thu, 24 Apr 2025 10:28:08 +0000 Subject: [PATCH 1/8] refactor: replace `unwrap_or` with `unwrap_or_else` for improved lazy evaluation --- Cargo.toml | 2 ++ benchmarks/src/bin/external_aggr.rs | 2 +- benchmarks/src/imdb/run.rs | 2 +- benchmarks/src/sort.rs | 2 +- benchmarks/src/sort_tpch.rs | 2 +- benchmarks/src/tpch/run.rs | 2 +- benchmarks/src/util/options.rs | 2 +- datafusion/common/src/column.rs | 12 ++++-------- datafusion/common/src/error.rs | 6 +++--- .../core/src/datasource/file_format/parquet.rs | 2 +- datafusion/datasource-csv/src/file_format.rs | 10 +++++----- datafusion/datasource/src/file_scan_config.rs | 2 +- datafusion/datasource/src/statistics.rs | 2 +- datafusion/datasource/src/write/demux.rs | 2 +- datafusion/expr-common/src/type_coercion/binary.rs | 2 +- datafusion/expr/src/expr_fn.rs | 2 +- datafusion/functions-aggregate/src/array_agg.rs | 2 +- datafusion/functions-window/src/lead_lag.rs | 2 +- datafusion/macros/src/user_doc.rs | 2 +- .../optimizer/src/simplify_expressions/regex.rs | 2 +- datafusion/physical-expr/src/analysis.rs | 2 +- .../physical-expr/src/equivalence/properties/mod.rs | 2 +- datafusion/physical-plan/src/projection.rs | 2 +- datafusion/physical-plan/src/streaming.rs | 2 +- 24 files changed, 34 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5a735666f8e7..9bfe5092b930 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -209,6 +209,8 @@ strip = false # Detects large stack-allocated futures that may cause stack overflow crashes (see threshold in clippy.toml) large_futures = "warn" used_underscore_binding = "warn" +or_fun_call = "warn" +unnecessary_lazy_evaluations = "warn" [workspace.lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tarpaulin)"] } diff --git a/benchmarks/src/bin/external_aggr.rs b/benchmarks/src/bin/external_aggr.rs index 578f71f8275d..5255b54860d4 100644 --- a/benchmarks/src/bin/external_aggr.rs +++ b/benchmarks/src/bin/external_aggr.rs @@ -335,7 +335,7 @@ impl ExternalAggrConfig { fn partitions(&self) -> usize { self.common .partitions - .unwrap_or(get_available_parallelism()) + .unwrap_or_else(get_available_parallelism) } } diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs index d7d7a56d0540..33c68fe8b29c 100644 --- a/benchmarks/src/imdb/run.rs +++ b/benchmarks/src/imdb/run.rs @@ -471,7 +471,7 @@ impl RunOpt { fn partitions(&self) -> usize { self.common .partitions - .unwrap_or(get_available_parallelism()) + .unwrap_or_else(get_available_parallelism) } } diff --git a/benchmarks/src/sort.rs b/benchmarks/src/sort.rs index 9cf09c57205a..8b2b02670449 100644 --- a/benchmarks/src/sort.rs +++ b/benchmarks/src/sort.rs @@ -149,7 +149,7 @@ impl RunOpt { let config = SessionConfig::new().with_target_partitions( self.common .partitions - .unwrap_or(get_available_parallelism()), + .unwrap_or_else(get_available_parallelism), ); let ctx = SessionContext::new_with_config(config); let (rows, elapsed) = diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index 176234eca541..5a2107096ed5 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -352,6 +352,6 @@ impl RunOpt { fn partitions(&self) -> usize { self.common .partitions - .unwrap_or(get_available_parallelism()) + .unwrap_or_else(get_available_parallelism) } } diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 752a5a1a6ba0..c5b9c1932a6e 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -313,7 +313,7 @@ impl RunOpt { fn partitions(&self) -> usize { self.common .partitions - .unwrap_or(get_available_parallelism()) + .unwrap_or_else(get_available_parallelism) } } diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index a1cf31525dd9..eaf9836d8336 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -73,7 +73,7 @@ impl CommonOpt { pub fn update_config(&self, config: SessionConfig) -> SessionConfig { let mut config = config .with_target_partitions( - self.partitions.unwrap_or(get_available_parallelism()), + self.partitions.unwrap_or_else(get_available_parallelism), ) .with_batch_size(self.batch_size); if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes { diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index 50a4e257d1c9..c7789961b926 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -130,25 +130,21 @@ impl Column { /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR` pub fn from_qualified_name(flat_name: impl Into) -> Self { let flat_name = flat_name.into(); - Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or( - Self { + Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else(|| Self { relation: None, name: flat_name, spans: Spans::new(), - }, - ) + }) } /// Deserialize a fully qualified name string into a column preserving column text case pub fn from_qualified_name_ignore_case(flat_name: impl Into) -> Self { let flat_name = flat_name.into(); - Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or( - Self { + Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else(|| Self { relation: None, name: flat_name, spans: Spans::new(), - }, - ) + }) } /// return the column's name. diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index c50ec64759d5..ae8bd0732a16 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -526,7 +526,7 @@ impl DataFusionError { pub fn message(&self) -> Cow { match *self { DataFusionError::ArrowError(ref desc, ref backtrace) => { - let backtrace = backtrace.clone().unwrap_or("".to_owned()); + let backtrace = backtrace.clone().unwrap_or_else(|| "".to_owned()); Cow::Owned(format!("{desc}{backtrace}")) } #[cfg(feature = "parquet")] @@ -535,7 +535,7 @@ impl DataFusionError { DataFusionError::AvroError(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::IoError(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::SQL(ref desc, ref backtrace) => { - let backtrace: String = backtrace.clone().unwrap_or("".to_owned()); + let backtrace: String = backtrace.clone().unwrap_or_else(|| "".to_owned()); Cow::Owned(format!("{desc:?}{backtrace}")) } DataFusionError::Configuration(ref desc) => Cow::Owned(desc.to_string()), @@ -547,7 +547,7 @@ impl DataFusionError { DataFusionError::Plan(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::SchemaError(ref desc, ref backtrace) => { let backtrace: &str = - &backtrace.as_ref().clone().unwrap_or("".to_owned()); + &backtrace.as_ref().clone().unwrap_or_else(|| "".to_owned()); Cow::Owned(format!("{desc}{backtrace}")) } DataFusionError::Execution(ref desc) => Cow::Owned(desc.to_string()), diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7b8b99273f4e..14e1bde10b76 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1073,7 +1073,7 @@ mod tests { let format = state .get_file_format_factory("parquet") .map(|factory| factory.create(state, &Default::default()).unwrap()) - .unwrap_or(Arc::new(ParquetFormat::new())); + .unwrap_or_else(|| Arc::new(ParquetFormat::new())); scan_format( state, &*format, None, &testdata, file_name, projection, limit, diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index 76f3c50a70a7..f6ff9a686388 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -414,11 +414,11 @@ impl FileFormat for CsvFormat { let has_header = self .options .has_header - .unwrap_or(state.config_options().catalog.has_header); + .unwrap_or_else(|| state.config_options().catalog.has_header); let newlines_in_values = self .options .newlines_in_values - .unwrap_or(state.config_options().catalog.newlines_in_values); + .unwrap_or_else(|| state.config_options().catalog.newlines_in_values); let conf_builder = FileScanConfigBuilder::from(conf) .with_file_compression_type(self.options.compression.into()) @@ -454,11 +454,11 @@ impl FileFormat for CsvFormat { let has_header = self .options() .has_header - .unwrap_or(state.config_options().catalog.has_header); + .unwrap_or_else(|| state.config_options().catalog.has_header); let newlines_in_values = self .options() .newlines_in_values - .unwrap_or(state.config_options().catalog.newlines_in_values); + .unwrap_or_else(|| state.config_options().catalog.newlines_in_values); let options = self .options() @@ -504,7 +504,7 @@ impl CsvFormat { && self .options .has_header - .unwrap_or(state.config_options().catalog.has_header), + .unwrap_or_else(|| state.config_options().catalog.has_header), ) .with_delimiter(self.options.delimiter) .with_quote(self.options.quote); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fb756cc11fbb..ad2f0404d692 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -582,7 +582,7 @@ impl DataSource for FileScanConfig { &file_scan .projection .clone() - .unwrap_or((0..self.file_schema.fields().len()).collect()), + .unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()), ); DataSourceExec::from_data_source( FileScanConfigBuilder::from(file_scan) diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 8a04d77b273d..afe2bc860ef3 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -137,7 +137,7 @@ impl MinMaxStatistics { // Reverse the projection to get the index of the column in the full statistics // The file statistics contains _every_ column , but the sort column's index() // refers to the index in projected_schema - let i = projection.map(|p| p[c.index()]).unwrap_or(c.index()); + let i = projection.map(|p| p[c.index()]).unwrap_or_else(|| c.index()); let (min, max) = get_min_max(i).map_err(|e| { e.context(format!("get min/max for column: '{}'", c.name())) diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 49c3a64d24aa..b1ec3ee397ef 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -513,7 +513,7 @@ fn compute_take_arrays( for vals in all_partition_values.iter() { part_key.push(vals[i].clone().into()); } - let builder = take_map.entry(part_key).or_insert(UInt64Builder::new()); + let builder = take_map.entry(part_key).or_insert_with(UInt64Builder::new); builder.append_value(i as u64); } take_map diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index fdee00f81b1e..491e3c2f8b33 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -1138,7 +1138,7 @@ fn dictionary_comparison_coercion( /// 2. Data type of the other side should be able to cast to string type fn string_concat_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { use arrow::datatypes::DataType::*; - string_coercion(lhs_type, rhs_type).or(match (lhs_type, rhs_type) { + string_coercion(lhs_type, rhs_type).or_else(|| match (lhs_type, rhs_type) { (Utf8View, from_type) | (from_type, Utf8View) => { string_concat_internal_coercion(from_type, &Utf8View) } diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 966aba7d1195..cee356a2b42c 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -838,7 +838,7 @@ impl ExprFuncBuilder { partition_by: partition_by.unwrap_or_default(), order_by: order_by.unwrap_or_default(), window_frame: window_frame - .unwrap_or(WindowFrame::new(has_order_by)), + .unwrap_or_else(|| WindowFrame::new(has_order_by)), null_treatment, }, }) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index d658744c1ba5..a9af4c849922 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -1007,7 +1007,7 @@ mod tests { fn print_nulls(sort: Vec>) -> Vec { sort.into_iter() - .map(|v| v.unwrap_or("NULL".to_string())) + .map(|v| v.unwrap_or_else(|| "NULL".to_string())) .collect() } diff --git a/datafusion/functions-window/src/lead_lag.rs b/datafusion/functions-window/src/lead_lag.rs index 5df20cf5b980..75f82ea2af53 100644 --- a/datafusion/functions-window/src/lead_lag.rs +++ b/datafusion/functions-window/src/lead_lag.rs @@ -321,7 +321,7 @@ fn parse_default_value( unparsed .filter(|v| !v.data_type().is_null()) .map(|v| v.cast_to(&expr_type)) - .unwrap_or(ScalarValue::try_from(expr_type)) + .unwrap_or_else(|| ScalarValue::try_from(expr_type)) } #[derive(Debug)] diff --git a/datafusion/macros/src/user_doc.rs b/datafusion/macros/src/user_doc.rs index c6510c156423..31cf9bb1b750 100644 --- a/datafusion/macros/src/user_doc.rs +++ b/datafusion/macros/src/user_doc.rs @@ -206,7 +206,7 @@ pub fn user_doc(args: TokenStream, input: TokenStream) -> TokenStream { }; let doc_section_description = doc_section_desc .map(|desc| quote! { Some(#desc)}) - .unwrap_or(quote! { None }); + .unwrap_or_else(|| quote! { None }); let sql_example = sql_example.map(|ex| { quote! { diff --git a/datafusion/optimizer/src/simplify_expressions/regex.rs b/datafusion/optimizer/src/simplify_expressions/regex.rs index 0b47cdee212f..ab25c77f0bee 100644 --- a/datafusion/optimizer/src/simplify_expressions/regex.rs +++ b/datafusion/optimizer/src/simplify_expressions/regex.rs @@ -331,7 +331,7 @@ fn lower_simple(mode: &OperatorMode, left: &Expr, hir: &Hir) -> Option { } HirKind::Concat(inner) => { if let Some(pattern) = partial_anchored_literal_to_like(inner) - .or(collect_concat_to_like_string(inner)) + .or_else(|| collect_concat_to_like_string(inner)) { return Some(mode.expr(Box::new(left.clone()), pattern)); } diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index 5abd50f6d1b4..63f8862f8ce4 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -112,7 +112,7 @@ impl ExprBoundaries { .min_value .get_value() .cloned() - .unwrap_or(empty_field.clone()), + .unwrap_or_else(|| empty_field.clone()), col_stats .max_value .get_value() diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 5b34a02a9142..53551606c10c 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -1364,7 +1364,7 @@ impl EquivalenceProperties { .transform_up(|expr| update_properties(expr, self)) .data() .map(|node| node.data) - .unwrap_or(ExprProperties::new_unknown()) + .unwrap_or_else(|_| ExprProperties::new_unknown()) } /// Transforms this `EquivalenceProperties` into a new `EquivalenceProperties` diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 72934c74446e..feca9155e799 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -538,7 +538,7 @@ pub fn remove_unnecessary_projections( } else { return Ok(Transformed::no(plan)); }; - Ok(maybe_modified.map_or(Transformed::no(plan), Transformed::yes)) + Ok(maybe_modified.map_or_else(|| Transformed::no(plan), Transformed::yes)) } /// Compare the inputs and outputs of the projection. All expressions must be diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 18c472a7e187..6274995d04da 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -302,7 +302,7 @@ impl ExecutionPlan for StreamingTableExec { let new_projections = new_projections_for_columns( projection, &streaming_table_projections - .unwrap_or((0..self.schema().fields().len()).collect()), + .unwrap_or_else(|| (0..self.schema().fields().len()).collect()), ); let mut lex_orderings = vec![]; From ce0b4536545d5d63eb619310927129b9f2bb69c5 Mon Sep 17 00:00:00 2001 From: nevrohelios Date: Thu, 24 Apr 2025 12:51:01 +0000 Subject: [PATCH 2/8] refactor: improve code readability by adjusting formatting and using `unwrap_or_else` for better lazy evaluation --- Cargo.toml | 2 +- datafusion/common/src/column.rs | 12 ++++++++---- datafusion/common/src/error.rs | 3 ++- datafusion/datasource/src/statistics.rs | 4 +++- .../physical-optimizer/src/enforce_distribution.rs | 5 ++--- .../physical-optimizer/src/enforce_sorting/mod.rs | 5 +++-- .../replace_with_order_preserving_variants.rs | 7 ++++--- .../src/enforce_sorting/sort_pushdown.rs | 4 ++-- .../physical-optimizer/src/update_aggr_exprs.rs | 4 ++-- 9 files changed, 27 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9bfe5092b930..434541878d93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -209,7 +209,7 @@ strip = false # Detects large stack-allocated futures that may cause stack overflow crashes (see threshold in clippy.toml) large_futures = "warn" used_underscore_binding = "warn" -or_fun_call = "warn" +or_fun_call = "warn" unnecessary_lazy_evaluations = "warn" [workspace.lints.rust] diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index c7789961b926..b3acaeee5a54 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -130,21 +130,25 @@ impl Column { /// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR` pub fn from_qualified_name(flat_name: impl Into) -> Self { let flat_name = flat_name.into(); - Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else(|| Self { + Self::from_idents(parse_identifiers_normalized(&flat_name, false)).unwrap_or_else( + || Self { relation: None, name: flat_name, spans: Spans::new(), - }) + }, + ) } /// Deserialize a fully qualified name string into a column preserving column text case pub fn from_qualified_name_ignore_case(flat_name: impl Into) -> Self { let flat_name = flat_name.into(); - Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else(|| Self { + Self::from_idents(parse_identifiers_normalized(&flat_name, true)).unwrap_or_else( + || Self { relation: None, name: flat_name, spans: Spans::new(), - }) + }, + ) } /// return the column's name. diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index ae8bd0732a16..5e52a2886a81 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -535,7 +535,8 @@ impl DataFusionError { DataFusionError::AvroError(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::IoError(ref desc) => Cow::Owned(desc.to_string()), DataFusionError::SQL(ref desc, ref backtrace) => { - let backtrace: String = backtrace.clone().unwrap_or_else(|| "".to_owned()); + let backtrace: String = + backtrace.clone().unwrap_or_else(|| "".to_owned()); Cow::Owned(format!("{desc:?}{backtrace}")) } DataFusionError::Configuration(ref desc) => Cow::Owned(desc.to_string()), diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index afe2bc860ef3..50bf8b6550db 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -137,7 +137,9 @@ impl MinMaxStatistics { // Reverse the projection to get the index of the column in the full statistics // The file statistics contains _every_ column , but the sort column's index() // refers to the index in projected_schema - let i = projection.map(|p| p[c.index()]).unwrap_or_else(|| c.index()); + let i = projection + .map(|p| p[c.index()]) + .unwrap_or_else(|| c.index()); let (min, max) = get_min_max(i).map_err(|e| { e.context(format!("get min/max for column: '{}'", c.name())) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 523762401dfa..6b423241a91e 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -42,7 +42,6 @@ use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef, }; -use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; @@ -953,8 +952,8 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext { input .plan .output_ordering() - .unwrap_or(&LexOrdering::default()) - .clone(), + .cloned() + .unwrap_or_default(), Arc::clone(&input.plan), )) as _ } else { diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 20733b65692f..652114b2ed51 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -498,10 +498,11 @@ fn analyze_immediate_sort_removal( let sort_input = sort_exec.input(); // If this sort is unnecessary, we should remove it: if sort_input.equivalence_properties().ordering_satisfy( - sort_exec + &sort_exec .properties() .output_ordering() - .unwrap_or(LexOrdering::empty()), + .cloned() + .unwrap_or_default(), ) { node.plan = if !sort_exec.preserve_partitioning() && sort_input.output_partitioning().partition_count() > 1 diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index 2c5c0d4d510e..d14d2e9076d5 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -28,7 +28,7 @@ use crate::utils::{ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::Transformed; use datafusion_common::Result; -use datafusion_physical_expr_common::sort_expr::LexOrdering; +// use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::repartition::RepartitionExec; @@ -268,10 +268,11 @@ pub fn replace_with_order_preserving_variants( .plan .equivalence_properties() .ordering_satisfy( - requirements + &requirements .plan .output_ordering() - .unwrap_or(LexOrdering::empty()), + .cloned() + .unwrap_or_default(), ) { for child in alternate_plan.children.iter_mut() { diff --git a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs index 2e20608d0e9e..6d2c014f9e7c 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs @@ -233,7 +233,7 @@ fn pushdown_requirement_to_children( .properties() .output_ordering() .cloned() - .unwrap_or(LexOrdering::default()), + .unwrap_or_else(LexOrdering::default), ); if sort_exec .properties() @@ -258,7 +258,7 @@ fn pushdown_requirement_to_children( plan.properties() .output_ordering() .cloned() - .unwrap_or(LexOrdering::default()), + .unwrap_or_else(LexOrdering::default), ); // Push down through operator with fetch when: // - requirement is aligned with output ordering diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs b/datafusion/physical-optimizer/src/update_aggr_exprs.rs index 6228ed10ec34..43d317619b48 100644 --- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs +++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs @@ -27,7 +27,7 @@ use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement, }; -use datafusion_physical_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_expr::{LexRequirement}; use datafusion_physical_plan::aggregates::concat_slices; use datafusion_physical_plan::windows::get_ordered_partition_by_indices; use datafusion_physical_plan::{ @@ -159,7 +159,7 @@ fn try_convert_aggregate_if_better( aggr_exprs .into_iter() .map(|aggr_expr| { - let aggr_sort_exprs = aggr_expr.order_bys().unwrap_or(LexOrdering::empty()); + let aggr_sort_exprs = &aggr_expr.order_bys().cloned().unwrap_or_default(); let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs); let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone()); let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs); From 0aa5622a6c539d8ee88bf8123b49fb7a80a0c263 Mon Sep 17 00:00:00 2001 From: nevrohelios Date: Thu, 1 May 2025 03:19:05 +0530 Subject: [PATCH 3/8] [FIX] added imports --- benchmarks/src/util/options.rs | 4 ++-- .../enforce_sorting/replace_with_order_preserving_variants.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 91764c1607b2..65d548c8d9fc 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -27,6 +27,7 @@ use datafusion::{ }; use datafusion_common::{DataFusionError, Result}; use structopt::StructOpt; +use datafusion::common::utils::get_available_parallelism; // Common benchmark options (don't use doc comments otherwise this doc // shows up in help files) @@ -70,13 +71,12 @@ impl CommonOpt { } /// Modify the existing config appropriately - pub fn update_config(&self, config: SessionConfig) -> SessionConfig { let mut config = config .with_target_partitions( self.partitions.unwrap_or_else(get_available_parallelism), ) - .with_batch_size(self.batch_size); + .with_batch_size(self.batch_size.unwrap_or(8192)); if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes { config = diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index 0c17efe15310..789a1f362ac2 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -27,6 +27,7 @@ use crate::utils::{ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::Transformed; +use datafusion_physical_plan::internal_err; use datafusion_common::Result; // use datafusion_physical_expr_common::sort_expr::LexOrdering; From e149e7583291e7bae5e6f9ed4324b63b43ad63c5 Mon Sep 17 00:00:00 2001 From: nevrohelios Date: Thu, 1 May 2025 11:48:01 +0530 Subject: [PATCH 4/8] [FIX] formatting and restored original config logic --- benchmarks/src/util/options.rs | 18 +++++++----------- .../src/enforce_distribution.rs | 6 +----- .../src/update_aggr_exprs.rs | 2 +- 3 files changed, 9 insertions(+), 17 deletions(-) diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 65d548c8d9fc..49c56aa2da90 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -27,7 +27,6 @@ use datafusion::{ }; use datafusion_common::{DataFusionError, Result}; use structopt::StructOpt; -use datafusion::common::utils::get_available_parallelism; // Common benchmark options (don't use doc comments otherwise this doc // shows up in help files) @@ -71,16 +70,13 @@ impl CommonOpt { } /// Modify the existing config appropriately - pub fn update_config(&self, config: SessionConfig) -> SessionConfig { - let mut config = config - .with_target_partitions( - self.partitions.unwrap_or_else(get_available_parallelism), - ) - .with_batch_size(self.batch_size.unwrap_or(8192)); - - if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes { - config = - config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes); + pub fn update_config(&self, mut config: SessionConfig) -> SessionConfig { + if let Some(batch_size) = self.batch_size { + config = config.with_batch_size(batch_size); + } + + if let Some(partitions) = self.partitions { + config = config.with_target_partitions(partitions); } config diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 6b423241a91e..26cd3b645bfe 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -949,11 +949,7 @@ fn add_spm_on_top(input: DistributionContext) -> DistributionContext { let new_plan = if should_preserve_ordering { Arc::new(SortPreservingMergeExec::new( - input - .plan - .output_ordering() - .cloned() - .unwrap_or_default(), + input.plan.output_ordering().cloned().unwrap_or_default(), Arc::clone(&input.plan), )) as _ } else { diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs b/datafusion/physical-optimizer/src/update_aggr_exprs.rs index 43d317619b48..b3f0f2232f51 100644 --- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs +++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs @@ -24,10 +24,10 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{plan_datafusion_err, Result}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; +use datafusion_physical_expr::LexRequirement; use datafusion_physical_expr::{ reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement, }; -use datafusion_physical_expr::{LexRequirement}; use datafusion_physical_plan::aggregates::concat_slices; use datafusion_physical_plan::windows::get_ordered_partition_by_indices; use datafusion_physical_plan::{ From 1d3fa64d5bb331c8b752a2935f4979090bf53475 Mon Sep 17 00:00:00 2001 From: nevrohelios Date: Fri, 2 May 2025 20:13:30 +0530 Subject: [PATCH 5/8] config restored --- benchmarks/src/util/options.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 49c56aa2da90..1b413aa5a874 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -79,6 +79,11 @@ impl CommonOpt { config = config.with_target_partitions(partitions); } + if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes { + config = + config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes); + } + config } From d4e2ef518d05b61d1fdba46660199ae5c60e6207 Mon Sep 17 00:00:00 2001 From: nevrohelios Date: Mon, 5 May 2025 21:58:48 +0530 Subject: [PATCH 6/8] optimized the use of .clone() --- datafusion/physical-optimizer/src/enforce_sorting/mod.rs | 5 ++--- .../replace_with_order_preserving_variants.rs | 1 - datafusion/physical-optimizer/src/update_aggr_exprs.rs | 6 +++++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index cf61c187853a..37fec2eab3f9 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -502,11 +502,10 @@ fn analyze_immediate_sort_removal( let sort_input = sort_exec.input(); // If this sort is unnecessary, we should remove it: if sort_input.equivalence_properties().ordering_satisfy( - &sort_exec + sort_exec .properties() .output_ordering() - .cloned() - .unwrap_or_default(), + .unwrap_or_else(|| LexOrdering::empty()), ) { node.plan = if !sort_exec.preserve_partitioning() && sort_input.output_partitioning().partition_count() > 1 diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index 633eed9d82d1..254d29cff7f1 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -30,7 +30,6 @@ use datafusion_common::tree_node::Transformed; use datafusion_physical_plan::internal_err; use datafusion_common::Result; -// use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::repartition::RepartitionExec; diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs b/datafusion/physical-optimizer/src/update_aggr_exprs.rs index b3f0f2232f51..2fe736cf7fb3 100644 --- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs +++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs @@ -28,6 +28,7 @@ use datafusion_physical_expr::LexRequirement; use datafusion_physical_expr::{ reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement, }; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::aggregates::concat_slices; use datafusion_physical_plan::windows::get_ordered_partition_by_indices; use datafusion_physical_plan::{ @@ -159,7 +160,10 @@ fn try_convert_aggregate_if_better( aggr_exprs .into_iter() .map(|aggr_expr| { - let aggr_sort_exprs = &aggr_expr.order_bys().cloned().unwrap_or_default(); + let aggr_sort_exprs = &aggr_expr + .order_bys() + .cloned() + .unwrap_or_else(|| LexOrdering::empty().clone()); let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs); let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone()); let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs); From 22181dae3f72a593769df6c50781ca3f2e6c7d35 Mon Sep 17 00:00:00 2001 From: nevrohelios Date: Mon, 5 May 2025 22:09:26 +0530 Subject: [PATCH 7/8] removed the use of clone --- datafusion/physical-optimizer/src/update_aggr_exprs.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs b/datafusion/physical-optimizer/src/update_aggr_exprs.rs index 2fe736cf7fb3..ae1a38230d04 100644 --- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs +++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs @@ -160,10 +160,9 @@ fn try_convert_aggregate_if_better( aggr_exprs .into_iter() .map(|aggr_expr| { - let aggr_sort_exprs = &aggr_expr + let aggr_sort_exprs = aggr_expr .order_bys() - .cloned() - .unwrap_or_else(|| LexOrdering::empty().clone()); + .unwrap_or_else(|| LexOrdering::empty()); let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs); let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone()); let reverse_aggr_req = LexRequirement::from(reverse_aggr_sort_exprs); From f6c13b2a7a257c2dc8fb702850706eeaa2f04323 Mon Sep 17 00:00:00 2001 From: nevrohelios Date: Mon, 5 May 2025 22:14:36 +0530 Subject: [PATCH 8/8] cleanup the clone usecase --- .../replace_with_order_preserving_variants.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index 254d29cff7f1..9769e2e0366f 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -27,6 +27,7 @@ use crate::utils::{ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::Transformed; +use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::internal_err; use datafusion_common::Result; @@ -284,11 +285,10 @@ pub fn replace_with_order_preserving_variants( .plan .equivalence_properties() .ordering_satisfy( - &requirements + requirements .plan .output_ordering() - .cloned() - .unwrap_or_default(), + .unwrap_or_else(|| LexOrdering::empty()), ) { for child in alternate_plan.children.iter_mut() {