-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for consumer-aware OffsetFetchRequest and OffsetCommitRequest #164
Conversation
Add support for consumer metadata requests, required for broker-stored offsets
would like to get #158 merged first (fixing tests) and then have you include some integration tests |
Consumer metadata is only supported in Kafka trunk. Older versions will send an error back, which needs to be caught and the request needs to be resent using _send_broker_aware_request
Once the CI is fixed for #158, I'll work on getting it pulled into my fork and add some tests because tests are good. In the meantime, I added fallback support for older versions of Kafka that do not support consumer metadata. |
#158 has been merged and the CI is passing. I'm working on some other features, but will be glad to help you with the merge if you need it. |
@@ -373,7 +444,10 @@ def send_offset_commit_request(self, group, payloads=[], | |||
encoder = partial(KafkaProtocol.encode_offset_commit_request, | |||
group=group) | |||
decoder = KafkaProtocol.decode_offset_commit_response | |||
resps = self._send_broker_aware_request(payloads, encoder, decoder) | |||
try: | |||
resps = self._send_consumer_aware_request(group, payloads, encoder, decoder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this should be a flag. If it fails once, won't it fail every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is the situation where the client survives a cluster upgrade. It could be considered a corner case, and not worth supporting, however. I'm more concerned about the behavior in _send_broker_aware_request that will cause this request to get sent to every broker even if it's unsupported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't it always require two round trips if you aren't using a kafka server that supports this API request (eg, any production kafka cluster outside LinkedIn?)
This PR will require a 0.8.1.1 and/or 0.8.2 checkout of Kafka for integration testing. |
@wizzat I don't believe 0.8.1.1 will give the ability for any real testing (other than it doesn't work and it fails gracefully). And the more I think about it, the more I think this pull probably needs to wait until 0.8.2 drops officially. I have a need to support a fork of this library that works against Kafka trunk (as that's what we're using internally at LinkedIn), but I think it's best for me to do all of that work on the fork, and then branch it when 0.8.2 is released and submit a pull request for all changes together at that point to bring the main lib up to supporting 0.8.2. If you agree, I'll close this pull request and go back to work on my fork, including adding the integration testing. |
I'm completely in favor of supporting new features in upcoming Kafka versions, because not supporting them fully is what landed us in the mess where 0.8.1 wasn't fully supported for well over a month after release. I'm just of the opinion that the pull request needs to:
|
trunk testing should be easier after #193 . Perhaps now is a good time to create a new branch for working on 0.8.2 / trunk support? |
@@ -63,6 +63,9 @@ | |||
10 : 'MESSAGE_SIZE_TOO_LARGE', | |||
11 : 'STALE_CONTROLLER_EPOCH', | |||
12 : 'OFFSET_METADATA_TOO_LARGE', | |||
14 : 'OFFSETS_LOAD_IN_PROGRESS', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible typo? Should say 'OFFSET_LOAD_IN_PROGRESS' to be consistent with protocol.py, 452.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pull request is pretty old, and was made against the dev branch. It might have changed names since then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it’s really old and needs to be rebased. I haven’t had a chance to do it yet, however.
-Todd
From: Mark Roberts <notifications@github.commailto:notifications@github.com>
Reply-To: mumrah/kafka-python <reply@reply.github.commailto:reply@reply.github.com>
Date: Monday, December 8, 2014 at 9:45 AM
To: mumrah/kafka-python <kafka-python@noreply.github.commailto:kafka-python@noreply.github.com>
Cc: Todd Palino <tpalino@linkedin.commailto:tpalino@linkedin.com>
Subject: Re: [kafka-python] Support for consumer-aware OffsetFetchRequest and OffsetCommitRequest (#164)
In kafka/common.py:
@@ -63,6 +63,9 @@
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',
- 14 : 'OFFSETS_LOAD_IN_PROGRESS',
This pull request is pretty old, and was made against the dev branch. It might have changed names since then.
—
Reply to this email directly or view it on GitHubhttps://github.com//pull/164/files#r21470157.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right. Checked this out hoping to resolve some issues offset commit
issues against 0.8.2b and got an exception due to this instead, didn't look
at the date :).
On Mon Dec 08 2014 at 18:03:47 toddpalino notifications@github.com wrote:
In kafka/common.py:
@@ -63,6 +63,9 @@
10 : 'MESSAGE_SIZE_TOO_LARGE',
11 : 'STALE_CONTROLLER_EPOCH',
12 : 'OFFSET_METADATA_TOO_LARGE',
- 14 : 'OFFSETS_LOAD_IN_PROGRESS',
Yeah, it’s really old and needs to be rebased. I haven’t had a chance to
do it yet, however.
-Todd From: Mark Roberts <notifications@github.com<mailto:
notifications@github.com>> Reply-To: mumrah/kafka-python <
reply@reply.github.commailto:reply@reply.github.com> Date: Monday,
December 8, 2014 at 9:45 AM To: mumrah/kafka-python <
kafka-python@noreply.github.commailto:kafka-python@noreply.github.com>
Cc: Todd Palino <tpalino@linkedin.commailto:tpalino@linkedin.com>
Subject: Re: [kafka-python] Support for consumer-aware OffsetFetchRequest
and OffsetCommitRequest (#164
#164) In kafka/common.py:
@@ -63,6 +63,9 @@ 10 : 'MESSAGE_SIZE_TOO_LARGE', 11 :
'STALE_CONTROLLER_EPOCH', 12 : 'OFFSET_METADATA_TOO_LARGE', + 14 :
'OFFSETS_LOAD_IN_PROGRESS',
This pull request is pretty old, and was made against the dev branch. It
might have changed names since then. — Reply to this email directly or view
it on GitHub<
https://github.com/mumrah/kafka-python/pull/164/files#r21470157>.—
Reply to this email directly or view it on GitHub
https://github.com/mumrah/kafka-python/pull/164/files#r21471308.
Closing this. It's already fixed via other commits. There's still some issues against trunk, but it should be handled separately |
thanks, todd. |
The OffsetFetchRequest and OffsetCommitRequest requests to the broker, which are used for broker storage of consumer offsets (to replace Zookeeper storage of offsets) require a ConsumerMetadataRequest. Similar to the topic metadata request, this returns the coordinator for a given consumer group, which is the broker that you must send the offset fetch and commit requests to. If you do not do this, you will more often than not get an error code 16, which is NotCoordinatorForConsumer.
I've added support for the ConsumerMetadataRequest, as well as a _send_consumer_aware_request method to support getting the coordinator and then sending a request to that broker. I have updated the OffsetFetchRequest and OffsetCommitRequest to use this method instead of the _send_broker_aware_request method. I've also added error handling for the metadata request (in the case that it fails).