-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][transaction] avoid too many ServiceUnitNotReadyException for transaction buffer handler #14894
[fix][transaction] avoid too many ServiceUnitNotReadyException for transaction buffer handler #14894
Conversation
@codelipenghui:Thanks for your contribution. For this PR, do we need to update docs? |
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! left some comments
@@ -87,7 +96,18 @@ public TransactionBufferHandlerImpl(PulsarClient pulsarClient, | |||
long requestId = requestIdGenerator.getAndIncrement(); | |||
ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits, | |||
topic, action, lowWaterMark); | |||
return endTxn(requestId, topic, cmd, cb); | |||
|
|||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this try catch can be deleted, below all
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lookupCache.get(topic) will throw ExecutionException which means some errors happened in the lookup cache, we need to catch the exception and invalidate the cache.
REQUEST_CREDITS_UPDATER.incrementAndGet(this); | ||
} | ||
} else { | ||
checkPendingRequests(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only continue;
is enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
response.getError()); | ||
cache.invalidate(op.topic); | ||
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage())); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does we need this try catch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to add an error log if there are some exceptions when completing the callback, the final block ensure the onResponse() can be complete.
515a987
to
45aed4e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
45aed4e
to
9b109db
Compare
…ansaction buffer handler 1.Added max outstanding requests limit 2.Add the request to pending request after reach the max outstanding requests 3.Avoid duplicated lookup cache invalidation
9c03a0f
to
b9f1b79
Compare
…ansaction buffer handler (#14894) ### Motivation 1. Added max concurrent request limitation for transaction buffer client 2. Add the request to the pending request queue after reaching the concurrent request limitation 3. Avoid duplicated lookup cache invalidation (cherry picked from commit 384c528)
…ansaction buffer handler (apache#14894) ### Motivation 1. Added max concurrent request limitation for transaction buffer client 2. Add the request to the pending request queue after reaching the concurrent request limitation 3. Avoid duplicated lookup cache invalidation
Motivation
Documentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)