Skip to content

Commit

Permalink
Remove define_array_slice and reuse array_slice for `array_pop_fr…
Browse files Browse the repository at this point in the history
…ont/back` (#8401)

* array_element done

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* clippy

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* replace array_slice

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fix get_indexed_field_empty_list

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* replace pop front and pop back

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* clippy

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* add doc and comment

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fmt

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

---------

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 authored Dec 9, 2023
1 parent 2765fee commit 182a37e
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 160 deletions.
337 changes: 178 additions & 159 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Array expressions

use std::any::type_name;
use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -370,135 +369,64 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
}
}

fn return_empty(return_null: bool, data_type: DataType) -> Arc<dyn Array> {
if return_null {
new_null_array(&data_type, 1)
} else {
new_empty_array(&data_type)
}
}

fn list_slice<T: Array + 'static>(
array: &dyn Array,
i: i64,
j: i64,
return_element: bool,
) -> ArrayRef {
let array = array.as_any().downcast_ref::<T>().unwrap();

let array_type = array.data_type().clone();
/// array_element SQL function
///
/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
/// `array_element(array, index)`
///
/// For example:
/// > array_element(\[1, 2, 3], 2) -> 2
pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let indexes = as_int64_array(&args[1])?;

if i == 0 && j == 0 || array.is_empty() {
return return_empty(return_element, array_type);
}
let values = list_array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());

let i = match i.cmp(&0) {
Ordering::Less => {
if i.unsigned_abs() > array.len() as u64 {
return return_empty(true, array_type);
}
// use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], true, capacity);

(array.len() as i64 + i + 1) as usize
}
Ordering::Equal => 1,
Ordering::Greater => i as usize,
};
fn adjusted_array_index(index: i64, len: usize) -> Option<i64> {
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
index + len as i64
} else {
index - 1
};

let j = match j.cmp(&0) {
Ordering::Less => {
if j.unsigned_abs() as usize > array.len() {
return return_empty(true, array_type);
}
if return_element {
(array.len() as i64 + j + 1) as usize
} else {
(array.len() as i64 + j) as usize
}
if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
} else {
// Out of bounds
None
}
Ordering::Equal => 1,
Ordering::Greater => j.min(array.len() as i64) as usize,
};

if i > j || i > array.len() {
return_empty(return_element, array_type)
} else {
Arc::new(array.slice(i - 1, j + 1 - i))
}
}

fn slice<T: Array + 'static>(
array: &ListArray,
key: &Int64Array,
extra_key: &Int64Array,
return_element: bool,
) -> Result<Arc<dyn Array>> {
let sliced_array: Vec<Arc<dyn Array>> = array
.iter()
.zip(key.iter())
.zip(extra_key.iter())
.map(|((arr, i), j)| match (arr, i, j) {
(Some(arr), Some(i), Some(j)) => list_slice::<T>(&arr, i, j, return_element),
(Some(arr), None, Some(j)) => list_slice::<T>(&arr, 1i64, j, return_element),
(Some(arr), Some(i), None) => {
list_slice::<T>(&arr, i, arr.len() as i64, return_element)
}
(Some(arr), None, None) if !return_element => arr.clone(),
_ => return_empty(return_element, array.value_type()),
})
.collect();
for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;
let len = end - start;

// concat requires input of at least one array
if sliced_array.is_empty() {
Ok(return_empty(return_element, array.value_type()))
} else {
let vec = sliced_array
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();
let mut i: i32 = 0;
let mut offsets = vec![i];
offsets.extend(
vec.iter()
.map(|a| {
i += a.len() as i32;
i
})
.collect::<Vec<_>>(),
);
let values = compute::concat(vec.as_slice()).unwrap();
// array is null
if len == 0 {
mutable.extend_nulls(1);
continue;
}

let index = adjusted_array_index(indexes.value(row_index), len);

if return_element {
Ok(values)
if let Some(index) = index {
mutable.extend(0, start + index as usize, start + index as usize + 1);
} else {
let field = Arc::new(Field::new("item", array.value_type(), true));
Ok(Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
values,
None,
)?))
// Index out of bounds
mutable.extend_nulls(1);
}
}
}

fn define_array_slice(
list_array: &ListArray,
key: &Int64Array,
extra_key: &Int64Array,
return_element: bool,
) -> Result<ArrayRef> {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
slice::<$ARRAY_TYPE>(list_array, key, extra_key, return_element)
};
}
call_array_function!(list_array.value_type(), true)
}

pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let key = as_int64_array(&args[1])?;
define_array_slice(list_array, key, key, true)
let data = mutable.freeze();
Ok(arrow_array::make_array(data))
}

fn general_except<OffsetSize: OffsetSizeTrait>(
Expand Down Expand Up @@ -579,47 +507,136 @@ pub fn array_except(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}

/// array_slice SQL function
///
/// We follow the behavior of array_slice in DuckDB
/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
///
/// > array_slice(array, from, to)
///
/// Positive index is treated as the index from the start of the array. If the
/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
/// length of the array, it is treated as the length of the array.
///
/// Negative index is treated as the index from the end of the array. If the index
/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
/// The `to` index is exclusive like python slice syntax.
///
/// See test cases in `array.slt` for more details.
pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let key = as_int64_array(&args[1])?;
let extra_key = as_int64_array(&args[2])?;
define_array_slice(list_array, key, extra_key, false)
}

fn general_array_pop(
list_array: &GenericListArray<i32>,
from_back: bool,
) -> Result<(Vec<i64>, Vec<i64>)> {
if from_back {
let key = vec![0; list_array.len()];
// Attention: `arr.len() - 1` in extra key defines the last element position (position = index + 1, not inclusive) we want in the new array.
let extra_key: Vec<_> = list_array
.iter()
.map(|x| x.map_or(0, |arr| arr.len() as i64 - 1))
.collect();
Ok((key, extra_key))
} else {
// Attention: 2 in the `key`` defines the first element position (position = index + 1) we want in the new array.
// We only handle two cases of the first element index: if the old array has any elements, starts from 2 (index + 1), or starts from initial.
let key: Vec<_> = list_array.iter().map(|x| x.map_or(0, |_| 2)).collect();
let extra_key: Vec<_> = list_array
.iter()
.map(|x| x.map_or(0, |arr| arr.len() as i64))
.collect();
Ok((key, extra_key))
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;

let values = list_array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());

// use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls.
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);

// We have the slice syntax compatible with DuckDB v0.8.1.
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.

fn adjusted_from_index(index: i64, len: usize) -> Option<i64> {
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
index + len as i64
} else {
// array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
std::cmp::max(index - 1, 0)
};

if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
} else {
// Out of bounds
None
}
}

fn adjusted_to_index(index: i64, len: usize) -> Option<i64> {
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
// array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
index + len as i64 - 1
} else {
// array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
std::cmp::min(index - 1, len as i64 - 1)
};

if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
} else {
// Out of bounds
None
}
}

let mut offsets = vec![0];

for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;
let len = end - start;

// len 0 indicate array is null, return empty array in this row.
if len == 0 {
offsets.push(offsets[row_index]);
continue;
}

// If index is null, we consider it as the minimum / maximum index of the array.
let from_index = if from_array.is_null(row_index) {
Some(0)
} else {
adjusted_from_index(from_array.value(row_index), len)
};

let to_index = if to_array.is_null(row_index) {
Some(len as i64 - 1)
} else {
adjusted_to_index(to_array.value(row_index), len)
};

if let (Some(from), Some(to)) = (from_index, to_index) {
if from <= to {
assert!(start + to as usize <= end);
mutable.extend(0, start + from as usize, start + to as usize + 1);
offsets.push(offsets[row_index] + (to - from + 1) as i32);
} else {
// invalid range, return empty array
offsets.push(offsets[row_index]);
}
} else {
// invalid range, return empty array
offsets.push(offsets[row_index]);
}
}

let data = mutable.freeze();

Ok(Arc::new(ListArray::try_new(
Arc::new(Field::new("item", list_array.value_type(), true)),
OffsetBuffer::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
}

/// array_pop_back SQL function
pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let (key, extra_key) = general_array_pop(list_array, true)?;

define_array_slice(
list_array,
&Int64Array::from(key),
&Int64Array::from(extra_key),
false,
)
let from_array = Int64Array::from(vec![1; list_array.len()]);
let to_array = Int64Array::from(
list_array
.iter()
.map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
.collect::<Vec<i64>>(),
);
let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
array_slice(args.as_slice())
}

/// Appends or prepends elements to a ListArray.
Expand Down Expand Up @@ -743,16 +760,18 @@ pub fn gen_range(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(arr)
}

/// array_pop_front SQL function
pub fn array_pop_front(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let (key, extra_key) = general_array_pop(list_array, false)?;

define_array_slice(
list_array,
&Int64Array::from(key),
&Int64Array::from(extra_key),
false,
)
let from_array = Int64Array::from(vec![2; list_array.len()]);
let to_array = Int64Array::from(
list_array
.iter()
.map(|arr| arr.map_or(0, |arr| arr.len() as i64))
.collect::<Vec<i64>>(),
);
let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
array_slice(args.as_slice())
}

/// Array_append SQL function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ mod tests {
.evaluate(&batch)?
.into_array(batch.num_rows())
.expect("Failed to convert to array");
assert!(result.is_null(0));
assert!(result.is_empty());
Ok(())
}

Expand Down

0 comments on commit 182a37e

Please sign in to comment.