diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 42e2d660c..b083deb1a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -178,6 +178,9 @@ def reset_offsets_if_needed(self): Arguments: partitions ([TopicPartition]): the partitions that need offsets reset + Returns: + bool: True if any partitions need reset; otherwise False (no reset pending) + Raises: NoOffsetForPartitionError: if no offset reset strategy is defined KafkaTimeoutError if timeout_ms provided @@ -189,7 +192,8 @@ def reset_offsets_if_needed(self): partitions = self._subscriptions.partitions_needing_reset() if not partitions: - return + return False + log.debug('Resetting offsets for %s', partitions) offset_resets = dict() for tp in partitions: @@ -198,6 +202,7 @@ def reset_offsets_if_needed(self): offset_resets[tp] = ts self._reset_offsets_async(offset_resets) + return True def offsets_by_times(self, timestamps, timeout_ms=None): """Fetch offset for each partition passed in ``timestamps`` map. diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ce3cf9203..d966ea009 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1124,7 +1124,7 @@ def _update_fetch_positions(self, timeout_ms=None): partitions (List[TopicPartition]): The partitions that need updating fetch positions. - Returns True if fetch positions updated, False if timeout + Returns True if fetch positions updated, False if timeout or async reset is pending Raises: NoOffsetForPartitionError: If no offset is stored for a given @@ -1135,15 +1135,13 @@ def _update_fetch_positions(self, timeout_ms=None): if (self.config['api_version'] >= (0, 8, 1) and self.config['group_id'] is not None): - try: - # If there are any partitions which do not have a valid position and are not - # awaiting reset, then we need to fetch committed offsets. We will only do a - # coordinator lookup if there are partitions which have missing positions, so - # a consumer with manually assigned partitions can avoid a coordinator dependence - # by always ensuring that assigned partitions have an initial position. - self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms) - except KafkaTimeoutError: - pass + # If there are any partitions which do not have a valid position and are not + # awaiting reset, then we need to fetch committed offsets. We will only do a + # coordinator lookup if there are partitions which have missing positions, so + # a consumer with manually assigned partitions can avoid a coordinator dependence + # by always ensuring that assigned partitions have an initial position. + if not self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms): + return False # If there are partitions still needing a position and a reset policy is defined, # request reset using the default policy. If no reset strategy is defined and there @@ -1152,8 +1150,7 @@ def _update_fetch_positions(self, timeout_ms=None): # Finally send an asynchronous request to lookup and update the positions of any # partitions which are awaiting reset. - self._fetcher.reset_offsets_if_needed() - return False + return not self._fetcher.reset_offsets_if_needed() def _message_generator_v2(self): timeout_ms = 1000 * max(0, self._consumer_timeout - time.time()) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ddd413b82..d6fc802d9 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -427,7 +427,8 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None): future_key = frozenset(partitions) timer = Timer(timeout_ms) while True: - self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms) + if not self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms): + timer.maybe_raise() # contact coordinator to fetch committed offsets if future_key in self._offset_fetch_futures: