diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index 000fcd97b..4c835fe34 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -8,6 +8,7 @@ import time import six +import sys try: from Queue import Empty, Queue @@ -16,7 +17,9 @@ from kafka.common import ( FetchRequest, OffsetRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, check_error ) from .base import ( Consumer, @@ -94,6 +97,10 @@ class SimpleConsumer(Consumer): message in the iterator before exiting. None means no timeout, so it will wait forever. + auto_offset_reset: default largest. Reset partition offsets upon + OffsetOutOfRangeError. Valid values are largest and smallest. + Otherwise, do not reset the offsets and raise OffsetOutOfRangeError. + Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the @@ -106,7 +113,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): + iter_timeout=None, + auto_offset_reset='largest'): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -125,12 +133,38 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout + self.auto_offset_reset = auto_offset_reset self.queue = Queue() def __repr__(self): return '' % \ (self.group, self.topic, str(self.offsets.keys())) + def reset_partition_offset(self, partition): + LATEST = -1 + EARLIEST = -2 + if self.auto_offset_reset == 'largest': + reqs = [OffsetRequest(self.topic, partition, LATEST, 1)] + elif self.auto_offset_reset == 'smallest': + reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)] + else: + # Let's raise an reasonable exception type if user calls + # outside of an exception context + if sys.exc_info() == (None, None, None): + raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' + 'valid auto_offset_reset setting ' + '(largest|smallest)') + # Otherwise we should re-raise the upstream exception + # b/c it typically includes additional data about + # the request that triggered it, and we do not want to drop that + raise + + # send_offset_request + (resp, ) = self.client.send_offset_request(reqs) + check_error(resp) + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -297,10 +331,27 @@ def _fetch(self): responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) + min_bytes=self.fetch_min_bytes, + fail_on_error=False + ) retry_partitions = {} for resp in responses: + + try: + check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + self.client.reset_topic_metadata(resp.topic) + raise + except OffsetOutOfRangeError: + log.warning("OffsetOutOfRangeError for %s - %d. " + "Resetting partition offset...", + resp.topic, resp.partition) + self.reset_partition_offset(resp.partition) + # Retry this partition + retry_partitions[resp.partition] = partitions[resp.partition] + continue + partition = resp.partition buffer_size = partitions[partition] try: diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 4723220ba..9c8919011 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -5,7 +5,7 @@ from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message from kafka.common import ( - ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout + ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError ) from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES @@ -85,6 +85,48 @@ def test_simple_consumer(self): consumer.stop() + @kafka_versions('all') + def test_simple_consumer_smallest_offset_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + consumer = self.consumer(auto_offset_reset='smallest') + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + # Since auto_offset_reset is set to smallest we should read all 200 + # messages from beginning. + self.assert_message_count([message for message in consumer], 200) + + @kafka_versions('all') + def test_simple_consumer_largest_offset_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Default largest + consumer = self.consumer() + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + # Since auto_offset_reset is set to largest we should not read any + # messages. + self.assert_message_count([message for message in consumer], 0) + # Send 200 new messages to the queue + self.send_messages(0, range(200, 300)) + self.send_messages(1, range(300, 400)) + # Since the offset is set to largest we should read all the new messages. + self.assert_message_count([message for message in consumer], 200) + + @kafka_versions('all') + def test_simple_consumer_no_reset(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Default largest + consumer = self.consumer(auto_offset_reset=None) + # Move fetch offset ahead of 300 message (out of range) + consumer.seek(300, 2) + with self.assertRaises(OffsetOutOfRangeError): + consumer.get_message() + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100))