-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement/Feature Issue #47] Added support for KeySharedPolicy for the consumer when in KeyShared mode. #109
Conversation
@BewareMyPower Would you be able to help review this PR along with the dependent PR on the C++ client? I see that you are a contributor to both repos. |
Sure. |
Thanks! |
8eff634
to
bec4cce
Compare
Now that the pulsar cpp client included the code I added in the newest 3.2.0 release, the fixes in this PR will be compatable with it. |
The |
if i > 0: | ||
time.sleep(0.02) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to sleep 2 milliseconds before each send?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the sleep to keep in line with the other tests. You can see them here
pulsar-client-python/tests/pulsar_test.py
Line 861 in 39ac8f8
time.sleep(0.02) |
pulsar-client-python/tests/pulsar_test.py
Line 931 in 39ac8f8
time.sleep(0.02) |
pulsar-client-python/tests/pulsar_test.py
Line 1077 in 39ac8f8
time.sleep(0.02) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave the sleeps in for now assuming there is a reason for them in the other tests. But if that is not the case then I can remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. Most of the other sleep
calls should be removed IMO. In this PR, it's better to remove them. I think we should remove all unnecessary sleep
calls in other tests in a separated PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are wrong.
BTW, I think these tests are meaningless that maybe we can remove them. Because they only test two consumers can consume all the messages from a topic when key_shared_mode
, allow_out_of_order_delivery
, sticky_ranges
are configured.
@hyperevo I've left my comments here. Since we're going to release 3.2.0 soon (https://lists.apache.org/thread/4ryrobppjl93gcml63hcnpnjkrn6lrtn), please address the comments ASAP if you want this PR to be included in 3.2.0. |
@BewareMyPower All the comments should be resolved now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approve it. We can open another PR to remove the sleep calls
Motivation
The pulsar python client lacks support for defining KeyShared behaviour like out of order message delivery and sticky-hash, auto-hash for consumers in KeyShared mode. This PR adds full support. The user can now provide a KeySharedPolicy when starting a consumer with client.subscribe() #47
The ConsumerConfiguration::KeySharedPolicy and related setter/getter are now exposed to the Python client in this PR.
Modifications
In order to add this functionality I also had to modify the C++ client slightly. The setter function for setting the sticky ranges in the C++ client takes a parameter of the type std::initializer_list as seen here https://github.com/apache/pulsar-client-cpp/blob/86d66bdb09bdb596a2a5c25b0a09f3d0182d683e/include/pulsar/KeySharedPolicy.h#L99. But std::initializer_list types are not supported and probably won't ever be supported by pybind11. See pybind/pybind11#1302 (comment). So I added an overloaded setter function that takes a parameter of the type std::vector. This modification enables us to have python bindings for KeySharedPolicy. I have submitted a PR to the C++ client here apache/pulsar-client-cpp#242.
This PR won't work until that referenced C++ client PR is merged.
Verifying this change
Added 4 new unit tests. Ran them to ensure they pass.
Documentation
Added documentation in the form of comments and docstrings to the new classes and parameters that were added.