Skip to content

Commit

Permalink
Preserve field name in ScalarValue::List (#2893)
Browse files Browse the repository at this point in the history
* scalar list data type

* list element name

* remove unused code

* remove unused logging

* Reenabled commented test

* Formatting

* merge

* Update datafusion/common/src/scalar.rs

* Update datafusion/proto/src/to_proto.rs

* Update datafusion/proto/src/to_proto.rs

* fix fmt

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
comphead and alamb authored Jul 18, 2022
1 parent 305e265 commit 90e5fd0
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 295 deletions.
101 changes: 57 additions & 44 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub enum ScalarValue {
/// large binary
LargeBinary(Option<Vec<u8>>),
/// list of nested ScalarValue
List(Option<Vec<ScalarValue>>, Box<DataType>),
List(Option<Vec<ScalarValue>>, Box<Field>),
/// Date stored as a signed 32bit int days since UNIX epoch 1970-01-01
Date32(Option<i32>),
/// Date stored as a signed 64bit int milliseconds since UNIX epoch 1970-01-01
Expand Down Expand Up @@ -651,9 +651,9 @@ impl ScalarValue {
ScalarValue::LargeUtf8(_) => DataType::LargeUtf8,
ScalarValue::Binary(_) => DataType::Binary,
ScalarValue::LargeBinary(_) => DataType::LargeBinary,
ScalarValue::List(_, data_type) => DataType::List(Box::new(Field::new(
ScalarValue::List(_, field) => DataType::List(Box::new(Field::new(
"item",
data_type.as_ref().clone(),
field.data_type().clone(),
true,
))),
ScalarValue::Date32(_) => DataType::Date32,
Expand Down Expand Up @@ -1300,7 +1300,7 @@ impl ScalarValue {
.collect::<LargeBinaryArray>(),
),
},
ScalarValue::List(values, data_type) => Arc::new(match data_type.as_ref() {
ScalarValue::List(values, field) => Arc::new(match field.data_type() {
DataType::Boolean => build_list!(BooleanBuilder, Boolean, values, size),
DataType::Int8 => build_list!(Int8Builder, Int8, values, size),
DataType::Int16 => build_list!(Int16Builder, Int16, values, size),
Expand All @@ -1323,7 +1323,7 @@ impl ScalarValue {
repeat(self.clone()).take(size),
&DataType::List(Box::new(Field::new(
"item",
data_type.as_ref().clone(),
field.data_type().clone(),
true,
))),
)
Expand Down Expand Up @@ -1463,8 +1463,10 @@ impl ScalarValue {
Some(scalar_vec)
}
};
let data_type = nested_type.data_type().clone();
ScalarValue::List(value, Box::new(data_type))
ScalarValue::List(
value,
Box::new(Field::new("item", nested_type.data_type().clone(), true)),
)
}
DataType::Date32 => {
typed_cast!(array, index, Date32Array, Date32)
Expand Down Expand Up @@ -1564,8 +1566,10 @@ impl ScalarValue {
Some(scalar_vec)
}
};
let data_type = nested_type.data_type().clone();
ScalarValue::List(value, Box::new(data_type))
ScalarValue::List(
value,
Box::new(Field::new("item", nested_type.data_type().clone(), true)),
)
}
other => {
return Err(DataFusionError::NotImplemented(format!(
Expand Down Expand Up @@ -1898,9 +1902,10 @@ impl TryFrom<&DataType> for ScalarValue {
index_type.clone(),
Box::new(value_type.as_ref().try_into()?),
),
DataType::List(ref nested_type) => {
ScalarValue::List(None, Box::new(nested_type.data_type().clone()))
}
DataType::List(ref nested_type) => ScalarValue::List(
None,
Box::new(Field::new("item", nested_type.data_type().clone(), true)),
),
DataType::Struct(fields) => {
ScalarValue::Struct(None, Box::new(fields.clone()))
}
Expand Down Expand Up @@ -2248,8 +2253,11 @@ mod tests {

#[test]
fn scalar_list_null_to_array() {
let list_array_ref =
ScalarValue::List(None, Box::new(DataType::UInt64)).to_array();
let list_array_ref = ScalarValue::List(
None,
Box::new(Field::new("item", DataType::UInt64, false)),
)
.to_array();
let list_array = list_array_ref.as_any().downcast_ref::<ListArray>().unwrap();

assert!(list_array.is_null(0));
Expand All @@ -2265,7 +2273,7 @@ mod tests {
ScalarValue::UInt64(None),
ScalarValue::UInt64(Some(101)),
]),
Box::new(DataType::UInt64),
Box::new(Field::new("item", DataType::UInt64, false)),
)
.to_array();

Expand Down Expand Up @@ -2747,35 +2755,35 @@ mod tests {
assert_eq!(
List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
)
.partial_cmp(&List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
)),
Some(Ordering::Equal)
);

assert_eq!(
List(
Some(vec![Int32(Some(10)), Int32(Some(5))]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
)
.partial_cmp(&List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
)),
Some(Ordering::Greater)
);

assert_eq!(
List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
)
.partial_cmp(&List(
Some(vec![Int32(Some(10)), Int32(Some(5))]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
)),
Some(Ordering::Less)
);
Expand All @@ -2784,11 +2792,11 @@ mod tests {
assert_eq!(
List(
Some(vec![Int64(Some(1)), Int64(Some(5))]),
Box::new(DataType::Int64),
Box::new(Field::new("item", DataType::Int64, false)),
)
.partial_cmp(&List(
Some(vec![Int32(Some(1)), Int32(Some(5))]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
)),
None
);
Expand Down Expand Up @@ -3011,17 +3019,17 @@ mod tests {
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
);

let l1 = ScalarValue::List(
Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
);

let l2 = ScalarValue::List(
Some(vec![ScalarValue::from(6i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, false)),
);

// Define struct scalars
Expand Down Expand Up @@ -3066,13 +3074,18 @@ mod tests {
// Define list-of-structs scalars
let nl0 = ScalarValue::List(
Some(vec![s0.clone(), s1.clone()]),
Box::new(s0.get_datatype()),
Box::new(Field::new("item", s0.get_datatype(), true)),
);

let nl1 = ScalarValue::List(Some(vec![s2]), Box::new(s0.get_datatype()));

let nl2 = ScalarValue::List(Some(vec![s1]), Box::new(s0.get_datatype()));
let nl1 = ScalarValue::List(
Some(vec![s2]),
Box::new(Field::new("item", s0.get_datatype(), true)),
);

let nl2 = ScalarValue::List(
Some(vec![s1]),
Box::new(Field::new("item", s0.get_datatype(), true)),
);
// iter_to_array for list-of-struct
let array = ScalarValue::iter_to_array(vec![nl0, nl1, nl2]).unwrap();
let array = array.as_any().downcast_ref::<ListArray>().unwrap();
Expand Down Expand Up @@ -3222,48 +3235,48 @@ mod tests {
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
),
]),
Box::new(DataType::List(Box::new(Field::new(
Box::new(Field::new(
"item",
DataType::Int32,
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
)))),
)),
);

let l2 = ScalarValue::List(
Some(vec![
ScalarValue::List(
Some(vec![ScalarValue::from(6i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
),
]),
Box::new(DataType::List(Box::new(Field::new(
Box::new(Field::new(
"item",
DataType::Int32,
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
)))),
)),
);

let l3 = ScalarValue::List(
Some(vec![ScalarValue::List(
Some(vec![ScalarValue::from(9i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
)]),
Box::new(DataType::List(Box::new(Field::new(
Box::new(Field::new(
"item",
DataType::Int32,
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
)))),
)),
);

let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();
Expand Down
38 changes: 19 additions & 19 deletions datafusion/physical-expr/src/aggregate/array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl Accumulator for ArrayAggAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::List(
Some(self.values.clone()),
Box::new(self.datatype.clone()),
Box::new(Field::new("item", self.datatype.clone(), true)),
))
}
}
Expand Down Expand Up @@ -179,7 +179,7 @@ mod tests {
ScalarValue::Int32(Some(4)),
ScalarValue::Int32(Some(5)),
]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
);

generic_test_op!(a, DataType::Int32, ArrayAgg, list, DataType::Int32)
Expand All @@ -195,57 +195,57 @@ mod tests {
ScalarValue::from(2i32),
ScalarValue::from(3i32),
]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(4i32), ScalarValue::from(5i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
),
]),
Box::new(DataType::List(Box::new(Field::new(
Box::new(Field::new(
"item",
DataType::Int32,
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
)))),
)),
);

let l2 = ScalarValue::List(
Some(vec![
ScalarValue::List(
Some(vec![ScalarValue::from(6i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
),
ScalarValue::List(
Some(vec![ScalarValue::from(7i32), ScalarValue::from(8i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
),
]),
Box::new(DataType::List(Box::new(Field::new(
Box::new(Field::new(
"item",
DataType::Int32,
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
)))),
)),
);

let l3 = ScalarValue::List(
Some(vec![ScalarValue::List(
Some(vec![ScalarValue::from(9i32)]),
Box::new(DataType::Int32),
Box::new(Field::new("item", DataType::Int32, true)),
)]),
Box::new(DataType::List(Box::new(Field::new(
Box::new(Field::new(
"item",
DataType::Int32,
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
)))),
)),
);

let list = ScalarValue::List(
Some(vec![l1.clone(), l2.clone(), l3.clone()]),
Box::new(DataType::List(Box::new(Field::new(
Box::new(Field::new(
"item",
DataType::Int32,
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
true,
)))),
)),
);

let array = ScalarValue::iter_to_array(vec![l1, l2, l3]).unwrap();
Expand Down
Loading

0 comments on commit 90e5fd0

Please sign in to comment.