-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client]Fix client memory limit currentUsage leak and semaphore release duplicated in ProducerImpl #16837
[fix][client]Fix client memory limit currentUsage leak and semaphore release duplicated in ProducerImpl #16837
Conversation
A related one #16835 |
@Nicklee007 Could you please help add some unit tests? |
@codelipenghui +1 we need some tests to cover this change |
@codelipenghui @Shoothzj Add some unit test, PTAL Thx~ |
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java
Show resolved
Hide resolved
@codelipenghui @Jason918 @Shoothzj Also find producer semaphore release duplicated in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…release duplicated in ProducerImpl (apache#16837) ### Motivation Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error, producer send message fail and we find the `currentUsage` always keep high value like the leaked, and cause the producer send rate is slow. And find producer semaphore release duplicated when `createOpSendMsg` occur some excrption. Follow 1 point only release the message count semaphore, but not release the memory limit. **memory limit currentUsage leak point** https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033 **producer semaphore release duplicated** https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120 ``` After the exception the memory limit leak occured. org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1 at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826) at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889) at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369) at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848) at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) ``` ### Modifications 1. add the `MemoryLimitController` release. ### Documentation - [X] `doc-not-needed`
@Nicklee007 Could you help to open another PR to branch-2.9? |
…release duplicated in ProducerImpl (#16837) ### Motivation Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error, producer send message fail and we find the `currentUsage` always keep high value like the leaked, and cause the producer send rate is slow. And find producer semaphore release duplicated when `createOpSendMsg` occur some excrption. Follow 1 point only release the message count semaphore, but not release the memory limit. **memory limit currentUsage leak point** https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033 **producer semaphore release duplicated** https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120 ``` After the exception the memory limit leak occured. org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1 at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826) at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889) at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369) at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848) at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) ``` ### Modifications 1. add the `MemoryLimitController` release. ### Documentation - [X] `doc-not-needed` (cherry picked from commit 955dcd1)
Remove the |
@mattisonchao ok, I'll cherry-pick it to branch 2.9. |
Move this PR to the next release of 2.8.5. For the release manager of next 2.8.x release, this PR requires an extra import of @Nicklee007 If you want to include this PR in 2.8.4, please open a new PR to migrate this PR to branch-2.8. |
@mattisonchao cherry-pick to PR #16971 in branch 2.9. |
@BewareMyPower @Jason918 Created a new PR #16972 to branch-2.7 which only fix the semaphore release duplicated problem in ProducerImpl, PTAL. |
@BewareMyPower Created a new PR #16985 branch-2.8, PTAL. |
…release duplicated in ProducerImpl (#16837) ### Motivation Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error, producer send message fail and we find the `currentUsage` always keep high value like the leaked, and cause the producer send rate is slow. And find producer semaphore release duplicated when `createOpSendMsg` occur some excrption. Follow 1 point only release the message count semaphore, but not release the memory limit. **memory limit currentUsage leak point** https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033 **producer semaphore release duplicated** https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120 ``` After the exception the memory limit leak occured. org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1 at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826) at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889) at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369) at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848) at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) ``` ### Modifications 1. add the `MemoryLimitController` release. ### Documentation - [X] `doc-not-needed` (cherry picked from commit 955dcd1)
…release duplicated in ProducerImpl (apache#16837) ### Motivation Fix client memory limit `currentUsage` leak in `ProducerImpl`. When our pulsar cluster occur some error, producer send message fail and we find the `currentUsage` always keep high value like the leaked, and cause the producer send rate is slow. And find producer semaphore release duplicated when `createOpSendMsg` occur some excrption. Follow 1 point only release the message count semaphore, but not release the memory limit. **memory limit currentUsage leak point** https://github.com/apache/pulsar/blob/c217b8f559292fd34c6a4fb4b30aab213720d962/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2031-L2033 **producer semaphore release duplicated** https://github.com/apache/pulsar/blob/4d64e2e66689381ebbb94fbfc03eb4e1dfba0405/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L2116-L2120 ``` After the exception the memory limit leak occured. org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer XXXX-366-15151 can not send message to the topic persistent://XXXX/XXXX/XXXX within given timeout : createdAt 30.005 seconds ago, firstSentAt 30.005 seconds ago, lastSentAt 30.005 seconds ago, retryCount 1 at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1287) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:1826) at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889) at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1369) at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:1816) at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:1848) at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834) ``` ### Modifications 1. add the `MemoryLimitController` release. ### Documentation - [X] `doc-not-needed` (cherry picked from commit 955dcd1) (cherry picked from commit 7c73269)
Motivation
Fix client memory limit
currentUsage
leak inProducerImpl
. When our pulsar cluster occur some error, producer send message fail and we find thecurrentUsage
always keep high value like the leaked, and cause the producer send rate is slow.And find producer semaphore release duplicated when
createOpSendMsg
occur some excrption.Follow 1 point only release the message count semaphore, but not release the memory limit.
memory limit currentUsage leak point
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Lines 2031 to 2033 in c217b8f
producer semaphore release duplicated
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Lines 2116 to 2120 in 4d64e2e
Modifications
MemoryLimitController
release.Documentation
doc-not-needed
(Please explain why)