-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for offsets_for_times, beginning_offsets and end_offsets APIs. #1161
Conversation
test/test_consumer_integration.py
Outdated
consumer = self.kafka_consumer() | ||
tp = TopicPartition(self.topic, 0) | ||
|
||
with self.assertRaises(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to pass in the exception type expected here
Fixes #1036 |
@dpkp @jeffwidman Can I have another 2 eyes here, just in case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked it over, looks fine to me, although I'm not super familiar with the timestamps functionality as we have yet to enable that at work.
How closely does this follow the Java implementation?
kafka/conn.py
Outdated
((0, 10, 1), MetadataRequest[2]) | ||
((0, 10, 1), MetadataRequest[2]), | ||
((0, 10, 2), OffsetFetchRequest[2]), | ||
((0, 11, 0), FetchRequest[5]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afraid I don't understand the purpose of this? I thought that from 0.10 onwards, we were just going to query the ApiVersionsRequest
to identify the version and rely on that. I can't remember if there's a mapping of supported API calls somewhere, if not, is this just a hack to get around that?
Also, given that it checks in descending order, and returns the first one that works, shouldn't the highest broker version be listed first? Otherwise the comment should get tweaked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it's not really something that was introduced in this PR, I just extended it with new checks. I did propose using ApiVersion response for min-max per-broker checks on protocol versions in #865. Here we just determine api_version
like we used to in versions before, only for v0.10 and above we don't do additional requests, only check that max version for a protocol is supported.
I'll change the comment to be clear on ordering.
kafka/consumer/fetcher.py
Outdated
def _offset(self, partition, timestamp): | ||
"""Fetch a single offset before the given timestamp for the partition. | ||
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")): | ||
""" Fetch offset for each partition passed in ``timestamps`` map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I more commonly see no space between """
and the first word, as in """Fetch
...
kafka/consumer/fetcher.py
Outdated
remaining_ms = timeout_ms - elapsed_ms | ||
|
||
raise Errors.KafkaTimeoutError( | ||
"Failed to get offsets by times in %s ms" % timeout_ms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about replacing "times" with "timestamp"?
The word "times" in English has several meanings, normally it's obvious from context, but here there's just a hint of ambiguity for those less familiar with Kafka...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I don't mind. But the API offsets_for_times
should probably be left as is to match offsetsForTimes
in Java client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Completely agree to copy Java client.
kafka/consumer/group.py
Outdated
@@ -861,6 +861,48 @@ def metrics(self, raw=False): | |||
metrics[k.group][k.name] = v.value() | |||
return metrics | |||
|
|||
def offsets_for_times(self, timestamps): | |||
""" | |||
Look up the offsets for the given partitions by timestamp. The returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look up the offsets for the given partitions...
This confuses me because "given" implies a list of partitions is passed in, but that's not present in the method args. If the partitions are attached to the class, then maybe use wording other than "given" to describe the partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timestamps is a map {TopicPartition: int}
. The wording was taken from https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes(java.util.Map), I think it's OK...
kafka/consumer/group.py
Outdated
partition. | ||
|
||
Note: | ||
Notice that this method may block indefinitely if the partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Notice that" is superfluous
brokers = '%s:%d' % (self.server.host, self.server.port) | ||
producer = KafkaProducer( | ||
bootstrap_servers=brokers, **configs) | ||
return producer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this producer should just get moved to a pytest
fixture. Not something that needs to be handled in this PR, but should probably create an issue saying we should clean it up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly we can't use pytest fixtures here, as it's a TestCase
class. We will need to refactor those classes to simple functions to support fixtures
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. It is possible to apply pytest fixtures at the class level in pytest, but only if it's a generic class, aka designed for pytest and not unittest. I filed #1167 in case someone wants to migrate it at some point.
Pretty much 1 to 1. I did not move all changes ( |
Added |
Sorry... I created a merge conflict :( |
…y nodes and send in parallel.
… review issues
61668aa
to
55ded55
Compare
🍰 |
Thanks ~ |
Still needs to group by nodes and send in parallel.