-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [broker] fix mismatch between dispatcher.consumerList and dispatcher.consumerSet #22283
Conversation
…tcher.consumerSet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same issue with #20583, right ?
.../src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java
Show resolved
Hide resolved
It seems that public synchronized boolean canUnsubscribe(Consumer consumer) {
return consumerList.size() == 1 && consumerSet.contains(consumer);
} However, I think it's over-designed. When the |
I see the purpose of |
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After checking more details. I believe the root cause of this issue is the new subscribe request should not remove the failed request of the previous subscribe request. This is the responsibility of the previous subscribe request.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1226 to 1232 in 74585b5
} else if (existingConsumerFuture.isCompletedExceptionally()){ | |
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true, | |
String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s", | |
remoteAddress, subscriptionName)); | |
consumers.remove(consumerId, existingConsumerFuture); | |
commandSender.sendErrorResponse(requestId, error, | |
"Consumer that failed is already present on the connection"); |
We are using a failed future to ensure the new subscribe request from the same consumer should only be accepted once the old subscribe request is done. So I think the reasonable fix it to avoid the new subscribe request remove the failed future from the old subscribe request.
And we already have the mechanism to remove the failed future finally by the previous subscribe request.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1312 to 1322 in 74585b5
try { | |
consumer.close(); | |
log.info("[{}] Cleared consumer created after timeout on client side {}", | |
remoteAddress, consumer); | |
} catch (BrokerServiceException e) { | |
log.warn( | |
"[{}] Error closing consumer created" | |
+ " after timeout on client side {}: {}", | |
remoteAddress, consumer, e.getMessage()); | |
} | |
consumers.remove(consumerId, consumerFuture); |
Good suggestion, already changed the solution: The subsequent subscribing will always fail until the previous one is done even if the previous subscribing was timed out in the client view, which guarantees that there is no race condition about twice subscribing. |
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Show resolved
Hide resolved
Added. |
/pulsarbot run-failure-checks |
…tcher.consumerSet (apache#22283) (cherry picked from commit a52945b) (cherry picked from commit bec3be2)
…tcher.consumerSet (apache#22283) (cherry picked from commit a52945b) (cherry picked from commit bec3be2)
…tcher.consumerSet (apache#22283) (cherry picked from commit a52945b) (cherry picked from commit bec3be2)
…tcher.consumerSet (apache#22283) (cherry picked from commit a52945b) (cherry picked from commit bec3be2)
…tcher.consumerSet (apache#22283) (cherry picked from commit a52945b) (cherry picked from commit bec3be2)
…tcher.consumerSet (apache#22283) (cherry picked from commit a52945b)
Motivation
CloseConsumer
, then removeconsumerFuture(1st)
fromServerCnx
, then the mechanism to prevent registering a consumer with the same id is invalid now-
dispatcher.consumerSet.size
:1
-
dispatcher.consumerList.size
:1
-
dispatcher.consumerSet.size
:1
-
dispatcher.consumerList.size
:2
-
dispatcher.consumerSet.size
:0
-
dispatcher.consumerList.size
:1
This issue may cause a stuck of dispatchers closing/unsubscribing and topic closing/deleting. See more context: #22270
BTW, the scenario of producer registration has no issue because the producers with the same
ServerCnx + producerId
will override each other.Modifications
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x