diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1888d38bf..1689b23f1 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -613,7 +613,8 @@ def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() # do not fetch a partition if we have a pending fetch response to process # use copy.copy to avoid runtimeerror on mutation from different thread - discard = {fetch.topic_partition for fetch in self._completed_fetches.copy()} + # TODO: switch to deque.copy() with py3 + discard = {fetch.topic_partition for fetch in copy.copy(self._completed_fetches)} current = self._next_partition_records if current: discard.add(current.topic_partition) diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index c8e4f7cac..4785b1b75 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -68,10 +68,10 @@ def receive(self, auth_bytes): # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed # by the server client_flags = self.SASL_QOP_AUTH - server_flags = msg[0] + server_flags = struct.Struct('>b').unpack(msg[0:1])[0] message_parts = [ struct.Struct('>b').pack(client_flags & server_flags), - msg[1:], + msg[1:], # always agree to max message size from server self.auth_id.encode('utf-8'), ] # add authorization identity to the response, and GSS-wrap