Skip to content

Commit

Permalink
skip the whole confluent schema header in proto deser (#763)
Browse files Browse the repository at this point in the history
  • Loading branch information
emef authored Oct 23, 2024
1 parent 843e327 commit e5395fb
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 3 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/arroyo-formats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ prost-build = { workspace = true }
prost-types = { workspace = true}
base64 = "0.22.1"
uuid = { version = "1.10.0", features = ["v4"] }
regex = "1.10.6"
regex = "1.10.6"
integer-encoding = "4.0.2"
30 changes: 29 additions & 1 deletion crates/arroyo-formats/src/proto/de.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::float_to_json;
use anyhow::anyhow;
use arroyo_rpc::formats::ProtobufFormat;
use arroyo_types::SourceError;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use integer_encoding::VarInt;
use prost_reflect::{DescriptorPool, DynamicMessage, FieldDescriptor, Kind, MapKey, Value};
use serde_json::Value as JsonValue;

Expand All @@ -12,7 +14,9 @@ pub(crate) fn deserialize_proto(
mut msg: &[u8],
) -> Result<serde_json::Value, SourceError> {
if proto.confluent_schema_registry {
msg = &msg[5..];
skip_confluent_header(&mut msg).map_err(|e| {
SourceError::bad_data(format!("invalid confluent schema header: {:?}", e))
})?;
}

let message = proto.message_name.as_ref().expect("no message name");
Expand Down Expand Up @@ -85,3 +89,27 @@ fn proto_value_to_json(field: &FieldDescriptor, value: &Value) -> JsonValue {
}
}
}

// see: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
fn skip_confluent_header(msg: &mut &[u8]) -> anyhow::Result<()> {
// skip magic byte + schema ID
*msg = &msg[5..];

// skip message indexes array
if msg[0] == 0 {
*msg = &msg[1..];
} else {
let count = read_varint(msg)?;
for _ in 0..count {
read_varint(msg)?;
}
}
Ok(())
}

fn read_varint(msg: &mut &[u8]) -> anyhow::Result<i32> {
let (value, bytes_read) =
i32::decode_var(msg).ok_or_else(|| anyhow!("could not read varint"))?;
*msg = &msg[bytes_read..];
Ok(value)
}

0 comments on commit e5395fb

Please sign in to comment.