From bc0f7f772dbea2033e81a5bfee3da752f7f853a8 Mon Sep 17 00:00:00 2001 From: tomer Date: Mon, 22 Oct 2018 10:29:19 +0300 Subject: [PATCH] create round-robin partitioner starting from a random partition --- kafka/partitioner/__init__.py | 3 ++- kafka/partitioner/randroundrobin.py | 31 +++++++++++++++++++++++ test/test_partitioner.py | 39 ++++++++++++++++++++++++++++- 3 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 kafka/partitioner/randroundrobin.py diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py index a9dbbdccb..feb18a4f8 100644 --- a/kafka/partitioner/__init__.py +++ b/kafka/partitioner/__init__.py @@ -3,8 +3,9 @@ from kafka.partitioner.default import DefaultPartitioner from kafka.partitioner.hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner from kafka.partitioner.roundrobin import RoundRobinPartitioner +from kafka.partitioner.randroundrobin import RandStartRoundRobinPartitioner __all__ = [ 'DefaultPartitioner', 'RoundRobinPartitioner', 'HashedPartitioner', - 'Murmur2Partitioner', 'LegacyPartitioner' + 'Murmur2Partitioner', 'LegacyPartitioner', 'RandStartRoundRobinPartitioner' ] diff --git a/kafka/partitioner/randroundrobin.py b/kafka/partitioner/randroundrobin.py new file mode 100644 index 000000000..30521e2dd --- /dev/null +++ b/kafka/partitioner/randroundrobin.py @@ -0,0 +1,31 @@ +from __future__ import absolute_import + +import random + +from kafka.partitioner import RoundRobinPartitioner +from kafka.partitioner.roundrobin import CachedPartitionCycler + + +class RandStartRoundRobinPartitioner(RoundRobinPartitioner): + """Random start round robin partitioner. + Selects first partition randomly and starts a round robin cycle + """ + def __init__(self, partitions=None): + self.partitions_iterable = CachedRandomPartitionCycler(partitions) + if partitions: + self._set_partitions(partitions) + else: + self.partitions = None + + +class CachedRandomPartitionCycler(CachedPartitionCycler): + + def next(self): + assert self.partitions is not None + if self.cur_pos is None: + self.cur_pos = random.choice(self.partitions) + if not self._index_available(self.cur_pos, self.partitions): + self.cur_pos = 0 + cur_item = self.partitions[self.cur_pos] + self.cur_pos += 1 + return cur_item diff --git a/test/test_partitioner.py b/test/test_partitioner.py index 47470e1bd..896b9bfda 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -1,6 +1,6 @@ from __future__ import absolute_import -from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner +from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner, RandStartRoundRobinPartitioner from kafka.partitioner.hashed import murmur2 @@ -68,3 +68,40 @@ def test_murmur2_not_ascii(): # Verify no regression of murmur2() bug encoding py2 bytes that don't ascii encode murmur2(b'\xa4') murmur2(b'\x81' * 1000) + + +def _get_expected_partitions(self, partitioner, all_partitions, available): + max_partition = available[len(available) - 1] + first_partition = partitioner(None, all_partitions, available) + + # create expected partitions based on range of first_partition -> max + expected_partitions = list(range(first_partition, max_partition + 1, 1)) + list(range(0, first_partition, 1)) + + # swap first and last elements as next call to partitioner(...) will return first_partition + 1 + expected_partitions.append(expected_partitions.pop(0)) + return expected_partitions + + +def test_randstartroundrobin_partitioner(self): + partitioner = RandStartRoundRobinPartitioner() + all_partitions = list(range(100)) + + # partitioner should cycle between partition - first partition is random + available = all_partitions + + expected_partitions = self._get_expected_partitions(partitioner, all_partitions, available) + for expected in expected_partitions: + assert expected == partitioner(None, all_partitions, available) + + # test dynamic partition re-assignment + available = available[:-25] + + expected_partitions = self._get_expected_partitions(partitioner, all_partitions, available) + for expected in expected_partitions: + assert expected == partitioner(None, all_partitions, available) + + all_partitions = list(range(200)) + available = all_partitions + expected_partitions = self._get_expected_partitions(partitioner, all_partitions, available) + for expected in expected_partitions: + assert expected == partitioner(None, all_partitions, available)