Skip to content

Commit ec40c7f

Browse files
committed
Convert RunEndEncoded field to Parquet
1 parent 04f217b commit ec40c7f

File tree

5 files changed

+133
-21
lines changed

5 files changed

+133
-21
lines changed

arrow-schema/src/datatype.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ pub enum DataType {
354354
/// that contain many repeated values using less memory, but with
355355
/// a higher CPU overhead for some operations.
356356
///
357-
/// This type mostly used to represent low cardinality string
357+
/// This type is mostly used to represent low cardinality string
358358
/// arrays or a limited set of primitive types as integers.
359359
Dictionary(Box<DataType>, Box<DataType>),
360360
/// Exact 32-bit width decimal value with precision and scale

parquet/src/arrow/arrow_writer/byte_array.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::util::bit_util::num_required_bits;
2828
use crate::util::interner::{Interner, Storage};
2929
use arrow_array::{
3030
Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
31-
LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
31+
LargeBinaryArray, LargeStringArray, RunArray, StringArray, StringViewArray,
3232
};
3333
use arrow_schema::DataType;
3434

@@ -59,6 +59,28 @@ macro_rules! downcast_dict_op {
5959
};
6060
}
6161

62+
macro_rules! downcast_ree_impl {
63+
($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
64+
$op($array
65+
.as_any()
66+
.downcast_ref::<RunArray<arrow_array::types::$key>>()
67+
.unwrap()
68+
.downcast::<$val>()
69+
.unwrap()$(, $arg)*)
70+
}};
71+
}
72+
73+
macro_rules! downcast_ree_op {
74+
($run_end_field:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
75+
match $run_end_field.data_type() {
76+
DataType::Int16 => downcast_ree_impl!($array, Int16Type, $val, $op$(, $arg)*),
77+
DataType::Int32 => downcast_ree_impl!($array, Int32Type, $val, $op$(, $arg)*),
78+
DataType::Int64 => downcast_ree_impl!($array, Int64Type, $val, $op$(, $arg)*),
79+
_ => unreachable!(),
80+
}
81+
};
82+
}
83+
6284
macro_rules! downcast_op {
6385
($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
6486
match $data_type {
@@ -90,6 +112,20 @@ macro_rules! downcast_op {
90112
}
91113
d => unreachable!("cannot downcast {} dictionary value to byte array", d),
92114
},
115+
DataType::RunEndEncoded(run_end, value) => match value.data_type() {
116+
DataType::Utf8 => downcast_ree_op!(run_end, StringArray, $array, $op$(, $arg)*),
117+
DataType::LargeUtf8 => {
118+
downcast_ree_op!(run_end, LargeStringArray, $array, $op$(, $arg)*)
119+
}
120+
DataType::Binary => downcast_ree_op!(run_end, BinaryArray, $array, $op$(, $arg)*),
121+
DataType::LargeBinary => {
122+
downcast_ree_op!(run_end, LargeBinaryArray, $array, $op$(, $arg)*)
123+
}
124+
DataType::FixedSizeBinary(_) => {
125+
downcast_ree_op!(run_end, FixedSizeBinaryArray, $array, $op$(, $arg)*)
126+
}
127+
d => unreachable!("cannot downcast {} run end encoded value to byte array", d),
128+
},
93129
d => unreachable!("cannot downcast {} to byte array", d),
94130
}
95131
};

parquet/src/arrow/arrow_writer/levels.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ impl LevelInfoBuilder {
222222
_ => unreachable!(),
223223
})
224224
}
225+
DataType::RunEndEncoded(_, v) if is_leaf(v.data_type()) => {
226+
let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
227+
Ok(Self::Primitive(levels))
228+
}
225229
d => Err(nyi_err!("Datatype {} is not yet supported", d)),
226230
}
227231
}

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,15 +1033,15 @@ impl ArrowColumnWriterFactory {
10331033

10341034
match data_type {
10351035
_ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1036-
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
1036+
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1037+
out.push(col(leaves.next().unwrap())?)
1038+
}
10371039
ArrowDataType::LargeBinary
10381040
| ArrowDataType::Binary
10391041
| ArrowDataType::Utf8
10401042
| ArrowDataType::LargeUtf8
10411043
| ArrowDataType::BinaryView
1042-
| ArrowDataType::Utf8View => {
1043-
out.push(bytes(leaves.next().unwrap())?)
1044-
}
1044+
| ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
10451045
ArrowDataType::List(f)
10461046
| ArrowDataType::LargeList(f)
10471047
| ArrowDataType::FixedSizeList(f, _) => {
@@ -1058,21 +1058,30 @@ impl ArrowColumnWriterFactory {
10581058
self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
10591059
}
10601060
_ => unreachable!("invalid map type"),
1061-
}
1061+
},
10621062
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1063-
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
1064-
out.push(bytes(leaves.next().unwrap())?)
1065-
}
1063+
ArrowDataType::Utf8
1064+
| ArrowDataType::LargeUtf8
1065+
| ArrowDataType::Binary
1066+
| ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
10661067
ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
10671068
out.push(bytes(leaves.next().unwrap())?)
10681069
}
1069-
ArrowDataType::FixedSizeBinary(_) => {
1070+
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1071+
_ => out.push(col(leaves.next().unwrap())?),
1072+
},
1073+
// TODO: Don't know what I'm doing here!
1074+
ArrowDataType::RunEndEncoded(_run_ends, value_type) => match value_type.data_type() {
1075+
ArrowDataType::Utf8
1076+
| ArrowDataType::LargeUtf8
1077+
| ArrowDataType::Binary
1078+
| ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1079+
ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
10701080
out.push(bytes(leaves.next().unwrap())?)
10711081
}
1072-
_ => {
1073-
out.push(col(leaves.next().unwrap())?)
1074-
}
1075-
}
1082+
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1083+
_ => out.push(col(leaves.next().unwrap())?),
1084+
},
10761085
_ => return Err(ParquetError::NYI(
10771086
format!(
10781087
"Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
@@ -1166,6 +1175,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
11661175
write_primitive(typed, array.values(), levels)
11671176
}
11681177
},
1178+
ArrowDataType::RunEndEncoded(_run_ends, _value_type) => todo!(),
11691179
_ => {
11701180
let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
11711181
let array = array.as_primitive::<Int32Type>();
@@ -1248,6 +1258,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
12481258
write_primitive(typed, array.values(), levels)
12491259
}
12501260
},
1261+
ArrowDataType::RunEndEncoded(_run_ends, _values) => todo!(),
12511262
_ => {
12521263
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
12531264
let array = array.as_primitive::<Int64Type>();
@@ -1324,6 +1335,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
13241335
let array = column.as_primitive::<Float16Type>();
13251336
get_float_16_array_slice(array, indices)
13261337
}
1338+
ArrowDataType::RunEndEncoded(_run_ends, _values) => todo!(),
13271339
_ => {
13281340
return Err(ParquetError::NYI(
13291341
"Attempting to write an Arrow type that is not yet implemented".to_string(),
@@ -1494,6 +1506,7 @@ mod tests {
14941506
use arrow::util::pretty::pretty_format_batches;
14951507
use arrow::{array::*, buffer::Buffer};
14961508
use arrow_buffer::{i256, IntervalDayTime, IntervalMonthDayNano, NullBuffer};
1509+
use arrow_ipc::RunEndEncoded;
14971510
use arrow_schema::Fields;
14981511
use half::f16;
14991512
use num::{FromPrimitive, ToPrimitive};
@@ -4293,4 +4306,35 @@ mod tests {
42934306
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
42944307
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
42954308
}
4309+
4310+
// TODO: Remove. Just added this to compare with arrow_writer_run_end_encoded
4311+
#[test]
4312+
fn arrow_writer_string_dictionary_two() {
4313+
let mut builder = StringDictionaryBuilder::<Int32Type>::new();
4314+
builder.extend([Some("alpha"), Some("alpha"), Some("beta")]);
4315+
let dict_array = builder.finish();
4316+
println!("dict_array type: {:?}", dict_array.data_type());
4317+
let schema = Arc::new(Schema::new(vec![Field::new(
4318+
"dict",
4319+
dict_array.data_type().clone(),
4320+
dict_array.is_nullable(),
4321+
)]));
4322+
one_column_roundtrip_with_schema(Arc::new(dict_array), schema);
4323+
}
4324+
4325+
#[test]
4326+
fn arrow_writer_run_end_encoded() {
4327+
// Create a run array of strings
4328+
let mut builder = StringRunBuilder::<Int32Type>::new();
4329+
builder.extend([Some("alpha"), Some("alpha"), Some("beta")]);
4330+
let run_array: RunArray<Int32Type> = builder.finish();
4331+
println!("run_array type: {:?}", run_array.data_type());
4332+
let schema = Arc::new(Schema::new(vec![Field::new(
4333+
"ree",
4334+
run_array.data_type().clone(),
4335+
run_array.is_nullable(),
4336+
)]));
4337+
// This fails because we don't read it back as RunArray?
4338+
one_column_roundtrip_with_schema(Arc::new(run_array), schema);
4339+
}
42964340
}

parquet/src/arrow/schema/mod.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ pub fn parquet_to_arrow_field_levels(
129129
match complex::convert_schema(schema, mask, hint)? {
130130
Some(field) => match &field.arrow_type {
131131
DataType::Struct(fields) => Ok(FieldLevels {
132-
fields: fields.clone(),
132+
fields: fields.to_owned(),
133133
levels: Some(field),
134134
}),
135135
_ => unreachable!(),
@@ -303,7 +303,7 @@ impl<'a> ArrowSchemaConverter<'a> {
303303
///
304304
/// Setting this option to `true` will result in Parquet files that can be
305305
/// read by more readers, but may lose precision for Arrow types such as
306-
/// [`DataType::Date64`] which have no direct [corresponding Parquet type].
306+
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
307307
///
308308
/// By default, this converter does not coerce to native Parquet types. Enabling type
309309
/// coercion allows for meaningful representations that do not require
@@ -771,12 +771,22 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
771771
DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
772772
DataType::Dictionary(_, ref value) => {
773773
// Dictionary encoding not handled at the schema level
774-
let dict_field = field.clone().with_data_type(value.as_ref().clone());
774+
let dict_field = field.to_owned().with_data_type(value.as_ref().clone());
775+
arrow_to_parquet_type(&dict_field, coerce_types)
776+
}
777+
DataType::RunEndEncoded(_run_ends, values) => {
778+
// REE arrays store run-length encoded data with:
779+
// - run_ends: cumulative end indices for each run
780+
// - values: the actual values for each run
781+
//
782+
// For Parquet conversion, we convert REE arrays to Dictionary arrays:
783+
// 1. The logical array is materialized as dictionary indices
784+
// 2. The values array becomes the dictionary values
785+
let dict_field = field
786+
.to_owned()
787+
.with_data_type(values.data_type().to_owned());
775788
arrow_to_parquet_type(&dict_field, coerce_types)
776789
}
777-
DataType::RunEndEncoded(_, _) => Err(arrow_err!(
778-
"Converting RunEndEncodedType to parquet not supported",
779-
)),
780790
}
781791
}
782792

@@ -2272,4 +2282,22 @@ mod tests {
22722282

22732283
Ok(())
22742284
}
2285+
2286+
#[test]
2287+
fn test_run_end_encoded_conversion() {
2288+
use crate::basic::Type;
2289+
let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, false));
2290+
let values_field = Arc::new(Field::new("values", DataType::Boolean, true));
2291+
let run_end_encoded_field = Field::new(
2292+
"run_end_encoded_16",
2293+
DataType::RunEndEncoded(run_ends_field, values_field),
2294+
false,
2295+
);
2296+
2297+
let result = arrow_to_parquet_type(&run_end_encoded_field, false).unwrap();
2298+
// Should convert to the underlying value type (Boolean in this case)
2299+
assert_eq!(result.get_physical_type(), Type::BOOLEAN);
2300+
assert_eq!(result.get_basic_info().repetition(), Repetition::REQUIRED); // field is not nullable
2301+
assert_eq!(result.name(), "run_end_encoded_16");
2302+
}
22752303
}

0 commit comments

Comments
 (0)