-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Broker] Fix race conditions in closing producers and consumers #13428
[Broker] Fix race conditions in closing producers and consumers #13428
Conversation
- closing ServerCnx while producers or consumers are created can lead to a producer or consumer never getting removed from the topic's list of producers
…he#13428) - closing ServerCnx while producers or consumers are created can lead to a producer or consumer never getting removed from the topic's list of producers upstream pull request apache#13428
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, other than a couple nits.
It's worth pointing out that the race is very subtle and comes here:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1302 to 1328 in 636d5b4
if (isActive()) { | |
if (producerFuture.complete(producer)) { | |
log.info("[{}] Created new producer: {}", remoteAddress, producer); | |
commandSender.sendProducerSuccessResponse(requestId, producerName, | |
producer.getLastSequenceId(), producer.getSchemaVersion(), | |
newTopicEpoch, true /* producer is ready now */); | |
if (getBrokerService().getInterceptor() != null) { | |
getBrokerService().getInterceptor(). | |
producerCreated(this, producer, metadata); | |
} | |
return; | |
} else { | |
// The producer's future was completed before by | |
// a close command | |
producer.closeNow(true); | |
log.info("[{}] Cleared producer created after" | |
+ " timeout on client side {}", | |
remoteAddress, producer); | |
} | |
} else { | |
producer.closeNow(true); | |
log.info("[{}] Cleared producer created after connection was closed: {}", | |
remoteAddress, producer); | |
producerFuture.completeExceptionally( | |
new IllegalStateException( | |
"Producer created after connection was closed")); | |
} |
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
- closing ServerCnx while producers or consumers are created can lead to a producer or consumer never getting removed from the topic's list of producers (cherry picked from commit 3316db5)
- closing ServerCnx while producers or consumers are created can lead to a producer or consumer never getting removed from the topic's list of producers (cherry picked from commit 3316db5)
…he#13428) - closing ServerCnx while producers or consumers are created can lead to a producer or consumer never getting removed from the topic's list of producers
Squash merge branch '2.8.1.4_LeastLongTermMessageRate' into 'release-2.8.1.4' Fixes #<xyz> ### Motivation --bug=101470445 Ledger元数据信息不一致和数据丢失问题 (merge request !90) --bug=105950701 【Pulsar】腾讯视频consumer泄露问题定位和修复 apache#13428 (merge request !163) --bug=106044205 Broker启动时,LeastLongTermMessageRate加载失败问题修复 apache#16096 (merge request !162) TAPD: --bug=106044205
Motivation
ServerCnx
while producers or consumers are created can lead to a producer or consumer never getting removed from the topic's list of producersModifications
If the future isn't completed by the time of closing the
ServerCnx
, complete it exceptionally. This will ensure that the existing logic in creating a producer or consumer doesn't leave the just created producer or consumer open.