Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for zstd compression #501

Closed
mikmatko opened this issue Apr 25, 2019 · 10 comments
Closed

Add support for zstd compression #501

mikmatko opened this issue Apr 25, 2019 · 10 comments

Comments

@mikmatko
Copy link

Zstandard support was added in Kafka 2.1.0 (KIP-110)

Would it be possible for aiokafka to support producing with zstd compression?

Thanks!

Related discussion in kafka-python: dpkp/kafka-python#1791

@tvoinarovskyi
Copy link
Member

Probably it will be easy after kafka-python supports it.

@rjjanuary
Copy link

I'm new to aiokafka (to be frank, aio in general) however I'm also running into the need to utilize zstd. I certainly don't mind helping out where I can. Would anyone mind steering me in the right direction?

I'm in the process of trying to understand the code base, but a gentle nudge would certainly be appreciated.

@sycured sycured mentioned this issue Jan 13, 2021
4 tasks
@sycured
Copy link

sycured commented Jan 13, 2021

@rjjanuary you have a base to work with my PR… I tried to add zstd, but I've something that fail on my MacBook Pro and I can't run tests (an issue with docker api)

@rjjanuary
Copy link

Thank you. I'll start digging in this evening. Thanks!

@sycured
Copy link

sycured commented Jan 14, 2021

@rjjanuary I've a big question to my myself about a possible change on my PR to by like kafka-python about that point, this is the possible change:

diff --git a/aiokafka/record/_crecords/default_records.pyx b/aiokafka/record/_crecords/default_records.pyx
index 2cc653e..eaffaa4 100644
--- a/aiokafka/record/_crecords/default_records.pyx
+++ b/aiokafka/record/_crecords/default_records.pyx
@@ -224,7 +224,7 @@ cdef class DefaultRecordBatch:
                 if compression_type == _ATTR_CODEC_LZ4:
                     uncompressed = lz4_decode(data.tobytes())
                 if compression_type == _ATTR_CODEC_ZSTD:
-                    uncompressed = zstd_decode(data)
+                    uncompressed = zstd_decode(data.tobytes())
         
                 PyBuffer_Release(&self._buffer)
                 PyObject_GetBuffer(uncompressed, &self._buffer, PyBUF_SIMPLE)
diff --git a/aiokafka/record/_crecords/legacy_records.pyx b/aiokafka/record/_crecords/legacy_records.pyx
index 0b56dd1..aaa31ea 100644
--- a/aiokafka/record/_crecords/legacy_records.pyx
+++ b/aiokafka/record/_crecords/legacy_records.pyx
@@ -425,7 +425,7 @@ cdef class LegacyRecordBatchBuilder:
                 else:
                     compressed = lz4_encode(bytes(self._buffer))
             elif self._compression_type == _ATTR_CODEC_ZSTD:
-                    compressed = zstd_encode(self._buffer)
+                    compressed = zstd_encode(bytes(self._buffer))
             else:
                 return 0
             size = _size_in_bytes(self._magic, key=None, value=compressed)
diff --git a/aiokafka/record/default_records.py b/aiokafka/record/default_records.py
index 33aaf0d..6eb4e65 100644
--- a/aiokafka/record/default_records.py
+++ b/aiokafka/record/default_records.py
@@ -186,7 +186,7 @@ class _DefaultRecordBatchPy(DefaultRecordBase):
                 if compression_type == self.CODEC_LZ4:
                     uncompressed = lz4_decode(data.tobytes())
                 if compression_type == self.CODEC_ZSTD:
-                    uncompressed = zstd_decode(data)
+                    uncompressed = zstd_decode(data.tobytes())
                 self._buffer = bytearray(uncompressed)
                 self._pos = 0
         self._decompressed = True
diff --git a/aiokafka/record/legacy_records.py b/aiokafka/record/legacy_records.py
index c75aa11..44c6b32 100644
--- a/aiokafka/record/legacy_records.py
+++ b/aiokafka/record/legacy_records.py
@@ -146,7 +146,7 @@ class _LegacyRecordBatchPy(LegacyRecordBase):
             else:
                 uncompressed = lz4_decode(data.tobytes())
         elif compression_type == self.CODEC_ZSTD:
-            uncompressed = zstd_decode(data)
+            uncompressed = zstd_decode(data.tobytes())
         return uncompressed
 
     def _read_header(self, pos):
@@ -404,7 +404,7 @@ class _LegacyRecordBatchBuilderPy(LegacyRecordBase):
                 else:
                     compressed = lz4_encode(bytes(buf))
             elif self._compression_type == self.CODEC_ZSTD:
-                compressed = zstd_encode(buf)
+                compressed = zstd_encode(bytes(buf))
             compressed_size = len(compressed)
             size = self._size_in_bytes(key_size=0, value_size=compressed_size)
             if size > len(self._buffer):

It's what I don't understand because during my test on my laptop, I obtain the same thing without and with this patch to be exactly like kafka-python so I don't know if we need this path inside aiokafka.

edit: I tried and the difference is about json.dumps and the necessity to add .encode('uff-8') or not. I prefer to force the encoding but my big question is: what's the best between a pure copy/paste with kafka-python and be a drop-in replacement or not?

@rjjanuary
Copy link

Personally, I see no issues with your approach. Unfortunately I'm dealing with some issues in my project as well, causing issues with testing.

I feel I'm still working to truly comprehend some of the basic concepts of aio in python. To me the programing paradigms are different enough that I struggle with understanding what it means to be a drop in replacement.

@sycured
Copy link

sycured commented Jan 15, 2021

I was with some little problem with a virtual env and when I recreated it and did: source .venv/bin/activate && pip install -Ue /Volumes/Macintosh\ HD\ -\ Case\ Sensitive/git_repositories/public/aiokafka
It was ready to be used, but I'm sure that I broke my older env with another thing.

About the drop-in replacement, it's more about moving from kafka-python to aiokafka without a lot of refactorings, just replacing the library's name. It's what you can see in my patch (my other comment).

If you can use it, it's perfect.

@wijagels
Copy link

Looks like this is done in #801. Would appreciate a release to get this available on PyPI

@ods ods closed this as completed Jul 5, 2022
@Rohit-Singh3
Copy link

getting below error when trying to consume from a ZSTD compressed record from kafka.

raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

error log:

Traceback (most recent call last):
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in
consume_from_topic(topic_to_consume)
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic
for message in consumer:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next
return self.next_v2()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records
self._next_partition_records = self._parse_fetched_data(completion)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set
for record in batch:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter
self._maybe_uncompress()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress
self._assert_has_codec(compression_type)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec
raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

Process finished with exit code 1

My Code for consuming a kafka topic:

from kafka import KafkaConsumer
def consume_from_topic(topic):
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
group_id='zstd-11-consumer-group',
auto_offset_reset='earliest',
enable_auto_commit=True
)
try:
for message in consumer:
v = message.value
k = message.key.decode("utf-8")
log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v)
print(log)

except KeyboardInterrupt:
consumer.close()
if name == "main":
topic_to_consume = "Integrate-Package-Zstd-ESP.info"
consume_from_topic(topic_to_consume)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants