-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix NPE of cumulative ack mode and incorrect unack message count #14021
Fix NPE of cumulative ack mode and incorrect unack message count #14021
Conversation
@congbobo184:Thanks for your contribution. For this PR, do we need to update docs? |
@congbobo184:Thanks for providing doc info! |
@congbobo184:Thanks for providing doc info! |
@Technoboy- PTAL :) |
Unack messages for Failover and Exclusive subscription is meaningless. |
...oker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
Outdated
Show resolved
Hide resolved
…rease_broker_consumer_unackMessageCount_throw_npe
…unt_throw_npe' of https://github.com/congbobo184/pulsar into congbobo184_fix_decrease_broker_consumer_unackMessageCount_throw_npe
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
) link #13383 ## Motivation #13383 has fixed the batch message ack does not decrease the unacked-msg count, but in cumulative ack mode also decrease, it will use pendingAcks, but in cumulative ack, this will not init. ![image](https://user-images.githubusercontent.com/39078850/151622041-7fb0acc5-32fd-4140-82d7-8c75d2a6aef5.png) ![image](https://user-images.githubusercontent.com/39078850/151622106-bf75f3fa-84d5-4099-99f4-50f4dddd43a2.png) If ack the batch index one by one, the last ack of a batch will decrease unack message with `batchSize` ``` ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 9 ================ batch size -> 10 ``` ### Modifications add judge `Subscription.isIndividualAckMode(subType)` when get ackCount. If the ack from consumer don't have ackset, we should treat it as empty ackset to calculate the ack count with the currently ackset. (cherry picked from commit 618f17c)
) link #13383 ## Motivation #13383 has fixed the batch message ack does not decrease the unacked-msg count, but in cumulative ack mode also decrease, it will use pendingAcks, but in cumulative ack, this will not init. ![image](https://user-images.githubusercontent.com/39078850/151622041-7fb0acc5-32fd-4140-82d7-8c75d2a6aef5.png) ![image](https://user-images.githubusercontent.com/39078850/151622106-bf75f3fa-84d5-4099-99f4-50f4dddd43a2.png) If ack the batch index one by one, the last ack of a batch will decrease unack message with `batchSize` ``` ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 9 ================ batch size -> 10 ``` ### Modifications add judge `Subscription.isIndividualAckMode(subType)` when get ackCount. If the ack from consumer don't have ackset, we should treat it as empty ackset to calculate the ack count with the currently ackset. (cherry picked from commit 618f17c)
…che#14021) link apache#13383 ## Motivation apache#13383 has fixed the batch message ack does not decrease the unacked-msg count, but in cumulative ack mode also decrease, it will use pendingAcks, but in cumulative ack, this will not init. ![image](https://user-images.githubusercontent.com/39078850/151622041-7fb0acc5-32fd-4140-82d7-8c75d2a6aef5.png) ![image](https://user-images.githubusercontent.com/39078850/151622106-bf75f3fa-84d5-4099-99f4-50f4dddd43a2.png) If ack the batch index one by one, the last ack of a batch will decrease unack message with `batchSize` ``` ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 9 ================ batch size -> 10 ``` ### Modifications add judge `Subscription.isIndividualAckMode(subType)` when get ackCount. If the ack from consumer don't have ackset, we should treat it as empty ackset to calculate the ack count with the currently ackset.
link #13383
Motivation
#13383 has fixed the batch message ack does not decrease the unacked-msg count, but in cumulative ack mode also decrease, it will use pendingAcks, but in cumulative ack, this will not init.
If ack the batch index one by one, the last ack of a batch will decrease unack message with
batchSize
Modifications
add judge
Subscription.isIndividualAckMode(subType)
when get ackCount.If the ack from consumer don't have ackset, we should treat it as empty ackset to calculate the ack count with the currently ackset.
Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)