diff --git a/.gitignore b/.gitignore index 72f931b..b01979f 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,9 @@ wheelhouse vcpkg_installed/ *.pyd *.lib + + +lib_pulsar.so +tests/test.log +.tests-container-id.txt + diff --git a/pulsar/__init__.py b/pulsar/__init__.py index c85c6e3..423ed0e 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -43,10 +43,12 @@ """ import logging +from typing import List, Tuple, Optional + import _pulsar from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ - LoggerLevel, BatchReceivePolicy # noqa: F401 + LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401 from pulsar.__about__ import __version__ @@ -689,7 +691,8 @@ def subscribe(self, topic, subscription_name, max_pending_chunked_message=10, auto_ack_oldest_chunked_message_on_queue_full=False, start_message_id_inclusive=False, - batch_receive_policy=None + batch_receive_policy=None, + key_shared_policy=None ): """ Subscribe to the given topic and subscription combination. @@ -774,6 +777,8 @@ def my_listener(consumer, message): Set the consumer to include the given position of any reset operation like Consumer::seek. batch_receive_policy: class ConsumerBatchReceivePolicy Set the batch collection policy for batch receiving. + key_shared_policy: class ConsumerKeySharedPolicy + Set the key shared policy for use when the ConsumerType is KeyShared. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -794,6 +799,7 @@ def my_listener(consumer, message): _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full') _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') + _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -826,6 +832,9 @@ def my_listener(consumer, message): if batch_receive_policy: conf.batch_receive_policy(batch_receive_policy.policy()) + if key_shared_policy: + conf.key_shared_policy(key_shared_policy.policy()) + c = Consumer() if isinstance(topic, str): # Single topic @@ -1447,6 +1456,73 @@ def policy(self): """ return self._policy +class ConsumerKeySharedPolicy: + """ + Consumer key shared policy is used to configure the consumer behaviour when the ConsumerType is KeyShared. + """ + def __init__( + self, + key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit, + allow_out_of_order_delivery: bool = False, + sticky_ranges: Optional[List[Tuple[int, int]]] = None, + ): + """ + Wrapper KeySharedPolicy. + + Parameters + ---------- + + key_shared_mode: KeySharedMode, optional + Set the key shared mode. eg: KeySharedMode.Sticky or KeysharedMode.AutoSplit + + allow_out_of_order_delivery: bool, optional + Set whether to allow for out of order delivery + If it is enabled, it relaxes the ordering requirement and allows the broker to send out-of-order + messages in case of failures. This makes it faster for new consumers to join without being stalled by + an existing slow consumer. + + If this is True, a single consumer still receives all keys, but they may come in different orders. + + sticky_ranges: List[Tuple[int, int]], optional + Set the ranges used with sticky mode. The integers can be from 0 to 2^16 (0 <= val < 65,536) + """ + if key_shared_mode == KeySharedMode.Sticky and sticky_ranges is None: + raise ValueError("When using key_shared_mode = KeySharedMode.Sticky you must also provide sticky_ranges") + + self._policy = KeySharedPolicy() + self._policy.set_key_shared_mode(key_shared_mode) + self._policy.set_allow_out_of_order_delivery(allow_out_of_order_delivery) + + if sticky_ranges is not None: + self._policy.set_sticky_ranges(sticky_ranges) + + @property + def key_shared_mode(self) -> KeySharedMode: + """ + Returns the key shared mode + """ + return self._policy.get_key_shared_mode() + + @property + def allow_out_of_order_delivery(self) -> bool: + """ + Returns whether out of order delivery is enabled + """ + return self._policy.is_allow_out_of_order_delivery() + + @property + def sticky_ranges(self) -> List[Tuple[int, int]]: + """ + Returns the actual sticky ranges + """ + return self._policy.get_sticky_ranges() + + def policy(self): + """ + Returns the actual KeySharedPolicy. + """ + return self._policy + class Reader: """ Pulsar topic reader. diff --git a/src/config.cc b/src/config.cc index 71795dd..7e2d38d 100644 --- a/src/config.cc +++ b/src/config.cc @@ -21,8 +21,10 @@ #include #include #include +#include #include #include +#include #include namespace py = pybind11; @@ -121,6 +123,15 @@ static ClientConfiguration& ClientConfiguration_setFileLogger(ClientConfiguratio void export_config(py::module_& m) { using namespace py; + class_>(m, "KeySharedPolicy") + .def(init<>()) + .def("set_key_shared_mode", &KeySharedPolicy::setKeySharedMode, return_value_policy::reference) + .def("get_key_shared_mode", &KeySharedPolicy::getKeySharedMode) + .def("set_allow_out_of_order_delivery", &KeySharedPolicy::setAllowOutOfOrderDelivery, return_value_policy::reference) + .def("is_allow_out_of_order_delivery", &KeySharedPolicy::isAllowOutOfOrderDelivery) + .def("set_sticky_ranges", static_cast(&KeySharedPolicy::setStickyRanges), return_value_policy::reference) + .def("get_sticky_ranges", &KeySharedPolicy::getStickyRanges); + class_>(m, "AbstractCryptoKeyReader") .def("getPublicKey", &CryptoKeyReader::getPublicKey) .def("getPrivateKey", &CryptoKeyReader::getPrivateKey); @@ -222,6 +233,8 @@ void export_config(py::module_& m) { class_>(m, "ConsumerConfiguration") .def(init<>()) + .def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy) + .def("key_shared_policy", &ConsumerConfiguration::setKeySharedPolicy, return_value_policy::reference) .def("consumer_type", &ConsumerConfiguration::getConsumerType) .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_value_policy::reference) .def("schema", &ConsumerConfiguration::getSchema, return_value_policy::copy) diff --git a/src/enums.cc b/src/enums.cc index f61011f..8dacc54 100644 --- a/src/enums.cc +++ b/src/enums.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include using namespace pulsar; @@ -28,6 +29,10 @@ namespace py = pybind11; void export_enums(py::module_& m) { using namespace py; + enum_(m, "KeySharedMode") + .value("AutoSplit", AUTO_SPLIT) + .value("Sticky", STICKY); + enum_(m, "PartitionsRoutingMode") .value("UseSinglePartition", ProducerConfiguration::UseSinglePartition) .value("RoundRobinDistribution", ProducerConfiguration::RoundRobinDistribution) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index eeb2a6a..3ec89a7 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -32,6 +32,8 @@ MessageId, CompressionType, ConsumerType, + KeySharedMode, + ConsumerKeySharedPolicy, PartitionsRoutingMode, AuthenticationBasic, AuthenticationTLS, @@ -1437,6 +1439,134 @@ def send_callback(res, msg): producer.flush() client.close() + def test_keyshare_policy(self): + with self.assertRaises(ValueError): + # Raise error because sticky ranges are not provided. + pulsar.ConsumerKeySharedPolicy( + key_shared_mode=pulsar.KeySharedMode.Sticky, + allow_out_of_order_delivery=False, + ) + + expected_key_shared_mode = pulsar.KeySharedMode.Sticky + expected_allow_out_of_order_delivery = True + expected_sticky_ranges = [(0, 100), (101,200)] + consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy( + key_shared_mode=expected_key_shared_mode, + allow_out_of_order_delivery=expected_allow_out_of_order_delivery, + sticky_ranges=expected_sticky_ranges + ) + + self.assertEqual(consumer_key_shared_policy.key_shared_mode, expected_key_shared_mode) + self.assertEqual(consumer_key_shared_policy.allow_out_of_order_delivery, expected_allow_out_of_order_delivery) + self.assertEqual(consumer_key_shared_policy.sticky_ranges, expected_sticky_ranges) + + def test_keyshared_invalid_sticky_ranges(self): + client = Client(self.serviceUrl) + topic = "my-python-topic-keyshare-invalid-" + str(time.time()) + with self.assertRaises(ValueError): + consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy( + key_shared_mode=pulsar.KeySharedMode.Sticky, + allow_out_of_order_delivery=False, + sticky_ranges=[(0,65536)] + ) + client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, + start_message_id_inclusive=True, + key_shared_policy=consumer_key_shared_policy) + + with self.assertRaises(ValueError): + consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy( + key_shared_mode=pulsar.KeySharedMode.Sticky, + allow_out_of_order_delivery=False, + sticky_ranges=[(0, 100), (50, 150)] + ) + client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, + start_message_id_inclusive=True, + key_shared_policy=consumer_key_shared_policy) + + def test_keyshared_autosplit(self): + client = Client(self.serviceUrl) + topic = "my-python-topic-keyshare-autosplit-" + str(time.time()) + consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy( + key_shared_mode=pulsar.KeySharedMode.AutoSplit, + allow_out_of_order_delivery=True, + ) + consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-1', + start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy) + consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-2', + start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy) + producer = client.create_producer(topic) + + for i in range(10): + if i > 0: + time.sleep(0.02) + producer.send(b"hello-%d" % i) + + msgs = [] + while True: + try: + msg = consumer.receive(100) + except pulsar.Timeout: + break + msgs.append(msg) + consumer.acknowledge(msg) + + while True: + try: + msg = consumer2.receive(100) + except pulsar.Timeout: + break + msgs.append(msg) + consumer2.acknowledge(msg) + + self.assertEqual(len(msgs), 10) + client.close() + + def test_sticky_autosplit(self): + client = Client(self.serviceUrl) + topic = "my-python-topic-keyshare-sticky-" + str(time.time()) + consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy( + key_shared_mode=pulsar.KeySharedMode.Sticky, + allow_out_of_order_delivery=True, + sticky_ranges=[(0,30000)], + ) + + consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-1', + start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy) + + consumer2_key_shared_policy = pulsar.ConsumerKeySharedPolicy( + key_shared_mode=pulsar.KeySharedMode.Sticky, + allow_out_of_order_delivery=True, + sticky_ranges=[(30001, 65535)], + ) + consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-2', + start_message_id_inclusive=True, key_shared_policy=consumer2_key_shared_policy) + producer = client.create_producer(topic) + + for i in range(10): + if i > 0: + time.sleep(0.02) + producer.send(b"hello-%d" % i) + + msgs = [] + while True: + try: + msg = consumer.receive(100) + except pulsar.Timeout: + break + msgs.append(msg) + consumer.acknowledge(msg) + + while True: + try: + msg = consumer2.receive(100) + except pulsar.Timeout: + break + msgs.append(msg) + consumer2.acknowledge(msg) + + self.assertEqual(len(msgs), 10) + client.close() + def test_acknowledge_failed(self): client = Client(self.serviceUrl) topic = 'test_acknowledge_failed' @@ -1461,5 +1591,6 @@ def test_acknowledge_failed(self): client.close() + if __name__ == "__main__": main()