diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index b083deb1a..1888d38bf 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -612,7 +612,8 @@ def _handle_list_offsets_response(self, future, response): def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() # do not fetch a partition if we have a pending fetch response to process - discard = {fetch.topic_partition for fetch in self._completed_fetches} + # use copy.copy to avoid runtimeerror on mutation from different thread + discard = {fetch.topic_partition for fetch in self._completed_fetches.copy()} current = self._next_partition_records if current: discard.add(current.topic_partition)