-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix producer/consumer stop to reconnect or Pub/Sub due to IO thread race-condition #23499
[fix][client] Fix producer/consumer stop to reconnect or Pub/Sub due to IO thread race-condition #23499
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work on addressing a hard problem, @poorbarcode! I added a few minor review comments.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great work @poorbarcode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue 1:
It looks the CompletableFuture.supplyAsync(() -> clientCnx, clientCnx.ctx().executor());
can fix this issue, but I don’t understand this fix way.
In the old code, Isn't this callback thread clientCnx.ctx().executor()
? I didn't double-check this part of the code.
LGTM
Issue 2:
You call the sync method in the async callback thread, this is an incorrect way. I don't suggest that you switch the thread to complete the callback.
In the past, we have removed many sync calls in the Pulsar.
@nodece please provide the exact code location that you are referring to. Thanks! |
Indeed, it would be more consistent to use |
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SimpleProduceConsumeIoTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
client.newProducer().topic("test").createAsync().thenApply(producer -> {
producer.send("hello-1".getBytes(StandardCharsets.UTF_8)); // incorrect, which will block the pulsar client thread.
producer.sendAsync("hello-1".getBytes(StandardCharsets.UTF_8)); // correct
});
+1 |
…to IO thread race-condition (apache#23499) (cherry picked from commit ff4a25e) (cherry picked from commit 8bb0df0)
…to IO thread race-condition (apache#23499) (cherry picked from commit ff4a25e) (cherry picked from commit 8bb0df0)
…to IO thread race-condition (apache#23499) (cherry picked from commit ff4a25e) (cherry picked from commit 8bb0df0)
…to IO thread race-condition (apache#23499) (cherry picked from commit ff4a25e) (cherry picked from commit 8bb0df0)
Motivation
Issue 1: see the test
testUnstableNetWorkWhenCreatingProducer
The producer fails to start if the network is unstable.
Command.newProducer
to brokersio.netty.channel.StacklessClosedChannelException: null
**Issue 2**: see the test: `testSendAfterCreateProducerAsync` - Start a producer/consumer async - Call a sync method in the callback method of method `createAsync`, such as `producer.send`, and `consumer.receive`. - The IO thread encounters a deadlockModifications
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x