From 878e4cefb696547d76f2eb0ad5b7afd2b1cfe044 Mon Sep 17 00:00:00 2001 From: Enrico Canzonieri Date: Tue, 6 Jan 2015 17:33:21 +0100 Subject: [PATCH 1/4] Implement offsets reset when OffsetOutOfRangeError This slightly changes the SimpleConsumer interface adding the default option use_latest_offsets. The fetch behaviour is also changed since it does not raise OffsetOutOfRangeError anymore. Resetting the offsets automatically is especially useful in MultiprocessConsumer, where an explicit seek call is not possible. --- kafka/consumer/simple.py | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index df975f4d9..5cd15b584 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -16,7 +16,9 @@ from kafka.common import ( FetchRequest, OffsetRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, check_error ) from .base import ( Consumer, @@ -98,7 +100,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): + iter_timeout=None, + use_latest_offsets=True): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -117,12 +120,26 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout + self.use_latest_offsets = use_latest_offsets self.queue = Queue() def __repr__(self): return '' % \ (self.group, self.topic, str(self.offsets.keys())) + def reset_partition_offset(self, partition): + LATEST = -1 + EARLIEST = -2 + if self.use_latest_offsets: + req = OffsetRequest(self.topic, partition, LATEST, 1) + else: + req = OffsetRequest(self.topic, partition, EARLIEST, 1) + + resp = self.client.send_offset_request(req) + check_error(resp) + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -286,10 +303,25 @@ def _fetch(self): responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) + min_bytes=self.fetch_min_bytes, + fail_on_error=False + ) retry_partitions = {} for resp in responses: + + try: + check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + self.client.reset_topic_metadata(resp.topic) + raise + except OffsetOutOfRangeError: + log.warning("OffsetOutOfRangeError for %s - %d. " + "Resetting partition offset...", + resp.topic, resp.partition) + self.reset_partition_offset(resp.partition) + continue + partition = resp.partition buffer_size = partitions[partition] try: From 6bc2c7aadac37c6e38c8a3c7be66013e9080aed7 Mon Sep 17 00:00:00 2001 From: Enrico Canzonieri Date: Wed, 14 Jan 2015 15:21:27 -0800 Subject: [PATCH 2/4] use a list in send_offset_request --- kafka/consumer/simple.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 5cd15b584..39103bd38 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -131,11 +131,12 @@ def reset_partition_offset(self, partition): LATEST = -1 EARLIEST = -2 if self.use_latest_offsets: - req = OffsetRequest(self.topic, partition, LATEST, 1) + reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] else: - req = OffsetRequest(self.topic, partition, EARLIEST, 1) + reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] - resp = self.client.send_offset_request(req) + # send_offset_request + (resp, ) = self.client.send_offset_request(reqs) check_error(resp) self.offsets[partition] = resp.offsets[0] self.fetch_offsets[partition] = resp.offsets[0] From f517ddf283a86947a15f95e5ec562e81f4c477e5 Mon Sep 17 00:00:00 2001 From: Enrico Canzonieri Date: Mon, 26 Jan 2015 14:40:49 -0800 Subject: [PATCH 3/4] Make SimpleConsumer auto_offset_reset more like KafkaConsumer --- kafka/consumer/simple.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 39103bd38..0593b5b72 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -8,6 +8,7 @@ import time import six +import sys try: from Queue import Empty, Queue @@ -87,6 +88,9 @@ class SimpleConsumer(Consumer): iter_timeout: default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever. + auto_offset_reset: default largest. Reset partition offsets upon + OffsetOutOfRangeError. Valid values are largest and smallest. + If None do not reset the offsets and raise OffsetOutOfRangeError. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -101,7 +105,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None, - use_latest_offsets=True): + auto_offset_reset='largest'): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -120,7 +124,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout - self.use_latest_offsets = use_latest_offsets + self.auto_offset_reset = auto_offset_reset self.queue = Queue() def __repr__(self): @@ -130,10 +134,21 @@ def __repr__(self): def reset_partition_offset(self, partition): LATEST = -1 EARLIEST = -2 - if self.use_latest_offsets: + if self.auto_offset_reset == 'largest': reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] - else: + elif self.auto_offset_reset == 'smallest': reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + else: + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise # send_offset_request (resp, ) = self.client.send_offset_request(reqs) From 37d0b7f9dbac6c5165e6ea171a97be19c53c27f5 Mon Sep 17 00:00:00 2001 From: Enrico Canzonieri Date: Mon, 26 Jan 2015 16:24:20 -0800 Subject: [PATCH 4/4] Retry failed partitions and add integration tests --- kafka/consumer/simple.py | 2 ++ test/test_consumer_integration.py | 44 ++++++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 2ec99f235..4c835fe34 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -348,6 +348,8 @@ def _fetch(self): "Resetting partition offset...", resp.topic, resp.partition) self.reset_partition_offset(resp.partition) + # Retry this partition + retry_partitions[resp.partition] = partitions[resp.partition] continue partition = resp.partition diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 4723220ba..9c8919011 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -5,7 +5,7 @@ from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message from kafka.common import ( - ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout + ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES @@ -85,6 +85,48 @@ def test_simple_consumer(self): consumer.stop() + @kafka_versions('all') + def test_simple_consumer_smallest_offset_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + consumer = self.consumer(auto_offset_reset='smallest') + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + # Since auto_offset_reset is set to smallest we should read all 200 + # messages from beginning. + self.assert_message_count([message for message in consumer], 200) + + @kafka_versions('all') + def test_simple_consumer_largest_offset_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Default largest + consumer = self.consumer() + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + # Since auto_offset_reset is set to largest we should not read any + # messages. + self.assert_message_count([message for message in consumer], 0) + # Send 200 new messages to the queue + self.send_messages(0, range(200, 300)) + self.send_messages(1, range(300, 400)) + # Since the offset is set to largest we should read all the new messages. + self.assert_message_count([message for message in consumer], 200) + + @kafka_versions('all') + def test_simple_consumer_no_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Default largest + consumer = self.consumer(auto_offset_reset=None) + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + with self.assertRaises(OffsetOutOfRangeError): + consumer.get_message() + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100))