diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs index 588334bab744..032ad683ff77 100644 --- a/arrow-avro/src/lib.rs +++ b/arrow-avro/src/lib.rs @@ -20,13 +20,13 @@ //! This crate provides: //! - a [`reader`] that decodes Avro (Object Container Files, Avro Single‑Object encoding, //! and Confluent Schema Registry wire format) into Arrow `RecordBatch`es, -//! - and a [`writer`] that encodes Arrow `RecordBatch`es into Avro (OCF or raw Avro binary). +//! - and a [`writer`] that encodes Arrow `RecordBatch`es into Avro (OCF or SOE). //! //! If you’re new to Arrow or Avro, see: //! - Arrow project site: //! - Avro 1.11.1 specification: //! -//! ## Example: OCF (Object Container File) round‑trip +//! ## Example: OCF (Object Container File) round‑trip *(runnable)* //! //! The example below creates an Arrow table, writes an **Avro OCF** fully in memory, //! and then reads it back. OCF is a self‑describing file format that embeds the Avro @@ -64,82 +64,7 @@ //! # Ok(()) } //! ``` //! -//! ## Quickstart: Confluent wire‑format round‑trip *(runnable)* -//! -//! The **Confluent Schema Registry wire format** prefixes each Avro message with a -//! 1‑byte magic `0x00` and a **4‑byte big‑endian** schema ID, followed by the Avro body. -//! See: -//! -//! In this round‑trip, we: -//! 1) Use `AvroStreamWriter` to create a **raw Avro body** for a single‑row batch, -//! 2) Wrap it with the Confluent prefix (magic and schema ID), -//! 3) Decode it back to Arrow using a `Decoder` configured with a `SchemaStore` that -//! maps the schema ID to the Avro schema used by the writer. -//! -//! ``` -//! use std::collections::HashMap; -//! use std::sync::Arc; -//! use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; -//! use arrow_schema::{DataType, Field, Schema}; -//! use arrow_avro::writer::{AvroStreamWriter, WriterBuilder}; -//! use arrow_avro::reader::ReaderBuilder; -//! use arrow_avro::schema::{ -//! AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm, -//! FingerprintStrategy, SCHEMA_METADATA_KEY -//! }; -//! -//! # fn main() -> Result<(), Box> { -//! // Writer schema registered under Schema Registry ID 1 -//! let avro_json = r#"{ -//! "type":"record","name":"User", -//! "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}] -//! }"#; -//! -//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id); -//! let id: u32 = 1; -//! store.set(Fingerprint::Id(id), AvroSchema::new(avro_json.to_string()))?; -//! -//! // Build an Arrow schema that references the same Avro JSON -//! let mut md = HashMap::new(); -//! md.insert(SCHEMA_METADATA_KEY.to_string(), avro_json.to_string()); -//! let schema = Schema::new_with_metadata( -//! vec![ -//! Field::new("id", DataType::Int64, false), -//! Field::new("name", DataType::Utf8, false), -//! ], -//! md, -//! ); -//! -//! // One‑row batch: { id: 42, name: "alice" } -//! let batch = RecordBatch::try_new( -//! Arc::new(schema.clone()), -//! vec![ -//! Arc::new(Int64Array::from(vec![42])) as ArrayRef, -//! Arc::new(StringArray::from(vec!["alice"])) as ArrayRef, -//! ], -//! )?; -//! -//! // Stream‑write a single record, letting the writer add the **Confluent** prefix. -//! let sink: Vec = Vec::new(); -//! let mut w: AvroStreamWriter> = WriterBuilder::new(schema.clone()) -//! .with_fingerprint_strategy(FingerprintStrategy::Id(id)) -//! .build(sink)?; -//! w.write(&batch)?; -//! w.finish()?; -//! let frame = w.into_inner(); // already: 0x00 + 4B BE ID + Avro body -//! assert!(frame.len() > 5); -//! -//! // Decode -//! let mut dec = ReaderBuilder::new() -//! .with_writer_schema_store(store) -//! .build_decoder()?; -//! dec.decode(&frame)?; -//! let out = dec.flush()?.expect("one row"); -//! assert_eq!(out.num_rows(), 1); -//! # Ok(()) } -//! ``` -//! -//! ## Quickstart: Avro Single‑Object Encoding round‑trip *(runnable)* +//! ## Quickstart: SOE (Single‑Object Encoding) round‑trip *(runnable)* //! //! Avro **Single‑Object Encoding (SOE)** wraps an Avro body with a 2‑byte marker //! `0xC3 0x01` and an **8‑byte little‑endian CRC‑64‑AVRO Rabin fingerprint** of the @@ -203,7 +128,7 @@ //! ### Modules //! //! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es. -//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent). +//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent, Apicurio). //! - [`schema`]: Avro schema parsing / fingerprints / registries. //! - [`compression`]: codecs used for **OCF block compression** (i.e., Deflate, Snappy, Zstandard, BZip2, and XZ). //! - [`codec`]: internal Avro-Arrow type conversion and row decode/encode plans. diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 7e0bfad0ce22..426845676a88 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -42,7 +42,7 @@ //! * [`Reader`](crate::reader::Reader): a convenient, synchronous iterator over `RecordBatch` decoded from an OCF //! input. Implements [`Iterator>`] and //! `RecordBatchReader`. -//! * [`Decoder`](crate::reader::Decoder): a push‑based row decoder that consumes raw Avro bytes and yields ready +//! * [`Decoder`](crate::reader::Decoder): a push‑based row decoder that consumes SOE framed Avro bytes and yields ready //! `RecordBatch` values when batches fill. This is suitable for integrating with async //! byte streams, network protocols, or other custom data sources. //! @@ -167,7 +167,7 @@ //! use arrow_array::{ArrayRef, Int64Array, RecordBatch}; //! use arrow_schema::{DataType, Field, Schema}; //! use arrow_avro::schema::{AvroSchema, SchemaStore, SCHEMA_METADATA_KEY, FingerprintStrategy}; -//! use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat}; +//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat}; //! use arrow_avro::reader::ReaderBuilder; //! //! # fn main() -> Result<(), Box> { @@ -187,7 +187,7 @@ //! )?; //! let mut w = WriterBuilder::new(arrow) //! .with_fingerprint_strategy(FingerprintStrategy::Rabin) // SOE prefix -//! .build::<_, AvroBinaryFormat>(Vec::new())?; +//! .build::<_, AvroSoeFormat>(Vec::new())?; //! w.write(&batch)?; //! w.finish()?; //! let frame = w.into_inner(); // C3 01 + fp + Avro body @@ -220,7 +220,7 @@ //! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch}; //! use arrow_schema::{DataType, Field, Schema}; //! use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm, SCHEMA_METADATA_KEY, FingerprintStrategy}; -//! use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat}; +//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat}; //! use arrow_avro::reader::ReaderBuilder; //! //! # fn main() -> Result<(), Box> { @@ -248,7 +248,7 @@ //! )?; //! let mut w = WriterBuilder::new(arrow) //! .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) // 0x00 + ID + body -//! .build::<_, AvroBinaryFormat>(Vec::new())?; +//! .build::<_, AvroSoeFormat>(Vec::new())?; //! w.write(&batch)?; w.finish()?; //! Ok(w.into_inner()) //! } @@ -402,7 +402,7 @@ //! Arc::new(StringArray::from(vec!["v0-alice"])) as ArrayRef])?; //! let mut w0 = arrow_avro::writer::WriterBuilder::new(arrow0) //! .with_fingerprint_strategy(FingerprintStrategy::Id(id_v0)) -//! .build::<_, arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?; +//! .build::<_, arrow_avro::writer::format::AvroSoeFormat>(Vec::new())?; //! w0.write(&batch0)?; w0.finish()?; //! let frame0 = w0.into_inner(); // 0x00 + id_v0 + body //! @@ -420,7 +420,7 @@ //! Arc::new(StringArray::from(vec![Some("bob@example.com")])) as ArrayRef])?; //! let mut w1 = arrow_avro::writer::WriterBuilder::new(arrow1) //! .with_fingerprint_strategy(FingerprintStrategy::Id(id_v1)) -//! .build::<_, arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?; +//! .build::<_, arrow_avro::writer::format::AvroSoeFormat>(Vec::new())?; //! w1.write(&batch1)?; w1.finish()?; //! let frame1 = w1.into_inner(); // 0x00 + id_v1 + body //! @@ -563,7 +563,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool { /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch}; /// # use arrow_schema::{DataType, Field, Schema}; /// # use arrow_avro::schema::{SCHEMA_METADATA_KEY, FingerprintStrategy}; -/// # use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat}; +/// # use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat}; /// # let mut md = HashMap::new(); /// # md.insert(SCHEMA_METADATA_KEY.to_string(), /// # r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()); @@ -571,7 +571,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool { /// # let batch = RecordBatch::try_new(Arc::new(arrow.clone()), vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef])?; /// # let mut w = WriterBuilder::new(arrow) /// # .with_fingerprint_strategy(fp.into()) -/// # .build::<_, AvroBinaryFormat>(Vec::new())?; +/// # .build::<_, AvroSoeFormat>(Vec::new())?; /// # w.write(&batch)?; w.finish()?; let frame = w.into_inner(); /// /// let mut decoder = ReaderBuilder::new() @@ -605,7 +605,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool { /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch}; /// # use arrow_schema::{DataType, Field, Schema}; /// # use arrow_avro::schema::{SCHEMA_METADATA_KEY, FingerprintStrategy}; -/// # use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat}; +/// # use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat}; /// # fn msg(x: i64) -> Result, Box> { /// # let mut md = HashMap::new(); /// # md.insert(SCHEMA_METADATA_KEY.to_string(), @@ -614,7 +614,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool { /// # let batch = RecordBatch::try_new(Arc::new(arrow.clone()), vec![Arc::new(Int64Array::from(vec![x])) as ArrayRef])?; /// # let mut w = WriterBuilder::new(arrow) /// # .with_fingerprint_strategy(FingerprintStrategy::Id(1234)) -/// # .build::<_, AvroBinaryFormat>(Vec::new())?; +/// # .build::<_, AvroSoeFormat>(Vec::new())?; /// # w.write(&batch)?; w.finish()?; Ok(w.into_inner()) /// # } /// # let m1 = msg(1)?; diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs index 07534c960a0f..ba2a0b8564b2 100644 --- a/arrow-avro/src/writer/format.rs +++ b/arrow-avro/src/writer/format.rs @@ -112,9 +112,9 @@ impl AvroFormat for AvroOcfFormat { /// See: /// See: #[derive(Debug, Default)] -pub struct AvroBinaryFormat {} +pub struct AvroSoeFormat {} -impl AvroFormat for AvroBinaryFormat { +impl AvroFormat for AvroSoeFormat { const NEEDS_PREFIX: bool = true; fn start_stream( &mut self, @@ -124,10 +124,9 @@ impl AvroFormat for AvroBinaryFormat { ) -> Result<(), ArrowError> { if compression.is_some() { return Err(ArrowError::InvalidArgumentError( - "Compression not supported for Avro binary streaming".to_string(), + "Compression not supported for Avro SOE streaming".to_string(), )); } - Ok(()) } diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index 1d4699865bf6..231c9846f9ac 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -26,7 +26,7 @@ //! file with header (schema JSON + metadata), optional compression, data blocks, and //! sync markers. See Avro 1.11.1 “Object Container Files.” //! -//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a **raw Avro binary stream** (“datum” bytes) without +//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a **Single Object Encoding (SOE) Stream** (“datum” bytes) without //! any container framing. This is useful when the schema is known out‑of‑band (i.e., //! via a registry) and you want minimal overhead. //! @@ -34,26 +34,31 @@ //! //! * Use **OCF** when you need a portable, self‑contained file. The schema travels with //! the data, making it easy to read elsewhere. -//! * Use the **raw stream** when your surrounding protocol supplies schema information -//! (i.e., a schema registry). If you need **single‑object encoding (SOE)** or Confluent -//! **Schema Registry** framing, you must add the appropriate prefix *outside* this writer: -//! - **SOE**: `0xC3 0x01` + 8‑byte little‑endian CRC‑64‑AVRO fingerprint + Avro body -//! (see Avro 1.11.1 “Single object encoding”). +//! * Use the **SOE stream** when your surrounding protocol supplies schema information +//! (i.e., a schema registry). The writer automatically adds the per‑record prefix: +//! - **SOE**: Each record is prefixed with the 2-byte header (`0xC3 0x01`) followed by +//! an 8‑byte little‑endian CRC‑64‑AVRO fingerprint, then the Avro body. +//! See Avro 1.11.1 "Single object encoding". //! -//! - **Confluent wire format**: magic `0x00` + **big‑endian** 4‑byte schema ID and Avro body. +//! - **Confluent wire format**: Each record is prefixed with magic byte `0x00` followed by +//! a **big‑endian** 4‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id(schema_id)`. //! +//! - **Apicurio wire format**: Each record is prefixed with magic byte `0x00` followed by +//! a **big‑endian** 8‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id64(schema_id)`. +//! //! //! ## Choosing the Avro schema //! //! By default, the writer converts your Arrow schema to Avro (including a top‑level record -//! name) and stores the resulting JSON under the `avro::schema` metadata key. If you already -//! have an Avro schema JSON, you want to use verbatim, put it into the Arrow schema metadata -//! under the same key before constructing the writer. The builder will pick it up. +//! name). If you already have an Avro schema JSON you want to use verbatim, put it into the +//! Arrow schema metadata under the `avro.schema` key before constructing the writer. The +//! builder will use that schema instead of generating a new one (unless `strip_metadata` is +//! set to true in the options). //! //! ## Compression //! //! For OCF, you may enable a compression codec via `WriterBuilder::with_compression`. The -//! chosen codec is written into the file header and used for subsequent blocks. Raw stream +//! chosen codec is written into the file header and used for subsequent blocks. SOE stream //! writing doesn’t apply container‑level compression. //! //! --- @@ -63,7 +68,7 @@ use crate::schema::{ AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY, }; use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long}; -use crate::writer::format::{AvroBinaryFormat, AvroFormat, AvroOcfFormat}; +use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat}; use arrow_array::RecordBatch; use arrow_schema::{ArrowError, Schema}; use std::io::Write; @@ -173,7 +178,7 @@ impl WriterBuilder { /// You’ll usually use the concrete aliases: /// /// * **[`AvroWriter`]** for **OCF** (self‑describing container file) -/// * **[`AvroStreamWriter`]** for **raw** Avro binary streams +/// * **[`AvroStreamWriter`]** for **SOE** Avro streams #[derive(Debug)] pub struct Writer { writer: W, @@ -226,12 +231,13 @@ pub struct Writer { /// ``` pub type AvroWriter = Writer; -/// Alias for a raw Avro **binary stream** writer. +/// Alias for an Avro **Single Object Encoding** stream writer. /// /// ### Example /// -/// This writes only the **Avro body** bytes — no OCF header/sync and no -/// single‑object or Confluent framing. If you need those frames, add them externally. +/// This writer automatically adds the appropriate per-record prefix (based on the +/// fingerprint strategy) before the Avro body of each record. The default is Single +/// Object Encoding (SOE) with a Rabin fingerprint. /// /// ``` /// use std::sync::Arc; @@ -247,7 +253,7 @@ pub type AvroWriter = Writer; /// vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef], /// )?; /// -/// // Write a raw Avro stream to a Vec +/// // Write an Avro Single Object Encoding stream to a Vec /// let sink: Vec = Vec::new(); /// let mut w = AvroStreamWriter::new(sink, schema)?; /// w.write(&batch)?; @@ -256,7 +262,7 @@ pub type AvroWriter = Writer; /// assert!(!bytes.is_empty()); /// # Ok(()) } /// ``` -pub type AvroStreamWriter = Writer; +pub type AvroStreamWriter = Writer; impl Writer { /// Convenience constructor – same as [`WriterBuilder::build`] with `AvroOcfFormat`. @@ -294,11 +300,10 @@ impl Writer { } } -impl Writer { +impl Writer { /// Convenience constructor to create a new [`AvroStreamWriter`]. /// - /// The resulting stream contains just **Avro binary** bodies (no OCF header/sync and no - /// single‑object or Confluent framing). If you need those frames, add them externally. + /// The resulting stream contains **Single Object Encodings** (no OCF header/sync). /// /// ### Example /// @@ -324,7 +329,7 @@ impl Writer { /// # Ok(()) } /// ``` pub fn new(writer: W, schema: Schema) -> Result { - WriterBuilder::new(schema).build::(writer) + WriterBuilder::new(schema).build::(writer) } } @@ -496,7 +501,7 @@ mod tests { let schema_id: u32 = 42; let mut writer = WriterBuilder::new(schema.clone()) .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) - .build::<_, AvroBinaryFormat>(Vec::new())?; + .build::<_, AvroSoeFormat>(Vec::new())?; writer.write(&batch)?; let encoded = writer.into_inner(); let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id); @@ -530,7 +535,7 @@ mod tests { let schema_id: u64 = 42; let mut writer = WriterBuilder::new(schema.clone()) .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id)) - .build::<_, AvroBinaryFormat>(Vec::new())?; + .build::<_, AvroSoeFormat>(Vec::new())?; writer.write(&batch)?; let encoded = writer.into_inner(); let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64); @@ -1393,7 +1398,7 @@ mod tests { let cap = 8192; let mut writer = WriterBuilder::new(schema) .with_capacity(cap) - .build::<_, AvroBinaryFormat>(Vec::new())?; + .build::<_, AvroSoeFormat>(Vec::new())?; assert_eq!(writer.capacity, cap); writer.write(&batch)?; let _bytes = writer.into_inner();