Skip to content

Commit 5002d67

Browse files
author
Gabriel Tincu
committed
PR change requests
Update readme zstd decompress will transform memoryview object to bytes before decompressing, to keep the same behavior as other decompression strategies Decrease fallback max message size due to OOM concerns Rewrite the 1MB message limit Test producer constructor makes use of broker version inference again, also add logic for zstd decompression
1 parent c9b5ba2 commit 5002d67

File tree

5 files changed

+11
-8
lines changed

5 files changed

+11
-8
lines changed

docs/index.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ multiprocessing is recommended.
122122
Compression
123123
***********
124124

125-
kafka-python supports multiple compression types. To produce or
125+
kafka-python supports multiple compression types:
126+
126127
- gzip : supported natively
127128
- lz4 : requires `python-lz4 <https://pypi.org/project/lz4/>`_ installed
128129
- snappy : requires the `python-snappy <https://pypi.org/project/python-snappy/>`_ package (which requires the snappy C library)

kafka/codec.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
1212
_XERIAL_V1_FORMAT = 'bccccccBii'
13-
ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3
13+
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024
1414

1515
try:
1616
import snappy

kafka/protocol/message.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import io
44
import time
55

6-
from kafka.codec import (has_gzip, has_snappy, has_lz4,
7-
gzip_decode, snappy_decode,
6+
from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd,
7+
gzip_decode, snappy_decode, zstd_decode,
88
lz4_decode, lz4_decode_old_kafka)
99
from kafka.protocol.frame import KafkaBytes
1010
from kafka.protocol.struct import Struct
@@ -35,6 +35,7 @@ class Message(Struct):
3535
CODEC_GZIP = 0x01
3636
CODEC_SNAPPY = 0x02
3737
CODEC_LZ4 = 0x03
38+
CODEC_ZSTD = 0x04
3839
TIMESTAMP_TYPE_MASK = 0x08
3940
HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
4041

@@ -119,7 +120,7 @@ def is_compressed(self):
119120

120121
def decompress(self):
121122
codec = self.attributes & self.CODEC_MASK
122-
assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
123+
assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD)
123124
if codec == self.CODEC_GZIP:
124125
assert has_gzip(), 'Gzip decompression unsupported'
125126
raw_bytes = gzip_decode(self.value)
@@ -132,6 +133,9 @@ def decompress(self):
132133
raw_bytes = lz4_decode_old_kafka(self.value)
133134
else:
134135
raw_bytes = lz4_decode(self.value)
136+
elif codec == self.CODEC_ZSTD:
137+
assert has_zstd(), "ZSTD decompression unsupported"
138+
raw_bytes = zstd_decode(self.value)
135139
else:
136140
raise Exception('This should be impossible')
137141

kafka/record/default_records.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def _maybe_uncompress(self):
189189
if compression_type == self.CODEC_LZ4:
190190
uncompressed = lz4_decode(data.tobytes())
191191
if compression_type == self.CODEC_ZSTD:
192-
uncompressed = zstd_decode(data)
192+
uncompressed = zstd_decode(data.tobytes())
193193
self._buffer = bytearray(uncompressed)
194194
self._pos = 0
195195
self._decompressed = True

test/test_producer.py

-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ def test_end_to_end(kafka_broker, compression):
3838
producer = KafkaProducer(bootstrap_servers=connect_str,
3939
retries=5,
4040
max_block_ms=30000,
41-
api_version=env_kafka_version(),
4241
compression_type=compression,
4342
value_serializer=str.encode)
4443
consumer = KafkaConsumer(bootstrap_servers=connect_str,
@@ -89,7 +88,6 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
8988
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
9089
producer = KafkaProducer(bootstrap_servers=connect_str,
9190
retries=5,
92-
api_version=env_kafka_version(),
9391
max_block_ms=30000,
9492
compression_type=compression)
9593
magic = producer._max_usable_produce_magic()

0 commit comments

Comments
 (0)