Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Migrated from avro-rs to avro-schema #692

Merged
merged 1 commit into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ ahash = { version = "0.7", optional = true }
# parquet support
parquet2 = { version = "0.8", optional = true, default_features = false, features = ["stream"] }

# avro
avro-rs = { version = "0.13", optional = true, default_features = false }
# avro support
avro-schema = { version = "0.2", optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
Expand Down Expand Up @@ -138,7 +138,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json"]
io_avro = ["avro-schema", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
Expand Down
8 changes: 0 additions & 8 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@ pub mod read;
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;

use crate::error::ArrowError;

impl From<avro_rs::Error> for ArrowError {
fn from(error: avro_rs::Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
Expand Down
6 changes: 3 additions & 3 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::convert::TryInto;
use std::sync::Arc;

use avro_rs::Schema as AvroSchema;
use avro_schema::{Enum, Schema as AvroSchema};

use crate::array::*;
use crate::datatypes::*;
Expand Down Expand Up @@ -33,7 +33,7 @@ fn make_mutable(
Box::new(MutableUtf8Array::<i32>::with_capacity(capacity)) as Box<dyn MutableArray>
}
PhysicalType::Dictionary(_) => {
if let Some(AvroSchema::Enum { symbols, .. }) = avro_schema {
if let Some(AvroSchema::Enum(Enum { symbols, .. })) = avro_schema {
let values = Utf8Array::<i32>::from_slice(symbols);
Box::new(FixedItemsUtf8Dictionary::with_capacity(values, capacity))
as Box<dyn MutableArray>
Expand Down Expand Up @@ -64,7 +64,7 @@ fn make_mutable(

fn is_union_null_first(avro_field: &AvroSchema) -> bool {
if let AvroSchema::Union(schemas) = avro_field {
schemas.variants()[0] == AvroSchema::Null
schemas[0] == AvroSchema::Null
} else {
unreachable!()
}
Expand Down
14 changes: 8 additions & 6 deletions src/io/avro/read/header.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
use std::collections::HashMap;

use avro_rs::{Error, Schema};
use avro_schema::Schema;
use serde_json;

use crate::error::Result;
use crate::error::{ArrowError, Result};

use super::Compression;

/// Deserializes the Avro header into an Avro [`Schema`] and optional [`Compression`].
pub(crate) fn deserialize_header(
header: HashMap<String, Vec<u8>>,
) -> Result<(Schema, Option<Compression>)> {
let json = header
let schema = header
.get("avro.schema")
.and_then(|bytes| serde_json::from_slice(bytes.as_ref()).ok())
.ok_or(Error::GetAvroSchemaFromMap)?;
let schema = Schema::parse(&json)?;
.ok_or_else(|| ArrowError::ExternalFormat("Avro schema must be present".to_string()))
.and_then(|bytes| {
serde_json::from_slice(bytes.as_ref())
.map_err(|e| ArrowError::ExternalFormat(e.to_string()))
})?;

let compression = header.get("avro.codec").and_then(|bytes| {
let bytes: &[u8] = bytes.as_ref();
Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::io::Read;
use std::sync::Arc;

use avro_rs::Schema as AvroSchema;
use avro_schema::{Record, Schema as AvroSchema};
use fallible_streaming_iterator::FallibleStreamingIterator;

mod block;
Expand Down Expand Up @@ -41,7 +41,7 @@ pub fn read_metadata<R: std::io::Read>(
let (avro_schema, codec, marker) = util::read_schema(reader)?;
let schema = schema::convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema {
let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
} else {
panic!()
Expand Down
127 changes: 68 additions & 59 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::collections::BTreeMap;

use avro_rs::schema::Name;
use avro_rs::types::Value;
use avro_rs::Schema as AvroSchema;
use avro_schema::{Enum, Fixed, Record, Schema as AvroSchema};

use crate::datatypes::*;
use crate::error::{ArrowError, Result};
Expand All @@ -24,44 +22,26 @@ fn aliased(name: &str, namespace: Option<&str>, default_namespace: Option<&str>)
fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
let mut props = BTreeMap::new();
match &schema {
AvroSchema::Record {
AvroSchema::Record(Record {
doc: Some(ref doc), ..
}
| AvroSchema::Enum {
})
| AvroSchema::Enum(Enum {
doc: Some(ref doc), ..
} => {
}) => {
props.insert("avro::doc".to_string(), doc.clone());
}
_ => {}
}
match &schema {
AvroSchema::Record {
name:
Name {
aliases: Some(aliases),
namespace,
..
},
..
}
| AvroSchema::Enum {
name:
Name {
aliases: Some(aliases),
namespace,
..
},
..
}
| AvroSchema::Fixed {
name:
Name {
aliases: Some(aliases),
namespace,
..
},
..
} => {
AvroSchema::Record(Record {
aliases, namespace, ..
})
| AvroSchema::Enum(Enum {
aliases, namespace, ..
})
| AvroSchema::Fixed(Fixed {
aliases, namespace, ..
}) => {
let aliases: Vec<String> = aliases
.iter()
.map(|alias| aliased(alias, namespace.as_deref(), None))
Expand All @@ -79,7 +59,7 @@ fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
let mut schema_fields = vec![];
match schema {
AvroSchema::Record { fields, .. } => {
AvroSchema::Record(Record { fields, .. }) => {
for field in fields {
schema_fields.push(schema_to_field(
&field.schema,
Expand All @@ -105,26 +85,55 @@ fn schema_to_field(
let data_type = match schema {
AvroSchema::Null => DataType::Null,
AvroSchema::Boolean => DataType::Boolean,
AvroSchema::Int => DataType::Int32,
AvroSchema::Long => DataType::Int64,
AvroSchema::Int(logical) => match logical {
Some(logical) => match logical {
avro_schema::IntLogical::Date => DataType::Date32,
avro_schema::IntLogical::Time => DataType::Time32(TimeUnit::Millisecond),
},
None => DataType::Int32,
},
AvroSchema::Long(logical) => match logical {
Some(logical) => match logical {
avro_schema::LongLogical::Time => DataType::Time64(TimeUnit::Microsecond),
avro_schema::LongLogical::TimestampMillis => {
DataType::Timestamp(TimeUnit::Millisecond, Some("00:00".to_string()))
}
avro_schema::LongLogical::TimestampMicros => {
DataType::Timestamp(TimeUnit::Microsecond, Some("00:00".to_string()))
}
avro_schema::LongLogical::LocalTimestampMillis => {
DataType::Timestamp(TimeUnit::Millisecond, None)
}
avro_schema::LongLogical::LocalTimestampMicros => {
DataType::Timestamp(TimeUnit::Microsecond, None)
}
},
None => DataType::Int64,
},
AvroSchema::Float => DataType::Float32,
AvroSchema::Double => DataType::Float64,
AvroSchema::Bytes => DataType::Binary,
AvroSchema::String => DataType::Utf8,
AvroSchema::Bytes(logical) => match logical {
Some(logical) => match logical {
avro_schema::BytesLogical::Decimal(precision, scale) => {
DataType::Decimal(*precision, *scale)
}
},
None => DataType::Binary,
},
AvroSchema::String(_) => DataType::Utf8,
AvroSchema::Array(item_schema) => DataType::List(Box::new(schema_to_field(
item_schema,
Some("item"), // default name for list items
false,
None,
)?)),
AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"),
AvroSchema::Union(us) => {
AvroSchema::Union(schemas) => {
// If there are only two variants and one of them is null, set the other type as the field data type
let has_nullable = us.find_schema(&Value::Null).is_some();
let sub_schemas = us.variants();
if has_nullable && sub_schemas.len() == 2 {
let has_nullable = schemas.iter().any(|x| x == &AvroSchema::Null);
if has_nullable && schemas.len() == 2 {
nullable = true;
if let Some(schema) = sub_schemas
if let Some(schema) = schemas
.iter()
.find(|&schema| !matches!(schema, AvroSchema::Null))
{
Expand All @@ -134,18 +143,18 @@ fn schema_to_field(
} else {
return Err(ArrowError::NotYetImplemented(format!(
"Can't read avro union {:?}",
us
schema
)));
}
} else {
let fields = sub_schemas
let fields = schemas
.iter()
.map(|s| schema_to_field(s, None, has_nullable, None))
.collect::<Result<Vec<Field>>>()?;
DataType::Union(fields, None, UnionMode::Dense)
}
}
AvroSchema::Record { name, fields, .. } => {
AvroSchema::Record(Record { name, fields, .. }) => {
let fields: Result<Vec<Field>> = fields
.iter()
.map(|field| {
Expand All @@ -158,7 +167,7 @@ fn schema_to_field(
}*/
schema_to_field(
&field.schema,
Some(&format!("{}.{}", name.fullname(None), field.name)),
Some(&format!("{}.{}", name, field.name)),
false,
Some(&props),
)
Expand All @@ -173,17 +182,17 @@ fn schema_to_field(
false,
))
}
AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size),
AvroSchema::Decimal {
precision, scale, ..
} => DataType::Decimal(*precision, *scale),
AvroSchema::Uuid => DataType::Utf8,
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
AvroSchema::TimeMicros => DataType::Time64(TimeUnit::Microsecond),
AvroSchema::TimestampMillis => DataType::Timestamp(TimeUnit::Millisecond, None),
AvroSchema::TimestampMicros => DataType::Timestamp(TimeUnit::Microsecond, None),
AvroSchema::Duration => DataType::Interval(IntervalUnit::MonthDayNano),
AvroSchema::Fixed(Fixed { size, logical, .. }) => match logical {
Some(logical) => match logical {
avro_schema::FixedLogical::Decimal(precision, scale) => {
DataType::Decimal(*precision, *scale)
}
avro_schema::FixedLogical::Duration => {
DataType::Interval(IntervalUnit::MonthDayNano)
}
},
None => DataType::FixedSizeBinary(*size),
},
};

let name = name.unwrap_or_default();
Expand Down
2 changes: 1 addition & 1 deletion src/io/avro/read/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::io::Read;

use avro_rs::Schema;
use avro_schema::Schema;

use crate::error::{ArrowError, Result};

Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Async Avro
use std::collections::HashMap;

use avro_rs::Schema as AvroSchema;
use avro_schema::{Record, Schema as AvroSchema};
use futures::AsyncRead;
use futures::AsyncReadExt;

Expand Down Expand Up @@ -30,7 +30,7 @@ pub async fn read_metadata<R: AsyncRead + Unpin + Send>(
let (avro_schema, codec, marker) = read_metadata_async(reader).await?;
let schema = convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record { fields, .. } = avro_schema {
let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
} else {
panic!()
Expand Down
Loading