From 94ad7a3d879f33012c00799d1691c79a57387881 Mon Sep 17 00:00:00 2001 From: Valeria Chernenko Date: Wed, 23 Sep 2020 15:59:06 +0200 Subject: [PATCH] [KIP-54] Remove extra dependency on sortedcontainers --- .../assignors/sticky/sorted_set.py | 63 +++++++++++++++++++ .../assignors/sticky/sticky_assignor.py | 42 ++++++------- requirements-dev.txt | 1 - setup.py | 3 - 4 files changed, 82 insertions(+), 27 deletions(-) create mode 100644 kafka/coordinator/assignors/sticky/sorted_set.py diff --git a/kafka/coordinator/assignors/sticky/sorted_set.py b/kafka/coordinator/assignors/sticky/sorted_set.py new file mode 100644 index 000000000..6a454a42d --- /dev/null +++ b/kafka/coordinator/assignors/sticky/sorted_set.py @@ -0,0 +1,63 @@ +class SortedSet: + def __init__(self, iterable=None, key=None): + self._key = key if key is not None else lambda x: x + self._set = set(iterable) if iterable is not None else set() + + self._cached_last = None + self._cached_first = None + + def first(self): + if self._cached_first is not None: + return self._cached_first + + first = None + for element in self._set: + if first is None or self._key(first) > self._key(element): + first = element + self._cached_first = first + return first + + def last(self): + if self._cached_last is not None: + return self._cached_last + + last = None + for element in self._set: + if last is None or self._key(last) < self._key(element): + last = element + self._cached_last = last + return last + + def pop_last(self): + value = self.last() + self._set.remove(value) + self._cached_last = None + return value + + def add(self, value): + if self._cached_last is not None and self._key(value) > self._key(self._cached_last): + self._cached_last = value + if self._cached_first is not None and self._key(value) < self._key(self._cached_first): + self._cached_first = value + + return self._set.add(value) + + def remove(self, value): + if self._cached_last is not None and self._cached_last == value: + self._cached_last = None + if self._cached_first is not None and self._cached_first == value: + self._cached_first = None + + return self._set.remove(value) + + def __contains__(self, value): + return value in self._set + + def __iter__(self): + return iter(sorted(self._set, key=self._key)) + + def _bool(self): + return len(self._set) != 0 + + __nonzero__ = _bool + __bool__ = _bool diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 11a1cbfe2..782708686 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -2,11 +2,10 @@ from collections import defaultdict, namedtuple from copy import deepcopy -from sortedcontainers import SortedSet, SortedDict, SortedList - from kafka.cluster import ClusterMetadata from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements +from kafka.coordinator.assignors.sticky.sorted_set import SortedSet from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment from kafka.coordinator.protocol import Schema from kafka.protocol.struct import Struct @@ -82,7 +81,7 @@ def __init__(self, cluster, members): # a mapping of all consumers to all potential topic partitions that can be assigned to them self.consumer_to_all_potential_partitions = {} # an ascending sorted set of consumers based on how many topic partitions are already assigned to them - self.sorted_current_subscriptions = set() + self.sorted_current_subscriptions = SortedSet() # an ascending sorted list of topic partitions based on how many consumers can potentially use them self.sorted_partitions = [] # all partitions that need to be assigned @@ -154,9 +153,10 @@ def balance(self): self._add_consumer_to_current_subscriptions_and_maintain_order(consumer) def get_final_assignment(self, member_id): - assignment = defaultdict(lambda: SortedList()) + assignment = defaultdict(list) for topic_partition in self.current_assignment[member_id]: - assignment[topic_partition.topic].add(topic_partition.partition) + assignment[topic_partition.topic].append(topic_partition.partition) + assignment = {k: sorted(v) for k, v in six.iteritems(assignment)} return six.viewitems(assignment) def _initialize(self, cluster): @@ -188,7 +188,7 @@ def _init_current_assignments(self, members): # higher generations overwrite lower generations in case of a conflict # note that a conflict could exists only if user data is for different generations - # for each partition we create a sorted map of its consumers by generation + # for each partition we create a map of its consumers by generation sorted_partition_consumers_by_generation = {} for consumer, member_metadata in six.iteritems(members): for partitions in member_metadata.partitions: @@ -204,14 +204,13 @@ def _init_current_assignments(self, members): else: consumers[member_metadata.generation] = consumer else: - sorted_consumers = SortedDict() - sorted_consumers[member_metadata.generation] = consumer + sorted_consumers = {member_metadata.generation: consumer} sorted_partition_consumers_by_generation[partitions] = sorted_consumers # previous_assignment holds the prior ConsumerGenerationPair (before current) of each partition # current and previous consumers are the last two consumers of each partition in the above sorted map for partitions, consumers in six.iteritems(sorted_partition_consumers_by_generation): - generations = list(reversed(consumers.keys())) + generations = sorted(consumers.keys(), reverse=True) self.current_assignment[consumers[generations[0]]].append(partitions) # now update previous assignment if any if len(generations) > 1: @@ -236,13 +235,10 @@ def _are_subscriptions_identical(self): return has_identical_list_elements(list(six.itervalues(self.consumer_to_all_potential_partitions))) def _populate_sorted_partitions(self): - # an ascending sorted set of topic partitions based on how many consumers can potentially use them - sorted_all_partitions = SortedSet( - iterable=[ - (tp, tuple(consumers)) for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers) - ], - key=partitions_comparator_key, - ) + # set of topic partitions with their respective potential consumers + all_partitions = set((tp, tuple(consumers)) + for tp, consumers in six.iteritems(self.partition_to_all_potential_consumers)) + partitions_sorted_by_num_of_potential_consumers = sorted(all_partitions, key=partitions_comparator_key) self.sorted_partitions = [] if not self.is_fresh_assignment and self._are_subscriptions_identical(): @@ -266,7 +262,7 @@ def _populate_sorted_partitions(self): # how many valid partitions are currently assigned to them while sorted_consumers: # take the consumer with the most partitions - consumer, _ = sorted_consumers.pop() + consumer, _ = sorted_consumers.pop_last() # currently assigned partitions to this consumer remaining_partitions = assignments[consumer] # from partitions that had a different consumer before, @@ -284,13 +280,13 @@ def _populate_sorted_partitions(self): self.sorted_partitions.append(remaining_partitions.pop()) sorted_consumers.add((consumer, tuple(assignments[consumer]))) - while sorted_all_partitions: - partition = sorted_all_partitions.pop(0)[0] + while partitions_sorted_by_num_of_potential_consumers: + partition = partitions_sorted_by_num_of_potential_consumers.pop(0)[0] if partition not in self.sorted_partitions: self.sorted_partitions.append(partition) else: - while sorted_all_partitions: - self.sorted_partitions.append(sorted_all_partitions.pop(0)[0]) + while partitions_sorted_by_num_of_potential_consumers: + self.sorted_partitions.append(partitions_sorted_by_num_of_potential_consumers.pop(0)[0]) def _populate_partitions_to_reassign(self): self.unassigned_partitions = deepcopy(self.sorted_partitions) @@ -334,10 +330,10 @@ def _initialize_current_subscriptions(self): ) def _get_consumer_with_least_subscriptions(self): - return self.sorted_current_subscriptions[0][0] + return self.sorted_current_subscriptions.first()[0] def _get_consumer_with_most_subscriptions(self): - return self.sorted_current_subscriptions[-1][0] + return self.sorted_current_subscriptions.last()[0] def _remove_consumer_from_current_subscriptions_and_maintain_order(self, consumer): self.sorted_current_subscriptions.remove((consumer, tuple(self.current_assignment[consumer]))) diff --git a/requirements-dev.txt b/requirements-dev.txt index ace1f8c93..d2830905b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,4 +15,3 @@ pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 py==1.8.0 -sortedcontainers==2.1.0 diff --git a/setup.py b/setup.py index 35c0e754a..391226042 100644 --- a/setup.py +++ b/setup.py @@ -59,8 +59,5 @@ def run(cls): "Programming Language :: Python :: 3.7", "Programming Language :: Python :: Implementation :: PyPy", "Topic :: Software Development :: Libraries :: Python Modules", - ], - install_requires=[ - "sortedcontainers==2.1.0" ] )