Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read and write Decimal128 to Avro #837

Merged
merged 5 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,41 @@ fn deserialize_value<'a>(
.unwrap();
array.push(Some(value))
}
PrimitiveType::Int128 => {
let avro_inner = match avro_field {
AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field,
AvroSchema::Union(u) => match &u.as_slice() {
&[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e,
_ => unreachable!(),
},
_ => unreachable!(),
};
let len = match avro_inner {
AvroSchema::Bytes(_) => {
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?
}
AvroSchema::Fixed(b) => b.size,
_ => unreachable!(),
};
if len > 16 {
return Err(ArrowError::ExternalFormat(
"Avro decimal bytes return more than 16 bytes".to_string(),
));
}
let mut bytes = [0u8; 16];
bytes[..len].copy_from_slice(&block[..len]);
block = &block[len..];
let data = i128::from_be_bytes(bytes) >> (8 * (16 - len));
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i128>>()
.unwrap();
array.push(Some(data as i128))
}
_ => unreachable!(),
},
PhysicalType::Utf8 => {
Expand Down Expand Up @@ -353,6 +388,28 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) ->
PrimitiveType::MonthDayNano => {
block = &block[12..];
}
PrimitiveType::Int128 => {
let avro_inner = match avro_field {
AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field,
AvroSchema::Union(u) => match &u.as_slice() {
&[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e,
_ => unreachable!(),
},
_ => unreachable!(),
};
let len = match avro_inner {
AvroSchema::Bytes(_) => {
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?
}
AvroSchema::Fixed(b) => b.size,
_ => unreachable!(),
};
block = &block[len..];
}
_ => unreachable!(),
},
PhysicalType::Utf8 | PhysicalType::Binary => {
Expand Down
4 changes: 3 additions & 1 deletion src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use avro_schema::{
Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Schema as AvroSchema,
BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical,
Schema as AvroSchema,
};

use crate::datatypes::*;
Expand Down Expand Up @@ -54,6 +55,7 @@ fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
AvroSchema::Fixed(fixed)
}
DataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)),
DataType::Decimal(p, s) => AvroSchema::Bytes(Some(BytesLogical::Decimal(*p, *s))),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"write {:?} to avro",
Expand Down
34 changes: 34 additions & 0 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,40 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Bytes(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i128>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.values().iter(),
|x, buf| {
let len = ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Union(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i128>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
let len =
((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
}
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::MonthDayNano), AvroSchema::Fixed(_)) => {
let values = array
.as_any()
Expand Down
22 changes: 19 additions & 3 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use arrow2::chunk::Chunk;
use avro_rs::types::{Record, Value};
use avro_rs::{Codec, Writer};
use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema};
use avro_rs::{Days, Decimal, Duration, Millis, Months, Schema as AvroSchema};

use arrow2::array::*;
use arrow2::datatypes::*;
Expand Down Expand Up @@ -47,7 +47,8 @@ pub(super) fn schema() -> (AvroSchema, Schema) {
"type": "enum",
"name": "",
"symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}}
}},
{"name": "decimal", "type": {"type": "bytes", "logicalType": "decimal", "precision": 18, "scale": 5}}
]
}
"#;
Expand Down Expand Up @@ -76,6 +77,7 @@ pub(super) fn schema() -> (AvroSchema, Schema) {
DataType::Dictionary(i32::KEY_TYPE, Box::new(DataType::Utf8), false),
false,
),
Field::new("decimal", DataType::Decimal(18, 5), false),
]);

(AvroSchema::parse_str(raw_schema).unwrap(), schema)
Expand Down Expand Up @@ -109,6 +111,10 @@ pub(super) fn data() -> Chunk<Arc<dyn Array>> {
Int32Array::from_slice([1, 0]),
Arc::new(Utf8Array::<i32>::from_slice(["SPADES", "HEARTS"])),
)),
Arc::new(
PrimitiveArray::<i128>::from_slice([12345678i128, -12345678i128])
.to(DataType::Decimal(18, 5)),
),
];

Chunk::try_new(columns).unwrap()
Expand Down Expand Up @@ -142,6 +148,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
Value::Record(vec![("e".to_string(), Value::Double(1.0f64))]),
);
record.put("enum", Value::Enum(1, "HEARTS".to_string()));
record.put(
"decimal",
Value::Decimal(Decimal::from(&[0u8, 188u8, 97u8, 78u8])),
);
record.put(
"duration",
Value::Duration(Duration::new(Months::new(1), Days::new(1), Millis::new(1))),
Expand Down Expand Up @@ -170,6 +180,12 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
Value::Record(vec![("e".to_string(), Value::Double(2.0f64))]),
);
record.put("enum", Value::Enum(0, "SPADES".to_string()));
record.put(
"decimal",
Value::Decimal(Decimal::from(&[
255u8, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 67, 158, 178,
])),
);
writer.append(record)?;
Ok(writer.into_inner().unwrap())
}
Expand Down Expand Up @@ -260,6 +276,6 @@ fn test_projected(projection: Vec<bool>) -> Result<()> {
#[test]
fn read_projected() -> Result<()> {
test_projected(vec![
true, false, false, false, false, false, false, false, false, false, false,
true, false, false, false, false, false, false, false, false, false, false, false,
])
}