Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix the publish latency spike from the contention of MessageDeduplication #20647

Merged
merged 6 commits into from
Jun 29, 2023

Conversation

codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Jun 26, 2023

Motivation

This issue has occurred with a topic that has many producers, where the P99 publish latency of the broker will increase to hundreds of milliseconds when many producers connect or disconnect from the topic. This level of latency is unacceptable for a messaging system.

In this case, each producer add or remove operation goes to MessageDeduplication to update a map for inactive producers, regardless of whether message deduplication is enabled or disabled. My initial impression is that if message deduplication is disabled, it should not go to MessageDeduplication. However, users can enable message deduplication for an active topic, which may be the reason why this is happening. @merlimat, do you have any additional context on this matter? We may also need to find a solution to avoid any operations to MessageDeduplication if message deduplication is disabled in the future.

This PR provides a fix without introducing any changes in behavior, which will give us more confidence to cherry-pick it to release branches.

image

broker_lock_0621_1.html.txt

Modifications

Use ConcurrentHashMap instead of synchronized HashMap to reduce the contention between IO threads.

Here is the benchmark result from benchmark test
CLHM_CON (ConcurrentOpenHashMap)
CHM_CON(ConcurrentHashMap)
HM_CON(SynchronizedHashMap)

CLHM_CON::Thread-3::put: 422ms
CLHM_CON::Thread-5::put: 422ms
CLHM_CON::Thread-4::put: 422ms
CLHM_CON::Thread-0::put: 423ms
CLHM_CON::Thread-2::put: 422ms
CLHM_CON::Thread-1::put: 423ms
CLHM_CON::Thread-0::get: 786ms
CLHM_CON::Thread-1::get: 805ms
CLHM_CON::Thread-4::get: 839ms
CLHM_CON::Thread-5::get: 840ms
CLHM_CON::Thread-3::get: 854ms
CLHM_CON::Thread-2::get: 855ms
CLHM_CON::Thread-0::remove: 112ms
CLHM_CON::Thread-1::remove: 123ms
CLHM_CON::Thread-4::remove: 97ms
CLHM_CON::Thread-5::remove: 97ms
CLHM_CON::Thread-2::remove: 84ms
CLHM_CON::Thread-3::remove: 85ms
CLHM_CON: 1367 ms
CHM_CON::Thread-7::put: 33ms
CHM_CON::Thread-6::put: 35ms
CHM_CON::Thread-9::put: 37ms
CHM_CON::Thread-8::put: 38ms
CHM_CON::Thread-11::put: 38ms
CHM_CON::Thread-10::put: 39ms
CHM_CON::Thread-9::get: 630ms
CHM_CON::Thread-10::get: 629ms
CHM_CON::Thread-7::get: 635ms
CHM_CON::Thread-11::get: 630ms
CHM_CON::Thread-8::get: 631ms
CHM_CON::Thread-6::get: 635ms
CHM_CON::Thread-10::remove: 11ms
CHM_CON::Thread-6::remove: 9ms
CHM_CON::Thread-7::remove: 11ms
CHM_CON::Thread-8::remove: 10ms
CHM_CON::Thread-11::remove: 11ms
CHM_CON::Thread-9::remove: 12ms
CHM_CON: 680 ms
HM_CON::Thread-16::put: 643ms
HM_CON::Thread-15::put: 664ms
HM_CON::Thread-14::put: 670ms
HM_CON::Thread-13::put: 674ms
HM_CON::Thread-17::put: 687ms
HM_CON::Thread-12::put: 715ms
HM_CON::Thread-16::get: 61876ms
HM_CON::Thread-15::get: 62214ms
HM_CON::Thread-13::get: 62312ms
HM_CON::Thread-17::get: 62309ms
HM_CON::Thread-14::get: 62450ms
HM_CON::Thread-16::remove: 638ms
HM_CON::Thread-15::remove: 555ms
HM_CON::Thread-13::remove: 521ms
HM_CON::Thread-17::remove: 522ms
HM_CON::Thread-14::remove: 411ms
HM_CON::Thread-12::get: 62816ms
HM_CON::Thread-12::remove: 8ms
HM_CON: 63538 ms

Verifying this change

The existing tests can cover the new changes.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@codelipenghui codelipenghui self-assigned this Jun 26, 2023
@codelipenghui codelipenghui added this to the 3.1.0 milestone Jun 26, 2023
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 26, 2023
@codelipenghui codelipenghui added area/broker ready-to-test and removed doc-not-needed Your PR changes do not impact docs labels Jun 26, 2023
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 26, 2023
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mattisonchao
Copy link
Member

@@ -455,23 +455,17 @@ public synchronized void producerRemoved(String producerName) {
public synchronized void purgeInactiveProducers() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to keep this 'synchronized' ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need it, and I also want to remove it in this PR initially. But considering it is not related to the publish latency spike issue, maybe we'd better just keep this PR only for the publish latency spike issue. And I can create the following PR to remove this synchronized, but no need to cherry-pick to release branches.

@codelipenghui
Copy link
Contributor Author

After doing a benchmark test
We should use ConcurrentHashMap here, it performed super better than ConcurrentOpenHashMap and synchronized HashMap for put/remove operations.

CLHM_CON::Thread-3::put: 422ms
CLHM_CON::Thread-5::put: 422ms
CLHM_CON::Thread-4::put: 422ms
CLHM_CON::Thread-0::put: 423ms
CLHM_CON::Thread-2::put: 422ms
CLHM_CON::Thread-1::put: 423ms
CLHM_CON::Thread-0::get: 786ms
CLHM_CON::Thread-1::get: 805ms
CLHM_CON::Thread-4::get: 839ms
CLHM_CON::Thread-5::get: 840ms
CLHM_CON::Thread-3::get: 854ms
CLHM_CON::Thread-2::get: 855ms
CLHM_CON::Thread-0::remove: 112ms
CLHM_CON::Thread-1::remove: 123ms
CLHM_CON::Thread-4::remove: 97ms
CLHM_CON::Thread-5::remove: 97ms
CLHM_CON::Thread-2::remove: 84ms
CLHM_CON::Thread-3::remove: 85ms
CLHM_CON: 1367 ms
CHM_CON::Thread-7::put: 33ms
CHM_CON::Thread-6::put: 35ms
CHM_CON::Thread-9::put: 37ms
CHM_CON::Thread-8::put: 38ms
CHM_CON::Thread-11::put: 38ms
CHM_CON::Thread-10::put: 39ms
CHM_CON::Thread-9::get: 630ms
CHM_CON::Thread-10::get: 629ms
CHM_CON::Thread-7::get: 635ms
CHM_CON::Thread-11::get: 630ms
CHM_CON::Thread-8::get: 631ms
CHM_CON::Thread-6::get: 635ms
CHM_CON::Thread-10::remove: 11ms
CHM_CON::Thread-6::remove: 9ms
CHM_CON::Thread-7::remove: 11ms
CHM_CON::Thread-8::remove: 10ms
CHM_CON::Thread-11::remove: 11ms
CHM_CON::Thread-9::remove: 12ms
CHM_CON: 680 ms
HM_CON::Thread-16::put: 643ms
HM_CON::Thread-15::put: 664ms
HM_CON::Thread-14::put: 670ms
HM_CON::Thread-13::put: 674ms
HM_CON::Thread-17::put: 687ms
HM_CON::Thread-12::put: 715ms
HM_CON::Thread-16::get: 61876ms
HM_CON::Thread-15::get: 62214ms
HM_CON::Thread-13::get: 62312ms
HM_CON::Thread-17::get: 62309ms
HM_CON::Thread-14::get: 62450ms
HM_CON::Thread-16::remove: 638ms
HM_CON::Thread-15::remove: 555ms
HM_CON::Thread-13::remove: 521ms
HM_CON::Thread-17::remove: 522ms
HM_CON::Thread-14::remove: 411ms
HM_CON::Thread-12::get: 62816ms
HM_CON::Thread-12::remove: 8ms
HM_CON: 63538 ms

@lhotari
Copy link
Member

lhotari commented Jun 26, 2023

@codelipenghui

https://github.com/apache/pulsar/actions/runs/5373765736/jobs/9748615177?pr=20647 Please help check this failed job.

@codelipenghui exception is

  Error:  Tests run: 4, Failures: 1, Errors: 0, Skipped: 3, Time elapsed: 0.665 s <<< FAILURE! - in org.apache.pulsar.broker.service.persistent.MessageDuplicationTest
  Error:  org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.testInactiveProducerRemove  Time elapsed: 0.056 s  <<< FAILURE!
  java.lang.ClassCastException: class org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap cannot be cast to class java.util.Map (org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap is in unnamed module of loader 'app'; java.util.Map is in module java.base of loader 'bootstrap')
  	at org.apache.pulsar.broker.service.persistent.MessageDuplicationTest.testInactiveProducerRemove(MessageDuplicationTest.java:177)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  	at java.base/java.lang.Thread.run(Thread.java:833)

@mattisonchao
Copy link
Member

mattisonchao commented Jun 26, 2023

After doing a benchmark test
We should use ConcurrentHashMap here, it performed super better than ConcurrentOpenHashMap and synchronized HashMap for put/remove operations.

@codelipenghui
You have to avoid using remove in for-each If you chose ConcurrentHashMap . because there is a .ConcurrentModificationException

@codelipenghui
Copy link
Contributor Author

You have to avoid using remove in for-each If you chose ConcurrentHashMap . because there is a .ConcurrentModificationException

@mattisonchao I need to use iterator if the ConcurrentHashMap is the final decision.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@hangc0276 hangc0276 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

@codecov-commenter
Copy link

Codecov Report

Merging #20647 (b0cebfd) into master (2b01f83) will increase coverage by 39.54%.
The diff coverage is 100.00%.

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #20647       +/-   ##
=============================================
+ Coverage     33.58%   73.12%   +39.54%     
- Complexity    12127    32016    +19889     
=============================================
  Files          1613     1867      +254     
  Lines        126241   138683    +12442     
  Branches      13770    15240     +1470     
=============================================
+ Hits          42396   101410    +59014     
+ Misses        78331    29249    -49082     
- Partials       5514     8024     +2510     
Flag Coverage Δ
inttests 24.18% <50.00%> (-0.01%) ⬇️
systests 25.02% <50.00%> (?)
unittests 72.36% <100.00%> (+40.32%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...roker/service/persistent/MessageDeduplication.java 82.53% <100.00%> (+42.35%) ⬆️

... and 1520 files with indirect coverage changes

@Technoboy- Technoboy- merged commit 31c5b4d into apache:master Jun 29, 2023
Technoboy- pushed a commit that referenced this pull request Jun 29, 2023
Technoboy- pushed a commit that referenced this pull request Jun 29, 2023
@codelipenghui codelipenghui deleted the penghui/contention-dedup branch June 30, 2023 10:34
codelipenghui added a commit that referenced this pull request Jun 30, 2023
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jul 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants