Skip to content

Commit 522b26a

Browse files
committed
Merge branch 'main' into issue-8477-variant-to-arrow-decimal
2 parents d88fd7f + 760b7b6 commit 522b26a

File tree

15 files changed

+544
-529
lines changed

15 files changed

+544
-529
lines changed

arrow-avro/src/codec.rs

Lines changed: 18 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,10 @@
1414
// KIND, either express or implied. See the License for the
1515
// specific language governing permissions and limitations
1616
// under the License.
17-
1817
use crate::schema::{
19-
make_full_name, Array, Attributes, AvroSchema, ComplexType, Enum, Fixed, Map, Nullability,
20-
PrimitiveType, Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
21-
AVRO_FIELD_DEFAULT_METADATA_KEY, AVRO_ROOT_RECORD_DEFAULT_NAME,
18+
make_full_name, Array, Attributes, ComplexType, Enum, Fixed, Map, Nullability, PrimitiveType,
19+
Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
20+
AVRO_FIELD_DEFAULT_METADATA_KEY,
2221
};
2322
use arrow_schema::{
2423
ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, UnionFields, UnionMode,
@@ -77,8 +76,6 @@ pub(crate) enum AvroLiteral {
7776
Array(Vec<AvroLiteral>),
7877
/// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals.
7978
Map(IndexMap<String, AvroLiteral>),
80-
/// Represents an unsupported literal type.
81-
Unsupported,
8279
}
8380

8481
/// Contains the necessary information to resolve a writer's record against a reader's record schema.
@@ -208,7 +205,7 @@ impl AvroDataType {
208205
}
209206

210207
/// Returns an arrow [`Field`] with the given name
211-
pub fn field_with_name(&self, name: &str) -> Field {
208+
pub(crate) fn field_with_name(&self, name: &str) -> Field {
212209
let mut nullable = self.nullability.is_some();
213210
if !nullable {
214211
if let Codec::Union(children, _, _) = self.codec() {
@@ -230,7 +227,7 @@ impl AvroDataType {
230227
///
231228
/// The codec determines how Avro data is encoded and mapped to Arrow data types.
232229
/// This is useful when we need to inspect or use the specific encoding of a field.
233-
pub fn codec(&self) -> &Codec {
230+
pub(crate) fn codec(&self) -> &Codec {
234231
&self.codec
235232
}
236233

@@ -524,29 +521,6 @@ impl AvroField {
524521
pub(crate) fn name(&self) -> &str {
525522
&self.name
526523
}
527-
528-
/// Performs schema resolution between a writer and reader schema.
529-
///
530-
/// This is the primary entry point for handling schema evolution. It produces an
531-
/// `AvroField` that contains all the necessary information to read data written
532-
/// with the `writer` schema as if it were written with the `reader` schema.
533-
pub(crate) fn resolve_from_writer_and_reader<'a>(
534-
writer_schema: &'a Schema<'a>,
535-
reader_schema: &'a Schema<'a>,
536-
use_utf8view: bool,
537-
strict_mode: bool,
538-
) -> Result<Self, ArrowError> {
539-
let top_name = match reader_schema {
540-
Schema::Complex(ComplexType::Record(r)) => r.name.to_string(),
541-
_ => AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
542-
};
543-
let mut resolver = Maker::new(use_utf8view, strict_mode);
544-
let data_type = resolver.make_data_type(writer_schema, Some(reader_schema), None)?;
545-
Ok(Self {
546-
name: top_name,
547-
data_type,
548-
})
549-
}
550524
}
551525

552526
impl<'a> TryFrom<&Schema<'a>> for AvroField {
@@ -1629,28 +1603,6 @@ impl<'a> Maker<'a> {
16291603
Ok(datatype)
16301604
}
16311605

1632-
fn resolve_nullable_union<'s>(
1633-
&mut self,
1634-
writer_variants: &'s [Schema<'a>],
1635-
reader_variants: &'s [Schema<'a>],
1636-
namespace: Option<&'a str>,
1637-
) -> Result<AvroDataType, ArrowError> {
1638-
match (
1639-
nullable_union_variants(writer_variants),
1640-
nullable_union_variants(reader_variants),
1641-
) {
1642-
(Some((write_nb, write_nonnull)), Some((_read_nb, read_nonnull))) => {
1643-
let mut dt = self.make_data_type(write_nonnull, Some(read_nonnull), namespace)?;
1644-
dt.nullability = Some(write_nb);
1645-
Ok(dt)
1646-
}
1647-
_ => Err(ArrowError::NotYetImplemented(
1648-
"Union resolution requires both writer and reader to be 2-branch nullable unions"
1649-
.to_string(),
1650-
)),
1651-
}
1652-
}
1653-
16541606
// Resolve writer vs. reader enum schemas according to Avro 1.11.1.
16551607
//
16561608
// # How enums resolve (writer to reader)
@@ -1915,9 +1867,11 @@ impl<'a> Maker<'a> {
19151867
mod tests {
19161868
use super::*;
19171869
use crate::schema::{
1918-
Attributes, Field as AvroFieldSchema, Fixed, PrimitiveType, Schema, Type, TypeName,
1870+
Array, Attributes, ComplexType, Field as AvroFieldSchema, Fixed, PrimitiveType, Record,
1871+
Schema, Type, TypeName, AVRO_ROOT_RECORD_DEFAULT_NAME,
19191872
};
1920-
use serde_json;
1873+
use indexmap::IndexMap;
1874+
use serde_json::{self, Value};
19211875

19221876
fn create_schema_with_logical_type(
19231877
primitive_type: PrimitiveType,
@@ -1934,21 +1888,6 @@ mod tests {
19341888
})
19351889
}
19361890

1937-
fn create_fixed_schema(size: usize, logical_type: &'static str) -> Schema<'static> {
1938-
let attributes = Attributes {
1939-
logical_type: Some(logical_type),
1940-
additional: Default::default(),
1941-
};
1942-
1943-
Schema::Complex(ComplexType::Fixed(Fixed {
1944-
name: "fixed_type",
1945-
namespace: None,
1946-
aliases: Vec::new(),
1947-
size,
1948-
attributes,
1949-
}))
1950-
}
1951-
19521891
fn resolve_promotion(writer: PrimitiveType, reader: PrimitiveType) -> AvroDataType {
19531892
let writer_schema = Schema::TypeName(TypeName::Primitive(writer));
19541893
let reader_schema = Schema::TypeName(TypeName::Primitive(reader));
@@ -1965,17 +1904,6 @@ mod tests {
19651904
Schema::Union(branches)
19661905
}
19671906

1968-
fn mk_record_name(name: &str) -> Schema<'_> {
1969-
Schema::Complex(ComplexType::Record(Record {
1970-
name,
1971-
namespace: None,
1972-
doc: None,
1973-
aliases: vec![],
1974-
fields: vec![],
1975-
attributes: Attributes::default(),
1976-
}))
1977-
}
1978-
19791907
#[test]
19801908
fn test_date_logical_type() {
19811909
let schema = create_schema_with_logical_type(PrimitiveType::Int, "date");
@@ -2068,7 +1996,7 @@ mod tests {
20681996

20691997
#[test]
20701998
fn test_decimal_logical_type_not_implemented() {
2071-
let mut codec = Codec::Fixed(16);
1999+
let codec = Codec::Fixed(16);
20722000

20732001
let process_decimal = || -> Result<(), ArrowError> {
20742002
if let Codec::Fixed(_) = codec {
@@ -2556,9 +2484,14 @@ mod tests {
25562484
fn test_resolve_from_writer_and_reader_defaults_root_name_for_non_record_reader() {
25572485
let writer_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
25582486
let reader_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String));
2559-
let field =
2560-
AvroField::resolve_from_writer_and_reader(&writer_schema, &reader_schema, false, false)
2561-
.expect("resolution should succeed");
2487+
let mut maker = Maker::new(false, false);
2488+
let data_type = maker
2489+
.make_data_type(&writer_schema, Some(&reader_schema), None)
2490+
.expect("resolution should succeed");
2491+
let field = AvroField {
2492+
name: AVRO_ROOT_RECORD_DEFAULT_NAME.to_string(),
2493+
data_type,
2494+
};
25622495
assert_eq!(field.name(), AVRO_ROOT_RECORD_DEFAULT_NAME);
25632496
assert!(matches!(field.data_type().codec(), Codec::Utf8));
25642497
}

arrow-avro/src/compression.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717

1818
use arrow_schema::ArrowError;
19-
use std::io;
2019
use std::io::{Read, Write};
2120

2221
/// The metadata key used for storing the JSON encoded [`CompressionCodec`]

arrow-avro/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,6 @@
224224
)]
225225
#![cfg_attr(docsrs, feature(doc_cfg))]
226226
#![warn(missing_docs)]
227-
#![allow(unused)] // Temporary
228227

229228
/// Core functionality for reading Avro data into Arrow arrays
230229
///

arrow-avro/src/reader/cursor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl<'a> AvroCursor<'a> {
8585
ArrowError::ParseError("offset overflow reading avro bytes".to_string())
8686
})?;
8787

88-
if (self.buf.len() < len) {
88+
if self.buf.len() < len {
8989
return Err(ArrowError::ParseError(
9090
"Unexpected EOF reading bytes".to_string(),
9191
));
@@ -97,7 +97,7 @@ impl<'a> AvroCursor<'a> {
9797

9898
#[inline]
9999
pub(crate) fn get_float(&mut self) -> Result<f32, ArrowError> {
100-
if (self.buf.len() < 4) {
100+
if self.buf.len() < 4 {
101101
return Err(ArrowError::ParseError(
102102
"Unexpected EOF reading float".to_string(),
103103
));
@@ -109,7 +109,7 @@ impl<'a> AvroCursor<'a> {
109109

110110
#[inline]
111111
pub(crate) fn get_double(&mut self) -> Result<f64, ArrowError> {
112-
if (self.buf.len() < 8) {
112+
if self.buf.len() < 8 {
113113
return Err(ArrowError::ParseError(
114114
"Unexpected EOF reading float".to_string(),
115115
));

arrow-avro/src/reader/header.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,27 @@ use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
2121
use crate::reader::vlq::VLQDecoder;
2222
use crate::schema::{Schema, SCHEMA_METADATA_KEY};
2323
use arrow_schema::ArrowError;
24+
use std::io::BufRead;
25+
26+
/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
27+
pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
28+
let mut decoder = HeaderDecoder::default();
29+
loop {
30+
let buf = reader.fill_buf()?;
31+
if buf.is_empty() {
32+
break;
33+
}
34+
let read = buf.len();
35+
let decoded = decoder.decode(buf)?;
36+
reader.consume(decoded);
37+
if decoded != read {
38+
break;
39+
}
40+
}
41+
decoder.flush().ok_or_else(|| {
42+
ArrowError::ParseError("Unexpected EOF while reading Avro header".to_string())
43+
})
44+
}
2445

2546
#[derive(Debug)]
2647
enum HeaderDecoderState {
@@ -265,13 +286,13 @@ impl HeaderDecoder {
265286
#[cfg(test)]
266287
mod test {
267288
use super::*;
268-
use crate::codec::{AvroDataType, AvroField};
289+
use crate::codec::AvroField;
269290
use crate::reader::read_header;
270291
use crate::schema::SCHEMA_METADATA_KEY;
271292
use crate::test_util::arrow_test_data;
272293
use arrow_schema::{DataType, Field, Fields, TimeUnit};
273294
use std::fs::File;
274-
use std::io::{BufRead, BufReader};
295+
use std::io::BufReader;
275296

276297
#[test]
277298
fn test_header_decode() {
@@ -291,7 +312,7 @@ mod test {
291312

292313
fn decode_file(file: &str) -> Header {
293314
let file = File::open(file).unwrap();
294-
read_header(BufReader::with_capacity(100, file)).unwrap()
315+
read_header(BufReader::with_capacity(1000, file)).unwrap()
295316
}
296317

297318
#[test]

0 commit comments

Comments
 (0)