2
2
from collections import defaultdict , namedtuple
3
3
from copy import deepcopy
4
4
5
- from sortedcontainers import SortedSet , SortedDict , SortedList
6
-
7
5
from kafka .cluster import ClusterMetadata
8
6
from kafka .coordinator .assignors .abstract import AbstractPartitionAssignor
9
7
from kafka .coordinator .assignors .sticky .partition_movements import PartitionMovements
8
+ from kafka .coordinator .assignors .sticky .sorted_set import SortedSet
10
9
from kafka .coordinator .protocol import ConsumerProtocolMemberMetadata , ConsumerProtocolMemberAssignment
11
10
from kafka .coordinator .protocol import Schema
12
11
from kafka .protocol .struct import Struct
@@ -82,7 +81,7 @@ def __init__(self, cluster, members):
82
81
# a mapping of all consumers to all potential topic partitions that can be assigned to them
83
82
self .consumer_to_all_potential_partitions = {}
84
83
# an ascending sorted set of consumers based on how many topic partitions are already assigned to them
85
- self .sorted_current_subscriptions = set ()
84
+ self .sorted_current_subscriptions = SortedSet ()
86
85
# an ascending sorted list of topic partitions based on how many consumers can potentially use them
87
86
self .sorted_partitions = []
88
87
# all partitions that need to be assigned
@@ -154,9 +153,10 @@ def balance(self):
154
153
self ._add_consumer_to_current_subscriptions_and_maintain_order (consumer )
155
154
156
155
def get_final_assignment (self , member_id ):
157
- assignment = defaultdict (lambda : SortedList () )
156
+ assignment = defaultdict (list )
158
157
for topic_partition in self .current_assignment [member_id ]:
159
- assignment [topic_partition .topic ].add (topic_partition .partition )
158
+ assignment [topic_partition .topic ].append (topic_partition .partition )
159
+ assignment = {k : sorted (v ) for k , v in six .iteritems (assignment )}
160
160
return six .viewitems (assignment )
161
161
162
162
def _initialize (self , cluster ):
@@ -188,7 +188,7 @@ def _init_current_assignments(self, members):
188
188
# higher generations overwrite lower generations in case of a conflict
189
189
# note that a conflict could exists only if user data is for different generations
190
190
191
- # for each partition we create a sorted map of its consumers by generation
191
+ # for each partition we create a map of its consumers by generation
192
192
sorted_partition_consumers_by_generation = {}
193
193
for consumer , member_metadata in six .iteritems (members ):
194
194
for partitions in member_metadata .partitions :
@@ -204,14 +204,13 @@ def _init_current_assignments(self, members):
204
204
else :
205
205
consumers [member_metadata .generation ] = consumer
206
206
else :
207
- sorted_consumers = SortedDict ()
208
- sorted_consumers [member_metadata .generation ] = consumer
207
+ sorted_consumers = {member_metadata .generation : consumer }
209
208
sorted_partition_consumers_by_generation [partitions ] = sorted_consumers
210
209
211
210
# previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition
212
211
# current and previous consumers are the last two consumers of each partition in the above sorted map
213
212
for partitions , consumers in six .iteritems (sorted_partition_consumers_by_generation ):
214
- generations = list ( reversed ( consumers .keys ()) )
213
+ generations = sorted ( consumers .keys (), reverse = True )
215
214
self .current_assignment [consumers [generations [0 ]]].append (partitions )
216
215
# now update previous assignment if any
217
216
if len (generations ) > 1 :
@@ -236,13 +235,10 @@ def _are_subscriptions_identical(self):
236
235
return has_identical_list_elements (list (six .itervalues (self .consumer_to_all_potential_partitions )))
237
236
238
237
def _populate_sorted_partitions (self ):
239
- # an ascending sorted set of topic partitions based on how many consumers can potentially use them
240
- sorted_all_partitions = SortedSet (
241
- iterable = [
242
- (tp , tuple (consumers )) for tp , consumers in six .iteritems (self .partition_to_all_potential_consumers )
243
- ],
244
- key = partitions_comparator_key ,
245
- )
238
+ # set of topic partitions with their respective potential consumers
239
+ all_partitions = set ((tp , tuple (consumers ))
240
+ for tp , consumers in six .iteritems (self .partition_to_all_potential_consumers ))
241
+ partitions_sorted_by_num_of_potential_consumers = sorted (all_partitions , key = partitions_comparator_key )
246
242
247
243
self .sorted_partitions = []
248
244
if not self .is_fresh_assignment and self ._are_subscriptions_identical ():
@@ -266,7 +262,7 @@ def _populate_sorted_partitions(self):
266
262
# how many valid partitions are currently assigned to them
267
263
while sorted_consumers :
268
264
# take the consumer with the most partitions
269
- consumer , _ = sorted_consumers .pop ()
265
+ consumer , _ = sorted_consumers .pop_last ()
270
266
# currently assigned partitions to this consumer
271
267
remaining_partitions = assignments [consumer ]
272
268
# from partitions that had a different consumer before,
@@ -284,13 +280,13 @@ def _populate_sorted_partitions(self):
284
280
self .sorted_partitions .append (remaining_partitions .pop ())
285
281
sorted_consumers .add ((consumer , tuple (assignments [consumer ])))
286
282
287
- while sorted_all_partitions :
288
- partition = sorted_all_partitions .pop (0 )[0 ]
283
+ while partitions_sorted_by_num_of_potential_consumers :
284
+ partition = partitions_sorted_by_num_of_potential_consumers .pop (0 )[0 ]
289
285
if partition not in self .sorted_partitions :
290
286
self .sorted_partitions .append (partition )
291
287
else :
292
- while sorted_all_partitions :
293
- self .sorted_partitions .append (sorted_all_partitions .pop (0 )[0 ])
288
+ while partitions_sorted_by_num_of_potential_consumers :
289
+ self .sorted_partitions .append (partitions_sorted_by_num_of_potential_consumers .pop (0 )[0 ])
294
290
295
291
def _populate_partitions_to_reassign (self ):
296
292
self .unassigned_partitions = deepcopy (self .sorted_partitions )
@@ -334,10 +330,10 @@ def _initialize_current_subscriptions(self):
334
330
)
335
331
336
332
def _get_consumer_with_least_subscriptions (self ):
337
- return self .sorted_current_subscriptions [ 0 ] [0 ]
333
+ return self .sorted_current_subscriptions . first () [0 ]
338
334
339
335
def _get_consumer_with_most_subscriptions (self ):
340
- return self .sorted_current_subscriptions [ - 1 ] [0 ]
336
+ return self .sorted_current_subscriptions . last () [0 ]
341
337
342
338
def _remove_consumer_from_current_subscriptions_and_maintain_order (self , consumer ):
343
339
self .sorted_current_subscriptions .remove ((consumer , tuple (self .current_assignment [consumer ])))
0 commit comments