-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition of OpSendMsgQueue when publishing messages #14231
Fix race condition of OpSendMsgQueue when publishing messages #14231
Conversation
10b88f5
to
dca2efa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue implementation that backs pendingMessages isn't thread safe.
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Line 1426 in 58af7a5
* This queue is not thread safe. |
Isn't the correct solution to make getPendingQueueSize method synchronized? There's a similar thread safety issue in getDelayInMillis method which should be synchronized.
@lhotari Yes, I have tried to synchronize the And yes, getDelayInMillis also have the thread safety issue, but looks only affects the accuracy of replication delay. |
That is risky. It's undefined what the behavior of ArrayDeque would be if it's accessed without proper synchronization. Usually it would be "just" a data consistency issue. In the case of java.util.HashMap, concurrent modifications and access could lead to an infinite loop before Java 8. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to use the synchronized
keyword to ensure correctness. If we don't synchronize our access and we continue to use the forEach
method, I think we need to modify the implementation of OpSendMsgQueue#forEach
. As it is currently implemented, concurrent calls to forEach
could lead to different views of the forEachDepth
variable, which might lead to a postponedOpSendMgs
that is never removed.
If we really want to avoid adding the synchronized keyword, we could use a LongAdder
in the OpSendMsgQueue
. Then, anytime an object is added or removed, we can increment/decrement accordingly, and the getPendingQueueSize
will return the sum of the LongAdder
.
One advantage of using the synchronized
keyword is that it only affects producers that are running with stats logging or applications that are calling the getPendingQueueSize
method.
I agree that it would be better to have a messageCount method in |
@codelipenghui @lhotari @michaeljmarshall Changed the logic to use counter. PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -1743,7 +1748,7 @@ private void resendMessages(ClientCnx cnx, long expectedEpoch) { | |||
cnx.channel().close(); | |||
return; | |||
} | |||
int messagesToResend = pendingMessages.size(); | |||
long messagesToResend = pendingMessages.messagesCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: technically, users might notice a change in the Re-Sending
log line below when running with batch messaging enabled. I don't think this is a problem, but I wanted to mention the behavior change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was a left-over. fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry, I was too vague. I just meant that the log line's content will be different. Instead of logging X batch messages, it'll log the total number of messages (regardless of batching). This is fine, but it's a change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, though the message is more accurate now compared to before :)
LGTM |
@michaeljmarshall We don't need this commit to 2.8.3, #12704 is only in branch-2.9. 2.8.x not affected by this problem |
@lhotari Please help review again, thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Elegant solution, very good!
* Add synchronized for getPendingQueueSize(); * Use iterator instead. * Use counter to keep track of messages count * Changed to int Co-authored-by: Matteo Merli <mmerli@apache.org>
@codelipenghui - I agree that the specific bug that led to this fix is not in 2.8. However, in 2.8, we access |
@codelipenghui - I discussed this with @lhotari offline, and he pointed out that we've used the current paradigm for a while. I will not cherry pick this commit to |
…#14231) * Add synchronized for getPendingQueueSize(); * Use iterator instead. * Use counter to keep track of messages count * Changed to int Co-authored-by: Matteo Merli <mmerli@apache.org>
Background
The issue is a race condition related to #11884 and #12704
When call
getPendingQueueSize()
which will callpendingMessages.foreach()
but without a lock protection. After the send receipt come back, peek from thependingMessages
but which might get null here during the foreachprocess. I have added some logs to confirm the issue, and to reproduce the issue better to use use
.statsInterval(1, TimeUnit.SECONDS)` to let the stats update more frequently.Here are the logs which can explain the bug:
From the logs, you can see a message with sequence ID 182801 add the
OpSendMsgQueue
first, but after the producer received the receipt, the log shows Got ack for timed out msg which means got null when peeking messages, and after, the message add back to the internal queue, but the producer side is blocked at this time.Documentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)