Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified avro and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 6, 2022
1 parent 299df30 commit c79b15e
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 57 deletions.
35 changes: 0 additions & 35 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,6 @@ use avro_schema::{Enum, Fixed, Record, Schema as AvroSchema};
use crate::datatypes::*;
use crate::error::{ArrowError, Result};

/// Returns the fully qualified name for a field
fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) -> String {
if name.contains('.') {
name.to_string()
} else {
let namespace = namespace.as_ref().copied().or(default_namespace);

match namespace {
Some(ref namespace) => format!("{}.{}", namespace, name),
None => name.to_string(),
}
}
}

fn external_props(schema: &AvroSchema) -> Metadata {
let mut props = Metadata::new();
match &schema {
Expand All @@ -30,27 +16,6 @@ fn external_props(schema: &AvroSchema) -> Metadata {
}
_ => {}
}
match &schema {
AvroSchema::Record(Record {
aliases, namespace, ..
})
| AvroSchema::Enum(Enum {
aliases, namespace, ..
})
| AvroSchema::Fixed(Fixed {
aliases, namespace, ..
}) => {
let aliases: Vec<String> = aliases
.iter()
.map(|alias| aliased(alias, namespace.as_deref(), None))
.collect();
props.insert(
"avro::aliases".to_string(),
format!("[{}]", aliases.join(",")),
);
}
_ => {}
}
props
}

Expand Down
32 changes: 32 additions & 0 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,22 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Union(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<f32>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.extend(x.to_le_bytes())
}
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Float) => {
let values = array
.as_any()
Expand All @@ -167,6 +183,22 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Union(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<f64>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
buf.extend(x.to_le_bytes())
}
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Double) => {
let values = array
.as_any()
Expand Down
71 changes: 49 additions & 22 deletions tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
Expand All @@ -11,35 +9,64 @@ use super::read::read_avro;

pub(super) fn schema() -> Schema {
Schema::from(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Int32, false),
Field::new("int64", DataType::Int64, false),
Field::new("int64 nullable", DataType::Int64, true),
Field::new("utf8", DataType::Utf8, false),
Field::new("utf8 nullable", DataType::Utf8, true),
Field::new("int32", DataType::Int32, false),
Field::new("int32 nullable", DataType::Int32, true),
Field::new("date", DataType::Date32, false),
Field::new("d", DataType::Binary, false),
Field::new("e", DataType::Float64, false),
Field::new("f", DataType::Boolean, false),
Field::new("g", DataType::Utf8, true),
Field::new("h", DataType::Interval(IntervalUnit::MonthDayNano), true),
Field::new("date nullable", DataType::Date32, true),
Field::new("binary", DataType::Binary, false),
Field::new("binary nullable", DataType::Binary, true),
Field::new("float32", DataType::Float32, false),
Field::new("float32 nullable", DataType::Float32, true),
Field::new("float64", DataType::Float64, false),
Field::new("float64 nullable", DataType::Float64, true),
Field::new("boolean", DataType::Boolean, false),
Field::new("boolean nullable", DataType::Boolean, true),
Field::new(
"interval",
DataType::Interval(IntervalUnit::MonthDayNano),
false,
),
Field::new(
"interval nullable",
DataType::Interval(IntervalUnit::MonthDayNano),
true,
),
])
}

pub(super) fn data() -> Chunk<Arc<dyn Array>> {
pub(super) fn data() -> Chunk<Box<dyn Array>> {
let columns = vec![
Arc::new(Int64Array::from_slice([27, 47])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from_slice(["foo", "bar"])) as Arc<dyn Array>,
Arc::new(Int32Array::from_slice([1, 1])) as Arc<dyn Array>,
Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc<dyn Array>,
Arc::new(BinaryArray::<i32>::from_slice([b"foo", b"bar"])) as Arc<dyn Array>,
Arc::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0])) as Arc<dyn Array>,
Arc::new(BooleanArray::from_slice([true, false])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from([Some("foo"), None])) as Arc<dyn Array>,
Arc::new(PrimitiveArray::<months_days_ns>::from([
Box::new(Int64Array::from_slice([27, 47])) as Box<dyn Array>,
Box::new(Int64Array::from([Some(27), None])),
Box::new(Utf8Array::<i32>::from_slice(["foo", "bar"])),
Box::new(Utf8Array::<i32>::from([Some("foo"), None])),
Box::new(Int32Array::from_slice([1, 1])),
Box::new(Int32Array::from([Some(1), None])),
Box::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)),
Box::new(Int32Array::from([Some(1), None]).to(DataType::Date32)),
Box::new(BinaryArray::<i32>::from_slice([b"foo", b"bar"])),
Box::new(BinaryArray::<i32>::from([Some(b"foo"), None])),
Box::new(PrimitiveArray::<f32>::from_slice([1.0, 2.0])),
Box::new(PrimitiveArray::<f32>::from([Some(1.0), None])),
Box::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0])),
Box::new(PrimitiveArray::<f64>::from([Some(1.0), None])),
Box::new(BooleanArray::from_slice([true, false])),
Box::new(BooleanArray::from([Some(true), None])),
Box::new(PrimitiveArray::<months_days_ns>::from_slice([
months_days_ns::new(1, 1, 10 * 1_000_000), // 10 millis
months_days_ns::new(2, 2, 20 * 1_000_000), // 20 millis
])),
Box::new(PrimitiveArray::<months_days_ns>::from([
Some(months_days_ns::new(1, 1, 10 * 1_000_000)), // 10 millis
None,
])) as Arc<dyn Array>,
])),
];

Chunk::try_new(columns).unwrap()
Chunk::new(columns)
}

pub(super) fn serialize_to_block<R: AsRef<dyn Array>>(
Expand Down

0 comments on commit c79b15e

Please sign in to comment.