diff --git a/datafusion/functions-nested/benches/array_set_ops.rs b/datafusion/functions-nested/benches/array_set_ops.rs index 237c2d1474c00..e3146921d7fe1 100644 --- a/datafusion/functions-nested/benches/array_set_ops.rs +++ b/datafusion/functions-nested/benches/array_set_ops.rs @@ -23,7 +23,7 @@ use criterion::{ }; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; -use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; +use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, ArrayUnion}; use rand::SeedableRng; use rand::prelude::SliceRandom; use rand::rngs::StdRng; @@ -38,6 +38,7 @@ const SEED: u64 = 42; fn criterion_benchmark(c: &mut Criterion) { bench_array_union(c); bench_array_intersect(c); + bench_array_distinct(c); } fn invoke_udf(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { @@ -97,6 +98,48 @@ fn bench_array_intersect(c: &mut Criterion) { group.finish(); } +fn bench_array_distinct(c: &mut Criterion) { + let mut group = c.benchmark_group("array_distinct"); + let udf = ArrayDistinct::new(); + + for (duplicate_label, duplicate_ratio) in + &[("high_duplicate", 0.8), ("low_duplicate", 0.2)] + { + for &array_size in ARRAY_SIZES { + let array = + create_array_with_duplicates(NUM_ROWS, array_size, *duplicate_ratio); + group.bench_with_input( + BenchmarkId::new(*duplicate_label, array_size), + &array_size, + |b, _| { + b.iter(|| { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(array.clone())], + arg_fields: vec![ + Field::new("arr", array.data_type().clone(), false) + .into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new( + "result", + array.data_type().clone(), + false, + ) + .into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ) + }) + }, + ); + } + } + + group.finish(); +} + fn create_arrays_with_overlap( num_rows: usize, array_size: usize, @@ -164,5 +207,53 @@ fn create_arrays_with_overlap( (array1, array2) } +fn create_array_with_duplicates( + num_rows: usize, + array_size: usize, + duplicate_ratio: f64, +) -> ArrayRef { + assert!((0.0..=1.0).contains(&duplicate_ratio)); + let unique_count = ((array_size as f64) * (1.0 - duplicate_ratio)).round() as usize; + let duplicate_count = array_size - unique_count; + + let mut rng = StdRng::seed_from_u64(SEED); + let mut values = Vec::with_capacity(num_rows * array_size); + + for row in 0..num_rows { + let base = (row as i64) * (array_size as i64) * 2; + + // Add unique values first + for i in 0..unique_count { + values.push(base + i as i64); + } + + // Fill the rest with duplicates randomly picked from the unique values + let mut unique_indices: Vec = + (0..unique_count).map(|i| base + i as i64).collect(); + unique_indices.shuffle(&mut rng); + + for i in 0..duplicate_count { + values.push(unique_indices[i % unique_count]); + } + } + + let values = Int64Array::from(values); + let field = Arc::new(Field::new("item", DataType::Int64, true)); + + let offsets = (0..=num_rows) + .map(|i| (i * array_size) as i32) + .collect::>(); + + Arc::new( + ListArray::try_new( + field, + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + ) + .unwrap(), + ) +} + criterion_group!(benches, criterion_benchmark); criterion_main!(benches); diff --git a/datafusion/functions-nested/src/set_ops.rs b/datafusion/functions-nested/src/set_ops.rs index 370599611feef..2348b3c530c53 100644 --- a/datafusion/functions-nested/src/set_ops.rs +++ b/datafusion/functions-nested/src/set_ops.rs @@ -22,7 +22,6 @@ use arrow::array::{ Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, new_null_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer}; -use arrow::compute; use arrow::datatypes::DataType::{LargeList, List, Null}; use arrow::datatypes::{DataType, Field, FieldRef}; use arrow::row::{RowConverter, SortField}; @@ -35,7 +34,6 @@ use datafusion_expr::{ ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, }; use datafusion_macros::user_doc; -use itertools::Itertools; use std::any::Any; use std::collections::HashSet; use std::fmt::{Display, Formatter}; @@ -264,7 +262,7 @@ impl ScalarUDFImpl for ArrayIntersect { ) )] #[derive(Debug, PartialEq, Eq, Hash)] -pub(super) struct ArrayDistinct { +pub struct ArrayDistinct { signature: Signature, aliases: Vec, } @@ -278,6 +276,12 @@ impl ArrayDistinct { } } +impl Default for ArrayDistinct { + fn default() -> Self { + Self::new() + } +} + impl ScalarUDFImpl for ArrayDistinct { fn as_any(&self) -> &dyn Any { self @@ -527,42 +531,52 @@ fn general_array_distinct( if array.is_empty() { return Ok(Arc::new(array.clone()) as ArrayRef); } + let value_offsets = array.value_offsets(); let dt = array.value_type(); - let mut offsets = Vec::with_capacity(array.len()); + let mut offsets = Vec::with_capacity(array.len() + 1); offsets.push(OffsetSize::usize_as(0)); - let mut new_arrays = Vec::with_capacity(array.len()); - let converter = RowConverter::new(vec![SortField::new(dt)])?; - // distinct for each list in ListArray - for arr in array.iter() { - let last_offset: OffsetSize = offsets.last().copied().unwrap(); - let Some(arr) = arr else { - // Add same offset for null + + // Convert all values to row format in a single batch for performance + let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; + let rows = converter.convert_columns(&[Arc::clone(array.values())])?; + let mut final_rows = Vec::with_capacity(rows.num_rows()); + let mut seen = HashSet::new(); + for i in 0..array.len() { + let last_offset = *offsets.last().unwrap(); + + // Null list entries produce no output; just carry forward the offset. + if array.is_null(i) { offsets.push(last_offset); continue; - }; - let values = converter.convert_columns(&[arr])?; - // sort elements in list and remove duplicates - let rows = values.iter().sorted().dedup().collect::>(); - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => Arc::clone(array), - None => { - return internal_err!("array_distinct: failed to get array from rows"); + } + + let start = value_offsets[i].as_usize(); + let end = value_offsets[i + 1].as_usize(); + seen.clear(); + seen.reserve(end - start); + + // Walk the sub-array and keep only the first occurrence of each value. + for idx in start..end { + let row = rows.row(idx); + if seen.insert(row) { + final_rows.push(row); } - }; - new_arrays.push(array); - } - if new_arrays.is_empty() { - return Ok(Arc::new(array.clone()) as ArrayRef); + } + offsets.push(last_offset + OffsetSize::usize_as(seen.len())); } - let offsets = OffsetBuffer::new(offsets.into()); - let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); - let values = compute::concat(&new_arrays_ref)?; + + // Convert all collected distinct rows back + let final_values = if final_rows.is_empty() { + new_empty_array(&dt) + } else { + let arrays = converter.convert_rows(final_rows)?; + Arc::clone(&arrays[0]) + }; + Ok(Arc::new(GenericListArray::::try_new( Arc::clone(field), - offsets, - values, + OffsetBuffer::new(offsets.into()), + final_values, // Keep the list nulls array.nulls().cloned(), )?)) diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index a41cbecf77055..f675763120718 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -6832,7 +6832,7 @@ from array_distinct_table_2D; ---- [[1, 2], [3, 4], [5, 6]] [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] -[NULL, [5, 6]] +[[5, 6], NULL] query ? select array_distinct(column1) @@ -6864,7 +6864,7 @@ from array_distinct_table_2D_fixed; ---- [[1, 2], [3, 4], [5, 6]] [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] -[NULL, [5, 6]] +[[5, 6], NULL] query ??? select array_intersect(column1, column2),