-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Moved entries filtering from consumer to dispatcher #4329
Conversation
ac3a021
to
dcfa102
Compare
} | ||
} | ||
|
||
public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, String subscription, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems we need subscription
and consumerId
only for logging and not actually needed by the utility. So, can we remove it from the signature and we can log it at dispatcher if getNumberOfMessagesInBatch(..)
returns -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question was that the util method is called from multiple places, so we can avoid duplicating that logging code
* @param subscription | ||
* the subscription object | ||
*/ | ||
public void filterEntriesForConsumer(List<Entry> entries, int[] batchSizes, SendMessageInfo sendMessageInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on what bases are we going to filter the entry? do we have plan to pass Consumer to apply function for filtering?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current plan is to have a general way to filter out messages for (corrupted, delayed messages, internal markers).
It could be also used, in future, to do some other filtering as well, though that's not the intention here to generalize for that.
// Message metadata was corrupted | ||
entries.set(i, null); | ||
entry.release(); | ||
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add debug log or may be info log because it shouldn't happen often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already logged in the metadata parsing method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need to do this on subscription? The operation is to tell the managed ledger to not give us this message again. dispatchers have a handle on the managed ledger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most likely, yes, though technically it's only true for PersistentDispatcher. This asbtract method it's shared across all the types of dispatchers, hence it goes though the Subscription abstraction.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Left some small comments.
// Message metadata was corrupted | ||
entries.set(i, null); | ||
entry.release(); | ||
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need to do this on subscription? The operation is to tell the managed ledger to not give us this message again. dispatchers have a handle on the managed ledger.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs | ||
MESSAGE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); | ||
incrementUnackedMessages(sendMessageInfo.getTotalMessages()); | ||
msgOut.recordMultipleEvents(sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes()); | ||
|
||
ctx.channel().eventLoop().execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this call to another method which is only given the entries, to avoid the temptation to touch batchSizes or the message Info in the callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. there was actually a problem there already. Since I need to check the sizes from the loop anyway, I just changed it to use a recyclable instead of the thread local. Then it becomes easier and cleaner.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
run cpp tests |
@rdhabalia @ivankelly Please take another look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm #shipit
Motivation
As discussed in #4062, there is a back and forth between the dispatcher and the consumer when passing the messages. The key points are:
With this PR, the dispatcher will do the filter and it will prefill a the batch sizes for each entry. Consumer will just see the filtered list.