Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 170 additions & 28 deletions parquet-variant-compute/src/cast_to_variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use crate::type_conversion::{
Expand All @@ -39,7 +40,7 @@ use arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_s_to_datetime,
timestamp_us_to_datetime,
};
use arrow_schema::{ArrowError, DataType, TimeUnit};
use arrow_schema::{ArrowError, DataType, TimeUnit, UnionFields};
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use parquet_variant::{
Variant, VariantBuilder, VariantDecimal16, VariantDecimal4, VariantDecimal8,
Expand Down Expand Up @@ -379,6 +380,9 @@ pub fn cast_to_variant(input: &dyn Array) -> Result<VariantArray, ArrowError> {
builder.append_variant(variant);
}
}
DataType::Union(fields, _) => {
convert_union(fields, input, &mut builder)?;
}
DataType::Date32 => {
generic_conversion_array!(
Date32Type,
Expand All @@ -398,9 +402,9 @@ pub fn cast_to_variant(input: &dyn Array) -> Result<VariantArray, ArrowError> {
);
}
DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
DataType::Int16 => process_run_end_encoded::<Int16Type>(input, &mut builder)?,
DataType::Int32 => process_run_end_encoded::<Int32Type>(input, &mut builder)?,
DataType::Int64 => process_run_end_encoded::<Int64Type>(input, &mut builder)?,
DataType::Int16 => convert_run_end_encoded::<Int16Type>(input, &mut builder)?,
DataType::Int32 => convert_run_end_encoded::<Int32Type>(input, &mut builder)?,
DataType::Int64 => convert_run_end_encoded::<Int64Type>(input, &mut builder)?,
_ => {
return Err(ArrowError::CastError(format!(
"Unsupported run ends type: {:?}",
Expand All @@ -409,25 +413,7 @@ pub fn cast_to_variant(input: &dyn Array) -> Result<VariantArray, ArrowError> {
}
},
DataType::Dictionary(_, _) => {
let dict_array = input.as_any_dictionary();
let values_variant_array = cast_to_variant(dict_array.values().as_ref())?;
let normalized_keys = dict_array.normalized_keys();
let keys = dict_array.keys();

for (i, key_idx) in normalized_keys.iter().enumerate() {
if keys.is_null(i) {
builder.append_null();
continue;
}

if values_variant_array.is_null(*key_idx) {
builder.append_null();
continue;
}

let value = values_variant_array.value(*key_idx);
builder.append_variant(value);
}
convert_dictionary_encoded(input, &mut builder)?;
}

DataType::Map(field, _) => match field.data_type() {
Expand Down Expand Up @@ -559,8 +545,45 @@ pub fn cast_to_variant(input: &dyn Array) -> Result<VariantArray, ArrowError> {
Ok(builder.build())
}

/// Generic function to process run-end encoded arrays
fn process_run_end_encoded<R: RunEndIndexType>(
/// Convert union arrays
fn convert_union(
fields: &UnionFields,
input: &dyn Array,
builder: &mut VariantArrayBuilder,
) -> Result<(), ArrowError> {
let union_array = input.as_union();

// Convert each child array to variant arrays
let mut child_variant_arrays = HashMap::new();
for (type_id, _) in fields.iter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to merge the two passes into one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you are suggesting

Copy link
Contributor Author

@liamzwbao liamzwbao Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean using one loop instead of two? That way we will compute the child array of each type_id on demand.

But IIUC, we will use all the child arrays anyway if it's a valid union, and I think precomute all the child arrays may benefit the lookup in the second loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, we'll use the same child_variant_array many times. The current would have a better performance

let child_array = union_array.child(type_id);
let child_variant_array = cast_to_variant(child_array.as_ref())?;
child_variant_arrays.insert(type_id, child_variant_array);
}

// Process each element in the union array
for i in 0..union_array.len() {
let type_id = union_array.type_id(i);
let value_offset = union_array.value_offset(i);

if let Some(child_variant_array) = child_variant_arrays.get(&type_id) {
if child_variant_array.is_null(value_offset) {
builder.append_null();
} else {
let value = child_variant_array.value(value_offset);
builder.append_variant(value);
}
} else {
// This should not happen in a valid union, but handle gracefully
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

builder.append_null();
}
}

Ok(())
}

/// Generic function to convert run-end encoded arrays
fn convert_run_end_encoded<R: RunEndIndexType>(
input: &dyn Array,
builder: &mut VariantArrayBuilder,
) -> Result<(), ArrowError> {
Expand Down Expand Up @@ -594,6 +617,34 @@ fn process_run_end_encoded<R: RunEndIndexType>(
Ok(())
}

/// Convert dictionary encoded arrays
fn convert_dictionary_encoded(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change just moves the code from the Dictionary branch into this helper function. Since all the impls use slightly different coding styles, we could do a follow-up PR to make them consistent once all the variant cast implementations are complete.

input: &dyn Array,
builder: &mut VariantArrayBuilder,
) -> Result<(), ArrowError> {
let dict_array = input.as_any_dictionary();
let values_variant_array = cast_to_variant(dict_array.values().as_ref())?;
let normalized_keys = dict_array.normalized_keys();
let keys = dict_array.keys();

for (i, key_idx) in normalized_keys.iter().enumerate() {
if keys.is_null(i) {
builder.append_null();
continue;
}

if values_variant_array.is_null(*key_idx) {
builder.append_null();
continue;
}

let value = values_variant_array.value(*key_idx);
builder.append_variant(value);
}

Ok(())
}

// TODO do we need a cast_with_options to allow specifying conversion behavior,
// e.g. how to handle overflows, whether to convert to Variant::Null or return
// an error, etc. ?
Expand All @@ -609,10 +660,10 @@ mod tests {
LargeStringArray, ListArray, MapArray, NullArray, StringArray, StringRunBuilder,
StringViewArray, StructArray, Time32MillisecondArray, Time32SecondArray,
Time64MicrosecondArray, Time64NanosecondArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array,
UInt8Array, UnionArray,
};
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow_schema::{Field, Fields};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field, Fields, UnionFields};
use arrow_schema::{
DECIMAL128_MAX_PRECISION, DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION,
};
Expand Down Expand Up @@ -1637,6 +1688,97 @@ mod tests {
assert_eq!(obj4.get("age"), None);
}

#[test]
fn test_cast_to_variant_union_sparse() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we please add a test for a UnionArray where the child element is null? So that the output VariantArray has a null as well?

// Create a sparse union array with mixed types (int, float, string)
let int_array = Int32Array::from(vec![Some(1), None, None, None, Some(34), None]);
let float_array = Float64Array::from(vec![None, Some(3.2), None, Some(32.5), None, None]);
let string_array = StringArray::from(vec![None, None, Some("hello"), None, None, None]);
let type_ids = [0, 1, 2, 1, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();

let union_fields = UnionFields::new(
vec![0, 1, 2],
vec![
Field::new("int_field", DataType::Int32, false),
Field::new("float_field", DataType::Float64, false),
Field::new("string_field", DataType::Utf8, false),
],
);

let children: Vec<Arc<dyn Array>> = vec![
Arc::new(int_array),
Arc::new(float_array),
Arc::new(string_array),
];

let union_array = UnionArray::try_new(
union_fields,
type_ids,
None, // Sparse union
children,
)
.unwrap();

run_test(
Arc::new(union_array),
vec![
Some(Variant::Int32(1)),
Some(Variant::Double(3.2)),
Some(Variant::from("hello")),
Some(Variant::Double(32.5)),
Some(Variant::Int32(34)),
None,
],
);
}

#[test]
fn test_cast_to_variant_union_dense() {
// Create a dense union array with mixed types (int, float, string)
let int_array = Int32Array::from(vec![Some(1), Some(34), None]);
let float_array = Float64Array::from(vec![3.2, 32.5]);
let string_array = StringArray::from(vec!["hello"]);
let type_ids = [0, 1, 2, 1, 0, 0].into_iter().collect::<ScalarBuffer<i8>>();
let offsets = [0, 0, 0, 1, 1, 2]
.into_iter()
.collect::<ScalarBuffer<i32>>();

let union_fields = UnionFields::new(
vec![0, 1, 2],
vec![
Field::new("int_field", DataType::Int32, false),
Field::new("float_field", DataType::Float64, false),
Field::new("string_field", DataType::Utf8, false),
],
);

let children: Vec<Arc<dyn Array>> = vec![
Arc::new(int_array),
Arc::new(float_array),
Arc::new(string_array),
];

let union_array = UnionArray::try_new(
union_fields,
type_ids,
Some(offsets), // Dense union
children,
)
.unwrap();

run_test(
Arc::new(union_array),
vec![
Some(Variant::Int32(1)),
Some(Variant::Double(3.2)),
Some(Variant::from("hello")),
Some(Variant::Double(32.5)),
Some(Variant::Int32(34)),
None,
],
);
}

#[test]
fn test_cast_to_variant_struct_with_nulls() {
// Test struct with null values at the struct level
Expand Down
Loading