Closed
Description
I was trying to use MultiProcessConsumer in 0.9.3, and noticed that it doesn't work with any of my topics with multiple partitions, even if I only use one process. After digging a bit, I can generate the same problem I have with MultiProcessConsumer using SimpleConsumer directly:
simpconsumer = SimpleConsumer(kafka, "my-group2", "statsv")
print(simpconsumer.get_message())
OffsetOutOfRangeError Traceback (most recent call last)
<ipython-input-85-372890462560> in <module>()
----> 1 print(simpconsumer.get_message())
/usr/lib/python2.7/dist-packages/kafka/consumer/simple.py in get_message(self, block, timeout, get_partition_info)
232
233 def get_message(self, block=True, timeout=0.1, get_partition_info=None):
--> 234 return self._get_message(block, timeout, get_partition_info)
235
236 def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
/usr/lib/python2.7/dist-packages/kafka/consumer/simple.py in _get_message(self, block, timeout, get_partition_info, update_offset)
245 # We're out of messages, go grab some more.
246 with FetchContext(self, block, timeout):
--> 247 self._fetch()
248 try:
249 partition, message = self.queue.get_nowait()
/usr/lib/python2.7/dist-packages/kafka/consumer/simple.py in _fetch(self)
298 requests,
299 max_wait_time=int(self.fetch_max_wait_time),
--> 300 min_bytes=self.fetch_min_bytes)
301
302 retry_partitions = {}
/usr/lib/python2.7/dist-packages/kafka/client.py in send_fetch_request(self, payloads, fail_on_error, callback, max_wait_time, min_bytes)
442 for resp in resps:
443 if fail_on_error is True:
--> 444 self._raise_on_response_error(resp)
445
446 if callback is not None:
/usr/lib/python2.7/dist-packages/kafka/client.py in _raise_on_response_error(self, resp)
210 def _raise_on_response_error(self, resp):
211 try:
--> 212 kafka.common.check_error(resp)
213 except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
214 self.reset_topic_metadata(resp.topic)
/usr/lib/python2.7/dist-packages/kafka/common.py in check_error(response)
213 if response.error:
214 error_class = kafka_errors.get(response.error, UnknownError)
--> 215 raise error_class(response)
OffsetOutOfRangeError: FetchResponse(topic='statsv', partition=0, error=1, highwaterMark=-1, messages=<generator object _decode_message_set_iter at 0x7f7a7c267b40>)
I can overcome this issue when using SimpleConsumer directly, by seeking to a real position in the stream:
# or simpconsumer.seek(2, 0)
# or simpconsumer.seek(1, 0)
simpconsumer.seek(0, 0)
print(simpconsumer.get_message())
I'm not exactly sure what is going on here, but maybe SimpleConsumer should do a seek before it tries to consume messages?
Metadata
Metadata
Assignees
Labels
No labels