Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eq for struct #5423

Closed
wants to merge 8 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 158 additions & 2 deletions arrow-ord/src/cmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,60 @@ fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> Result<BooleanArray,
let r = r_v.map(|x| x.values().as_ref()).unwrap_or(r);
let r_t = r.data_type();

if l_t != r_t || l_t.is_nested() {
if l_t != r_t {
return Err(ArrowError::InvalidArgumentError(format!(
"Invalid comparison operation: {l_t} {op} {r_t}"
)));
}

if let Struct(_) = l_t {
let l = l.as_struct();
let r = r.as_struct();
assert_eq!(l.num_columns(), r.num_columns());

match op {
Op::Equal => {
let cmp_res = (0..l.num_columns()).try_fold(
BooleanArray::new(BooleanBuffer::new_set(len), None),
|res, i| {
let col_l = l.column(i);
let col_r = r.column(i);
let eq_rows = compare_op(op, col_l, col_r)?;
let nulls = NullBuffer::union(res.nulls(), eq_rows.nulls());
let vals = res.values() & eq_rows.values();
Ok::<BooleanArray, ArrowError>(BooleanArray::new(vals, nulls))
},
)?;

// null handling
// if both null, then true
// if one is null, then false
// else compare the values
let bools: Vec<bool> = (0..l.len())
.map(|i| {
let is_l_null = l.is_null(i);
let is_r_null = r.is_null(i);
if is_l_null && is_r_null {
true
} else if !is_l_null && !is_r_null {
cmp_res.value(i)
} else {
false
}
})
.collect();

return Ok(BooleanArray::from(bools));
}
_ => {
return Err(ArrowError::NotYetImplemented(format!(
"Op for struct is not yet implemented: {op}"
)));
}
}
}

if l_t.is_nested() {
return Err(ArrowError::InvalidArgumentError(format!(
"Invalid comparison operation: {l_t} {op} {r_t}"
)));
Expand Down Expand Up @@ -544,10 +597,113 @@ impl<'a> ArrayOrd for &'a FixedSizeBinaryArray {
mod tests {
use std::sync::Arc;

use arrow_array::{DictionaryArray, Int32Array, Scalar, StringArray};
use arrow_array::{
DictionaryArray, Float32Array, Int32Array, Scalar, StringArray, StructArray,
};
use arrow_buffer::Buffer;
use arrow_schema::{DataType, Field};

use super::*;

#[test]
fn test_struct_eq() {
let field_a = Arc::new(Field::new("a", DataType::Int32, true));
let field_b = Arc::new(Field::new("b", DataType::Float32, true));
let s1 = StructArray::from(vec![
(
field_a.clone(),
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as Arc<dyn Array>,
),
(
field_b.clone(),
Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0])) as Arc<dyn Array>,
),
]);

let s2 = StructArray::from(vec![
(
field_a.clone(),
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as Arc<dyn Array>,
),
(
field_b.clone(),
Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0])) as Arc<dyn Array>,
),
]);

let res = eq(&s1, &s2).unwrap();
assert_eq!(res, BooleanArray::from(vec![true; 5]));

let s1 = StructArray::from(vec![
(
field_a.clone(),
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])) as Arc<dyn Array>,
),
(
field_b.clone(),
Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0])) as Arc<dyn Array>,
),
]);
let s2 = StructArray::from(vec![
(
field_a,
Arc::new(Int32Array::from(vec![10, 20, 30, 4, 5])) as Arc<dyn Array>,
),
(
field_b,
Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0])) as Arc<dyn Array>,
),
]);

let res = eq(&s1, &s2).unwrap();
assert_eq!(
res,
BooleanArray::from(vec![false, false, false, true, true])
);
}

#[test]
fn test_struct_eq_with_nulls() {
let field_a = Arc::new(Field::new("a", DataType::Int32, true));
let field_b = Arc::new(Field::new("b", DataType::Float32, true));

let s1 = StructArray::from((
vec![
(
field_a.clone(),
Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn Array>,
),
(
field_b.clone(),
Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0])) as Arc<dyn Array>,
),
],
// 4(=b100), (null, null, valid)
Buffer::from(&[4]),
));

let s2 = StructArray::from((
vec![
(
field_a,
Arc::new(Int32Array::from(vec![1, 2, 3])) as Arc<dyn Array>,
),
(
field_b,
Arc::new(Float32Array::from(vec![1.1, 2.0, 3.1])) as Arc<dyn Array>,
),
],
// 6(=b110), (null, valid, valid)
Buffer::from(&[6]),
));

let res = eq(&s1, &s2).unwrap();
// both 1st row are null, so true
// 2nd row, 1st column is null, but 2nd column is valid, so false
// both 3rd row are valid, so compare the value
assert_eq!(res, BooleanArray::from(vec![true, false, false]));
}

#[test]
fn test_null_dict() {
let a = DictionaryArray::new(Int32Array::new_null(10), Arc::new(Int32Array::new_null(0)));
Expand Down
Loading