-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [client] Fix memory leak when publishing encountered a corner case error #23738
base: master
Are you sure you want to change the base?
Conversation
/pulsarbot rerun-failure-checks |
// To guarantee compatibility with customized "SendCallback", see detail the doc of this method. | ||
if ((callback instanceof ProducerImpl.DefaultSendMessageCallback | ||
|| callback.getClass().getName().endsWith("NonPersistentReplicator$ProducerSendCallback") | ||
|| callback.getClass().getName().endsWith("ProducerSendCallback$ProducerSendCallback")) | ||
&& msg.getDataBuffer().capacity() > 0) { | ||
checkArgument(msg.getDataBuffer().refCnt() >= 2, | ||
"Message's data's refCnt is less than 2, see #23738"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such type assertions really should be avoided. I mean, the checkArgument(message instanceof MessageImpl)
assertion before is also redundant.
The reference count validation should not be performed in sendAsync
, instead, it should be validated before the caller calls sendAsync
(e.g. in GeoPersistentReplicator#replicateEntries
).
MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); | ||
producer.close(); | ||
msgBuilder.value("msg-1").sendAsync().exceptionally(ex -> { | ||
log.warn("expected error", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's an expected error, don't print warn logs. You'd better use assertThrows
or a trivial try-catch
:
try {
msgBuilder.value("msg-1").send();
fail();
} catch (PulsarClientException ignored) {
// or you can add the type assertion here
}
try{ | ||
sendFutureList.get(sendFutureList.size() - 1).join(); | ||
} catch (Exception ex) { | ||
log.warn("", ex); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which case is expected? Should it succeed or fail?
indexCalledSend++; | ||
} | ||
} catch (Exception ex) { | ||
log.warn("", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see many places calling log.warn("", ex)
. If the exception is expected, please do type assertions (like assertTrue(e instanceof xxx)
or assertTrue(e.getMessage().contains("xxx")
) rather than printing the whole exception stack
msgBuilder.value("msg-1").sendAsync().get(3, TimeUnit.SECONDS); | ||
} catch (Exception ex) { | ||
log.warn("Intercept error", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet another unnecessary exception stack logs, which is noisy. And you should call fail()
in the try block.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #23738 +/- ##
============================================
+ Coverage 73.57% 74.42% +0.85%
- Complexity 32624 35114 +2490
============================================
Files 1877 1945 +68
Lines 139502 147540 +8038
Branches 15299 16290 +991
============================================
+ Hits 102638 109812 +7174
- Misses 28908 29261 +353
- Partials 7956 8467 +511
Flags with carried forward coverage won't be shown. Click here to find out more.
|
Motivation
Issue 1: memory leak if get errors when publishing
Conditions:
max message size
ProducerInterceptor. eligible
testSendQueueIsFull
,testSendMessageSizeExceeded
,testSendAfterClosedProducer
, andtestInterceptorError
Modifications
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x