Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 4 additions & 79 deletions arrow-avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
//! This crate provides:
//! - a [`reader`] that decodes Avro (Object Container Files, Avro Single‑Object encoding,
//! and Confluent Schema Registry wire format) into Arrow `RecordBatch`es,
//! - and a [`writer`] that encodes Arrow `RecordBatch`es into Avro (OCF or raw Avro binary).
//! - and a [`writer`] that encodes Arrow `RecordBatch`es into Avro (OCF or SOE).
//!
//! If you’re new to Arrow or Avro, see:
//! - Arrow project site: <https://arrow.apache.org/>
//! - Avro 1.11.1 specification: <https://avro.apache.org/docs/1.11.1/specification/>
//!
//! ## Example: OCF (Object Container File) round‑trip
//! ## Example: OCF (Object Container File) round‑trip *(runnable)*
//!
//! The example below creates an Arrow table, writes an **Avro OCF** fully in memory,
//! and then reads it back. OCF is a self‑describing file format that embeds the Avro
Expand Down Expand Up @@ -64,82 +64,7 @@
//! # Ok(()) }
//! ```
//!
//! ## Quickstart: Confluent wire‑format round‑trip *(runnable)*
//!
//! The **Confluent Schema Registry wire format** prefixes each Avro message with a
//! 1‑byte magic `0x00` and a **4‑byte big‑endian** schema ID, followed by the Avro body.
//! See: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
//!
//! In this round‑trip, we:
//! 1) Use `AvroStreamWriter` to create a **raw Avro body** for a single‑row batch,
//! 2) Wrap it with the Confluent prefix (magic and schema ID),
//! 3) Decode it back to Arrow using a `Decoder` configured with a `SchemaStore` that
//! maps the schema ID to the Avro schema used by the writer.
//!
//! ```
//! use std::collections::HashMap;
//! use std::sync::Arc;
//! use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
//! use arrow_schema::{DataType, Field, Schema};
//! use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
//! use arrow_avro::reader::ReaderBuilder;
//! use arrow_avro::schema::{
//! AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm,
//! FingerprintStrategy, SCHEMA_METADATA_KEY
//! };
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Writer schema registered under Schema Registry ID 1
//! let avro_json = r#"{
//! "type":"record","name":"User",
//! "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]
//! }"#;
//!
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
//! let id: u32 = 1;
//! store.set(Fingerprint::Id(id), AvroSchema::new(avro_json.to_string()))?;
//!
//! // Build an Arrow schema that references the same Avro JSON
//! let mut md = HashMap::new();
//! md.insert(SCHEMA_METADATA_KEY.to_string(), avro_json.to_string());
//! let schema = Schema::new_with_metadata(
//! vec![
//! Field::new("id", DataType::Int64, false),
//! Field::new("name", DataType::Utf8, false),
//! ],
//! md,
//! );
//!
//! // One‑row batch: { id: 42, name: "alice" }
//! let batch = RecordBatch::try_new(
//! Arc::new(schema.clone()),
//! vec![
//! Arc::new(Int64Array::from(vec![42])) as ArrayRef,
//! Arc::new(StringArray::from(vec!["alice"])) as ArrayRef,
//! ],
//! )?;
//!
//! // Stream‑write a single record, letting the writer add the **Confluent** prefix.
//! let sink: Vec<u8> = Vec::new();
//! let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
//! .with_fingerprint_strategy(FingerprintStrategy::Id(id))
//! .build(sink)?;
//! w.write(&batch)?;
//! w.finish()?;
//! let frame = w.into_inner(); // already: 0x00 + 4B BE ID + Avro body
//! assert!(frame.len() > 5);
//!
//! // Decode
//! let mut dec = ReaderBuilder::new()
//! .with_writer_schema_store(store)
//! .build_decoder()?;
//! dec.decode(&frame)?;
//! let out = dec.flush()?.expect("one row");
//! assert_eq!(out.num_rows(), 1);
//! # Ok(()) }
//! ```
//!
//! ## Quickstart: Avro Single‑Object Encoding round‑trip *(runnable)*
//! ## Quickstart: SOE (Single‑Object Encoding) round‑trip *(runnable)*
//!
//! Avro **Single‑Object Encoding (SOE)** wraps an Avro body with a 2‑byte marker
//! `0xC3 0x01` and an **8‑byte little‑endian CRC‑64‑AVRO Rabin fingerprint** of the
Expand Down Expand Up @@ -203,7 +128,7 @@
//! ### Modules
//!
//! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es.
//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent).
//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent, Apicurio).
//! - [`schema`]: Avro schema parsing / fingerprints / registries.
//! - [`compression`]: codecs used for **OCF block compression** (i.e., Deflate, Snappy, Zstandard, BZip2, and XZ).
//! - [`codec`]: internal Avro-Arrow type conversion and row decode/encode plans.
Expand Down
22 changes: 11 additions & 11 deletions arrow-avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
//! * [`Reader`](crate::reader::Reader): a convenient, synchronous iterator over `RecordBatch` decoded from an OCF
//! input. Implements [`Iterator<Item = Result<RecordBatch, ArrowError>>`] and
//! `RecordBatchReader`.
//! * [`Decoder`](crate::reader::Decoder): a push‑based row decoder that consumes raw Avro bytes and yields ready
//! * [`Decoder`](crate::reader::Decoder): a push‑based row decoder that consumes SOE framed Avro bytes and yields ready
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decoder also supports Confluent and Apicurio framed messages; consider saying “consumes Avro‑framed bytes (SOE/Confluent/Apicurio)” instead of only “SOE framed” to avoid misleading readers.

🤖 React with 👍 or 👎 to let us know if the comment was useful.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-but-wont-fix; category:documentation; feedback:The AI reviewer is correct that the decoder supports several formats but all of them are Single Object Encoded with custom headers, so there is no need to make this part of the documentation more specific.

//! `RecordBatch` values when batches fill. This is suitable for integrating with async
//! byte streams, network protocols, or other custom data sources.
//!
Expand Down Expand Up @@ -167,7 +167,7 @@
//! use arrow_array::{ArrayRef, Int64Array, RecordBatch};
//! use arrow_schema::{DataType, Field, Schema};
//! use arrow_avro::schema::{AvroSchema, SchemaStore, SCHEMA_METADATA_KEY, FingerprintStrategy};
//! use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
//! use arrow_avro::reader::ReaderBuilder;
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -187,7 +187,7 @@
//! )?;
//! let mut w = WriterBuilder::new(arrow)
//! .with_fingerprint_strategy(FingerprintStrategy::Rabin) // SOE prefix
//! .build::<_, AvroBinaryFormat>(Vec::new())?;
//! .build::<_, AvroSoeFormat>(Vec::new())?;
//! w.write(&batch)?;
//! w.finish()?;
//! let frame = w.into_inner(); // C3 01 + fp + Avro body
Expand Down Expand Up @@ -220,7 +220,7 @@
//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
//! use arrow_schema::{DataType, Field, Schema};
//! use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm, SCHEMA_METADATA_KEY, FingerprintStrategy};
//! use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
//! use arrow_avro::reader::ReaderBuilder;
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -248,7 +248,7 @@
//! )?;
//! let mut w = WriterBuilder::new(arrow)
//! .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) // 0x00 + ID + body
//! .build::<_, AvroBinaryFormat>(Vec::new())?;
//! .build::<_, AvroSoeFormat>(Vec::new())?;
//! w.write(&batch)?; w.finish()?;
//! Ok(w.into_inner())
//! }
Expand Down Expand Up @@ -402,7 +402,7 @@
//! Arc::new(StringArray::from(vec!["v0-alice"])) as ArrayRef])?;
//! let mut w0 = arrow_avro::writer::WriterBuilder::new(arrow0)
//! .with_fingerprint_strategy(FingerprintStrategy::Id(id_v0))
//! .build::<_, arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?;
//! .build::<_, arrow_avro::writer::format::AvroSoeFormat>(Vec::new())?;
//! w0.write(&batch0)?; w0.finish()?;
//! let frame0 = w0.into_inner(); // 0x00 + id_v0 + body
//!
Expand All @@ -420,7 +420,7 @@
//! Arc::new(StringArray::from(vec![Some("bob@example.com")])) as ArrayRef])?;
//! let mut w1 = arrow_avro::writer::WriterBuilder::new(arrow1)
//! .with_fingerprint_strategy(FingerprintStrategy::Id(id_v1))
//! .build::<_, arrow_avro::writer::format::AvroBinaryFormat>(Vec::new())?;
//! .build::<_, arrow_avro::writer::format::AvroSoeFormat>(Vec::new())?;
//! w1.write(&batch1)?; w1.finish()?;
//! let frame1 = w1.into_inner(); // 0x00 + id_v1 + body
//!
Expand Down Expand Up @@ -563,15 +563,15 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
/// # use arrow_schema::{DataType, Field, Schema};
/// # use arrow_avro::schema::{SCHEMA_METADATA_KEY, FingerprintStrategy};
/// # use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
/// # use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
/// # let mut md = HashMap::new();
/// # md.insert(SCHEMA_METADATA_KEY.to_string(),
/// # r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string());
/// # let arrow = Schema::new_with_metadata(vec![Field::new("x", DataType::Int64, false)], md);
/// # let batch = RecordBatch::try_new(Arc::new(arrow.clone()), vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef])?;
/// # let mut w = WriterBuilder::new(arrow)
/// # .with_fingerprint_strategy(fp.into())
/// # .build::<_, AvroBinaryFormat>(Vec::new())?;
/// # .build::<_, AvroSoeFormat>(Vec::new())?;
/// # w.write(&batch)?; w.finish()?; let frame = w.into_inner();
///
/// let mut decoder = ReaderBuilder::new()
Expand Down Expand Up @@ -605,7 +605,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
/// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
/// # use arrow_schema::{DataType, Field, Schema};
/// # use arrow_avro::schema::{SCHEMA_METADATA_KEY, FingerprintStrategy};
/// # use arrow_avro::writer::{WriterBuilder, format::AvroBinaryFormat};
/// # use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
/// # fn msg(x: i64) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
/// # let mut md = HashMap::new();
/// # md.insert(SCHEMA_METADATA_KEY.to_string(),
Expand All @@ -614,7 +614,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
/// # let batch = RecordBatch::try_new(Arc::new(arrow.clone()), vec![Arc::new(Int64Array::from(vec![x])) as ArrayRef])?;
/// # let mut w = WriterBuilder::new(arrow)
/// # .with_fingerprint_strategy(FingerprintStrategy::Id(1234))
/// # .build::<_, AvroBinaryFormat>(Vec::new())?;
/// # .build::<_, AvroSoeFormat>(Vec::new())?;
/// # w.write(&batch)?; w.finish()?; Ok(w.into_inner())
/// # }
/// # let m1 = msg(1)?;
Expand Down
7 changes: 3 additions & 4 deletions arrow-avro/src/writer/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ impl AvroFormat for AvroOcfFormat {
/// See: <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
/// See: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
#[derive(Debug, Default)]
pub struct AvroBinaryFormat {}
pub struct AvroSoeFormat {}

impl AvroFormat for AvroBinaryFormat {
impl AvroFormat for AvroSoeFormat {
const NEEDS_PREFIX: bool = true;
fn start_stream<W: Write>(
&mut self,
Expand All @@ -124,10 +124,9 @@ impl AvroFormat for AvroBinaryFormat {
) -> Result<(), ArrowError> {
if compression.is_some() {
return Err(ArrowError::InvalidArgumentError(
"Compression not supported for Avro binary streaming".to_string(),
"Compression not supported for Avro SOE streaming".to_string(),
));
}

Ok(())
}

Expand Down
55 changes: 30 additions & 25 deletions arrow-avro/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,39 @@
//! file with header (schema JSON + metadata), optional compression, data blocks, and
//! sync markers. See Avro 1.11.1 “Object Container Files.”
//! <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a **raw Avro binary stream** (“datum” bytes) without
//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a **Single Object Encoding (SOE) Stream** (“datum” bytes) without
//! any container framing. This is useful when the schema is known out‑of‑band (i.e.,
//! via a registry) and you want minimal overhead.
//!
//! ## Which format should you use?
//!
//! * Use **OCF** when you need a portable, self‑contained file. The schema travels with
//! the data, making it easy to read elsewhere.
//! * Use the **raw stream** when your surrounding protocol supplies schema information
//! (i.e., a schema registry). If you need **single‑object encoding (SOE)** or Confluent
//! **Schema Registry** framing, you must add the appropriate prefix *outside* this writer:
//! - **SOE**: `0xC3 0x01` + 8‑byte little‑endian CRC‑64‑AVRO fingerprint + Avro body
//! (see Avro 1.11.1 Single object encoding”).
//! * Use the **SOE stream** when your surrounding protocol supplies schema information
//! (i.e., a schema registry). The writer automatically adds the per‑record prefix:
//! - **SOE**: Each record is prefixed with the 2-byte header (`0xC3 0x01`) followed by
//! an 8‑byte little‑endian CRC‑64‑AVRO fingerprint, then the Avro body.
//! See Avro 1.11.1 "Single object encoding".
//! <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
//! - **Confluent wire format**: magic `0x00` + **big‑endian** 4‑byte schema ID and Avro body.
//! - **Confluent wire format**: Each record is prefixed with magic byte `0x00` followed by
//! a **big‑endian** 4‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id(schema_id)`.
//! <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
//! - **Apicurio wire format**: Each record is prefixed with magic byte `0x00` followed by
//! a **big‑endian** 8‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id64(schema_id)`.
//! <https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry>
//!
//! ## Choosing the Avro schema
//!
//! By default, the writer converts your Arrow schema to Avro (including a top‑level record
//! name) and stores the resulting JSON under the `avro::schema` metadata key. If you already
//! have an Avro schema JSON, you want to use verbatim, put it into the Arrow schema metadata
//! under the same key before constructing the writer. The builder will pick it up.
//! name). If you already have an Avro schema JSON you want to use verbatim, put it into the
//! Arrow schema metadata under the `avro.schema` key before constructing the writer. The
//! builder will use that schema instead of generating a new one (unless `strip_metadata` is
//! set to true in the options).
//!
//! ## Compression
//!
//! For OCF, you may enable a compression codec via `WriterBuilder::with_compression`. The
//! chosen codec is written into the file header and used for subsequent blocks. Raw stream
//! chosen codec is written into the file header and used for subsequent blocks. SOE stream
//! writing doesn’t apply container‑level compression.
//!
//! ---
Expand All @@ -63,7 +68,7 @@ use crate::schema::{
AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
};
use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
use crate::writer::format::{AvroBinaryFormat, AvroFormat, AvroOcfFormat};
use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, Schema};
use std::io::Write;
Expand Down Expand Up @@ -173,7 +178,7 @@ impl WriterBuilder {
/// You’ll usually use the concrete aliases:
///
/// * **[`AvroWriter`]** for **OCF** (self‑describing container file)
/// * **[`AvroStreamWriter`]** for **raw** Avro binary streams
/// * **[`AvroStreamWriter`]** for **SOE** Avro streams
#[derive(Debug)]
pub struct Writer<W: Write, F: AvroFormat> {
writer: W,
Expand Down Expand Up @@ -226,12 +231,13 @@ pub struct Writer<W: Write, F: AvroFormat> {
/// ```
pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;

/// Alias for a raw Avro **binary stream** writer.
/// Alias for an Avro **Single Object Encoding** stream writer.
///
/// ### Example
///
/// This writes only the **Avro body** bytes — no OCF header/sync and no
/// single‑object or Confluent framing. If you need those frames, add them externally.
/// This writer automatically adds the appropriate per-record prefix (based on the
/// fingerprint strategy) before the Avro body of each record. The default is Single
/// Object Encoding (SOE) with a Rabin fingerprint.
///
/// ```
/// use std::sync::Arc;
Expand All @@ -247,7 +253,7 @@ pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
/// vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
/// )?;
///
/// // Write a raw Avro stream to a Vec<u8>
/// // Write an Avro Single Object Encoding stream to a Vec<u8>
/// let sink: Vec<u8> = Vec::new();
/// let mut w = AvroStreamWriter::new(sink, schema)?;
/// w.write(&batch)?;
Expand All @@ -256,7 +262,7 @@ pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
/// assert!(!bytes.is_empty());
/// # Ok(()) }
/// ```
pub type AvroStreamWriter<W> = Writer<W, AvroBinaryFormat>;
pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;

impl<W: Write> Writer<W, AvroOcfFormat> {
/// Convenience constructor – same as [`WriterBuilder::build`] with `AvroOcfFormat`.
Expand Down Expand Up @@ -294,11 +300,10 @@ impl<W: Write> Writer<W, AvroOcfFormat> {
}
}

impl<W: Write> Writer<W, AvroBinaryFormat> {
impl<W: Write> Writer<W, AvroSoeFormat> {
/// Convenience constructor to create a new [`AvroStreamWriter`].
///
/// The resulting stream contains just **Avro binary** bodies (no OCF header/sync and no
/// single‑object or Confluent framing). If you need those frames, add them externally.
/// The resulting stream contains **Single Object Encodings** (no OCF header/sync).
///
/// ### Example
///
Expand All @@ -324,7 +329,7 @@ impl<W: Write> Writer<W, AvroBinaryFormat> {
/// # Ok(()) }
/// ```
pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
WriterBuilder::new(schema).build::<W, AvroBinaryFormat>(writer)
WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
}
}

Expand Down Expand Up @@ -496,7 +501,7 @@ mod tests {
let schema_id: u32 = 42;
let mut writer = WriterBuilder::new(schema.clone())
.with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
.build::<_, AvroBinaryFormat>(Vec::new())?;
.build::<_, AvroSoeFormat>(Vec::new())?;
writer.write(&batch)?;
let encoded = writer.into_inner();
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
Expand Down Expand Up @@ -530,7 +535,7 @@ mod tests {
let schema_id: u64 = 42;
let mut writer = WriterBuilder::new(schema.clone())
.with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
.build::<_, AvroBinaryFormat>(Vec::new())?;
.build::<_, AvroSoeFormat>(Vec::new())?;
writer.write(&batch)?;
let encoded = writer.into_inner();
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
Expand Down Expand Up @@ -1393,7 +1398,7 @@ mod tests {
let cap = 8192;
let mut writer = WriterBuilder::new(schema)
.with_capacity(cap)
.build::<_, AvroBinaryFormat>(Vec::new())?;
.build::<_, AvroSoeFormat>(Vec::new())?;
assert_eq!(writer.capacity, cap);
writer.write(&batch)?;
let _bytes = writer.into_inner();
Expand Down
Loading