From 0331ddc464b85b577cca3018dafbeca01c3dddb4 Mon Sep 17 00:00:00 2001 From: William Barnhart Date: Tue, 26 Mar 2024 19:26:20 -0400 Subject: [PATCH] Add zstd support on legacy record and ensure no variable is referred before definition (#138) * fix if statement logic and add zstd check * fix if statement logic and add zstd uncompress * fix imports * avoid variable be used before definition * Remove unused import from legacy_records.py --------- Co-authored-by: Alexandre Souza --- kafka/record/default_records.py | 4 ++++ kafka/record/legacy_records.py | 13 +++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 91eb5c8a0..8b630cc8b 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -115,6 +115,8 @@ def _assert_has_codec(self, compression_type: int) -> None: checker, name = codecs.has_lz4, "lz4" elif compression_type == self.CODEC_ZSTD: checker, name = codecs.has_zstd, "zstd" + else: + checker, name = lambda: False, "Unknown" if not checker(): raise UnsupportedCodecError( f"Libraries for {name} compression codec not found") @@ -525,6 +527,8 @@ def _maybe_compress(self) -> bool: compressed = lz4_encode(data) elif self._compression_type == self.CODEC_ZSTD: compressed = zstd_encode(data) + else: + compressed = '' # unknown compressed_size = len(compressed) if len(data) <= compressed_size: # We did not get any benefit from compression, lets send diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py index b77799f4d..4439462f6 100644 --- a/kafka/record/legacy_records.py +++ b/kafka/record/legacy_records.py @@ -49,8 +49,8 @@ from kafka.record.util import calc_crc32 from kafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, - gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka, + gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka, zstd_encode, + gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka, zstd_decode ) import kafka.codec as codecs from kafka.errors import CorruptRecordException, UnsupportedCodecError @@ -110,6 +110,7 @@ class LegacyRecordBase: CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 LOG_APPEND_TIME = 1 @@ -124,6 +125,10 @@ def _assert_has_codec(self, compression_type: int) -> None: checker, name = codecs.has_snappy, "snappy" elif compression_type == self.CODEC_LZ4: checker, name = codecs.has_lz4, "lz4" + elif compression_type == self.CODEC_ZSTD: + checker, name = codecs.has_zstd, "zstd" + else: + checker, name = lambda: False, "Unknown" if not checker(): raise UnsupportedCodecError( f"Libraries for {name} compression codec not found") @@ -195,6 +200,10 @@ def _decompress(self, key_offset: int) -> bytes: uncompressed = lz4_decode_old_kafka(data.tobytes()) else: uncompressed = lz4_decode(data.tobytes()) + elif compression_type == self.CODEC_ZSTD: + uncompressed = zstd_decode(data) + else: + raise ValueError("Unknown Compression Type - %s" % compression_type) return uncompressed def _read_header(self, pos: int) -> Union[Tuple[int, int, int, int, int, None], Tuple[int, int, int, int, int, int]]: