-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature][broker] PIP 37: Support chunking with Shared subscription #16202
[feature][broker] PIP 37: Support chunking with Shared subscription #16202
Conversation
@BewareMyPower Please provide a correct documentation label for your PR. |
1 similar comment
@BewareMyPower Please provide a correct documentation label for your PR. |
a815c22
to
b60557b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me. Left some comments.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadata.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
long stickyKeyHash = getStickyKeyHash(entry); | ||
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); | ||
entry.release(); | ||
if (messagesForC < entryAndMetadataList.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will messagesForC or consumer.getAvailablePermits()
be less than entryAndMetadataList.size()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, maybe the consumer's permits might change in the loop. It's added here for safety programming.
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
Show resolved
Hide resolved
8479c9c
to
588c3c1
Compare
This PR will fixes #7645 |
@codelipenghui @merlimat @rdhabalia @Jason918 @eolivelli @315157973 Could you take a look at this PR? |
588c3c1
to
a89ef8a
Compare
/pulsarbot rerun-failure-checks |
89cc8ce
to
788c422
Compare
@merlimat @lhotari @Jason918 @rdhabalia @hangc0276 @eolivelli @codelipenghui @massakam @congbobo184 @315157973 Could you take a second look at this PR? |
788c422
to
43d7264
Compare
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java
Show resolved
Hide resolved
@BewareMyPower Please help resolve the conflicts. |
43d7264
to
76db14f
Compare
### Motivation https://github.com/apache/pulsar/wiki/PIP-37%3A-Large-message-size-handling-in-Pulsar#option-1-broker-caches-mapping-of-message-uuid-and-consumerid The cons of option 1 described in the original proposal don't exist for current code because broker keeps redelivered messages into **sorted** map now. ### Modifications First of all, to avoid too many code changes, an `EntryAndMetadata` class is introduced to bind the `Entry` with the associated `MessageMetadata` to avoid parsing the metadata repeatedly. It also implements the `Entry` interface, so this PR changes some `List<Entry>` parameters to `List<? extends Entry>` so that a `List<EntryAndMetadata>` argument can be accepted. Then, a `SharedConsumerAssignor` is introduced to assign a list of entries to all shared consumers. 1. Use a default selector to select the next consumer, like `PersistentDispatcherMultipleConsumers#getNextConsumer`, 2. Each time a consumer is chosen, assign the entries in range [i, i+permits) to the consumer except entries that have uuid: - If uuid is not cached, cache `uuid -> consumer` to indicate the chunked message of this uuid must be dispatched to this consumer. - Otherwise, assign this entry to the owner consumer of the uuid. The `assign` method returns a map that maps `Consumer` to `List<EntryAndMetadata>`. The following logic is similar to the Key_Shared dispatcher. Finally, cancel the limit in `ConsumerImpl`. ### Verifying this change `SharedConsumerAssignorTest` is added to show how the assignor works in detail. `MessageChunkingSharedTest` is added to verify the Shared dispatcher works on chunked messages, including: - Single producer sends chunked messages with various chunk count to a consumer has a limited permits. - Single producer sends chunked messages to two consumers to verify both they can receive chunked messages. - Produce interleaved chunks via `PersistentTopic` directly to simulate multiple producers, and verify the new consumer can receive all unacknowledged messages received by the old consumer. ### TODO We need to change the implementation of `ChunkMessageIdImpl` to make it possible for consumer to acknowledge all entries of a chunked message. Since this PR already includes many changes, I will do that later.
76db14f
to
6fcc4cf
Compare
…pache#16202) * [feature][broker] PIP 37: Support chunking with Shared subscription ### Motivation https://github.com/apache/pulsar/wiki/PIP-37%3A-Large-message-size-handling-in-Pulsar#option-1-broker-caches-mapping-of-message-uuid-and-consumerid The cons of option 1 described in the original proposal don't exist for current code because broker keeps redelivered messages into **sorted** map now. ### Modifications First of all, to avoid too many code changes, an `EntryAndMetadata` class is introduced to bind the `Entry` with the associated `MessageMetadata` to avoid parsing the metadata repeatedly. It also implements the `Entry` interface, so this PR changes some `List<Entry>` parameters to `List<? extends Entry>` so that a `List<EntryAndMetadata>` argument can be accepted. Then, a `SharedConsumerAssignor` is introduced to assign a list of entries to all shared consumers. 1. Use a default selector to select the next consumer, like `PersistentDispatcherMultipleConsumers#getNextConsumer`, 2. Each time a consumer is chosen, assign the entries in range [i, i+permits) to the consumer except entries that have uuid: - If uuid is not cached, cache `uuid -> consumer` to indicate the chunked message of this uuid must be dispatched to this consumer. - Otherwise, assign this entry to the owner consumer of the uuid. The `assign` method returns a map that maps `Consumer` to `List<EntryAndMetadata>`. The following logic is similar to the Key_Shared dispatcher. Finally, cancel the limit in `ConsumerImpl`. ### Verifying this change `SharedConsumerAssignorTest` is added to show how the assignor works in detail. `MessageChunkingSharedTest` is added to verify the Shared dispatcher works on chunked messages, including: - Single producer sends chunked messages with various chunk count to a consumer has a limited permits. - Single producer sends chunked messages to two consumers to verify both they can receive chunked messages. - Produce interleaved chunks via `PersistentTopic` directly to simulate multiple producers, and verify the new consumer can receive all unacknowledged messages received by the old consumer.
Motivation
https://github.com/apache/pulsar/wiki/PIP-37%3A-Large-message-size-handling-in-Pulsar#option-1-broker-caches-mapping-of-message-uuid-and-consumerid
The cons of option 1 described in the original proposal don't exist
for current code because broker keeps redelivered messages into
sorted map now.
Modifications
First of all, to avoid too many code changes, an
EntryAndMetadata
class is introduced to bind the
Entry
with the associatedMessageMetadata
to avoid parsing the metadata repeatedly. It alsoimplements the
Entry
interface, so this PR changes someList<Entry>
parameters toList<? extends Entry>
so that aList<EntryAndMetadata>
argument can be accepted.Then, a
SharedConsumerAssignor
is introduced to assign a list ofentries to all shared consumers.
PersistentDispatcherMultipleConsumers#getNextConsumer
,[i, i+permits) to the consumer except entries that have uuid:
uuid -> consumer
to indicate thechunked message of this uuid must be dispatched to this consumer.
The
assign
method returns a map that mapsConsumer
toList<EntryAndMetadata>
. The following logic is similar to theKey_Shared dispatcher.
Finally, cancel the limit in
ConsumerImpl
.Verifying this change
SharedConsumerAssignorTest
is added to show how the assignor worksin detail.
MessageChunkingSharedTest
is added to verify the Shared dispatcherworks on chunked messages, including:
consumer has a limited permits.
both they can receive chunked messages.
PersistentTopic
directly to simulatemultiple producers, and verify the new consumer can receive all
unacknowledged messages received by the old consumer.
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)