Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement/Feature Issue #47] Added support for KeySharedPolicy for the consumer when in KeyShared mode. #109

Merged
merged 5 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ wheelhouse
vcpkg_installed/
*.pyd
*.lib


lib_pulsar.so
tests/test.log
.tests-container-id.txt

80 changes: 78 additions & 2 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
"""

import logging
from typing import List, Tuple, Optional

import _pulsar

from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
LoggerLevel, BatchReceivePolicy # noqa: F401
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401

from pulsar.__about__ import __version__

Expand Down Expand Up @@ -689,7 +691,8 @@ def subscribe(self, topic, subscription_name,
max_pending_chunked_message=10,
auto_ack_oldest_chunked_message_on_queue_full=False,
start_message_id_inclusive=False,
batch_receive_policy=None
batch_receive_policy=None,
key_shared_policy=None
):
"""
Subscribe to the given topic and subscription combination.
Expand Down Expand Up @@ -774,6 +777,8 @@ def my_listener(consumer, message):
Set the consumer to include the given position of any reset operation like Consumer::seek.
batch_receive_policy: class ConsumerBatchReceivePolicy
Set the batch collection policy for batch receiving.
key_shared_policy: class ConsumerKeySharedPolicy
Set the key shared policy for use when the ConsumerType is KeyShared.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
Expand All @@ -794,6 +799,7 @@ def my_listener(consumer, message):
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')

conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
Expand Down Expand Up @@ -826,6 +832,9 @@ def my_listener(consumer, message):
if batch_receive_policy:
conf.batch_receive_policy(batch_receive_policy.policy())

if key_shared_policy:
conf.key_shared_policy(key_shared_policy.policy())

c = Consumer()
if isinstance(topic, str):
# Single topic
Expand Down Expand Up @@ -1447,6 +1456,73 @@ def policy(self):
"""
return self._policy

class ConsumerKeySharedPolicy:
"""
Consumer key shared policy is used to configure the consumer behaviour when the ConsumerType is KeyShared.
"""
def __init__(
self,
key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit,
allow_out_of_order_delivery: bool = False,
sticky_ranges: Optional[List[Tuple[int, int]]] = None,
):
"""
Wrapper KeySharedPolicy.

Parameters
----------

key_shared_mode: KeySharedMode, optional
Set the key shared mode. eg: KeySharedMode.Sticky or KeysharedMode.AutoSplit

allow_out_of_order_delivery: bool, optional
Set whether to allow for out of order delivery
If it is enabled, it relaxes the ordering requirement and allows the broker to send out-of-order
messages in case of failures. This makes it faster for new consumers to join without being stalled by
an existing slow consumer.

If this is True, a single consumer still receives all keys, but they may come in different orders.

sticky_ranges: List[Tuple[int, int]], optional
Set the ranges used with sticky mode. The integers can be from 0 to 2^16 (0 <= val < 65,536)
"""
if key_shared_mode == KeySharedMode.Sticky and sticky_ranges is None:
raise ValueError("When using key_shared_mode = KeySharedMode.Sticky you must also provide sticky_ranges")

self._policy = KeySharedPolicy()
self._policy.set_key_shared_mode(key_shared_mode)
self._policy.set_allow_out_of_order_delivery(allow_out_of_order_delivery)

if sticky_ranges is not None:
self._policy.set_sticky_ranges(sticky_ranges)

@property
def key_shared_mode(self) -> KeySharedMode:
"""
Returns the key shared mode
"""
return self._policy.get_key_shared_mode()

@property
def allow_out_of_order_delivery(self) -> bool:
"""
Returns whether out of order delivery is enabled
"""
return self._policy.is_allow_out_of_order_delivery()

@property
def sticky_ranges(self) -> List[Tuple[int, int]]:
"""
Returns the actual sticky ranges
"""
return self._policy.get_sticky_ranges()

def policy(self):
"""
Returns the actual KeySharedPolicy.
"""
return self._policy

class Reader:
"""
Pulsar topic reader.
Expand Down
13 changes: 13 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include <pulsar/ConsoleLoggerFactory.h>
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/KeySharedPolicy.h>
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <memory>

namespace py = pybind11;
Expand Down Expand Up @@ -121,6 +123,15 @@ static ClientConfiguration& ClientConfiguration_setFileLogger(ClientConfiguratio
void export_config(py::module_& m) {
using namespace py;

class_<KeySharedPolicy, std::shared_ptr<KeySharedPolicy>>(m, "KeySharedPolicy")
.def(init<>())
.def("set_key_shared_mode", &KeySharedPolicy::setKeySharedMode, return_value_policy::reference)
.def("get_key_shared_mode", &KeySharedPolicy::getKeySharedMode)
.def("set_allow_out_of_order_delivery", &KeySharedPolicy::setAllowOutOfOrderDelivery, return_value_policy::reference)
.def("is_allow_out_of_order_delivery", &KeySharedPolicy::isAllowOutOfOrderDelivery)
.def("set_sticky_ranges", static_cast<KeySharedPolicy& (KeySharedPolicy::*)(const StickyRanges&)>(&KeySharedPolicy::setStickyRanges), return_value_policy::reference)
.def("get_sticky_ranges", &KeySharedPolicy::getStickyRanges);

class_<CryptoKeyReader, std::shared_ptr<CryptoKeyReader>>(m, "AbstractCryptoKeyReader")
.def("getPublicKey", &CryptoKeyReader::getPublicKey)
.def("getPrivateKey", &CryptoKeyReader::getPrivateKey);
Expand Down Expand Up @@ -222,6 +233,8 @@ void export_config(py::module_& m) {

class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, "ConsumerConfiguration")
.def(init<>())
.def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
.def("key_shared_policy", &ConsumerConfiguration::setKeySharedPolicy, return_value_policy::reference)
.def("consumer_type", &ConsumerConfiguration::getConsumerType)
.def("consumer_type", &ConsumerConfiguration::setConsumerType, return_value_policy::reference)
.def("schema", &ConsumerConfiguration::getSchema, return_value_policy::copy)
Expand Down
5 changes: 5 additions & 0 deletions src/enums.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <pulsar/CompressionType.h>
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/KeySharedPolicy.h>
#include <pybind11/pybind11.h>

using namespace pulsar;
Expand All @@ -28,6 +29,10 @@ namespace py = pybind11;
void export_enums(py::module_& m) {
using namespace py;

enum_<KeySharedMode>(m, "KeySharedMode")
.value("AutoSplit", AUTO_SPLIT)
.value("Sticky", STICKY);

enum_<ProducerConfiguration::PartitionsRoutingMode>(m, "PartitionsRoutingMode")
.value("UseSinglePartition", ProducerConfiguration::UseSinglePartition)
.value("RoundRobinDistribution", ProducerConfiguration::RoundRobinDistribution)
Expand Down
131 changes: 131 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
MessageId,
CompressionType,
ConsumerType,
KeySharedMode,
ConsumerKeySharedPolicy,
PartitionsRoutingMode,
AuthenticationBasic,
AuthenticationTLS,
Expand Down Expand Up @@ -1437,6 +1439,134 @@ def send_callback(res, msg):
producer.flush()
client.close()

def test_keyshare_policy(self):
with self.assertRaises(ValueError):
# Raise error because sticky ranges are not provided.
pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
)

expected_key_shared_mode = pulsar.KeySharedMode.Sticky
expected_allow_out_of_order_delivery = True
expected_sticky_ranges = [(0, 100), (101,200)]
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=expected_key_shared_mode,
allow_out_of_order_delivery=expected_allow_out_of_order_delivery,
sticky_ranges=expected_sticky_ranges
)

self.assertEqual(consumer_key_shared_policy.key_shared_mode, expected_key_shared_mode)
self.assertEqual(consumer_key_shared_policy.allow_out_of_order_delivery, expected_allow_out_of_order_delivery)
self.assertEqual(consumer_key_shared_policy.sticky_ranges, expected_sticky_ranges)

def test_keyshared_invalid_sticky_ranges(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-invalid-" + str(time.time())
with self.assertRaises(ValueError):
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
sticky_ranges=[(0,65536)]
)
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
start_message_id_inclusive=True,
key_shared_policy=consumer_key_shared_policy)

with self.assertRaises(ValueError):
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=False,
sticky_ranges=[(0, 100), (50, 150)]
)
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
start_message_id_inclusive=True,
key_shared_policy=consumer_key_shared_policy)

def test_keyshared_autosplit(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-autosplit-" + str(time.time())
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.AutoSplit,
allow_out_of_order_delivery=True,
)
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-1',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-2',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
producer = client.create_producer(topic)

for i in range(10):
if i > 0:
time.sleep(0.02)
Comment on lines +1500 to +1501
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to sleep 2 milliseconds before each send?

Copy link
Contributor Author

@hyperevo hyperevo May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the sleep to keep in line with the other tests. You can see them here

time.sleep(0.02)
and here
time.sleep(0.02)
and here
time.sleep(0.02)
. I am not sure if they are necessary or not. But since they were there in the other tests I assumed there must have been a reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave the sleeps in for now assuming there is a reason for them in the other tests. But if that is not the case then I can remove them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Most of the other sleep calls should be removed IMO. In this PR, it's better to remove them. I think we should remove all unnecessary sleep calls in other tests in a separated PR.

producer.send(b"hello-%d" % i)

msgs = []
while True:
try:
msg = consumer.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer.acknowledge(msg)

while True:
try:
msg = consumer2.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer2.acknowledge(msg)

self.assertEqual(len(msgs), 10)
client.close()

def test_sticky_autosplit(self):
client = Client(self.serviceUrl)
topic = "my-python-topic-keyshare-sticky-" + str(time.time())
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=True,
sticky_ranges=[(0,30000)],
)

consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-1',
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)

consumer2_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
key_shared_mode=pulsar.KeySharedMode.Sticky,
allow_out_of_order_delivery=True,
sticky_ranges=[(30001, 65535)],
)
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-2',
start_message_id_inclusive=True, key_shared_policy=consumer2_key_shared_policy)
producer = client.create_producer(topic)

for i in range(10):
if i > 0:
time.sleep(0.02)
producer.send(b"hello-%d" % i)

msgs = []
while True:
try:
msg = consumer.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer.acknowledge(msg)

while True:
try:
msg = consumer2.receive(100)
except pulsar.Timeout:
break
msgs.append(msg)
consumer2.acknowledge(msg)

self.assertEqual(len(msgs), 10)
client.close()

def test_acknowledge_failed(self):
client = Client(self.serviceUrl)
topic = 'test_acknowledge_failed'
Expand All @@ -1461,5 +1591,6 @@ def test_acknowledge_failed(self):
client.close()



if __name__ == "__main__":
main()