Skip to content

Commit 348ae91

Browse files
jecsand838mbrobbel
andauthored
Add support for 64-bit Schema Registry IDs (Id64) in arrow-avro (#8575)
# Which issue does this PR close? We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. - Part of #4886 # Rationale for this change Many Kafka deployments use **Confluent Schema Registry** (4‑byte, big‑endian ID) and/or **Apicurio Registry** (commonly 8‑byte, big‑endian *global* ID). `arrow-avro` already supported the Confluent wire format; this PR adds first‑class support for Apicurio’s 64‑bit ID, enabling seamless decode/encode of streams that carry an 8‑byte ID after the magic byte. This improves interoperability with ecosystems that standardize on Apicurio or Red Hat Event Streams. # What changes are included in this PR? * **New 64‑bit ID support** * Add `Fingerprint::Id64(u64)` and `FingerprintAlgorithm::Id64`. * Add `FingerprintStrategy::Id64(u64)` and helper `Fingerprint::load_fingerprint_id64`. * Extend `Fingerprint::serialized_prefix` to emit/read an 8‑byte big‑endian ID after the `0x00` magic byte. * **Clarify/algin numeric‑ID algorithm names** * Replace the prior `FingerprintAlgorithm::None` (numeric ID) with `FingerprintAlgorithm::Id` (4‑byte) and introduce `Id64` (8‑byte). All examples and call sites updated accordingly (i.e., `SchemaStore::new_with_type(FingerprintAlgorithm::Id)`). * **Reader/Writer plumbing** * `Decoder` now understands both `Id` (4‑byte) and `Id64` (8‑byte) prefixes. * `WriterBuilder` accepts `FingerprintStrategy::Id64` to write frames with a 64‑bit ID. * **SchemaStore behavior** * `SchemaStore::register` now errors for `Id`/`Id64` algorithms (as those IDs come from a registry); callers should use `set(Fingerprint::Id(_)|Id64(_), ...)` to associate schemas by registry ID. * **Docs & examples** * Reader docs expanded to call out Confluent (4‑byte) and Apicurio (8‑byte) formats; examples switched to `FingerprintAlgorithm::Id`. Bench and example updates reflect the new variants. # Are these changes tested? Yes. This PR adds/updates unit tests that exercise the new path end‑to‑end, including: * `test_stream_writer_with_id64_fingerprint_rt` (writer round‑trip with 64‑bit ID). * `test_two_messages_same_schema_id64` (decoder round‑trip with 64‑bit ID). * Adjustments to existing tests and benches to use `FingerprintAlgorithm::Id` instead of `None`. # Are there any user-facing changes? N/A because `arrow-avro` isn't public yet. --------- Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>
1 parent 8e669e7 commit 348ae91

File tree

6 files changed

+195
-30
lines changed

6 files changed

+195
-30
lines changed

arrow-avro/benches/decoder.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ fn make_prefix(fp: Fingerprint) -> Vec<u8> {
4848
buf.extend_from_slice(&id.to_be_bytes()); // big-endian
4949
buf
5050
}
51+
Fingerprint::Id64(id) => {
52+
let mut buf = Vec::with_capacity(CONFLUENT_MAGIC.len() + size_of::<u64>());
53+
buf.extend_from_slice(&CONFLUENT_MAGIC); // 00
54+
buf.extend_from_slice(&id.to_be_bytes()); // big-endian
55+
buf
56+
}
5157
#[cfg(feature = "md5")]
5258
Fingerprint::MD5(val) => {
5359
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of_val(&val));
@@ -366,7 +372,7 @@ fn new_decoder_id(
366372
id: u32,
367373
) -> arrow_avro::reader::Decoder {
368374
let schema = AvroSchema::new(schema_json.parse().unwrap());
369-
let mut store = arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::None);
375+
let mut store = arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::Id);
370376
// Register the schema with a provided Confluent-style ID
371377
store
372378
.set(Fingerprint::Id(id), schema.clone())

arrow-avro/examples/decode_kafka_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
171171
let id_v0: u32 = 0;
172172
let id_v1: u32 = 1;
173173

174-
// Confluent SchemaStore keyed by integer IDs (FingerprintAlgorithm::None)
175-
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
174+
// Confluent SchemaStore keyed by integer IDs (FingerprintAlgorithm::Id)
175+
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
176176
store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
177177
store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
178178

arrow-avro/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
//! "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]
9696
//! }"#;
9797
//!
98-
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
98+
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
9999
//! let id: u32 = 1;
100100
//! store.set(Fingerprint::Id(id), AvroSchema::new(avro_json.to_string()))?;
101101
//!

arrow-avro/src/reader/mod.rs

Lines changed: 71 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,14 @@
6060
//! <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
6161
//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a **4‑byte big‑endian**
6262
//! schema ID, then the Avro‑encoded body. Use `Decoder` with a `SchemaStore` configured
63-
//! for `FingerprintAlgorithm::None` and entries keyed by `Fingerprint::Id`. See
63+
//! for `FingerprintAlgorithm::Id` and entries keyed by `Fingerprint::Id`. See
6464
//! Confluent’s “Wire format” documentation.
6565
//! <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
66+
//! * **Apicurio Schema Registry wire format**: A 1‑byte magic `0x00`, a **8‑byte big‑endian**
67+
//! global schema ID, then the Avro‑encoded body. Use `Decoder` with a `SchemaStore` configured
68+
//! for `FingerprintAlgorithm::Id64` and entries keyed by `Fingerprint::Id64`. See
69+
//! Apicurio’s “Avro SerDe” documentation.
70+
//! <https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry>
6671
//!
6772
//! ## Basic file usage (OCF)
6873
//!
@@ -99,7 +104,7 @@
99104
//! # Ok(()) }
100105
//! ```
101106
//!
102-
//! ## Streaming usage (single‑object / Confluent)
107+
//! ## Streaming usage (single‑object / Confluent / Apicurio)
103108
//!
104109
//! The `Decoder` lets you integrate Avro decoding with **any** source of bytes by
105110
//! periodically calling `Decoder::decode` with new data and calling `Decoder::flush`
@@ -220,7 +225,7 @@
220225
//!
221226
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
222227
//! // Set up a store keyed by numeric IDs (Confluent).
223-
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
228+
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
224229
//! let schema_id = 7u32;
225230
//! let avro_schema = AvroSchema::new(r#"{"type":"record","name":"User","fields":[
226231
//! {"name":"id","type":"long"}, {"name":"name","type":"string"}]}"#.to_string());
@@ -380,7 +385,7 @@
380385
//! let id_v0: u32 = 0;
381386
//! let id_v1: u32 = 1;
382387
//!
383-
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None); // integer IDs
388+
//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id); // integer IDs
384389
//! store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
385390
//! store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
386391
//!
@@ -591,7 +596,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
591596
/// use arrow_avro::reader::ReaderBuilder;
592597
///
593598
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
594-
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
599+
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
595600
/// store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;
596601
///
597602
/// // --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---
@@ -713,9 +718,12 @@ impl Decoder {
713718
Fingerprint::Rabin(u64::from_le_bytes(bytes))
714719
})
715720
}
716-
FingerprintAlgorithm::None => {
721+
FingerprintAlgorithm::Id => self.handle_prefix_common(buf, &CONFLUENT_MAGIC, |bytes| {
722+
Fingerprint::Id(u32::from_be_bytes(bytes))
723+
}),
724+
FingerprintAlgorithm::Id64 => {
717725
self.handle_prefix_common(buf, &CONFLUENT_MAGIC, |bytes| {
718-
Fingerprint::Id(u32::from_be_bytes(bytes))
726+
Fingerprint::Id64(u64::from_be_bytes(bytes))
719727
})
720728
}
721729
#[cfg(feature = "md5")]
@@ -909,7 +917,7 @@ impl Decoder {
909917
/// use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm};
910918
/// use arrow_avro::reader::ReaderBuilder;
911919
///
912-
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
920+
/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
913921
/// store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[]}"#.to_string()))?;
914922
///
915923
/// let decoder = ReaderBuilder::new()
@@ -1409,6 +1417,9 @@ mod test {
14091417
Fingerprint::Id(v) => {
14101418
panic!("make_prefix expects a Rabin fingerprint, got ({v})");
14111419
}
1420+
Fingerprint::Id64(v) => {
1421+
panic!("make_prefix expects a Rabin fingerprint, got ({v})");
1422+
}
14121423
#[cfg(feature = "md5")]
14131424
Fingerprint::MD5(v) => {
14141425
panic!("make_prefix expects a Rabin fingerprint, got ({v:?})");
@@ -1445,6 +1456,21 @@ mod test {
14451456
msg
14461457
}
14471458

1459+
fn make_id64_prefix(id: u64, additional: usize) -> Vec<u8> {
1460+
let capacity = CONFLUENT_MAGIC.len() + size_of::<u64>() + additional;
1461+
let mut out = Vec::with_capacity(capacity);
1462+
out.extend_from_slice(&CONFLUENT_MAGIC);
1463+
out.extend_from_slice(&id.to_be_bytes());
1464+
out
1465+
}
1466+
1467+
fn make_message_id64(id: u64, value: i64) -> Vec<u8> {
1468+
let encoded_value = encode_zigzag(value);
1469+
let mut msg = make_id64_prefix(id, encoded_value.len());
1470+
msg.extend_from_slice(&encoded_value);
1471+
msg
1472+
}
1473+
14481474
fn make_value_schema(pt: PrimitiveType) -> AvroSchema {
14491475
let json_schema = format!(
14501476
r#"{{"type":"record","name":"S","fields":[{{"name":"v","type":"{}"}}]}}"#,
@@ -2159,6 +2185,7 @@ mod test {
21592185
let long_bytes = match fp_long {
21602186
Fingerprint::Rabin(v) => v.to_le_bytes(),
21612187
Fingerprint::Id(id) => panic!("expected Rabin fingerprint, got ({id})"),
2188+
Fingerprint::Id64(id) => panic!("expected Rabin fingerprint, got ({id})"),
21622189
#[cfg(feature = "md5")]
21632190
Fingerprint::MD5(v) => panic!("expected Rabin fingerprint, got ({v:?})"),
21642191
#[cfg(feature = "sha256")]
@@ -2183,6 +2210,7 @@ mod test {
21832210
match fp_long {
21842211
Fingerprint::Rabin(v) => buf.extend_from_slice(&v.to_le_bytes()),
21852212
Fingerprint::Id(id) => panic!("expected Rabin fingerprint, got ({id})"),
2213+
Fingerprint::Id64(id) => panic!("expected Rabin fingerprint, got ({id})"),
21862214
#[cfg(feature = "md5")]
21872215
Fingerprint::MD5(v) => panic!("expected Rabin fingerprint, got ({v:?})"),
21882216
#[cfg(feature = "sha256")]
@@ -2269,7 +2297,7 @@ mod test {
22692297
let reader_schema = writer_schema.clone();
22702298
let id = 100u32;
22712299
// Set up store with None fingerprint algorithm and register schema by id
2272-
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
2300+
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
22732301
let _ = store
22742302
.set(Fingerprint::Id(id), writer_schema.clone())
22752303
.expect("set id schema");
@@ -2300,7 +2328,7 @@ mod test {
23002328
let writer_schema = make_value_schema(PrimitiveType::Int);
23012329
let id_known = 7u32;
23022330
let id_unknown = 9u32;
2303-
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
2331+
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
23042332
let _ = store
23052333
.set(Fingerprint::Id(id_known), writer_schema.clone())
23062334
.expect("set id schema");
@@ -2324,7 +2352,7 @@ mod test {
23242352
fn test_handle_prefix_id_incomplete_magic() {
23252353
let writer_schema = make_value_schema(PrimitiveType::Int);
23262354
let id = 5u32;
2327-
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
2355+
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
23282356
let _ = store
23292357
.set(Fingerprint::Id(id), writer_schema.clone())
23302358
.expect("set id schema");
@@ -2341,6 +2369,38 @@ mod test {
23412369
assert!(decoder.pending_schema.is_none());
23422370
}
23432371

2372+
#[test]
2373+
fn test_two_messages_same_schema_id64() {
2374+
let writer_schema = make_value_schema(PrimitiveType::Int);
2375+
let reader_schema = writer_schema.clone();
2376+
let id = 100u64;
2377+
// Set up store with None fingerprint algorithm and register schema by id
2378+
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
2379+
let _ = store
2380+
.set(Fingerprint::Id64(id), writer_schema.clone())
2381+
.expect("set id schema");
2382+
let msg1 = make_message_id64(id, 21);
2383+
let msg2 = make_message_id64(id, 22);
2384+
let input = [msg1.clone(), msg2.clone()].concat();
2385+
let mut decoder = ReaderBuilder::new()
2386+
.with_batch_size(8)
2387+
.with_reader_schema(reader_schema)
2388+
.with_writer_schema_store(store)
2389+
.with_active_fingerprint(Fingerprint::Id64(id))
2390+
.build_decoder()
2391+
.unwrap();
2392+
let _ = decoder.decode(&input).unwrap();
2393+
let batch = decoder.flush().unwrap().expect("batch");
2394+
assert_eq!(batch.num_rows(), 2);
2395+
let col = batch
2396+
.column(0)
2397+
.as_any()
2398+
.downcast_ref::<Int32Array>()
2399+
.unwrap();
2400+
assert_eq!(col.value(0), 21);
2401+
assert_eq!(col.value(1), 22);
2402+
}
2403+
23442404
#[test]
23452405
fn test_decode_stream_with_schema() {
23462406
struct TestCase<'a> {

0 commit comments

Comments
 (0)