Skip to content

Commit a27ab88

Browse files
author
Tincu Gabriel
authored
Add support for zstd compression (#2021)
1 parent 08ea211 commit a27ab88

File tree

10 files changed

+75
-23
lines changed

10 files changed

+75
-23
lines changed

.travis.yml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ addons:
2222
apt:
2323
packages:
2424
- libsnappy-dev
25+
- libzstd-dev
2526
- openjdk-8-jdk
2627

2728
cache:

docs/index.rst

+5-4
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,12 @@ multiprocessing is recommended.
122122
Compression
123123
***********
124124

125-
kafka-python supports gzip compression/decompression natively. To produce or
126-
consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
127-
To enable snappy, install python-snappy (also requires snappy library).
128-
See `Installation <install.html#optional-snappy-install>`_ for more information.
125+
kafka-python supports multiple compression types:
129126

127+
- gzip : supported natively
128+
- lz4 : requires `python-lz4 <https://pypi.org/project/lz4/>`_ installed
129+
- snappy : requires the `python-snappy <https://pypi.org/project/python-snappy/>`_ package (which requires the snappy C library)
130+
- zstd : requires the `python-zstandard <https://github.com/indygreg/python-zstandard>`_ package installed
130131

131132
Protocol
132133
********

kafka/codec.py

+25
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,18 @@
1010

1111
_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
1212
_XERIAL_V1_FORMAT = 'bccccccBii'
13+
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024
1314

1415
try:
1516
import snappy
1617
except ImportError:
1718
snappy = None
1819

20+
try:
21+
import zstandard as zstd
22+
except ImportError:
23+
zstd = None
24+
1925
try:
2026
import lz4.frame as lz4
2127

@@ -58,6 +64,10 @@ def has_snappy():
5864
return snappy is not None
5965

6066

67+
def has_zstd():
68+
return zstd is not None
69+
70+
6171
def has_lz4():
6272
if lz4 is not None:
6373
return True
@@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload):
299309
payload[header_size:]
300310
])
301311
return lz4_decode(munged_payload)
312+
313+
314+
def zstd_encode(payload):
315+
if not zstd:
316+
raise NotImplementedError("Zstd codec is not available")
317+
return zstd.ZstdCompressor().compress(payload)
318+
319+
320+
def zstd_decode(payload):
321+
if not zstd:
322+
raise NotImplementedError("Zstd codec is not available")
323+
try:
324+
return zstd.ZstdDecompressor().decompress(payload)
325+
except zstd.ZstdError:
326+
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)

kafka/producer/kafka.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
import kafka.errors as Errors
1414
from kafka.client_async import KafkaClient, selectors
15-
from kafka.codec import has_gzip, has_snappy, has_lz4
15+
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
1616
from kafka.metrics import MetricConfig, Metrics
1717
from kafka.partitioner.default import DefaultPartitioner
1818
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
@@ -119,7 +119,7 @@ class KafkaProducer(object):
119119
available guarantee.
120120
If unset, defaults to acks=1.
121121
compression_type (str): The compression type for all data generated by
122-
the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
122+
the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None.
123123
Compression is of full batches of data, so the efficacy of batching
124124
will also impact the compression ratio (more batching means better
125125
compression). Default: None.
@@ -339,6 +339,7 @@ class KafkaProducer(object):
339339
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
340340
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
341341
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
342+
'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD),
342343
None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE),
343344
}
344345

@@ -388,6 +389,9 @@ def __init__(self, **configs):
388389
if self.config['compression_type'] == 'lz4':
389390
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
390391

392+
if self.config['compression_type'] == 'zstd':
393+
assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers'
394+
391395
# Check compression_type for library support
392396
ct = self.config['compression_type']
393397
if ct not in self._COMPRESSORS:

kafka/protocol/message.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import io
44
import time
55

6-
from kafka.codec import (has_gzip, has_snappy, has_lz4,
7-
gzip_decode, snappy_decode,
6+
from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd,
7+
gzip_decode, snappy_decode, zstd_decode,
88
lz4_decode, lz4_decode_old_kafka)
99
from kafka.protocol.frame import KafkaBytes
1010
from kafka.protocol.struct import Struct
@@ -35,6 +35,7 @@ class Message(Struct):
3535
CODEC_GZIP = 0x01
3636
CODEC_SNAPPY = 0x02
3737
CODEC_LZ4 = 0x03
38+
CODEC_ZSTD = 0x04
3839
TIMESTAMP_TYPE_MASK = 0x08
3940
HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
4041

@@ -119,7 +120,7 @@ def is_compressed(self):
119120

120121
def decompress(self):
121122
codec = self.attributes & self.CODEC_MASK
122-
assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
123+
assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD)
123124
if codec == self.CODEC_GZIP:
124125
assert has_gzip(), 'Gzip decompression unsupported'
125126
raw_bytes = gzip_decode(self.value)
@@ -132,6 +133,9 @@ def decompress(self):
132133
raw_bytes = lz4_decode_old_kafka(self.value)
133134
else:
134135
raw_bytes = lz4_decode(self.value)
136+
elif codec == self.CODEC_ZSTD:
137+
assert has_zstd(), "ZSTD decompression unsupported"
138+
raw_bytes = zstd_decode(self.value)
135139
else:
136140
raise Exception('This should be impossible')
137141

kafka/record/default_records.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@
6262
)
6363
from kafka.errors import CorruptRecordException, UnsupportedCodecError
6464
from kafka.codec import (
65-
gzip_encode, snappy_encode, lz4_encode,
66-
gzip_decode, snappy_decode, lz4_decode
65+
gzip_encode, snappy_encode, lz4_encode, zstd_encode,
66+
gzip_decode, snappy_decode, lz4_decode, zstd_decode
6767
)
6868
import kafka.codec as codecs
6969

@@ -97,6 +97,7 @@ class DefaultRecordBase(object):
9797
CODEC_GZIP = 0x01
9898
CODEC_SNAPPY = 0x02
9999
CODEC_LZ4 = 0x03
100+
CODEC_ZSTD = 0x04
100101
TIMESTAMP_TYPE_MASK = 0x08
101102
TRANSACTIONAL_MASK = 0x10
102103
CONTROL_MASK = 0x20
@@ -111,6 +112,8 @@ def _assert_has_codec(self, compression_type):
111112
checker, name = codecs.has_snappy, "snappy"
112113
elif compression_type == self.CODEC_LZ4:
113114
checker, name = codecs.has_lz4, "lz4"
115+
elif compression_type == self.CODEC_ZSTD:
116+
checker, name = codecs.has_zstd, "zstd"
114117
if not checker():
115118
raise UnsupportedCodecError(
116119
"Libraries for {} compression codec not found".format(name))
@@ -185,6 +188,8 @@ def _maybe_uncompress(self):
185188
uncompressed = snappy_decode(data.tobytes())
186189
if compression_type == self.CODEC_LZ4:
187190
uncompressed = lz4_decode(data.tobytes())
191+
if compression_type == self.CODEC_ZSTD:
192+
uncompressed = zstd_decode(data.tobytes())
188193
self._buffer = bytearray(uncompressed)
189194
self._pos = 0
190195
self._decompressed = True
@@ -517,6 +522,8 @@ def _maybe_compress(self):
517522
compressed = snappy_encode(data)
518523
elif self._compression_type == self.CODEC_LZ4:
519524
compressed = lz4_encode(data)
525+
elif self._compression_type == self.CODEC_ZSTD:
526+
compressed = zstd_encode(data)
520527
compressed_size = len(compressed)
521528
if len(data) <= compressed_size:
522529
# We did not get any benefit from compression, lets send

kafka/record/memory_records.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class MemoryRecordsBuilder(object):
117117

118118
def __init__(self, magic, compression_type, batch_size):
119119
assert magic in [0, 1, 2], "Not supported magic"
120-
assert compression_type in [0, 1, 2, 3], "Not valid compression type"
120+
assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type"
121121
if magic >= 2:
122122
self._builder = DefaultRecordBatchBuilder(
123123
magic=magic, compression_type=compression_type,

test/test_codec.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
from kafka.vendor.six.moves import range
88

99
from kafka.codec import (
10-
has_snappy, has_lz4,
10+
has_snappy, has_lz4, has_zstd,
1111
gzip_encode, gzip_decode,
1212
snappy_encode, snappy_decode,
1313
lz4_encode, lz4_decode,
1414
lz4_encode_old_kafka, lz4_decode_old_kafka,
15+
zstd_encode, zstd_decode,
1516
)
1617

1718
from test.testutil import random_string
@@ -113,3 +114,11 @@ def test_lz4_incremental():
113114
b2 = lz4_decode(lz4_encode(b1))
114115
assert len(b1) == len(b2)
115116
assert b1 == b2
117+
118+
119+
@pytest.mark.skipif(not has_zstd(), reason="Zstd not available")
120+
def test_zstd():
121+
for _ in range(1000):
122+
b1 = random_string(100).encode('utf-8')
123+
b2 = zstd_decode(zstd_encode(b1))
124+
assert b1 == b2

test/test_producer.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ def test_buffer_pool():
2323

2424

2525
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
26-
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
26+
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
2727
def test_end_to_end(kafka_broker, compression):
28-
2928
if compression == 'lz4':
30-
# LZ4 requires 0.8.2
3129
if env_kafka_version() < (0, 8, 2):
32-
return
33-
# python-lz4 crashes on older versions of pypy
30+
pytest.skip('LZ4 requires 0.8.2')
3431
elif platform.python_implementation() == 'PyPy':
35-
return
32+
pytest.skip('python-lz4 crashes on older versions of pypy')
33+
34+
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
35+
pytest.skip('zstd requires kafka 2.1.0 or newer')
3636

3737
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
3838
producer = KafkaProducer(bootstrap_servers=connect_str,
@@ -81,8 +81,10 @@ def test_kafka_producer_gc_cleanup():
8181

8282

8383
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
84-
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
84+
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
8585
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
86+
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
87+
pytest.skip('zstd requires 2.1.0 or more')
8688
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
8789
producer = KafkaProducer(bootstrap_servers=connect_str,
8890
retries=5,
@@ -124,10 +126,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
124126
if headers:
125127
assert record.serialized_header_size == 22
126128

127-
# generated timestamp case is skipped for broker 0.9 and below
128129
if magic == 0:
129-
return
130-
130+
pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
131131
send_time = time.time() * 1000
132132
future = producer.send(
133133
topic,

tox.ini

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ deps =
1515
pytest-mock
1616
mock
1717
python-snappy
18+
zstandard
1819
lz4
1920
xxhash
2021
crc32c

0 commit comments

Comments
 (0)