Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace python-snappy and zstandard with cramjam #940

Merged
merged 5 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ jobs:

strategy:
matrix:
# 3.11 excluded due to problems with python-snappy
python: ["3.8", "3.9", "3.10"]
python: ["3.8", "3.9", "3.10", "3.11"]
include:
- python: "3.8"
aiokafka_whl: dist/aiokafka-*-cp38-cp38-win_amd64.whl
Expand Down Expand Up @@ -139,9 +138,6 @@ jobs:
with:
python-version: ${{ matrix.python }}

- name: Install system dependencies
run: |
brew install snappy
- name: Install python dependencies
run: |
pip install --upgrade pip setuptools wheel
Expand Down Expand Up @@ -236,7 +232,7 @@ jobs:
source .env/bin/activate && \
yum install -y epel-release && \
yum-config-manager --enable epel && \
yum install -y snappy-devel libzstd-devel krb5-devel && \
yum install -y krb5-devel && \
pip install --upgrade pip setuptools wheel && \
pip install -r requirements-ci.txt && \
pip install ${{ matrix.aiokafka_whl }} && \
Expand Down
7 changes: 1 addition & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ jobs:

strategy:
matrix:
# 3.11 excluded due to problems with python-snappy
python: ["3.8", "3.9", "3.10"]
python: ["3.8", "3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -168,10 +167,6 @@ jobs:
restore-keys: |
${{ runner.os }}-py-${{ matrix.python }}-

- name: Install system dependencies
run: |
brew install snappy

- name: Install python dependencies
run: |
pip install --upgrade pip setuptools wheel
Expand Down
122 changes: 24 additions & 98 deletions aiokafka/codec.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,24 @@
import gzip
import io
import platform
import struct

_XERIAL_V1_HEADER = (-126, b"S", b"N", b"A", b"P", b"P", b"Y", 0, 1, 1)
_XERIAL_V1_FORMAT = "bccccccBii"
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

try:
import snappy
import cramjam
except ImportError:
snappy = None

try:
import zstandard as zstd
except ImportError:
zstd = None
cramjam = None

try:
import lz4.frame as lz4

def _lz4_compress(payload, **kwargs):
# Kafka does not support LZ4 dependent blocks
try:
# For lz4>=0.12.0
kwargs.pop("block_linked", None)
return lz4.compress(payload, block_linked=False, **kwargs)
except TypeError:
# For earlier versions of lz4
kwargs.pop("block_mode", None)
return lz4.compress(payload, block_mode=1, **kwargs)
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-57+-+Interoperable+LZ4+Framing
kwargs.pop("block_linked", None)
return lz4.compress(payload, block_linked=False, **kwargs)

except ImportError:
lz4 = None
Expand All @@ -44,24 +33,17 @@ def _lz4_compress(payload, **kwargs):
except ImportError:
lz4framed = None

try:
import xxhash
except ImportError:
xxhash = None

PYPY = bool(platform.python_implementation() == "PyPy")


def has_gzip():
return True


def has_snappy():
return snappy is not None
return cramjam is not None


def has_zstd():
return zstd is not None
return cramjam is not None


def has_lz4():
Expand Down Expand Up @@ -133,32 +115,22 @@ def snappy_encode(payload, xerial_compatible=True, xerial_blocksize=32 * 1024):
raise NotImplementedError("Snappy codec is not available")

if not xerial_compatible:
return snappy.compress(payload)
return cramjam.snappy.compress_raw(payload)

out = io.BytesIO()
for fmt, dat in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER):
out.write(struct.pack("!" + fmt, dat))

# Chunk through buffers to avoid creating intermediate slice copies
if PYPY:
# on pypy, snappy.compress() on a sliced buffer consumes the entire
# buffer... likely a python-snappy bug, so just use a slice copy
def chunker(payload, i, size):
return payload[i:size + i]

else:
# snappy.compress does not like raw memoryviews, so we have to convert
# tobytes, which is a copy... oh well. it's the thought that counts.
# pylint: disable-msg=undefined-variable
def chunker(payload, i, size):
return memoryview(payload)[i:size + i].tobytes()
def chunker(payload, i, size):
return memoryview(payload)[i:size + i]

for chunk in (
chunker(payload, i, xerial_blocksize)
for i in range(0, len(payload), xerial_blocksize)
):

block = snappy.compress(chunk)
block = cramjam.snappy.compress_raw(chunk)
block_size = len(block)
out.write(struct.pack("!i", block_size))
out.write(block)
Expand Down Expand Up @@ -210,13 +182,13 @@ def snappy_decode(payload):
# Skip the block size
cursor += 4
end = cursor + block_size
out.write(snappy.decompress(byt[cursor:end]))
out.write(cramjam.snappy.decompress_raw(byt[cursor:end]))
cursor = end

out.seek(0)
return out.read()
else:
return snappy.decompress(payload)
return bytes(cramjam.snappy.decompress_raw(payload))


if lz4:
Expand Down Expand Up @@ -253,66 +225,20 @@ def lz4f_decode(payload):
lz4_decode = None


def lz4_encode_old_kafka(payload):
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
flg = data[4]
if not isinstance(flg, int):
flg = ord(flg)

content_size_bit = (flg >> 3) & 1
if content_size_bit:
# Old kafka does not accept the content-size field
# so we need to discard it and reset the header flag
flg -= 8
data = bytearray(data)
data[4] = flg
data = bytes(data)
payload = data[header_size + 8:]
else:
payload = data[header_size:]

# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size - 1]).digest()[
-2:-1
] # pylint: disable-msg=no-member

return b"".join([data[0:header_size - 1], hc, payload])


def lz4_decode_old_kafka(payload):
assert xxhash is not None
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
flg = payload[4]
else:
flg = ord(payload[4])
content_size_bit = (flg >> 3) & 1
if content_size_bit:
header_size += 8

# This should be the correct hc
hc = xxhash.xxh32(payload[4:header_size - 1]).digest()[-2:-1]

munged_payload = b"".join([payload[0:header_size - 1], hc, payload[header_size:]])
return lz4_decode(munged_payload)
def zstd_encode(payload, level=None):
if not has_zstd():
raise NotImplementedError("Zstd codec is not available")

if level is None:
# Default for kafka broker
# https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Support+Compression+Level
level = 3

def zstd_encode(payload):
if not zstd:
raise NotImplementedError("Zstd codec is not available")
return zstd.ZstdCompressor().compress(payload)
return bytes(cramjam.zstd.compress(payload, level=level))


def zstd_decode(payload):
if not zstd:
if not has_zstd():
raise NotImplementedError("Zstd codec is not available")
try:
return zstd.ZstdDecompressor().decompress(payload)
except zstd.ZstdError:
return zstd.ZstdDecompressor().decompress(
payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE
)

return bytes(cramjam.zstd.decompress(payload))
7 changes: 5 additions & 2 deletions aiokafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
snappy_decode,
zstd_decode,
lz4_decode,
lz4_decode_old_kafka,
)
from aiokafka.errors import UnsupportedCodecError
from aiokafka.util import WeakMethod

from .frame import KafkaBytes
Expand Down Expand Up @@ -156,7 +156,10 @@ def decompress(self):
elif codec == self.CODEC_LZ4:
assert has_lz4(), "LZ4 decompression unsupported"
if self.magic == 0:
raw_bytes = lz4_decode_old_kafka(self.value)
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
raw_bytes = lz4_decode(self.value)
elif codec == self.CODEC_ZSTD:
Expand Down
14 changes: 10 additions & 4 deletions aiokafka/record/_crecords/legacy_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode,
)
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from zlib import crc32 as py_crc32 # needed for windows macro
Expand Down Expand Up @@ -141,7 +141,10 @@ cdef class LegacyRecordBatch:
uncompressed = snappy_decode(value)
elif compression_type == _ATTR_CODEC_LZ4:
if self._magic == 0:
uncompressed = lz4_decode_old_kafka(value)
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
uncompressed = lz4_decode(value)

Expand Down Expand Up @@ -437,7 +440,10 @@ cdef class LegacyRecordBatchBuilder:
compressed = snappy_encode(self._buffer)
elif self._compression_type == _ATTR_CODEC_LZ4:
if self._magic == 0:
compressed = lz4_encode_old_kafka(bytes(self._buffer))
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
compressed = lz4_encode(bytes(self._buffer))
else:
Expand Down
14 changes: 10 additions & 4 deletions aiokafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import aiokafka.codec as codecs
from aiokafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode,
)
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
Expand Down Expand Up @@ -159,7 +159,10 @@ def _decompress(self, key_offset):
uncompressed = snappy_decode(data.tobytes())
elif compression_type == self.CODEC_LZ4:
if self._magic == 0:
uncompressed = lz4_decode_old_kafka(data.tobytes())
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
uncompressed = lz4_decode(data.tobytes())
return uncompressed
Expand Down Expand Up @@ -415,7 +418,10 @@ def _maybe_compress(self):
compressed = snappy_encode(buf)
elif self._compression_type == self.CODEC_LZ4:
if self._magic == 0:
compressed = lz4_encode_old_kafka(bytes(buf))
# https://issues.apache.org/jira/browse/KAFKA-3160
raise UnsupportedCodecError(
"LZ4 is not supported for broker version 0.8/0.9"
)
else:
compressed = lz4_encode(bytes(buf))
compressed_size = len(compressed)
Expand Down
28 changes: 1 addition & 27 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,33 +110,7 @@ from http://landinghub.visualstudio.com/visual-cpp-build-tools
Optional Snappy install
+++++++++++++++++++++++

1. Download and build Snappy from http://google.github.io/snappy/

Ubuntu:

.. code:: bash

apt-get install libsnappy-dev

OSX:

.. code:: bash

brew install snappy

From Source:

.. code:: bash

wget https://github.com/google/snappy/tarball/master
tar xzvf google-snappy-X.X.X-X-XXXXXXXX.tar.gz
cd google-snappy-X.X.X-X-XXXXXXXX
./configure
make
sudo make install


1. Install **aiokafka** with :code:`snappy` extra option
To enable Snappy compression/decompression, install **aiokafka** with :code:`snappy` extra option

.. code:: bash

Expand Down
4 changes: 1 addition & 3 deletions requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ pytest-mock==3.12.0
docker==6.1.2
chardet==4.0.0 # Until fixed requests is released
lz4==3.1.3
xxhash==2.0.2
python-snappy==0.6.1
docutils==0.17.1
Pygments==2.15.0
gssapi==1.8.2
async-timeout==4.0.1
zstandard==0.16.0
cramjam==2.7.0
4 changes: 1 addition & 3 deletions requirements-win-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ pytest-mock==3.12.0
docker==6.0.1
chardet==4.0.0 # Until fixed requests is released
lz4==3.1.3
xxhash==2.0.2
python-snappy==0.6.1
zstandard==0.16.0
cramjam==2.7.0
Loading