-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add v2 record batch format for FetchResponse V4 support. #1185
Conversation
Wow, that was a bit rough... Ended up with 2-3 hacks to support old versions of clients, but this should do for now. I plan to refactor LegacyRecord part to not use Message/MessageSet constructs, but for now, I need a break. In the mean time, feedback would be great. |
Do you mean the old SimpleClient stacks? Or just so new KafkaClient can support old brokers? If the former, than I started working on #1193 last night, and if I got that finished up, that would make your life simpler now and future code maintenance more straightforward... |
@jeffwidman Yea, I meant the SimpleClient stacks. If we remove those, I could also remove some hacks from this PR. |
Then let's remove those first so that we don't have to worry about maintaining/cleaning up those hacks later. I started working on removing them last night, will try to push up a PR later tonight... |
@dpkp I'm satisfied with the changes, but I want your feedback on this. Also, we need to decide where will this change be merged into, is it going for 1.3.X or 2.X.X. I'm more for the second, as we can clean up the hacks for old clients. |
Very impressive! I may have a few style suggestions that I'll write up tomorrow, but generally it looks very good. Re: merge location, I prefer just landing to master and defer choices about what to label the next release (1.4 / 2.0 / etc). |
kafka/protocol/fetch.py
Outdated
|
||
|
||
class Records(Bytes): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class seems out of place in fetch.py . Why not put in message.py?
kafka/protocol/fetch.py
Outdated
|
||
@classmethod | ||
def encode(cls, buffer): | ||
# For old producer, backward compat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What other alternatives do we have here? Will we need to maintain MessageSet, or is there a way to transition the old code off of it entirely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think of it, let's just leave the field as Bytes in the protocol and wrap it into respectable classes in Fetcher and Old client. No hacks that way.
kafka/protocol/fetch.py
Outdated
from .types import Array, Int8, Int16, Int32, Int64, Schema, String, Bytes | ||
|
||
|
||
class Records(Bytes): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth a docstring explaining how this class / struct fits in compared w Bytes and MemoryRecords
kafka/protocol/legacy.py
Outdated
@@ -216,6 +216,7 @@ def decode_fetch_response(cls, response): | |||
|
|||
@classmethod | |||
def decode_message_set(cls, messages): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should document what type we expect here for messages. It seems like this changes the signature and could ause breakage.
kafka/record/util.py
Outdated
return ord(memview[pos]) | ||
|
||
|
||
def encode_varint(num): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are VARINTs used elsewhere in the new Kafka protocol APIs? Perhaps these functions belong under kafka.protocol.types ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, currently only the Record structure uses it. If other API's will use it we can restructure later. I want this module to be very isolated with a good Facade. I want to cythonize this part later, I already did some work on that in aiokafka
, gives very good results so far.
@dpkp Say, now that I look at it, the PR is massive. Instead of merging the whole thing let me split in in 2 parts:
That way we can first make sure we don't have a regression from the LegacyRecords classes and only after merge the v2 records. And the changeset will be more suited for review. |
+1
|
kafka/record/legacy_records.py
Outdated
if self._compression_type == self.CODEC_GZIP: | ||
compressed = gzip_encode(data) | ||
elif self._compression_type == self.CODEC_SNAPPY: | ||
compressed = snappy_encode(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I test locally and snappy libraries are not available, this will raise a NotImplementedError
and crash the producer IO thread:
ERROR:kafka.producer.sender:Uncaught error in kafka producer I/O thread
But this is only logged and producer.send() still returns a future. If you try to resolve the future with .get()
it hangs forever. We should probably improve this so that the exception is raised to the user or at least does not crash the IO thread and is used to resolve the future. My preference is the former.
kafka/producer/kafka.py
Outdated
@@ -353,7 +354,7 @@ def __init__(self, **configs): | |||
if self.config['compression_type'] == 'lz4': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since compression_type is a producer configuration, we could put the check for compression libraries import in __init__
here
Should also note that this PR changes the error raised when compression libraries are not available -- was AssertionError, now NotImplementedError. Related to #1168 |
kafka/consumer/fetcher.py
Outdated
# we did not read a single message from a non-empty | ||
# buffer because that message's size is larger than | ||
# fetch size, in this case record this exception | ||
self._record_too_large_partitions[tp] = fetch_offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the Java consumer docs, this is actually now a soft-limit so that the consumer can make progress. That change is likely outside the scope of this PR, so I filed #1232 to track that.
Running quick benchmarks suggests that KafkaProducer performance is reduced significantly when using the new record format. My laptop shows a drop from 9K records/sec to 5.5 records/sec. On the other hand, KafkaConsumer performance improved from 25K/sec to 35K/sec. This was against a local, 3-broker cluster running 0.11.0.0:
When testing against 0.10.2.1 (the old message format), this PR performed basically the same as master (perhaps slightly better). My gut says that this is ok for now; folks that care about consumer performance will benefit, folks that care about producer performance will be sad. But since we allow users to select the api version they want, they can always pin to 0.10 in their producers. |
Is that truly only 5.5 records/sec or did you mean 5.5K/sec? A 50% reduction is very different than a 99% reduction in throughput... I'm also curious if the Java producer saw such a drastic throughput drop... ? |
Ha, yea. 5.5K records / sec. From about 1.2MB/sec to 600KB/sec |
@dpkp I do expect a drop in V2 format, I did describe the problems in #1119 (comment). For now, don't worry about those, I will do heavy benchmarking on that part before the merge. The main problem is probably the varint pure python implementation, it's a bad fit for the language. |
@jeffwidman JVM's Jit probably made it barely visible ) Java's fast for those type of problems, that's all. I'm positive, that PyPy will also yield quite a good number even on the current implementation. |
I'm going to cut a release this weekend but would like to not include this PR yet. Mostly this is because memoryview does not work on python 2.6. And although we say that we do not support 2.6, it still does work -- it is just not actively tested. I think if we're going to break python2.6 affirmatively we should do it loudly / with a large version bump. |
@dpkp Seems like there's a bug here and the reader goes v1 on read, cause the read speed should have dropped also on v2. Probably I know where's the bug ) |
ba9b11c
to
20405e5
Compare
Latest microbenchmarks for the V2 format:
Will need to lower it to at least 2 times slower... |
And hereby I declare myself as a magician and thus will perform magic on the latest commit.
|
Very impressive. FYI, since you're a collaborator on the project, you can manually re-trigger the travis builds for any PR yourself within the Travis GUI, no need to push commits to force them: https://stackoverflow.com/questions/17606874/trigger-a-travis-ci-rebuild-without-pushing-a-commit |
@jeffwidman Yea, but it bugged out on me. It did not want to restart from the interface. Had to create another job) |
1a95cb7
to
e5fce34
Compare
Is this ready to be merged? I see merge conflicts... |
e5fce34
to
4d0c387
Compare
…ilder. Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
c7fc65b
to
3af66bc
Compare
@dpkp @jeffwidman Will go straight to the point. I want to merge this. This PR is quite big even now and dancing around it will not bring better performance. To get better it needs a C extension, which should not be part of this PR. Any objections? |
self.config['fetch_min_bytes'], | ||
self.config['fetch_max_bytes'], | ||
partition_data) | ||
if version == 3: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be <= 3
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, the 0,1 and 2 versions are handled above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep -- i missed that!
I'm +1 for merge after fixing the FetchRequest api version check. Performance hit is ok with me in order to support newer features, especially headers. I'm not super excited about C-extensions. pypy JIT is quite good. |
+1 from me. Agree with @dpkp, at the end of the day, core features like this trump throughput concerns. Plus in the majority of consumers that I support, most of the time is due to processing the contents of the message, not While the speed of C-extensions are nice, I'd rather direct those who want that to |
Great work! |
Changed layout of record parsing to allow V2 integration with batch concept. For now I only moved the abstract implementation and wrapped LegacyBatch using existing MessageSet and Message constructs. What I want to do next is:
@dpkp What do you think, it's quite a hard change, but I did not find a good way to abstract Batches using existing constructs. Moreover this is based on current Java client version, thou with not as many classes.