From b7f640cc8fda2615a8be980e4232951d7aff32d4 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 28 Nov 2023 09:14:55 +0800 Subject: [PATCH 1/5] add benchmark Signed-off-by: jayzhan211 --- datafusion/core/Cargo.toml | 4 + datafusion/core/benches/array_expression.rs | 64 +++++++++++++ .../physical-expr/src/array_expressions.rs | 92 ++++++++++++++++++- 3 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 datafusion/core/benches/array_expression.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 0b7aa1509820..c30c9035b741 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -167,3 +167,7 @@ name = "sort" [[bench]] harness = false name = "topk_aggregate" + +[[bench]] +harness = false +name = "array_expression" \ No newline at end of file diff --git a/datafusion/core/benches/array_expression.rs b/datafusion/core/benches/array_expression.rs new file mode 100644 index 000000000000..2557a83f7d6b --- /dev/null +++ b/datafusion/core/benches/array_expression.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; +extern crate arrow; +extern crate datafusion; + +mod data_utils; +use crate::criterion::Criterion; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{Int64Array, ListArray, ArrayRef}; +use datafusion_physical_expr::array_expressions; +use std::sync::Arc; + +fn criterion_benchmark(c: &mut Criterion) { + let array_len = 1000000000; + + let array = (0..array_len).map(|_| Some(2 as i64)).collect::>(); + let list_array = ListArray::from_iter_primitive::(vec![ + Some(array.clone()), + Some(array.clone()), + Some(array), + ]); + let from_array = Int64Array::from_value(2, 3); + let to_array = Int64Array::from_value(-2, 3); + + let args = vec![ + Arc::new(list_array) as ArrayRef, + Arc::new(from_array) as ArrayRef, + Arc::new(to_array) as ArrayRef, + ]; + + let array = (0..array_len).map(|_| Some(-2 as i64)).collect::>(); + let expected_array = ListArray::from_iter_primitive::(vec![ + Some(array.clone()), + Some(array.clone()), + Some(array), + ]); + + c.bench_function("array_replace", |b| { + b.iter(|| { + assert_eq!(array_expressions::array_replace_all(args.as_slice()).unwrap().as_list::(), criterion::black_box(&expected_array)) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 8968bcf2ea4e..dd36ea8f9197 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -102,6 +102,7 @@ fn compare_element_to_list( ) -> Result { let indices = UInt32Array::from(vec![row_index as u32]); let element_array_row = arrow::compute::take(element_array, &indices, None)?; + // Compute all positions in list_row_array (that is itself an // array) that are equal to `from_array_row` let res = match element_array_row.data_type() { @@ -1415,16 +1416,103 @@ fn general_replace( )?)) } +fn general_replace_v2( + list_array: &ListArray, + from_array: &ArrayRef, + to_array: &ArrayRef, + arr_n: Vec, +) -> Result { + // Build up the offsets for the final output array + let mut offsets: Vec = vec![0]; + let values = list_array.values(); + let original_data = values.to_data(); + // let from_data = from_array.to_data(); + let to_data = to_array.to_data(); + let capacity = Capacities::Array(original_data.len()); + + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data, &to_data], + false, + capacity, + ); + + let mut valid = BooleanBufferBuilder::new(list_array.len()); + + for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { + if list_array.is_null(row_index) { + offsets.push(offsets[row_index]); + valid.append(false); + continue; + } + + let start = offset_window[0] as usize; + let end = offset_window[1] as usize; + + let list_array_row = list_array.value(row_index); + + // Compute all positions in list_row_array (that is itself an + // array) that are equal to `from_array_row` + let eq_array = compare_element_to_list( + &list_array_row, + &from_array, + row_index, + true, + )?; + assert_eq!(end, start + eq_array.len()); + + let original_idx = 0; + let replace_idx = 1; + let n = arr_n[row_index]; + let mut counter = 0; + + if eq_array.false_count() == eq_array.len() { + // no matches, copy original data + mutable.extend(original_idx, start, end); + offsets.push(offsets[row_index] + (end - start) as i32); + valid.append(true); + continue; + } + + for (i, to_replace) in eq_array.iter().enumerate() { + if let Some(true) = to_replace { + mutable.extend(replace_idx, row_index, row_index + 1); + counter += 1; + if counter == n { + // copy original data for any matches past n + mutable.extend(original_idx, start + i + 1, end); + break; + } + } else { + // copy original data for false / null matches + mutable.extend(original_idx, start + i, start + i + 1); + } + } + + offsets.push(offsets[row_index] + (end - start) as i32); + valid.append(true); + } + + let data = mutable.freeze(); + + Ok(Arc::new(ListArray::try_new( + Arc::new(Field::new("item", list_array.value_type(), true)), + OffsetBuffer::new(offsets.into()), + arrow_array::make_array(data), + Some(NullBuffer::new(valid.finish())), + )?)) +} + pub fn array_replace(args: &[ArrayRef]) -> Result { // replace at most one occurence for each element let arr_n = vec![1; args[0].len()]; - general_replace(as_list_array(&args[0])?, &args[1], &args[2], arr_n) + // general_replace(as_list_array(&args[0])?, &args[1], &args[2], arr_n) + general_replace_v2(as_list_array(&args[0])?, &args[1], &args[2], arr_n) } pub fn array_replace_n(args: &[ArrayRef]) -> Result { // replace the specified number of occurences let arr_n = as_int64_array(&args[3])?.values().to_vec(); - general_replace(as_list_array(&args[0])?, &args[1], &args[2], arr_n) + general_replace_v2(as_list_array(&args[0])?, &args[1], &args[2], arr_n) } pub fn array_replace_all(args: &[ArrayRef]) -> Result { From a945d7381205895bf2c4c8a636e7a8dc8f2ad132 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 28 Nov 2023 09:29:18 +0800 Subject: [PATCH 2/5] fmt Signed-off-by: jayzhan211 --- datafusion/core/benches/array_expression.rs | 13 +++++++++---- datafusion/physical-expr/src/array_expressions.rs | 10 +++------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/datafusion/core/benches/array_expression.rs b/datafusion/core/benches/array_expression.rs index 2557a83f7d6b..6a7763d92895 100644 --- a/datafusion/core/benches/array_expression.rs +++ b/datafusion/core/benches/array_expression.rs @@ -24,12 +24,12 @@ mod data_utils; use crate::criterion::Criterion; use arrow_array::cast::AsArray; use arrow_array::types::Int64Type; -use arrow_array::{Int64Array, ListArray, ArrayRef}; +use arrow_array::{ArrayRef, Int64Array, ListArray}; use datafusion_physical_expr::array_expressions; use std::sync::Arc; fn criterion_benchmark(c: &mut Criterion) { - let array_len = 1000000000; + let array_len = 600000000; let array = (0..array_len).map(|_| Some(2 as i64)).collect::>(); let list_array = ListArray::from_iter_primitive::(vec![ @@ -52,10 +52,15 @@ fn criterion_benchmark(c: &mut Criterion) { Some(array.clone()), Some(array), ]); - + c.bench_function("array_replace", |b| { b.iter(|| { - assert_eq!(array_expressions::array_replace_all(args.as_slice()).unwrap().as_list::(), criterion::black_box(&expected_array)) + assert_eq!( + array_expressions::array_replace_all(args.as_slice()) + .unwrap() + .as_list::(), + criterion::black_box(&expected_array) + ) }) }); } diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index dd36ea8f9197..ee76c3d8fe09 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -1429,7 +1429,7 @@ fn general_replace_v2( // let from_data = from_array.to_data(); let to_data = to_array.to_data(); let capacity = Capacities::Array(original_data.len()); - + let mut mutable = MutableArrayData::with_capacities( vec![&original_data, &to_data], false, @@ -1452,12 +1452,8 @@ fn general_replace_v2( // Compute all positions in list_row_array (that is itself an // array) that are equal to `from_array_row` - let eq_array = compare_element_to_list( - &list_array_row, - &from_array, - row_index, - true, - )?; + let eq_array = + compare_element_to_list(&list_array_row, &from_array, row_index, true)?; assert_eq!(end, start + eq_array.len()); let original_idx = 0; From 884015686d373584c8868e23e443b5e6e2eca11d Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 28 Nov 2023 09:31:20 +0800 Subject: [PATCH 3/5] address clippy Signed-off-by: jayzhan211 --- datafusion/core/benches/array_expression.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/benches/array_expression.rs b/datafusion/core/benches/array_expression.rs index 6a7763d92895..b450ceab9110 100644 --- a/datafusion/core/benches/array_expression.rs +++ b/datafusion/core/benches/array_expression.rs @@ -31,7 +31,7 @@ use std::sync::Arc; fn criterion_benchmark(c: &mut Criterion) { let array_len = 600000000; - let array = (0..array_len).map(|_| Some(2 as i64)).collect::>(); + let array = (0..array_len).map(|_| Some(2_i64)).collect::>(); let list_array = ListArray::from_iter_primitive::(vec![ Some(array.clone()), Some(array.clone()), @@ -46,7 +46,7 @@ fn criterion_benchmark(c: &mut Criterion) { Arc::new(to_array) as ArrayRef, ]; - let array = (0..array_len).map(|_| Some(-2 as i64)).collect::>(); + let array = (0..array_len).map(|_| Some(-2_i64)).collect::>(); let expected_array = ListArray::from_iter_primitive::(vec![ Some(array.clone()), Some(array.clone()), From af39170beac72f90ce64a94a9b68258384518795 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 28 Nov 2023 20:44:17 +0800 Subject: [PATCH 4/5] cleanup Signed-off-by: jayzhan211 --- datafusion/core/Cargo.toml | 2 +- datafusion/core/benches/array_expression.rs | 6 +- .../physical-expr/src/array_expressions.rs | 103 +----------------- 3 files changed, 11 insertions(+), 100 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c30c9035b741..7caf91e24f2f 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -170,4 +170,4 @@ name = "topk_aggregate" [[bench]] harness = false -name = "array_expression" \ No newline at end of file +name = "array_expression" diff --git a/datafusion/core/benches/array_expression.rs b/datafusion/core/benches/array_expression.rs index b450ceab9110..95bc93e0e353 100644 --- a/datafusion/core/benches/array_expression.rs +++ b/datafusion/core/benches/array_expression.rs @@ -29,7 +29,9 @@ use datafusion_physical_expr::array_expressions; use std::sync::Arc; fn criterion_benchmark(c: &mut Criterion) { - let array_len = 600000000; + // Construct large arrays for benchmarking + + let array_len = 100000000; let array = (0..array_len).map(|_| Some(2_i64)).collect::>(); let list_array = ListArray::from_iter_primitive::(vec![ @@ -53,6 +55,8 @@ fn criterion_benchmark(c: &mut Criterion) { Some(array), ]); + // Benchmark array functions + c.bench_function("array_replace", |b| { b.iter(|| { assert_eq!( diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index ee76c3d8fe09..5b5510894d94 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -34,8 +34,7 @@ use datafusion_common::cast::{ }; use datafusion_common::utils::array_into_list_array; use datafusion_common::{ - exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, - DataFusionError, Result, + exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result, }; use itertools::Itertools; @@ -1332,101 +1331,11 @@ fn general_replace( from_array: &ArrayRef, to_array: &ArrayRef, arr_n: Vec, -) -> Result { - // Build up the offsets for the final output array - let mut offsets: Vec = vec![0]; - let data_type = list_array.value_type(); - let mut new_values = vec![]; - - // n is the number of elements to replace in this row - for (row_index, (list_array_row, n)) in - list_array.iter().zip(arr_n.iter()).enumerate() - { - let last_offset: i32 = offsets - .last() - .copied() - .ok_or_else(|| internal_datafusion_err!("offsets should not be empty"))?; - - match list_array_row { - Some(list_array_row) => { - // Compute all positions in list_row_array (that is itself an - // array) that are equal to `from_array_row` - let eq_array = compare_element_to_list( - &list_array_row, - &from_array, - row_index, - true, - )?; - - // Use MutableArrayData to build the replaced array - let original_data = list_array_row.to_data(); - let to_data = to_array.to_data(); - let capacity = Capacities::Array(original_data.len() + to_data.len()); - - // First array is the original array, second array is the element to replace with. - let mut mutable = MutableArrayData::with_capacities( - vec![&original_data, &to_data], - false, - capacity, - ); - let original_idx = 0; - let replace_idx = 1; - - let mut counter = 0; - for (i, to_replace) in eq_array.iter().enumerate() { - if let Some(true) = to_replace { - mutable.extend(replace_idx, row_index, row_index + 1); - counter += 1; - if counter == *n { - // copy original data for any matches past n - mutable.extend(original_idx, i + 1, eq_array.len()); - break; - } - } else { - // copy original data for false / null matches - mutable.extend(original_idx, i, i + 1); - } - } - - let data = mutable.freeze(); - let replaced_array = arrow_array::make_array(data); - - offsets.push(last_offset + replaced_array.len() as i32); - new_values.push(replaced_array); - } - None => { - // Null element results in a null row (no new offsets) - offsets.push(last_offset); - } - } - } - - let values = if new_values.is_empty() { - new_empty_array(&data_type) - } else { - let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); - arrow::compute::concat(&new_values)? - }; - - Ok(Arc::new(ListArray::try_new( - Arc::new(Field::new("item", data_type, true)), - OffsetBuffer::new(offsets.into()), - values, - list_array.nulls().cloned(), - )?)) -} - -fn general_replace_v2( - list_array: &ListArray, - from_array: &ArrayRef, - to_array: &ArrayRef, - arr_n: Vec, ) -> Result { // Build up the offsets for the final output array let mut offsets: Vec = vec![0]; let values = list_array.values(); let original_data = values.to_data(); - // let from_data = from_array.to_data(); let to_data = to_array.to_data(); let capacity = Capacities::Array(original_data.len()); @@ -1454,15 +1363,14 @@ fn general_replace_v2( // array) that are equal to `from_array_row` let eq_array = compare_element_to_list(&list_array_row, &from_array, row_index, true)?; - assert_eq!(end, start + eq_array.len()); let original_idx = 0; let replace_idx = 1; let n = arr_n[row_index]; let mut counter = 0; + // All elements are false, no need to replace, just copy original data if eq_array.false_count() == eq_array.len() { - // no matches, copy original data mutable.extend(original_idx, start, end); offsets.push(offsets[row_index] + (end - start) as i32); valid.append(true); @@ -1474,7 +1382,7 @@ fn general_replace_v2( mutable.extend(replace_idx, row_index, row_index + 1); counter += 1; if counter == n { - // copy original data for any matches past n + // copy original data for any matches pass n mutable.extend(original_idx, start + i + 1, end); break; } @@ -1501,14 +1409,13 @@ fn general_replace_v2( pub fn array_replace(args: &[ArrayRef]) -> Result { // replace at most one occurence for each element let arr_n = vec![1; args[0].len()]; - // general_replace(as_list_array(&args[0])?, &args[1], &args[2], arr_n) - general_replace_v2(as_list_array(&args[0])?, &args[1], &args[2], arr_n) + general_replace(as_list_array(&args[0])?, &args[1], &args[2], arr_n) } pub fn array_replace_n(args: &[ArrayRef]) -> Result { // replace the specified number of occurences let arr_n = as_int64_array(&args[3])?.values().to_vec(); - general_replace_v2(as_list_array(&args[0])?, &args[1], &args[2], arr_n) + general_replace(as_list_array(&args[0])?, &args[1], &args[2], arr_n) } pub fn array_replace_all(args: &[ArrayRef]) -> Result { From 9e54d7190d49ba74584ee2afa7eb482dc7be0258 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Tue, 28 Nov 2023 20:59:54 +0800 Subject: [PATCH 5/5] fix comment Signed-off-by: jayzhan211 --- datafusion/physical-expr/src/array_expressions.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 5b5510894d94..f391dffbc6ea 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -1339,6 +1339,7 @@ fn general_replace( let to_data = to_array.to_data(); let capacity = Capacities::Array(original_data.len()); + // First array is the original array, second array is the element to replace with. let mut mutable = MutableArrayData::with_capacities( vec![&original_data, &to_data], false, @@ -1382,7 +1383,7 @@ fn general_replace( mutable.extend(replace_idx, row_index, row_index + 1); counter += 1; if counter == n { - // copy original data for any matches pass n + // copy original data for any matches past n mutable.extend(original_idx, start + i + 1, end); break; }