Skip to content

Commit

Permalink
[KIP-54] Remove extra dependency on sortedcontainers
Browse files Browse the repository at this point in the history
  • Loading branch information
aynroot committed Sep 29, 2020
1 parent 53cc810 commit 94ad7a3
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 27 deletions.
63 changes: 63 additions & 0 deletions kafka/coordinator/assignors/sticky/sorted_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
class SortedSet:
def __init__(self, iterable=None, key=None):
self._key = key if key is not None else lambda x: x
self._set = set(iterable) if iterable is not None else set()

self._cached_last = None
self._cached_first = None

def first(self):
if self._cached_first is not None:
return self._cached_first

first = None
for element in self._set:
if first is None or self._key(first) > self._key(element):
first = element
self._cached_first = first
return first

def last(self):
if self._cached_last is not None:
return self._cached_last

last = None
for element in self._set:
if last is None or self._key(last) < self._key(element):
last = element
self._cached_last = last
return last

def pop_last(self):
value = self.last()
self._set.remove(value)
self._cached_last = None
return value

def add(self, value):
if self._cached_last is not None and self._key(value) > self._key(self._cached_last):
self._cached_last = value
if self._cached_first is not None and self._key(value) < self._key(self._cached_first):
self._cached_first = value

return self._set.add(value)

def remove(self, value):
if self._cached_last is not None and self._cached_last == value:
self._cached_last = None
if self._cached_first is not None and self._cached_first == value:
self._cached_first = None

return self._set.remove(value)

def __contains__(self, value):
return value in self._set

def __iter__(self):
return iter(sorted(self._set, key=self._key))

def _bool(self):
return len(self._set) != 0

__nonzero__ = _bool
__bool__ = _bool
42 changes: 19 additions & 23 deletions kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from sortedcontainers import SortedSet, SortedDict, SortedList

from kafka.cluster import ClusterMetadata
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.coordinator.protocol import Schema
from kafka.protocol.struct import Struct
Expand Down Expand Up @@ -82,7 +81,7 @@ def __init__(self, cluster, members):
# a mapping of all consumers to all potential topic partitions that can be assigned to them
self.consumer_to_all_potential_partitions = {}
# an ascending sorted set of consumers based on how many topic partitions are already assigned to them
self.sorted_current_subscriptions = set()
self.sorted_current_subscriptions = SortedSet()
# an ascending sorted list of topic partitions based on how many consumers can potentially use them
self.sorted_partitions = []
# all partitions that need to be assigned
Expand Down Expand Up @@ -154,9 +153,10 @@ def balance(self):
self._add_consumer_to_current_subscriptions_and_maintain_order(consumer)

def get_final_assignment(self, member_id):
assignment = defaultdict(lambda: SortedList())
assignment = defaultdict(list)
for topic_partition in self.current_assignment[member_id]:
assignment[topic_partition.topic].add(topic_partition.partition)
assignment[topic_partition.topic].append(topic_partition.partition)
assignment = {k: sorted(v) for k, v in six.iteritems(assignment)}
return six.viewitems(assignment)

def _initialize(self, cluster):
Expand Down Expand Up @@ -188,7 +188,7 @@ def _init_current_assignments(self, members):
# higher generations overwrite lower generations in case of a conflict
# note that a conflict could exists only if user data is for different generations

# for each partition we create a sorted map of its consumers by generation
# for each partition we create a map of its consumers by generation
sorted_partition_consumers_by_generation = {}
for consumer, member_metadata in six.iteritems(members):
for partitions in member_metadata.partitions:
Expand All @@ -204,14 +204,13 @@ def _init_current_assignments(self, members):
else:
consumers[member_metadata.generation] = consumer
else:
sorted_consumers = SortedDict()
sorted_consumers[member_metadata.generation] = consumer
sorted_consumers = {member_metadata.generation: consumer}
sorted_partition_consumers_by_generation[partitions] = sorted_consumers

# previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition
# current and previous consumers are the last two consumers of each partition in the above sorted map
for partitions, consumers in six.iteritems(sorted_partition_consumers_by_generation):
generations = list(reversed(consumers.keys()))
generations = sorted(consumers.keys(), reverse=True)
self.current_assignment[consumers[generations[0]]].append(partitions)
# now update previous assignment if any
if len(generations) > 1:
Expand All @@ -236,13 +235,10 @@ def _are_subscriptions_identical(self):
return has_identical_list_elements(list(six.itervalues(self.consumer_to_all_potential_partitions)))

def _populate_sorted_partitions(self):
# an ascending sorted set of topic partitions based on how many consumers can potentially use them
sorted_all_partitions = SortedSet(
iterable=[
(tp, tuple(consumers)) for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers)
],
key=partitions_comparator_key,
)
# set of topic partitions with their respective potential consumers
all_partitions = set((tp, tuple(consumers))
for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers))
partitions_sorted_by_num_of_potential_consumers = sorted(all_partitions, key=partitions_comparator_key)

self.sorted_partitions = []
if not self.is_fresh_assignment and self._are_subscriptions_identical():
Expand All @@ -266,7 +262,7 @@ def _populate_sorted_partitions(self):
# how many valid partitions are currently assigned to them
while sorted_consumers:
# take the consumer with the most partitions
consumer, _ = sorted_consumers.pop()
consumer, _ = sorted_consumers.pop_last()
# currently assigned partitions to this consumer
remaining_partitions = assignments[consumer]
# from partitions that had a different consumer before,
Expand All @@ -284,13 +280,13 @@ def _populate_sorted_partitions(self):
self.sorted_partitions.append(remaining_partitions.pop())
sorted_consumers.add((consumer, tuple(assignments[consumer])))

while sorted_all_partitions:
partition = sorted_all_partitions.pop(0)[0]
while partitions_sorted_by_num_of_potential_consumers:
partition = partitions_sorted_by_num_of_potential_consumers.pop(0)[0]
if partition not in self.sorted_partitions:
self.sorted_partitions.append(partition)
else:
while sorted_all_partitions:
self.sorted_partitions.append(sorted_all_partitions.pop(0)[0])
while partitions_sorted_by_num_of_potential_consumers:
self.sorted_partitions.append(partitions_sorted_by_num_of_potential_consumers.pop(0)[0])

def _populate_partitions_to_reassign(self):
self.unassigned_partitions = deepcopy(self.sorted_partitions)
Expand Down Expand Up @@ -334,10 +330,10 @@ def _initialize_current_subscriptions(self):
)

def _get_consumer_with_least_subscriptions(self):
return self.sorted_current_subscriptions[0][0]
return self.sorted_current_subscriptions.first()[0]

def _get_consumer_with_most_subscriptions(self):
return self.sorted_current_subscriptions[-1][0]
return self.sorted_current_subscriptions.last()[0]

def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer):
self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer])))
Expand Down
1 change: 0 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,3 @@ pytest-mock==1.10.0
sphinx-rtd-theme==0.2.4
crc32c==1.7
py==1.8.0
sortedcontainers==2.1.0
3 changes: 0 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,5 @@ def run(cls):
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Software Development :: Libraries :: Python Modules",
],
install_requires=[
"sortedcontainers==2.1.0"
]
)

0 comments on commit 94ad7a3

Please sign in to comment.