Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion arrow-avro/benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ fn make_prefix(fp: Fingerprint) -> Vec<u8> {
buf.extend_from_slice(&id.to_be_bytes()); // big-endian
buf
}
Fingerprint::Id64(id) => {
let mut buf = Vec::with_capacity(CONFLUENT_MAGIC.len() + size_of::<u64>());
buf.extend_from_slice(&CONFLUENT_MAGIC); // 00
buf.extend_from_slice(&id.to_be_bytes()); // big-endian
buf
}
#[cfg(feature = "md5")]
Fingerprint::MD5(val) => {
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of_val(&val));
Expand Down Expand Up @@ -366,7 +372,7 @@ fn new_decoder_id(
id: u32,
) -> arrow_avro::reader::Decoder {
let schema = AvroSchema::new(schema_json.parse().unwrap());
let mut store = arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::None);
let mut store = arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::Id);
// Register the schema with a provided Confluent-style ID
store
.set(Fingerprint::Id(id), schema.clone())
Expand Down
4 changes: 2 additions & 2 deletions arrow-avro/examples/decode_kafka_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let id_v0: u32 = 0;
let id_v1: u32 = 1;

// Confluent SchemaStore keyed by integer IDs (FingerprintAlgorithm::None)
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
// Confluent SchemaStore keyed by integer IDs (FingerprintAlgorithm::Id)
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;

Expand Down
2 changes: 1 addition & 1 deletion arrow-avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
//! "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]
//! }"#;
//!
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
//! let id: u32 = 1;
//! store.set(Fingerprint::Id(id), AvroSchema::new(avro_json.to_string()))?;
//!
Expand Down
82 changes: 71 additions & 11 deletions arrow-avro/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,14 @@
//! <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a **4‑byte big‑endian**
//! schema ID, then the Avro‑encoded body. Use `Decoder` with a `SchemaStore` configured
//! for `FingerprintAlgorithm::None` and entries keyed by `Fingerprint::Id`. See
//! for `FingerprintAlgorithm::Id` and entries keyed by `Fingerprint::Id`. See
//! Confluent’s “Wire format” documentation.
//! <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
//! * **Apicurio Schema Registry wire format**: A 1‑byte magic `0x00`, a **8‑byte big‑endian**
//! global schema ID, then the Avro‑encoded body. Use `Decoder` with a `SchemaStore` configured
//! for `FingerprintAlgorithm::Id64` and entries keyed by `Fingerprint::Id64`. See
//! Apicurio’s “Avro SerDe” documentation.
//! <https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry>
//!
//! ## Basic file usage (OCF)
//!
Expand Down Expand Up @@ -99,7 +104,7 @@
//! # Ok(()) }
//! ```
//!
//! ## Streaming usage (single‑object / Confluent)
//! ## Streaming usage (single‑object / Confluent / Apicurio)
//!
//! The `Decoder` lets you integrate Avro decoding with **any** source of bytes by
//! periodically calling `Decoder::decode` with new data and calling `Decoder::flush`
Expand Down Expand Up @@ -220,7 +225,7 @@
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Set up a store keyed by numeric IDs (Confluent).
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
//! let schema_id = 7u32;
//! let avro_schema = AvroSchema::new(r#"{"type":"record","name":"User","fields":[
//! {"name":"id","type":"long"}, {"name":"name","type":"string"}]}"#.to_string());
Expand Down Expand Up @@ -380,7 +385,7 @@
//! let id_v0: u32 = 0;
//! let id_v1: u32 = 1;
//!
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None); // integer IDs
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id); // integer IDs
//! store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
//! store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
//!
Expand Down Expand Up @@ -591,7 +596,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
/// use arrow_avro::reader::ReaderBuilder;
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
/// store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;
///
/// // --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---
Expand Down Expand Up @@ -713,9 +718,12 @@ impl Decoder {
Fingerprint::Rabin(u64::from_le_bytes(bytes))
})
}
FingerprintAlgorithm::None => {
FingerprintAlgorithm::Id => self.handle_prefix_common(buf, &CONFLUENT_MAGIC, |bytes| {
Fingerprint::Id(u32::from_be_bytes(bytes))
}),
FingerprintAlgorithm::Id64 => {
self.handle_prefix_common(buf, &CONFLUENT_MAGIC, |bytes| {
Fingerprint::Id(u32::from_be_bytes(bytes))
Fingerprint::Id64(u64::from_be_bytes(bytes))
})
}
#[cfg(feature = "md5")]
Expand Down Expand Up @@ -909,7 +917,7 @@ impl Decoder {
/// use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm};
/// use arrow_avro::reader::ReaderBuilder;
///
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
/// store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[]}"#.to_string()))?;
///
/// let decoder = ReaderBuilder::new()
Expand Down Expand Up @@ -1409,6 +1417,9 @@ mod test {
Fingerprint::Id(v) => {
panic!("make_prefix expects a Rabin fingerprint, got ({v})");
}
Fingerprint::Id64(v) => {
panic!("make_prefix expects a Rabin fingerprint, got ({v})");
}
#[cfg(feature = "md5")]
Fingerprint::MD5(v) => {
panic!("make_prefix expects a Rabin fingerprint, got ({v:?})");
Expand Down Expand Up @@ -1445,6 +1456,21 @@ mod test {
msg
}

fn make_id64_prefix(id: u64, additional: usize) -> Vec<u8> {
let capacity = CONFLUENT_MAGIC.len() + size_of::<u64>() + additional;
let mut out = Vec::with_capacity(capacity);
out.extend_from_slice(&CONFLUENT_MAGIC);
out.extend_from_slice(&id.to_be_bytes());
out
}

fn make_message_id64(id: u64, value: i64) -> Vec<u8> {
let encoded_value = encode_zigzag(value);
let mut msg = make_id64_prefix(id, encoded_value.len());
msg.extend_from_slice(&encoded_value);
msg
}

fn make_value_schema(pt: PrimitiveType) -> AvroSchema {
let json_schema = format!(
r#"{{"type":"record","name":"S","fields":[{{"name":"v","type":"{}"}}]}}"#,
Expand Down Expand Up @@ -2159,6 +2185,7 @@ mod test {
let long_bytes = match fp_long {
Fingerprint::Rabin(v) => v.to_le_bytes(),
Fingerprint::Id(id) => panic!("expected Rabin fingerprint, got ({id})"),
Fingerprint::Id64(id) => panic!("expected Rabin fingerprint, got ({id})"),
#[cfg(feature = "md5")]
Fingerprint::MD5(v) => panic!("expected Rabin fingerprint, got ({v:?})"),
#[cfg(feature = "sha256")]
Expand All @@ -2183,6 +2210,7 @@ mod test {
match fp_long {
Fingerprint::Rabin(v) => buf.extend_from_slice(&v.to_le_bytes()),
Fingerprint::Id(id) => panic!("expected Rabin fingerprint, got ({id})"),
Fingerprint::Id64(id) => panic!("expected Rabin fingerprint, got ({id})"),
#[cfg(feature = "md5")]
Fingerprint::MD5(v) => panic!("expected Rabin fingerprint, got ({v:?})"),
#[cfg(feature = "sha256")]
Expand Down Expand Up @@ -2269,7 +2297,7 @@ mod test {
let reader_schema = writer_schema.clone();
let id = 100u32;
// Set up store with None fingerprint algorithm and register schema by id
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let _ = store
.set(Fingerprint::Id(id), writer_schema.clone())
.expect("set id schema");
Expand Down Expand Up @@ -2300,7 +2328,7 @@ mod test {
let writer_schema = make_value_schema(PrimitiveType::Int);
let id_known = 7u32;
let id_unknown = 9u32;
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let _ = store
.set(Fingerprint::Id(id_known), writer_schema.clone())
.expect("set id schema");
Expand All @@ -2324,7 +2352,7 @@ mod test {
fn test_handle_prefix_id_incomplete_magic() {
let writer_schema = make_value_schema(PrimitiveType::Int);
let id = 5u32;
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let _ = store
.set(Fingerprint::Id(id), writer_schema.clone())
.expect("set id schema");
Expand All @@ -2341,6 +2369,38 @@ mod test {
assert!(decoder.pending_schema.is_none());
}

#[test]
fn test_two_messages_same_schema_id64() {
let writer_schema = make_value_schema(PrimitiveType::Int);
let reader_schema = writer_schema.clone();
let id = 100u64;
// Set up store with None fingerprint algorithm and register schema by id
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
let _ = store
.set(Fingerprint::Id64(id), writer_schema.clone())
.expect("set id schema");
let msg1 = make_message_id64(id, 21);
let msg2 = make_message_id64(id, 22);
let input = [msg1.clone(), msg2.clone()].concat();
let mut decoder = ReaderBuilder::new()
.with_batch_size(8)
.with_reader_schema(reader_schema)
.with_writer_schema_store(store)
.with_active_fingerprint(Fingerprint::Id64(id))
.build_decoder()
.unwrap();
let _ = decoder.decode(&input).unwrap();
let batch = decoder.flush().unwrap().expect("batch");
assert_eq!(batch.num_rows(), 2);
let col = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(col.value(0), 21);
assert_eq!(col.value(1), 22);
}

#[test]
fn test_decode_stream_with_schema() {
struct TestCase<'a> {
Expand Down
Loading
Loading