From 386176add66b89df7bd0b85c2dc1b91afa89dc48 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 30 Dec 2025 15:01:34 -0700 Subject: [PATCH 1/6] optimize split_part --- datafusion/functions/Cargo.toml | 5 ++++ datafusion/functions/src/string/split_part.rs | 26 +++++++++---------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 5ceeee57b0be4..bf1cc3a7fc802 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -294,3 +294,8 @@ required-features = ["unicode_expressions"] harness = false name = "levenshtein" required-features = ["unicode_expressions"] + +[[bench]] +harness = false +name = "split_part" +required-features = ["string_expressions"] diff --git a/datafusion/functions/src/string/split_part.rs b/datafusion/functions/src/string/split_part.rs index 8ac505bf360f6..c2da3791b06be 100644 --- a/datafusion/functions/src/string/split_part.rs +++ b/datafusion/functions/src/string/split_part.rs @@ -219,22 +219,22 @@ where .try_for_each(|((string, delimiter), n)| -> Result<(), DataFusionError> { match (string, delimiter, n) { (Some(string), Some(delimiter), Some(n)) => { - let split_string: Vec<&str> = string.split(delimiter).collect(); - let len = split_string.len(); - - let index = match n.cmp(&0) { - std::cmp::Ordering::Less => len as i64 + n, + let result = match n.cmp(&0) { + std::cmp::Ordering::Greater => { + // Positive index: use nth() to avoid collecting all parts + // This stops iteration as soon as we find the nth element + string.split(delimiter).nth((n - 1) as usize) + } + std::cmp::Ordering::Less => { + // Negative index: use rsplit().nth() to efficiently get from the end + // rsplit iterates in reverse, so -1 means first from rsplit (index 0) + string.rsplit(delimiter).nth((-n - 1) as usize) + } std::cmp::Ordering::Equal => { return exec_err!("field position must not be zero"); } - std::cmp::Ordering::Greater => n - 1, - } as usize; - - if index < len { - builder.append_value(split_string[index]); - } else { - builder.append_value(""); - } + }; + builder.append_value(result.unwrap_or("")); } _ => builder.append_null(), } From 36ea121c18dc61360e6c1b7e23dd8933e5c41ae7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 30 Dec 2025 15:02:11 -0700 Subject: [PATCH 2/6] optimize split_part --- datafusion/functions/benches/split_part.rs | 382 +++++++++++++++++++++ 1 file changed, 382 insertions(+) create mode 100644 datafusion/functions/benches/split_part.rs diff --git a/datafusion/functions/benches/split_part.rs b/datafusion/functions/benches/split_part.rs new file mode 100644 index 0000000000000..e524a68c2bf74 --- /dev/null +++ b/datafusion/functions/benches/split_part.rs @@ -0,0 +1,382 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, StringArray, StringViewArray}; +use arrow::datatypes::{DataType, Field}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_functions::string::split_part; +use rand::distr::Alphanumeric; +use rand::prelude::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +const N_ROWS: usize = 8192; + +/// Generate test data for split_part benchmarks +/// Creates strings with multiple parts separated by the delimiter +fn gen_split_part_data( + n_rows: usize, + num_parts: usize, // number of parts in each string (separated by delimiter) + part_len: usize, // length of each part + delimiter: &str, // the delimiter to use + use_string_view: bool, // false -> StringArray, true -> StringViewArray +) -> (ColumnarValue, ColumnarValue) { + let mut rng = StdRng::seed_from_u64(42); + + let mut strings: Vec = Vec::with_capacity(n_rows); + for _ in 0..n_rows { + let mut parts: Vec = Vec::with_capacity(num_parts); + for _ in 0..num_parts { + let part: String = (&mut rng) + .sample_iter(&Alphanumeric) + .take(part_len) + .map(char::from) + .collect(); + parts.push(part); + } + strings.push(parts.join(delimiter)); + } + + let delimiters: Vec = vec![delimiter.to_string(); n_rows]; + + if use_string_view { + let string_array: StringViewArray = strings.into_iter().map(Some).collect(); + let delimiter_array: StringViewArray = delimiters.into_iter().map(Some).collect(); + ( + ColumnarValue::Array(Arc::new(string_array) as ArrayRef), + ColumnarValue::Array(Arc::new(delimiter_array) as ArrayRef), + ) + } else { + let string_array: StringArray = strings.into_iter().map(Some).collect(); + let delimiter_array: StringArray = delimiters.into_iter().map(Some).collect(); + ( + ColumnarValue::Array(Arc::new(string_array) as ArrayRef), + ColumnarValue::Array(Arc::new(delimiter_array) as ArrayRef), + ) + } +} + +fn gen_positions(n_rows: usize, position: i64) -> ColumnarValue { + let positions: Vec = vec![position; n_rows]; + ColumnarValue::Array(Arc::new(Int64Array::from(positions)) as ArrayRef) +} + +fn criterion_benchmark(c: &mut Criterion) { + let split_part_func = split_part(); + let config_options = Arc::new(ConfigOptions::default()); + + let mut group = c.benchmark_group("split_part"); + + // Test different scenarios + // Scenario 1: Single-char delimiter, first position (should be fastest with optimization) + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", false); + let positions = gen_positions(N_ROWS, 1); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("single_char_delim", "pos_first"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + // Scenario 2: Single-char delimiter, middle position + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", false); + let positions = gen_positions(N_ROWS, 5); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("single_char_delim", "pos_middle"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + // Scenario 3: Single-char delimiter, last position + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", false); + let positions = gen_positions(N_ROWS, 10); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("single_char_delim", "pos_last"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + // Scenario 4: Single-char delimiter, negative position (last element) + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", false); + let positions = gen_positions(N_ROWS, -1); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("single_char_delim", "pos_negative"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + // Scenario 5: Multi-char delimiter, first position + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, "~@~", false); + let positions = gen_positions(N_ROWS, 1); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("multi_char_delim", "pos_first"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + // Scenario 6: Multi-char delimiter, middle position + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, "~@~", false); + let positions = gen_positions(N_ROWS, 5); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("multi_char_delim", "pos_middle"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + // Scenario 7: StringViewArray, single-char delimiter, first position + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 10, 8, ".", true); + let positions = gen_positions(N_ROWS, 1); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("string_view_single_char", "pos_first"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + // Scenario 8: Many parts (20), position near end - shows benefit of early termination + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 20, 8, ".", false); + let positions = gen_positions(N_ROWS, 2); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("many_parts_20", "pos_second"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + // Scenario 9: Long strings with many parts - worst case for old implementation + { + let (strings, delimiters) = gen_split_part_data(N_ROWS, 50, 16, "/", false); + let positions = gen_positions(N_ROWS, 1); + let args = vec![strings, delimiters, positions]; + let arg_fields: Vec<_> = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect(); + let return_field = Field::new("f", DataType::Utf8, true).into(); + + group.bench_function( + BenchmarkId::new("long_strings_50_parts", "pos_first"), + |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From 04fe9b46a7db848d77d4d02dd2bf9a8e07fab2ad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 30 Dec 2025 15:04:46 -0700 Subject: [PATCH 3/6] cargo fmt --- datafusion/functions/benches/split_part.rs | 244 ++++++++++----------- 1 file changed, 122 insertions(+), 122 deletions(-) diff --git a/datafusion/functions/benches/split_part.rs b/datafusion/functions/benches/split_part.rs index e524a68c2bf74..e23610338d15c 100644 --- a/datafusion/functions/benches/split_part.rs +++ b/datafusion/functions/benches/split_part.rs @@ -19,7 +19,7 @@ extern crate criterion; use arrow::array::{ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow::datatypes::{DataType, Field}; -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; use datafusion_functions::string::split_part; @@ -35,10 +35,10 @@ const N_ROWS: usize = 8192; /// Creates strings with multiple parts separated by the delimiter fn gen_split_part_data( n_rows: usize, - num_parts: usize, // number of parts in each string (separated by delimiter) - part_len: usize, // length of each part - delimiter: &str, // the delimiter to use - use_string_view: bool, // false -> StringArray, true -> StringViewArray + num_parts: usize, // number of parts in each string (separated by delimiter) + part_len: usize, // length of each part + delimiter: &str, // the delimiter to use + use_string_view: bool, // false -> StringArray, true -> StringViewArray ) -> (ColumnarValue, ColumnarValue) { let mut rng = StdRng::seed_from_u64(42); @@ -95,28 +95,27 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); - group.bench_function( - BenchmarkId::new("single_char_delim", "pos_first"), - |b| { - b.iter(|| { - black_box( - split_part_func - .invoke_with_args(ScalarFunctionArgs { - args: args.clone(), - arg_fields: arg_fields.clone(), - number_rows: N_ROWS, - return_field: Arc::clone(&return_field), - config_options: Arc::clone(&config_options), - }) - .expect("split_part should work"), - ) - }) - }, - ); + group.bench_function(BenchmarkId::new("single_char_delim", "pos_first"), |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }); } // Scenario 2: Single-char delimiter, middle position @@ -127,28 +126,27 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); - group.bench_function( - BenchmarkId::new("single_char_delim", "pos_middle"), - |b| { - b.iter(|| { - black_box( - split_part_func - .invoke_with_args(ScalarFunctionArgs { - args: args.clone(), - arg_fields: arg_fields.clone(), - number_rows: N_ROWS, - return_field: Arc::clone(&return_field), - config_options: Arc::clone(&config_options), - }) - .expect("split_part should work"), - ) - }) - }, - ); + group.bench_function(BenchmarkId::new("single_char_delim", "pos_middle"), |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }); } // Scenario 3: Single-char delimiter, last position @@ -159,28 +157,27 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); - group.bench_function( - BenchmarkId::new("single_char_delim", "pos_last"), - |b| { - b.iter(|| { - black_box( - split_part_func - .invoke_with_args(ScalarFunctionArgs { - args: args.clone(), - arg_fields: arg_fields.clone(), - number_rows: N_ROWS, - return_field: Arc::clone(&return_field), - config_options: Arc::clone(&config_options), - }) - .expect("split_part should work"), - ) - }) - }, - ); + group.bench_function(BenchmarkId::new("single_char_delim", "pos_last"), |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }); } // Scenario 4: Single-char delimiter, negative position (last element) @@ -191,7 +188,9 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); @@ -223,28 +222,27 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); - group.bench_function( - BenchmarkId::new("multi_char_delim", "pos_first"), - |b| { - b.iter(|| { - black_box( - split_part_func - .invoke_with_args(ScalarFunctionArgs { - args: args.clone(), - arg_fields: arg_fields.clone(), - number_rows: N_ROWS, - return_field: Arc::clone(&return_field), - config_options: Arc::clone(&config_options), - }) - .expect("split_part should work"), - ) - }) - }, - ); + group.bench_function(BenchmarkId::new("multi_char_delim", "pos_first"), |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }); } // Scenario 6: Multi-char delimiter, middle position @@ -255,28 +253,27 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); - group.bench_function( - BenchmarkId::new("multi_char_delim", "pos_middle"), - |b| { - b.iter(|| { - black_box( - split_part_func - .invoke_with_args(ScalarFunctionArgs { - args: args.clone(), - arg_fields: arg_fields.clone(), - number_rows: N_ROWS, - return_field: Arc::clone(&return_field), - config_options: Arc::clone(&config_options), - }) - .expect("split_part should work"), - ) - }) - }, - ); + group.bench_function(BenchmarkId::new("multi_char_delim", "pos_middle"), |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }); } // Scenario 7: StringViewArray, single-char delimiter, first position @@ -287,7 +284,9 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); @@ -319,28 +318,27 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); - group.bench_function( - BenchmarkId::new("many_parts_20", "pos_second"), - |b| { - b.iter(|| { - black_box( - split_part_func - .invoke_with_args(ScalarFunctionArgs { - args: args.clone(), - arg_fields: arg_fields.clone(), - number_rows: N_ROWS, - return_field: Arc::clone(&return_field), - config_options: Arc::clone(&config_options), - }) - .expect("split_part should work"), - ) - }) - }, - ); + group.bench_function(BenchmarkId::new("many_parts_20", "pos_second"), |b| { + b.iter(|| { + black_box( + split_part_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: N_ROWS, + return_field: Arc::clone(&return_field), + config_options: Arc::clone(&config_options), + }) + .expect("split_part should work"), + ) + }) + }); } // Scenario 9: Long strings with many parts - worst case for old implementation @@ -351,7 +349,9 @@ fn criterion_benchmark(c: &mut Criterion) { let arg_fields: Vec<_> = args .iter() .enumerate() - .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) .collect(); let return_field = Field::new("f", DataType::Utf8, true).into(); From 0fd7f15ae9c804c6a7d2b0f9749dff6b71a58648 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 5 Jan 2026 03:52:36 -0700 Subject: [PATCH 4/6] address feedback --- datafusion/functions/Cargo.toml | 1 - datafusion/functions/src/string/split_part.rs | 16 +++++-- .../physical-expr/src/expressions/case.rs | 44 ++++++++++++------- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 1b8e729ec48a5..ddf9cb2b34ba3 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -309,4 +309,3 @@ required-features = ["unicode_expressions"] harness = false name = "factorial" required-features = ["math_expressions"] ->>>>>>> apache/main diff --git a/datafusion/functions/src/string/split_part.rs b/datafusion/functions/src/string/split_part.rs index 12ab867328174..ce9463c1103ec 100644 --- a/datafusion/functions/src/string/split_part.rs +++ b/datafusion/functions/src/string/split_part.rs @@ -25,7 +25,7 @@ use arrow::datatypes::DataType; use datafusion_common::ScalarValue; use datafusion_common::cast::as_int64_array; use datafusion_common::types::{NativeType, logical_int64, logical_string}; -use datafusion_common::{DataFusionError, Result, exec_err}; +use datafusion_common::{DataFusionError, Result, exec_datafusion_err, exec_err}; use datafusion_expr::{ Coercion, ColumnarValue, Documentation, TypeSignatureClass, Volatility, }; @@ -223,12 +223,22 @@ where std::cmp::Ordering::Greater => { // Positive index: use nth() to avoid collecting all parts // This stops iteration as soon as we find the nth element - string.split(delimiter).nth((n - 1) as usize) + let idx: usize = (n - 1).try_into().map_err(|_| { + exec_datafusion_err!( + "split_part index {n} exceeds maximum supported value" + ) + })?; + string.split(delimiter).nth(idx) } std::cmp::Ordering::Less => { // Negative index: use rsplit().nth() to efficiently get from the end // rsplit iterates in reverse, so -1 means first from rsplit (index 0) - string.rsplit(delimiter).nth((-n - 1) as usize) + let idx: usize = (-n - 1).try_into().map_err(|_| { + exec_datafusion_err!( + "split_part index {n} exceeds maximum supported value" + ) + })?; + string.rsplit(delimiter).nth(idx) } std::cmp::Ordering::Equal => { return exec_err!("field position must not be zero"); diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index cfa0bad0c53fc..674f5d099c896 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -195,7 +195,9 @@ impl CaseBody { let then_sub_projections = projected_body .when_then_expr .iter() - .map(|(_, then_expr)| Self::compute_sub_projection(then_expr, projection.len())) + .map(|(_, then_expr)| { + Self::compute_sub_projection(then_expr, projection.len()) + }) .collect::>>()?; let else_sub_projection = projected_body @@ -386,12 +388,13 @@ impl ProjectedCaseBody { if when_true_count == remainder_batch.num_rows() { let sub_proj = &self.then_sub_projections[i]; // Project to only needed columns before evaluating - let then_value = if sub_proj.projection.len() < remainder_batch.num_columns() { - let projected = remainder_batch.project(&sub_proj.projection)?; - sub_proj.expr.evaluate(&projected)? - } else { - self.body.when_then_expr[i].1.evaluate(&remainder_batch)? - }; + let then_value = + if sub_proj.projection.len() < remainder_batch.num_columns() { + let projected = remainder_batch.project(&sub_proj.projection)?; + sub_proj.expr.evaluate(&projected)? + } else { + self.body.when_then_expr[i].1.evaluate(&remainder_batch)? + }; result_builder.add_branch_result(&remainder_rows, then_value)?; return result_builder.finish(); } @@ -401,7 +404,8 @@ impl ProjectedCaseBody { let then_rows = filter_array(&remainder_rows, &then_filter)?; let sub_proj = &self.then_sub_projections[i]; - let then_value = if sub_proj.projection.len() < remainder_batch.num_columns() { + let then_value = if sub_proj.projection.len() < remainder_batch.num_columns() + { // Project to only needed columns, then filter (fewer columns to filter) let projected = remainder_batch.project(&sub_proj.projection)?; let then_batch = filter_record_batch(&projected, &then_filter)?; @@ -432,7 +436,9 @@ impl ProjectedCaseBody { // Handle else expression with sub-projection if let Some(else_sub_proj) = &self.else_sub_projection { - let else_value = if else_sub_proj.projection.len() < remainder_batch.num_columns() { + let else_value = if else_sub_proj.projection.len() + < remainder_batch.num_columns() + { let projected = remainder_batch.project(&else_sub_proj.projection)?; else_sub_proj.expr.evaluate(&projected)? } else if let Some(e) = &self.body.else_expr { @@ -520,12 +526,13 @@ impl ProjectedCaseBody { if when_true_count == remainder_batch.num_rows() { let sub_proj = &self.then_sub_projections[i]; - let then_value = if sub_proj.projection.len() < remainder_batch.num_columns() { - let projected = remainder_batch.project(&sub_proj.projection)?; - sub_proj.expr.evaluate(&projected)? - } else { - self.body.when_then_expr[i].1.evaluate(&remainder_batch)? - }; + let then_value = + if sub_proj.projection.len() < remainder_batch.num_columns() { + let projected = remainder_batch.project(&sub_proj.projection)?; + sub_proj.expr.evaluate(&projected)? + } else { + self.body.when_then_expr[i].1.evaluate(&remainder_batch)? + }; result_builder.add_branch_result(&remainder_rows, then_value)?; return result_builder.finish(); } @@ -534,7 +541,8 @@ impl ProjectedCaseBody { let then_rows = filter_array(&remainder_rows, &then_filter)?; let sub_proj = &self.then_sub_projections[i]; - let then_value = if sub_proj.projection.len() < remainder_batch.num_columns() { + let then_value = if sub_proj.projection.len() < remainder_batch.num_columns() + { let projected = remainder_batch.project(&sub_proj.projection)?; let then_batch = filter_record_batch(&projected, &then_filter)?; sub_proj.expr.evaluate(&then_batch)? @@ -560,7 +568,9 @@ impl ProjectedCaseBody { } if let Some(else_sub_proj) = &self.else_sub_projection { - let else_value = if else_sub_proj.projection.len() < remainder_batch.num_columns() { + let else_value = if else_sub_proj.projection.len() + < remainder_batch.num_columns() + { let projected = remainder_batch.project(&else_sub_proj.projection)?; else_sub_proj.expr.evaluate(&projected)? } else if let Some(e) = &self.body.else_expr { From 0e09e23e4493dd68d256bd2b0e7609f1afe3972b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 5 Jan 2026 09:20:19 -0700 Subject: [PATCH 5/6] revert accidental commit --- .../physical-expr/src/expressions/case.rs | 447 +----------------- 1 file changed, 9 insertions(+), 438 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 674f5d099c896..758317d3d2798 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -189,92 +189,9 @@ impl CaseBody { .map(|(k, _)| *k) .collect::>(); - // Compute sub-projections for each "then" expression. - // These allow filtering only the columns needed by each specific expression, - // rather than all columns in the globally projected batch. - let then_sub_projections = projected_body - .when_then_expr - .iter() - .map(|(_, then_expr)| { - Self::compute_sub_projection(then_expr, projection.len()) - }) - .collect::>>()?; - - let else_sub_projection = projected_body - .else_expr - .as_ref() - .map(|e| Self::compute_sub_projection(e, projection.len())) - .transpose()?; - Ok(ProjectedCaseBody { projection, body: projected_body, - then_sub_projections, - else_sub_projection, - }) - } - - /// Computes a [SubProjection] for a single expression. - /// - /// This identifies which columns (relative to the parent batch) the expression - /// needs, creates a projection vector, and rewrites the expression with new - /// column indices relative to the sub-projection. - fn compute_sub_projection( - expr: &Arc, - parent_num_columns: usize, - ) -> Result { - // Collect column indices used by this expression (relative to parent batch) - let mut used_indices = IndexSet::::new(); - expr.apply(|e| { - if let Some(column) = e.as_any().downcast_ref::() { - used_indices.insert(column.index()); - } - Ok(TreeNodeRecursion::Continue) - }) - .expect("Closure cannot fail"); - - // If the expression uses all columns, no sub-projection needed - if used_indices.len() == parent_num_columns { - return Ok(SubProjection { - projection: (0..parent_num_columns).collect(), - expr: Arc::clone(expr), - }); - } - - // Build mapping from parent indices to sub-projection indices - let index_map: IndexMap = used_indices - .iter() - .enumerate() - .map(|(sub_idx, &parent_idx)| (parent_idx, sub_idx)) - .collect(); - - // Rewrite expression with sub-projection indices - let rewritten_expr = Arc::clone(expr) - .transform_down(|e| { - if let Some(column) = e.as_any().downcast_ref::() { - let parent_idx = column.index(); - let sub_idx = *index_map.get(&parent_idx).unwrap(); - if sub_idx != parent_idx { - return Ok(Transformed::yes(Arc::new(Column::new( - column.name(), - sub_idx, - )))); - } - } - Ok(Transformed::no(e)) - }) - .map(|t| t.data)?; - - // Build projection vector (parent indices in sub-projection order) - let projection: Vec = index_map - .iter() - .sorted_by_key(|(_, sub_idx)| *sub_idx) - .map(|(&parent_idx, _)| parent_idx) - .collect(); - - Ok(SubProjection { - projection, - expr: rewritten_expr, }) } } @@ -306,353 +223,10 @@ impl CaseBody { /// /// The projection vector and the rewritten expression (which only differs from the original in /// column reference indices) are held in a `ProjectedCaseBody`. -/// Sub-projection for a single expression within a CASE. -/// -/// This allows filtering only the columns needed by a specific expression, -/// rather than all columns in the parent batch. Since `RecordBatch::project()` -/// is O(1) (just creates column references) while `filter_record_batch()` is -/// O(rows × columns), projecting first then filtering is more efficient when -/// the expression uses fewer columns than the parent batch contains. -#[derive(Debug)] -struct SubProjection { - /// Indices into the parent (globally projected) batch - projection: Vec, - /// Expression rewritten with column indices relative to this sub-projection - expr: Arc, -} - -impl PartialEq for SubProjection { - fn eq(&self, other: &Self) -> bool { - self.projection == other.projection && self.expr.eq(&other.expr) - } -} - -impl Eq for SubProjection {} - -impl Hash for SubProjection { - fn hash(&self, state: &mut H) { - self.projection.hash(state); - self.expr.hash(state); - } -} - #[derive(Debug, Hash, PartialEq, Eq)] struct ProjectedCaseBody { projection: Vec, body: CaseBody, - /// Per-expression sub-projections for "then" expressions. - /// Each sub-projection contains indices into the globally projected batch - /// and a rewritten expression with column indices relative to the sub-projection. - then_sub_projections: Vec, - /// Sub-projection for the else expression, if present. - else_sub_projection: Option, -} - -impl ProjectedCaseBody { - /// Evaluates the CASE expression using sub-projections to minimize filtering overhead. - /// - /// This method is similar to [CaseBody::case_when_no_expr] but uses the pre-computed - /// sub-projections to filter only the columns needed by each then/else expression, - /// rather than filtering all columns in the batch. - fn case_when_no_expr( - &self, - batch: &RecordBatch, - return_type: &DataType, - ) -> Result { - let mut result_builder = ResultBuilder::new(return_type, batch.num_rows()); - - // `remainder_rows` contains the indices of the rows that need to be evaluated - let mut remainder_rows: ArrayRef = - Arc::new(UInt32Array::from_iter(0..batch.num_rows() as u32)); - // `remainder_batch` contains the rows themselves that need to be evaluated - let mut remainder_batch = Cow::Borrowed(batch); - - for i in 0..self.body.when_then_expr.len() { - // Evaluate the 'when' predicate for the remainder batch - let when_predicate = &self.body.when_then_expr[i].0; - let when_value = when_predicate - .evaluate(&remainder_batch)? - .into_array(remainder_batch.num_rows())?; - let when_value = as_boolean_array(&when_value).map_err(|_| { - internal_datafusion_err!("WHEN expression did not return a BooleanArray") - })?; - - let when_true_count = when_value.true_count(); - - // If the 'when' predicate did not match any rows, continue to the next branch - if when_true_count == 0 { - continue; - } - - // If the 'when' predicate matched all remaining rows, no need to filter - if when_true_count == remainder_batch.num_rows() { - let sub_proj = &self.then_sub_projections[i]; - // Project to only needed columns before evaluating - let then_value = - if sub_proj.projection.len() < remainder_batch.num_columns() { - let projected = remainder_batch.project(&sub_proj.projection)?; - sub_proj.expr.evaluate(&projected)? - } else { - self.body.when_then_expr[i].1.evaluate(&remainder_batch)? - }; - result_builder.add_branch_result(&remainder_rows, then_value)?; - return result_builder.finish(); - } - - // Filter and evaluate the then expression using sub-projection - let then_filter = create_filter(when_value, true); - let then_rows = filter_array(&remainder_rows, &then_filter)?; - - let sub_proj = &self.then_sub_projections[i]; - let then_value = if sub_proj.projection.len() < remainder_batch.num_columns() - { - // Project to only needed columns, then filter (fewer columns to filter) - let projected = remainder_batch.project(&sub_proj.projection)?; - let then_batch = filter_record_batch(&projected, &then_filter)?; - sub_proj.expr.evaluate(&then_batch)? - } else { - // No benefit from sub-projection, filter all columns - let then_batch = filter_record_batch(&remainder_batch, &then_filter)?; - self.body.when_then_expr[i].1.evaluate(&then_batch)? - }; - result_builder.add_branch_result(&then_rows, then_value)?; - - // If this is the last branch and no else, we're done - if self.body.else_expr.is_none() && i == self.body.when_then_expr.len() - 1 { - return result_builder.finish(); - } - - // Prepare for next branch - must filter all columns since we need them - // for subsequent when/then expressions - let next_selection = match when_value.null_count() { - 0 => not(when_value), - _ => not(&prep_null_mask_filter(when_value)), - }?; - let next_filter = create_filter(&next_selection, true); - remainder_batch = - Cow::Owned(filter_record_batch(&remainder_batch, &next_filter)?); - remainder_rows = filter_array(&remainder_rows, &next_filter)?; - } - - // Handle else expression with sub-projection - if let Some(else_sub_proj) = &self.else_sub_projection { - let else_value = if else_sub_proj.projection.len() - < remainder_batch.num_columns() - { - let projected = remainder_batch.project(&else_sub_proj.projection)?; - else_sub_proj.expr.evaluate(&projected)? - } else if let Some(e) = &self.body.else_expr { - let expr = try_cast(Arc::clone(e), &batch.schema(), return_type.clone())?; - expr.evaluate(&remainder_batch)? - } else { - unreachable!("else_sub_projection exists but else_expr doesn't") - }; - result_builder.add_branch_result(&remainder_rows, else_value)?; - } - - result_builder.finish() - } - - /// Evaluates the CASE WHEN expr = value form using sub-projections. - fn case_when_with_expr( - &self, - batch: &RecordBatch, - return_type: &DataType, - ) -> Result { - let mut result_builder = ResultBuilder::new(return_type, batch.num_rows()); - - let mut remainder_rows: ArrayRef = - Arc::new(UInt32Array::from_iter(0..batch.num_rows() as u32)); - let mut remainder_batch = Cow::Borrowed(batch); - - // Evaluate the base expression - let base_expr = self.body.expr.as_ref().ok_or_else(|| { - internal_datafusion_err!("case_when_with_expr called without base expression") - })?; - let base_value = base_expr.evaluate(batch)?; - let base_value_is_nested = base_value.data_type().is_nested(); - let mut base_values = base_value.into_array(batch.num_rows())?; - - // Handle null base values - let base_null_count = base_values.logical_null_count(); - if base_null_count > 0 { - let base_not_nulls = is_not_null(base_values.as_ref())?; - let base_all_null = base_null_count == remainder_batch.num_rows(); - - if let Some(e) = &self.body.else_expr { - let expr = try_cast(Arc::clone(e), &batch.schema(), return_type.clone())?; - - if base_all_null { - let nulls_value = expr.evaluate(&remainder_batch)?; - result_builder.add_branch_result(&remainder_rows, nulls_value)?; - } else { - let nulls_filter = create_filter(¬(&base_not_nulls)?, true); - let nulls_batch = - filter_record_batch(&remainder_batch, &nulls_filter)?; - let nulls_rows = filter_array(&remainder_rows, &nulls_filter)?; - let nulls_value = expr.evaluate(&nulls_batch)?; - result_builder.add_branch_result(&nulls_rows, nulls_value)?; - } - } - - if base_all_null { - return result_builder.finish(); - } - - // Remove null rows from remainder - let not_null_filter = create_filter(&base_not_nulls, true); - remainder_batch = - Cow::Owned(filter_record_batch(&remainder_batch, ¬_null_filter)?); - remainder_rows = filter_array(&remainder_rows, ¬_null_filter)?; - base_values = filter_array(&base_values, ¬_null_filter)?; - } - - for i in 0..self.body.when_then_expr.len() { - let when_expr = &self.body.when_then_expr[i].0; - let when_value = match when_expr.evaluate(&remainder_batch)? { - ColumnarValue::Array(a) => { - compare_with_eq(&a, &base_values, base_value_is_nested) - } - ColumnarValue::Scalar(s) => { - compare_with_eq(&s.to_scalar()?, &base_values, base_value_is_nested) - } - }?; - - let when_true_count = when_value.true_count(); - - if when_true_count == 0 { - continue; - } - - if when_true_count == remainder_batch.num_rows() { - let sub_proj = &self.then_sub_projections[i]; - let then_value = - if sub_proj.projection.len() < remainder_batch.num_columns() { - let projected = remainder_batch.project(&sub_proj.projection)?; - sub_proj.expr.evaluate(&projected)? - } else { - self.body.when_then_expr[i].1.evaluate(&remainder_batch)? - }; - result_builder.add_branch_result(&remainder_rows, then_value)?; - return result_builder.finish(); - } - - let then_filter = create_filter(&when_value, true); - let then_rows = filter_array(&remainder_rows, &then_filter)?; - - let sub_proj = &self.then_sub_projections[i]; - let then_value = if sub_proj.projection.len() < remainder_batch.num_columns() - { - let projected = remainder_batch.project(&sub_proj.projection)?; - let then_batch = filter_record_batch(&projected, &then_filter)?; - sub_proj.expr.evaluate(&then_batch)? - } else { - let then_batch = filter_record_batch(&remainder_batch, &then_filter)?; - self.body.when_then_expr[i].1.evaluate(&then_batch)? - }; - result_builder.add_branch_result(&then_rows, then_value)?; - - if self.body.else_expr.is_none() && i == self.body.when_then_expr.len() - 1 { - return result_builder.finish(); - } - - let next_selection = match when_value.null_count() { - 0 => not(&when_value), - _ => not(&prep_null_mask_filter(&when_value)), - }?; - let next_filter = create_filter(&next_selection, true); - remainder_batch = - Cow::Owned(filter_record_batch(&remainder_batch, &next_filter)?); - remainder_rows = filter_array(&remainder_rows, &next_filter)?; - base_values = filter_array(&base_values, &next_filter)?; - } - - if let Some(else_sub_proj) = &self.else_sub_projection { - let else_value = if else_sub_proj.projection.len() - < remainder_batch.num_columns() - { - let projected = remainder_batch.project(&else_sub_proj.projection)?; - else_sub_proj.expr.evaluate(&projected)? - } else if let Some(e) = &self.body.else_expr { - let expr = try_cast(Arc::clone(e), &batch.schema(), return_type.clone())?; - expr.evaluate(&remainder_batch)? - } else { - unreachable!("else_sub_projection exists but else_expr doesn't") - }; - result_builder.add_branch_result(&remainder_rows, else_value)?; - } - - result_builder.finish() - } - - /// Evaluates CASE WHEN condition THEN expr ELSE expr END with sub-projections. - fn expr_or_expr( - &self, - batch: &RecordBatch, - when_value: &BooleanArray, - ) -> Result { - let when_value = match when_value.null_count() { - 0 => Cow::Borrowed(when_value), - _ => Cow::Owned(prep_null_mask_filter(when_value)), - }; - - let optimize_filter = batch.num_columns() > 1 - || (batch.num_columns() == 1 && multiple_arrays(batch.column(0).data_type())); - - let when_filter = create_filter(&when_value, optimize_filter); - - // Evaluate then expression with sub-projection - let then_sub_proj = &self.then_sub_projections[0]; - let then_value = if then_sub_proj.projection.len() < batch.num_columns() { - let projected = batch.project(&then_sub_proj.projection)?; - let then_batch = filter_record_batch(&projected, &when_filter)?; - then_sub_proj.expr.evaluate(&then_batch)? - } else { - let then_batch = filter_record_batch(batch, &when_filter)?; - self.body.when_then_expr[0].1.evaluate(&then_batch)? - }; - - // Evaluate else expression with sub-projection - let else_selection = not(&when_value)?; - let else_filter = create_filter(&else_selection, optimize_filter); - - let else_value = if let Some(else_sub_proj) = &self.else_sub_projection { - if else_sub_proj.projection.len() < batch.num_columns() { - let projected = batch.project(&else_sub_proj.projection)?; - let else_batch = filter_record_batch(&projected, &else_filter)?; - else_sub_proj.expr.evaluate(&else_batch)? - } else { - let else_batch = filter_record_batch(batch, &else_filter)?; - let e = self.body.else_expr.as_ref().unwrap(); - let return_type = self.body.data_type(&batch.schema())?; - let else_expr = try_cast(Arc::clone(e), &batch.schema(), return_type) - .unwrap_or_else(|_| Arc::clone(e)); - else_expr.evaluate(&else_batch)? - } - } else { - let else_batch = filter_record_batch(batch, &else_filter)?; - let e = self.body.else_expr.as_ref().unwrap(); - let return_type = self.body.data_type(&batch.schema())?; - let else_expr = try_cast(Arc::clone(e), &batch.schema(), return_type) - .unwrap_or_else(|_| Arc::clone(e)); - else_expr.evaluate(&else_batch)? - }; - - Ok(ColumnarValue::Array(match (then_value, else_value) { - (ColumnarValue::Array(t), ColumnarValue::Array(e)) => { - merge(&when_value, &t, &e) - } - (ColumnarValue::Scalar(t), ColumnarValue::Array(e)) => { - merge(&when_value, &t.to_scalar()?, &e) - } - (ColumnarValue::Array(t), ColumnarValue::Scalar(e)) => { - merge(&when_value, &t, &e.to_scalar()?) - } - (ColumnarValue::Scalar(t), ColumnarValue::Scalar(e)) => { - merge(&when_value, &t.to_scalar()?, &e.to_scalar()?) - } - }?)) - } } /// The CASE expression is similar to a series of nested if/else and there are two forms that @@ -1109,7 +683,6 @@ impl CaseExpr { } } -#[allow(dead_code)] // These methods are superseded by ProjectedCaseBody methods but kept for reference impl CaseBody { fn data_type(&self, input_schema: &Schema) -> Result { // since all then results have the same data type, we can choose any one as the @@ -1432,12 +1005,11 @@ impl CaseExpr { ) -> Result { let return_type = self.data_type(&batch.schema())?; if projected.projection.len() < batch.num_columns() { - // Project to only columns used by any expression in the CASE, - // then use sub-projections for individual then/else expressions let projected_batch = batch.project(&projected.projection)?; - projected.case_when_with_expr(&projected_batch, &return_type) + projected + .body + .case_when_with_expr(&projected_batch, &return_type) } else { - // No global projection benefit - use original body self.body.case_when_with_expr(batch, &return_type) } } @@ -1456,12 +1028,11 @@ impl CaseExpr { ) -> Result { let return_type = self.data_type(&batch.schema())?; if projected.projection.len() < batch.num_columns() { - // Project to only columns used by any expression in the CASE, - // then use sub-projections for individual then/else expressions let projected_batch = batch.project(&projected.projection)?; - projected.case_when_no_expr(&projected_batch, &return_type) + projected + .body + .case_when_no_expr(&projected_batch, &return_type) } else { - // No global projection benefit - use original body self.body.case_when_no_expr(batch, &return_type) } } @@ -1569,11 +1140,11 @@ impl CaseExpr { self.body.else_expr.as_ref().unwrap().evaluate(batch) } else if projected.projection.len() < batch.num_columns() { // The case expressions do not use all the columns of the input batch. - // Project first to reduce time spent filtering, then use sub-projections. + // Project first to reduce time spent filtering. let projected_batch = batch.project(&projected.projection)?; - projected.expr_or_expr(&projected_batch, when_value) + projected.body.expr_or_expr(&projected_batch, when_value) } else { - // No global projection benefit - use original body + // All columns are used in the case expressions, so there is no need to project. self.body.expr_or_expr(batch, when_value) } } From bc002faebc4080772de5333d06bc5215ba8055dd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 5 Jan 2026 09:21:07 -0700 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: Martin Grigorov --- datafusion/functions/src/string/split_part.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/string/split_part.rs b/datafusion/functions/src/string/split_part.rs index ce9463c1103ec..74bf7c16c43a1 100644 --- a/datafusion/functions/src/string/split_part.rs +++ b/datafusion/functions/src/string/split_part.rs @@ -235,7 +235,7 @@ where // rsplit iterates in reverse, so -1 means first from rsplit (index 0) let idx: usize = (-n - 1).try_into().map_err(|_| { exec_datafusion_err!( - "split_part index {n} exceeds maximum supported value" + "split_part index {n} exceeds minimum supported value" ) })?; string.rsplit(delimiter).nth(idx)