-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order #12456
[Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order #12456
Conversation
…orrect message order
@michaeljmarshall:Thanks for your contribution. For this PR, do we need to update docs? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch!
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a test failure related to this change
org.apache.pulsar.client.impl.MultiTopicsConsumerImplTest.testReceiveAsyncCanBeCancelled(org.apache.pulsar.client.impl.MultiTopicsConsumerImplTest)
[INFO] Run 1: PASS
Error: Run 2: MultiTopicsConsumerImplTest.testReceiveAsyncCanBeCancelled:168 expected [true] but found [false]
@michaeljmarshall:Thanks for providing doc info! |
@michaeljmarshall the test should be fixed. This line pulsar/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java Line 168 in bd942e1
should be wrapped with Awaitility Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasNextPendingReceive())); |
Good catch @michaeljmarshall! Yes that's true that the other consumer will be another thread and thenAcceptAsync is needed to fix another potential race. |
@lhotari - thank you for the solution. I ran the test locally without then with your proposed fix, and it works as expected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I'm testing the changes and running into some problems, possibly performance issues.
@@ -269,7 +269,7 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) { | |||
// recursion and stack overflow | |||
internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the thenAcceptAsync
now makes this code run on the internalPinnedExecutor, this extra scheduling to the internalPinnedExecutor should be removed. There isn't a risk of the stack growing infinitely since there is no direct recursive call chain.
This change improved the throughput performance by about 8% in a simple local benchmark.
# create topic with 10 partitions
./bin/pulsar-admin topics create-partitioned-topic -p 10 parttest
# run producer (a lot of small messages to stress test the logic)
./bin/pulsar-perf produce -ioThreads 4 -s 10 -o 6000 -p 200000 -r 350000 parttest
# run consumer (in another terminal)
./bin/pulsar-perf consume -q 6000 -p 200000 parttest
Throughput performance went from 290k msgs/s to about 315k msgs/s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lhotari - great catch. I just added a commit to remove the extra scheduling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address the comment about line 270.
@michaeljmarshall I created a repro test case for this PR in commit lhotari@1b0ad24b . This is the repro case: https://github.com/lhotari/pulsar/blob/1b0ad24b0a07a637c874d0aa76e06929ecea597f/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java#L133-L196 |
@michaeljmarshall can you allow edits for maintainers for this PR? I'd like to push the test case (lhotari@1b0ad24b) to be part of this PR. |
@lhotari - done, and I’ll address the rest of your comments soon. |
thanks @michaeljmarshall , I pushed a commit with the test to this PR now. Please pull the changes to your local PR branch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…orrect message order (#12456) * [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order * Fix test * Return the checkState method call to keep original behavior * Reproduce out-of-order delivery issue in PR 12456 * Remove unnecessary scheduling of receiveMessageFromConsumer Co-authored-by: Lari Hotari <lhotari@apache.org> (cherry picked from commit 6a2e3a1)
* up/master: [C++] Fixed connection read error logging (apache#12492) [Pulsar SQL] Pulsar SQL support query big entry data (apache#12448) [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order (apache#12456) Allow to have different instances LocalMemoryMetadataStore that share the same state (apache#12390) Remove unused ConsumerImpl.isTxnMessage (apache#12472)
…orrect message order (apache#12456) * [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order * Fix test * Return the checkState method call to keep original behavior * Reproduce out-of-order delivery issue in PR 12456 * Remove unnecessary scheduling of receiveMessageFromConsumer Co-authored-by: Lari Hotari <lhotari@apache.org> (cherry picked from commit 6a2e3a1) (cherry picked from commit cfec2c9)
@codelipenghui - would you please also cherry pick this to |
@michaeljmarshall We can only cherry-pick it after 2.9.0 released. |
- see apache/pulsar#12456 for details. - the bug applies to the use of receiveAsync and the workaround is to avoid the use of receiveAsync until the fix is included in the client - receiveAsync was introduced previously to workaround the bug apache/pulsar#9921 - that issue isn't critical
…orrect message order (apache#12456) * [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order * Fix test * Return the checkState method call to keep original behavior * Reproduce out-of-order delivery issue in PR 12456 * Remove unnecessary scheduling of receiveMessageFromConsumer Co-authored-by: Lari Hotari <lhotari@apache.org>
…orrect message order (#12456) * [Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order * Fix test * Return the checkState method call to keep original behavior * Reproduce out-of-order delivery issue in PR 12456 * Remove unnecessary scheduling of receiveMessageFromConsumer Co-authored-by: Lari Hotari <lhotari@apache.org> (cherry picked from commit 6a2e3a1)
Motivation
@lhotari and I discovered a race condition in the
MultiTopicsConsumerImpl<T>
class. The race allows for messages to be delivered out of order.We discovered the bug using the following steps:
However, the race can happen with a single message and on a topic with a single partition.
The race comes in these two code blocks:
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Lines 412 to 416 in 7ad46c8
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Lines 294 to 301 in 7ad46c8
The first block is executed on the application's calling thread. The second block is executed in either the
internalPinnedExecutor
or the topic partition consumer'sinternalPinnedExecutor
(these threads are not necessarily the same). As such, if the two blocks are called at the same time, it is possible forincomingMessages.poll();
to return anull
result whilenextPendingReceive();
also returns anull
result. If this happens, the next state of theMultiTopicsConsumerImpl
will be to have a single message in theincomingMessages
queue and a single pending receive in thependingReceives
queue. Then, a message will deliver out of order.This proposed solution follows the paradigm used by @Vanlightly in #11691. Essentially, the places where we need to inspect both the
pendingReceives
and theincomingMessages
queues must be updated from a single thread:internalPinnedExecutor
.Modifications
consumer.receiveAsync()
on theinternalPinnedExecutor
. I chose to run the whole callback on theinternalPinnedExecutor
instead of just themessageReceived
method. If we left the callback usingthenAccept
and ranmessageReceived
on theinternalPinnedExecutor
, there is a chance that the callback will run on the calling thread, which is always theinternalPinnedExecutor
. That would mean that themessageReceived
logic would actually run after the remaining callback logic that inspects theincomingMessages.size()
and decides whether or not to pause the consumer. By scheduling the callback on theinternalPinnedExecutor
usingthenAcceptAsync
, we guarantee that the code is run together without the data race we're fixing in this PR.MultiTopicsConsumerImpl#internalReceiveAsync
on theinternalPinnedExecutor
.Remove thecheckState(message instanceof TopicMessageImpl);
method call from theMultiTopicsConsumerImpl#internalReceiveAsync
method. This decision may be controversial. I removed the check because we only ever addTopicMessageImpl
to theMultiTopicsConsumerImpl
'sincomingMessages
queue. If it is a necessary check, we could complete the future exceptionally when the message is not of typeTopicMessageImpl
.Verifying this change
Since this is a fix for a data race, it is hard to test the change. I think the change is small enough that we don't need to add new tests for it, but please let me know if you think otherwise.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
We should document this in release notes. No other docs need to be updated.