Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Jan 10, 2024
1 parent cd52aac commit 60f4d2a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 79 deletions.
37 changes: 7 additions & 30 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,26 +545,16 @@ impl std::hash::Hash for ScalarValue {
FixedSizeBinary(_, v) => v.hash(state),
LargeBinary(v) => v.hash(state),
List(arr) => {
hash_list(arr.to_owned() as ArrayRef, state);
hash_nested_array(arr.to_owned() as ArrayRef, state);
}
LargeList(arr) => {
hash_list(arr.to_owned() as ArrayRef, state);
hash_nested_array(arr.to_owned() as ArrayRef, state);
}
FixedSizeList(arr) => {
hash_list(arr.to_owned() as ArrayRef, state);
hash_nested_array(arr.to_owned() as ArrayRef, state);
}
Struct(arr) => {
hash_list(arr.to_owned() as ArrayRef, state);
}
// TODO: Merge with List
Struct(struct_arr) => {
let arrays = vec![struct_arr.to_owned() as ArrayRef];
let hashes_buffer = &mut vec![0; struct_arr.len()];
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
let hashes =
create_hashes(&arrays, &random_state, hashes_buffer).unwrap();
// Hash back to std::hash::Hasher
hashes.hash(state);
hash_nested_array(arr.to_owned() as ArrayRef, state);
}
Date32(v) => v.hash(state),
Date64(v) => v.hash(state),
Expand Down Expand Up @@ -593,7 +583,7 @@ impl std::hash::Hash for ScalarValue {
}
}

fn hash_list<H: std::hash::Hasher>(arr: ArrayRef, state: &mut H) {
fn hash_nested_array<H: std::hash::Hasher>(arr: ArrayRef, state: &mut H) {
let arrays = vec![arr.to_owned()];
let hashes_buffer = &mut vec![0; arr.len()];
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
Expand Down Expand Up @@ -1952,14 +1942,6 @@ impl ScalarValue {
ScalarValue::Struct(arr) => {
Self::list_to_array_of_size(arr.as_ref() as &dyn Array, size)?
}
ScalarValue::Struct(arr) => {
let arrays = std::iter::repeat(arr.as_ref())
.take(size)
.map(|a| a as &dyn Array)
.collect::<Vec<_>>();
arrow::compute::concat(arrays.as_slice())
.map_err(|e| arrow_datafusion_err!(e))?
}
ScalarValue::Date32(e) => {
build_array_from_option!(Date32, Date32Array, e, size)
}
Expand Down Expand Up @@ -2514,10 +2496,6 @@ impl ScalarValue {
ScalarValue::Struct(arr) => {
Self::eq_array_list(&(arr.to_owned() as ArrayRef), array, index)
}
ScalarValue::Struct(arr) => {
let right = array.slice(index, 1);
arr.as_ref().eq(right.as_struct())
}
ScalarValue::Date32(val) => {
eq_array_primitive!(array, index, Date32Array, val)?
}
Expand Down Expand Up @@ -2975,7 +2953,8 @@ impl TryFrom<&DataType> for ScalarValue {
new_null_array(&DataType::Struct(fields.to_owned()), 1)
.as_struct()
.to_owned()
.into()),
.into(),
),
DataType::Null => ScalarValue::Null,
_ => {
return _not_impl_err!(
Expand Down Expand Up @@ -3042,7 +3021,6 @@ impl fmt::Display for ScalarValue {
ScalarValue::List(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::LargeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::FixedSizeList(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::Struct(arr) => fmt_list(arr.to_owned() as ArrayRef, f)?,
ScalarValue::Date32(e) => format_option!(f, e)?,
ScalarValue::Date64(e) => format_option!(f, e)?,
ScalarValue::Time32Second(e) => format_option!(f, e)?,
Expand Down Expand Up @@ -3078,7 +3056,6 @@ impl fmt::Display for ScalarValue {
let sv = ScalarValue::Struct(Arc::new(
column.as_struct().to_owned(),
));

format!("{}:{sv}", field.name())
} else {
let sv = ScalarValue::try_from_array(column, 0).unwrap();
Expand Down
62 changes: 13 additions & 49 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,61 +1158,18 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
Value::LargeUtf8Value(s.to_owned())
})
}
// ScalarValue::List / FixedSizeList / LargeList are serialized using
// Arrow IPC messages as a single column RecordBatch
ScalarValue::List(arr) => {
encode_scalar_list_value(arr.to_owned() as ArrayRef, val)
encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
}
ScalarValue::LargeList(arr) => {
// Wrap in a "field_name" column
encode_scalar_list_value(arr.to_owned() as ArrayRef, val)
encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
}
ScalarValue::FixedSizeList(arr) => {
encode_scalar_list_value(arr.to_owned() as ArrayRef, val)
encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
}

// TODO: Merge with ScalarValue::List / FixedSizeList / LargeList
// ScalarValue::List and ScalarValue::FixedSizeList are serialized using
// Arrow IPC messages as a single column RecordBatch
ScalarValue::Struct(arr) => {
let arr = arr.to_owned() as ArrayRef;
// Wrap in a "field_name" column
let batch = RecordBatch::try_from_iter(vec![(
"field_name",
arr.to_owned(),
)])
.map_err(|e| {
Error::General( format!("Error creating temporary batch while encoding ScalarValue::List: {e}"))
})?;

let gen = IpcDataGenerator {};
let mut dict_tracker = DictionaryTracker::new(false);
let (_, encoded_message) = gen
.encoded_batch(&batch, &mut dict_tracker, &Default::default())
.map_err(|e| {
Error::General(format!(
"Error encoding ScalarValue::List as IPC: {e}"
))
})?;

let schema: protobuf::Schema = batch.schema().try_into()?;

let scalar_list_value = protobuf::ScalarNestedValue {
ipc_message: encoded_message.ipc_message,
arrow_data: encoded_message.arrow_data,
schema: Some(schema),
};

match val {
ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::StructValue(
scalar_list_value,
)),
}),
_ => unreachable!(),
}
encode_scalar_nested_value(arr.to_owned() as ArrayRef, val)
}

ScalarValue::Date32(val) => {
create_proto_scalar(val.as_ref(), &data_type, |s| Value::Date32Value(*s))
}
Expand Down Expand Up @@ -1701,7 +1658,9 @@ fn create_proto_scalar<I, T: FnOnce(&I) -> protobuf::scalar_value::Value>(
Ok(protobuf::ScalarValue { value: Some(value) })
}

fn encode_scalar_list_value(
// ScalarValue::List / FixedSizeList / LargeList / Struct are serialized using
// Arrow IPC messages as a single column RecordBatch
fn encode_scalar_nested_value(
arr: ArrayRef,
val: &ScalarValue,
) -> Result<protobuf::ScalarValue, Error> {
Expand All @@ -1721,7 +1680,7 @@ fn encode_scalar_list_value(

let schema: protobuf::Schema = batch.schema().try_into()?;

let scalar_list_value = protobuf::ScalarListValue {
let scalar_list_value = protobuf::ScalarNestedValue {
ipc_message: encoded_message.ipc_message,
arrow_data: encoded_message.arrow_data,
schema: Some(schema),
Expand All @@ -1741,6 +1700,11 @@ fn encode_scalar_list_value(
scalar_list_value,
)),
}),
ScalarValue::Struct(_) => Ok(protobuf::ScalarValue {
value: Some(protobuf::scalar_value::Value::StructValue(
scalar_list_value,
)),
}),
_ => unreachable!(),
}
}

0 comments on commit 60f4d2a

Please sign in to comment.