diff --git a/Cargo.lock b/Cargo.lock index 7de2c31534..565d222777 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1831,6 +1831,7 @@ dependencies = [ "itertools 0.10.5", "kafka-protocol", "labels", + "lz4_flex", "md5", "metrics", "metrics-prometheus", @@ -3293,9 +3294,8 @@ dependencies = [ [[package]] name = "kafka-protocol" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3201cc84e70fc4e24b0ebf4dac3eb2a8d3460ec3f2b29fd197fa8051fc0db41c" +version = "0.12.0" +source = "git+https://github.com/tychedelia/kafka-protocol-rs.git?rev=cabe835#cabe835a9cc87fe8ea64649ff599d96a61e1fb66" dependencies = [ "anyhow", "bytes", @@ -3565,19 +3565,18 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lz4" -version = "1.26.0" +version = "1.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "958b4caa893816eea05507c20cfe47574a43d9a697138a7872990bba8a0ece68" +checksum = "a231296ca742e418c43660cb68e082486ff2538e8db432bc818580f3965025ed" dependencies = [ - "libc", "lz4-sys", ] [[package]] name = "lz4-sys" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" +checksum = "fcb44a01837a858d47e5a630d2ccf304c8efcc4b83b8f9f75b7a9ee4fcc6e57d" dependencies = [ "cc", "libc", diff --git a/Cargo.toml b/Cargo.toml index 1449c35b93..ea190b75a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ flate2 = "1.0" futures = "0.3" futures-core = "0.3" futures-util = "0.3" -fxhash = "0.2" # Used in `json` crate. Replace with xxhash. +fxhash = "0.2" # Used in `json` crate. Replace with xxhash. hex = "0.4.3" hexdump = "0.1" humantime = "2.1" @@ -72,7 +72,9 @@ jemalloc-ctl = "0.3" json-patch = "0.3" jsonwebtoken = { version = "9", default-features = false } js-sys = "0.3.60" -kafka-protocol = "0.11.0" +# TODO(jshearer): Swap back after 0.13.0 is released, which includes +# https://github.com/tychedelia/kafka-protocol-rs/pull/81 +kafka-protocol = { git = "https://github.com/tychedelia/kafka-protocol-rs.git", rev = "cabe835" } lazy_static = "1.4" libc = "0.2" librocksdb-sys = { version = "0.16.0", default-features = false, features = [ @@ -80,6 +82,7 @@ librocksdb-sys = { version = "0.16.0", default-features = false, features = [ "rtti", ] } lz4 = "1.24.0" +lz4_flex = "0.11.0" mime = "0.3" memchr = "2.5" metrics = "0.23.0" diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 9213f0750b..a303162283 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -37,6 +37,7 @@ hex = { workspace = true } hexdump = { workspace = true } itertools = { workspace = true } kafka-protocol = { workspace = true } +lz4_flex = { workspace = true } md5 = { workspace = true } metrics = { workspace = true } metrics-prometheus = { workspace = true } diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 7a70fe904d..5f4c528485 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -1,10 +1,12 @@ use super::{Collection, Partition}; use anyhow::bail; -use bytes::BufMut; +use bytes::{Buf, BufMut, BytesMut}; use doc::AsNode; use futures::StreamExt; use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; +use kafka_protocol::records::Compression; +use lz4_flex::frame::BlockMode; use std::time::Duration; pub struct Read { @@ -255,7 +257,7 @@ impl Read { compression: Compression::None, version: 2, }; - RecordBatchEncoder::encode(&mut buf, records.iter(), &opts) + RecordBatchEncoder::encode(&mut buf, records.iter(), &opts, Some(compressor)) .expect("record encoding cannot fail"); tracing::debug!( @@ -276,3 +278,28 @@ impl Read { Ok((self, buf.freeze())) } } + +fn compressor( + input: &mut BytesMut, + output: &mut Output, + c: Compression, +) -> anyhow::Result<()> { + match c { + Compression::None => output.put(input), + Compression::Lz4 => { + let mut frame_info = lz4_flex::frame::FrameInfo::default(); + // This breaks Go lz4 decoding + // frame_info.block_checksums = true; + frame_info.block_mode = BlockMode::Independent; + + let mut encoder = + lz4_flex::frame::FrameEncoder::with_frame_info(frame_info, output.writer()); + + std::io::copy(&mut input.reader(), &mut encoder)?; + + encoder.finish()?; + } + unsupported @ _ => bail!("Unsupported compression type {unsupported:?}"), + }; + Ok(()) +}