-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix consumer stops receiving messages when with large backlogs processing #22454
Conversation
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Show resolved
Hide resolved
Great work on this fix @Technoboy- ! Just one minor comment in #22454 (comment) . |
...ar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
Outdated
Show resolved
Hide resolved
...ar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
Outdated
Show resolved
Hide resolved
...ar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good work @Technoboy
…acklogs processing (apache#22454)
…acklogs processing (#22454)
@@ -308,7 +308,6 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor | |||
|
|||
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { | |||
deactivateCursor(); | |||
topic.getManagedLedger().removeWaitingCursor(cursor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just wondering about this removal now after the PR has been merged. It seems that whatever isResetCursor
means that it would be skipped in that case. @Technoboy- is that a problem?
…acklogs processing (apache#22454) (cherry picked from commit 40329ee)
…acklogs processing (apache#22454) (cherry picked from commit 40329ee)
…acklogs processing (apache#22454) (cherry picked from commit 40329ee)
…acklogs processing (apache#22454) (cherry picked from commit 40329ee)
…acklogs processing (apache#22454) (cherry picked from commit 40329ee)
Fixes #22435
Motivation
When the backlog size >
managedLedgerCursorBackloggedThreshold
(default 1000), the cursor will be set inactive first. so this will cause the non-durable cursor not to add to thewaiting cursor.
This leads to the consumer stopping to receive messages.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 3082 to 3090 in 2469b97
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
Lines 3816 to 3824 in 2469b97
Documentation
doc
doc-required
doc-not-needed
doc-complete