From dc1784df49959fb3e0805cbbe0635a4c369eac74 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 19 Dec 2021 22:24:16 +0000 Subject: [PATCH] Migrated to avro-schema --- Cargo.toml | 6 +- src/io/avro/mod.rs | 8 -- src/io/avro/read/deserialize.rs | 6 +- src/io/avro/read/header.rs | 14 ++-- src/io/avro/read/mod.rs | 4 +- src/io/avro/read/schema.rs | 127 +++++++++++++++-------------- src/io/avro/read/util.rs | 2 +- src/io/avro/read_async/metadata.rs | 4 +- tests/it/io/avro/read.rs | 23 +----- 9 files changed, 88 insertions(+), 106 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 49ad379efbf..69a92614906 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,8 +69,8 @@ ahash = { version = "0.7", optional = true } # parquet support parquet2 = { version = "0.8", optional = true, default_features = false, features = ["stream"] } -# avro -avro-rs = { version = "0.13", optional = true, default_features = false } +# avro support +avro-schema = { version = "0.2", optional = true } # compression of avro libflate = { version = "1.1.1", optional = true } snap = { version = "1", optional = true } @@ -138,7 +138,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] -io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json"] +io_avro = ["avro-schema", "fallible-streaming-iterator", "serde_json"] io_avro_compression = [ "libflate", "snap", diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index 2cf3c216d1b..fe9eb8bd1d3 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -6,14 +6,6 @@ pub mod read; #[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] pub mod read_async; -use crate::error::ArrowError; - -impl From for ArrowError { - fn from(error: avro_rs::Error) -> Self { - ArrowError::External("".to_string(), Box::new(error)) - } -} - // macros that can operate in sync and async code. macro_rules! avro_decode { ($reader:ident $($_await:tt)*) => { diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 93fdf5537b4..33bd8d888ae 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -1,7 +1,7 @@ use std::convert::TryInto; use std::sync::Arc; -use avro_rs::Schema as AvroSchema; +use avro_schema::{Enum, Schema as AvroSchema}; use crate::array::*; use crate::datatypes::*; @@ -33,7 +33,7 @@ fn make_mutable( Box::new(MutableUtf8Array::::with_capacity(capacity)) as Box } PhysicalType::Dictionary(_) => { - if let Some(AvroSchema::Enum { symbols, .. }) = avro_schema { + if let Some(AvroSchema::Enum(Enum { symbols, .. })) = avro_schema { let values = Utf8Array::::from_slice(symbols); Box::new(FixedItemsUtf8Dictionary::with_capacity(values, capacity)) as Box @@ -64,7 +64,7 @@ fn make_mutable( fn is_union_null_first(avro_field: &AvroSchema) -> bool { if let AvroSchema::Union(schemas) = avro_field { - schemas.variants()[0] == AvroSchema::Null + schemas[0] == AvroSchema::Null } else { unreachable!() } diff --git a/src/io/avro/read/header.rs b/src/io/avro/read/header.rs index e15aebf626e..86388f59cb5 100644 --- a/src/io/avro/read/header.rs +++ b/src/io/avro/read/header.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; -use avro_rs::{Error, Schema}; +use avro_schema::Schema; use serde_json; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use super::Compression; @@ -11,11 +11,13 @@ use super::Compression; pub(crate) fn deserialize_header( header: HashMap>, ) -> Result<(Schema, Option)> { - let json = header + let schema = header .get("avro.schema") - .and_then(|bytes| serde_json::from_slice(bytes.as_ref()).ok()) - .ok_or(Error::GetAvroSchemaFromMap)?; - let schema = Schema::parse(&json)?; + .ok_or_else(|| ArrowError::ExternalFormat("Avro schema must be present".to_string())) + .and_then(|bytes| { + serde_json::from_slice(bytes.as_ref()) + .map_err(|e| ArrowError::ExternalFormat(e.to_string())) + })?; let compression = header.get("avro.codec").and_then(|bytes| { let bytes: &[u8] = bytes.as_ref(); diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index f702b913f70..fea56e8e030 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -3,7 +3,7 @@ use std::io::Read; use std::sync::Arc; -use avro_rs::Schema as AvroSchema; +use avro_schema::{Record, Schema as AvroSchema}; use fallible_streaming_iterator::FallibleStreamingIterator; mod block; @@ -41,7 +41,7 @@ pub fn read_metadata( let (avro_schema, codec, marker) = util::read_schema(reader)?; let schema = schema::convert_schema(&avro_schema)?; - let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema { + let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema { fields.into_iter().map(|x| x.schema).collect() } else { panic!() diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index 6304eb0892a..7ab286f244a 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -1,8 +1,6 @@ use std::collections::BTreeMap; -use avro_rs::schema::Name; -use avro_rs::types::Value; -use avro_rs::Schema as AvroSchema; +use avro_schema::{Enum, Fixed, Record, Schema as AvroSchema}; use crate::datatypes::*; use crate::error::{ArrowError, Result}; @@ -24,44 +22,26 @@ fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>) fn external_props(schema: &AvroSchema) -> BTreeMap { let mut props = BTreeMap::new(); match &schema { - AvroSchema::Record { + AvroSchema::Record(Record { doc: Some(ref doc), .. - } - | AvroSchema::Enum { + }) + | AvroSchema::Enum(Enum { doc: Some(ref doc), .. - } => { + }) => { props.insert("avro::doc".to_string(), doc.clone()); } _ => {} } match &schema { - AvroSchema::Record { - name: - Name { - aliases: Some(aliases), - namespace, - .. - }, - .. - } - | AvroSchema::Enum { - name: - Name { - aliases: Some(aliases), - namespace, - .. - }, - .. - } - | AvroSchema::Fixed { - name: - Name { - aliases: Some(aliases), - namespace, - .. - }, - .. - } => { + AvroSchema::Record(Record { + aliases, namespace, .. + }) + | AvroSchema::Enum(Enum { + aliases, namespace, .. + }) + | AvroSchema::Fixed(Fixed { + aliases, namespace, .. + }) => { let aliases: Vec = aliases .iter() .map(|alias| aliased(alias, namespace.as_deref(), None)) @@ -79,7 +59,7 @@ fn external_props(schema: &AvroSchema) -> BTreeMap { pub fn convert_schema(schema: &AvroSchema) -> Result { let mut schema_fields = vec![]; match schema { - AvroSchema::Record { fields, .. } => { + AvroSchema::Record(Record { fields, .. }) => { for field in fields { schema_fields.push(schema_to_field( &field.schema, @@ -105,12 +85,42 @@ fn schema_to_field( let data_type = match schema { AvroSchema::Null => DataType::Null, AvroSchema::Boolean => DataType::Boolean, - AvroSchema::Int => DataType::Int32, - AvroSchema::Long => DataType::Int64, + AvroSchema::Int(logical) => match logical { + Some(logical) => match logical { + avro_schema::IntLogical::Date => DataType::Date32, + avro_schema::IntLogical::Time => DataType::Time32(TimeUnit::Millisecond), + }, + None => DataType::Int32, + }, + AvroSchema::Long(logical) => match logical { + Some(logical) => match logical { + avro_schema::LongLogical::Time => DataType::Time64(TimeUnit::Microsecond), + avro_schema::LongLogical::TimestampMillis => { + DataType::Timestamp(TimeUnit::Millisecond, Some("00:00".to_string())) + } + avro_schema::LongLogical::TimestampMicros => { + DataType::Timestamp(TimeUnit::Microsecond, Some("00:00".to_string())) + } + avro_schema::LongLogical::LocalTimestampMillis => { + DataType::Timestamp(TimeUnit::Millisecond, None) + } + avro_schema::LongLogical::LocalTimestampMicros => { + DataType::Timestamp(TimeUnit::Microsecond, None) + } + }, + None => DataType::Int64, + }, AvroSchema::Float => DataType::Float32, AvroSchema::Double => DataType::Float64, - AvroSchema::Bytes => DataType::Binary, - AvroSchema::String => DataType::Utf8, + AvroSchema::Bytes(logical) => match logical { + Some(logical) => match logical { + avro_schema::BytesLogical::Decimal(precision, scale) => { + DataType::Decimal(*precision, *scale) + } + }, + None => DataType::Binary, + }, + AvroSchema::String(_) => DataType::Utf8, AvroSchema::Array(item_schema) => DataType::List(Box::new(schema_to_field( item_schema, Some("item"), // default name for list items @@ -118,13 +128,12 @@ fn schema_to_field( None, )?)), AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"), - AvroSchema::Union(us) => { + AvroSchema::Union(schemas) => { // If there are only two variants and one of them is null, set the other type as the field data type - let has_nullable = us.find_schema(&Value::Null).is_some(); - let sub_schemas = us.variants(); - if has_nullable && sub_schemas.len() == 2 { + let has_nullable = schemas.iter().any(|x| x == &AvroSchema::Null); + if has_nullable && schemas.len() == 2 { nullable = true; - if let Some(schema) = sub_schemas + if let Some(schema) = schemas .iter() .find(|&schema| !matches!(schema, AvroSchema::Null)) { @@ -134,18 +143,18 @@ fn schema_to_field( } else { return Err(ArrowError::NotYetImplemented(format!( "Can't read avro union {:?}", - us + schema ))); } } else { - let fields = sub_schemas + let fields = schemas .iter() .map(|s| schema_to_field(s, None, has_nullable, None)) .collect::>>()?; DataType::Union(fields, None, UnionMode::Dense) } } - AvroSchema::Record { name, fields, .. } => { + AvroSchema::Record(Record { name, fields, .. }) => { let fields: Result> = fields .iter() .map(|field| { @@ -158,7 +167,7 @@ fn schema_to_field( }*/ schema_to_field( &field.schema, - Some(&format!("{}.{}", name.fullname(None), field.name)), + Some(&format!("{}.{}", name, field.name)), false, Some(&props), ) @@ -173,17 +182,17 @@ fn schema_to_field( false, )) } - AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size), - AvroSchema::Decimal { - precision, scale, .. - } => DataType::Decimal(*precision, *scale), - AvroSchema::Uuid => DataType::Utf8, - AvroSchema::Date => DataType::Date32, - AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond), - AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond), - AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None), - AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None), - AvroSchema::Duration => DataType::Interval(IntervalUnit::MonthDayNano), + AvroSchema::Fixed(Fixed { size, logical, .. }) => match logical { + Some(logical) => match logical { + avro_schema::FixedLogical::Decimal(precision, scale) => { + DataType::Decimal(*precision, *scale) + } + avro_schema::FixedLogical::Duration => { + DataType::Interval(IntervalUnit::MonthDayNano) + } + }, + None => DataType::FixedSizeBinary(*size), + }, }; let name = name.unwrap_or_default(); diff --git a/src/io/avro/read/util.rs b/src/io/avro/read/util.rs index 01a9db713a5..59628de37d8 100644 --- a/src/io/avro/read/util.rs +++ b/src/io/avro/read/util.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::io::Read; -use avro_rs::Schema; +use avro_schema::Schema; use crate::error::{ArrowError, Result}; diff --git a/src/io/avro/read_async/metadata.rs b/src/io/avro/read_async/metadata.rs index d3931236793..3040799aa5d 100644 --- a/src/io/avro/read_async/metadata.rs +++ b/src/io/avro/read_async/metadata.rs @@ -1,7 +1,7 @@ //! Async Avro use std::collections::HashMap; -use avro_rs::Schema as AvroSchema; +use avro_schema::{Record, Schema as AvroSchema}; use futures::AsyncRead; use futures::AsyncReadExt; @@ -30,7 +30,7 @@ pub async fn read_metadata( let (avro_schema, codec, marker) = read_metadata_async(reader).await?; let schema = convert_schema(&avro_schema)?; - let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema { + let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema { fields.into_iter().map(|x| x.schema).collect() } else { panic!() diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index 24059343412..db6e501d851 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -9,7 +9,6 @@ use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::avro::read; use arrow2::record_batch::RecordBatch; -use arrow2::types::months_days_ns; fn schema() -> (AvroSchema, Schema) { let raw_schema = r#" @@ -41,13 +40,6 @@ fn schema() -> (AvroSchema, Schema) { "type": "enum", "name": "", "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] - }}, - {"name": "duration", - "logicalType": "duration", - "type": { - "name": "duration", - "type": "fixed", - "size": 12 }} ] } @@ -72,17 +64,12 @@ fn schema() -> (AvroSchema, Schema) { DataType::Dictionary(i32::KEY_TYPE, Box::new(DataType::Utf8)), false, ), - Field::new( - "duration", - DataType::Interval(IntervalUnit::MonthDayNano), - false, - ), ]); (AvroSchema::parse_str(raw_schema).unwrap(), schema) } -pub(super) fn write(codec: Codec) -> Result<(Vec, RecordBatch)> { +pub(super) fn write(codec: Codec) -> std::result::Result<(Vec, RecordBatch), avro_rs::Error> { let (avro, schema) = schema(); // a writer needs a schema and something to write to let mut writer = Writer::with_codec(&avro, Vec::new(), codec); @@ -130,10 +117,6 @@ pub(super) fn write(codec: Codec) -> Result<(Vec, RecordBatch)> { ]), ); record.put("enum", Value::Enum(0, "SPADES".to_string())); - record.put( - "duration", - Value::Duration(Duration::new(Months::new(1), Days::new(2), Millis::new(1))), - ); writer.append(record)?; let data = vec![ @@ -158,10 +141,6 @@ pub(super) fn write(codec: Codec) -> Result<(Vec, RecordBatch)> { Int32Array::from_slice([1, 0]), Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), )) as Arc, - Arc::new(MonthsDaysNsArray::from_slice([ - months_days_ns::new(1, 1, 1_000_000), - months_days_ns::new(1, 2, 1_000_000), - ])) as Arc, ]; let expected = RecordBatch::try_new(Arc::new(schema), columns).unwrap();