Skip to content

Commit

Permalink
Update producer test to use pytest.skip instead of return statements
Browse files Browse the repository at this point in the history
Harden zstd decompression for missing frame size information (can happen when the sender is not under our control)
  • Loading branch information
Gabriel Tincu committed Mar 19, 2020
1 parent ca756d3 commit 7ca9851
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
6 changes: 5 additions & 1 deletion kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3

try:
import snappy
Expand Down Expand Up @@ -319,4 +320,7 @@ def zstd_encode(payload):
def zstd_decode(payload):
if not zstd:
raise NotImplementedError("Zstd codec is not available")
return zstd.ZstdDecompressor().decompress(payload)
try:
return zstd.ZstdDecompressor().decompress(payload)
except zstd.ZstdError:
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)
14 changes: 5 additions & 9 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ def test_buffer_pool():
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_end_to_end(kafka_broker, compression):
if compression == 'lz4':
# LZ4 requires 0.8.2
if env_kafka_version() < (0, 8, 2):
return
# python-lz4 crashes on older versions of pypy
pytest.skip('LZ4 requires 0.8.2')
elif platform.python_implementation() == 'PyPy':
return
pytest.skip('python-lz4 crashes on older versions of pypy')

if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
return
pytest.skip('zstd requires kafka 2.1.0 or newer')

connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
Expand Down Expand Up @@ -87,7 +85,7 @@ def test_kafka_producer_gc_cleanup():
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
return
pytest.skip('zstd requires 2.1.0 or more')
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
Expand Down Expand Up @@ -130,10 +128,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
if headers:
assert record.serialized_header_size == 22

# generated timestamp case is skipped for broker 0.9 and below
if magic == 0:
return

pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
send_time = time.time() * 1000
future = producer.send(
topic,
Expand Down

0 comments on commit 7ca9851

Please sign in to comment.