Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Add Decimal128 to avro round trip (Read/Write) (#837)
Browse files Browse the repository at this point in the history
  • Loading branch information
potter420 authored Feb 15, 2022
1 parent 894a6ea commit 618a650
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 4 deletions.
57 changes: 57 additions & 0 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,41 @@ fn deserialize_value<'a>(
.unwrap();
array.push(Some(value))
}
PrimitiveType::Int128 => {
let avro_inner = match avro_field {
AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field,
AvroSchema::Union(u) => match &u.as_slice() {
&[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e,
_ => unreachable!(),
},
_ => unreachable!(),
};
let len = match avro_inner {
AvroSchema::Bytes(_) => {
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?
}
AvroSchema::Fixed(b) => b.size,
_ => unreachable!(),
};
if len > 16 {
return Err(ArrowError::ExternalFormat(
"Avro decimal bytes return more than 16 bytes".to_string(),
));
}
let mut bytes = [0u8; 16];
bytes[..len].copy_from_slice(&block[..len]);
block = &block[len..];
let data = i128::from_be_bytes(bytes) >> (8 * (16 - len));
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i128>>()
.unwrap();
array.push(Some(data as i128))
}
_ => unreachable!(),
},
PhysicalType::Utf8 => {
Expand Down Expand Up @@ -353,6 +388,28 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) ->
PrimitiveType::MonthDayNano => {
block = &block[12..];
}
PrimitiveType::Int128 => {
let avro_inner = match avro_field {
AvroSchema::Bytes(_) | AvroSchema::Fixed(_) => avro_field,
AvroSchema::Union(u) => match &u.as_slice() {
&[e, AvroSchema::Null] | &[AvroSchema::Null, e] => e,
_ => unreachable!(),
},
_ => unreachable!(),
};
let len = match avro_inner {
AvroSchema::Bytes(_) => {
util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?
}
AvroSchema::Fixed(b) => b.size,
_ => unreachable!(),
};
block = &block[len..];
}
_ => unreachable!(),
},
PhysicalType::Utf8 | PhysicalType::Binary => {
Expand Down
4 changes: 3 additions & 1 deletion src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use avro_schema::{
Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Schema as AvroSchema,
BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical,
Schema as AvroSchema,
};

use crate::datatypes::*;
Expand Down Expand Up @@ -54,6 +55,7 @@ fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
AvroSchema::Fixed(fixed)
}
DataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)),
DataType::Decimal(p, s) => AvroSchema::Bytes(Some(BytesLogical::Decimal(*p, *s))),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"write {:?} to avro",
Expand Down
34 changes: 34 additions & 0 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,40 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Bytes(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i128>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.values().iter(),
|x, buf| {
let len = ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Int128), AvroSchema::Union(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i128>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
let len =
((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize;
util::zigzag_encode((16 - len) as i64, buf).unwrap();
buf.extend_from_slice(&x.to_be_bytes()[len..]);
}
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::MonthDayNano), AvroSchema::Fixed(_)) => {
let values = array
.as_any()
Expand Down
22 changes: 19 additions & 3 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use arrow2::chunk::Chunk;
use avro_rs::types::{Record, Value};
use avro_rs::{Codec, Writer};
use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema};
use avro_rs::{Days, Decimal, Duration, Millis, Months, Schema as AvroSchema};

use arrow2::array::*;
use arrow2::datatypes::*;
Expand Down Expand Up @@ -47,7 +47,8 @@ pub(super) fn schema() -> (AvroSchema, Schema) {
"type": "enum",
"name": "",
"symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
}}
}},
{"name": "decimal", "type": {"type": "bytes", "logicalType": "decimal", "precision": 18, "scale": 5}}
]
}
"#;
Expand Down Expand Up @@ -76,6 +77,7 @@ pub(super) fn schema() -> (AvroSchema, Schema) {
DataType::Dictionary(i32::KEY_TYPE, Box::new(DataType::Utf8), false),
false,
),
Field::new("decimal", DataType::Decimal(18, 5), false),
]);

(AvroSchema::parse_str(raw_schema).unwrap(), schema)
Expand Down Expand Up @@ -109,6 +111,10 @@ pub(super) fn data() -> Chunk<Arc<dyn Array>> {
Int32Array::from_slice([1, 0]),
Arc::new(Utf8Array::<i32>::from_slice(["SPADES", "HEARTS"])),
)),
Arc::new(
PrimitiveArray::<i128>::from_slice([12345678i128, -12345678i128])
.to(DataType::Decimal(18, 5)),
),
];

Chunk::try_new(columns).unwrap()
Expand Down Expand Up @@ -142,6 +148,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
Value::Record(vec![("e".to_string(), Value::Double(1.0f64))]),
);
record.put("enum", Value::Enum(1, "HEARTS".to_string()));
record.put(
"decimal",
Value::Decimal(Decimal::from(&[0u8, 188u8, 97u8, 78u8])),
);
record.put(
"duration",
Value::Duration(Duration::new(Months::new(1), Days::new(1), Millis::new(1))),
Expand Down Expand Up @@ -170,6 +180,12 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
Value::Record(vec![("e".to_string(), Value::Double(2.0f64))]),
);
record.put("enum", Value::Enum(0, "SPADES".to_string()));
record.put(
"decimal",
Value::Decimal(Decimal::from(&[
255u8, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 67, 158, 178,
])),
);
writer.append(record)?;
Ok(writer.into_inner().unwrap())
}
Expand Down Expand Up @@ -260,6 +276,6 @@ fn test_projected(projection: Vec<bool>) -> Result<()> {
#[test]
fn read_projected() -> Result<()> {
test_projected(vec![
true, false, false, false, false, false, false, false, false, false, false,
true, false, false, false, false, false, false, false, false, false, false, false,
])
}

0 comments on commit 618a650

Please sign in to comment.