Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Fix deadlock of metadata store #20189

Merged
merged 10 commits into from
May 18, 2023

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Apr 26, 2023

Motivation

Note:

  • This issue is found in a scenario where KOP is used.
  • It is usually expressed as many lookup timeout logs are printed, and Pub and Sub are stuck.
  • If there are a few topics, the heartbeat responses nothing caused by the event thread of the ZK client is not working correctly. Eventually, the ZK client will be reconnected, and the problem will be alleviated.
  • If there are a lot of topics, there will be many lookup requests, and the deadlock will be triggered again, loop....

To reproduce

  • create a cluster(2.11.x) using KOP
  • create a partitioned topic(which have 200+ partitions)
  • do Pub & Sub.

[1]Task: load or create topics

"main-EventThread" #21 daemon prio=5 os_prio=0 cpu=1025.43ms elapsed=1113.10s tid=0x00007fb2b6514bf0 nid=0xd6 waiting on condition  [0x00007fb2785dc000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@17.0.6/Native Method)
	- parking to wait for  <0x0000000487fda1a8> (a java.util.concurrent.CompletableFuture$Signaller)
	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.6/LockSupport.java:211)
	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.6/CompletableFuture.java:1864)
	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.6/ForkJoinPool.java:3463)
	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.6/ForkJoinPool.java:3434)
	at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.6/CompletableFuture.java:1898)
	at java.util.concurrent.CompletableFuture.get(java.base@17.0.6/CompletableFuture.java:2072)
	at com.github.benmanes.caffeine.cache.LocalAsyncCache$AbstractCacheView.resolve(LocalAsyncCache.java:515)
	at com.github.benmanes.caffeine.cache.LocalAsyncLoadingCache$LoadingCacheView.get(LocalAsyncLoadingCache.java:122)
	at org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundles(NamespaceBundleFactory.java:260)
	at org.apache.pulsar.broker.namespace.NamespaceService.getBundle(NamespaceService.java:219)
	at org.apache.pulsar.broker.namespace.NamespaceService.isServiceUnitActiveAsync(NamespaceService.java:1020)
	at org.apache.pulsar.broker.service.BrokerService.checkOwnershipAndCreatePersistentTopic(BrokerService.java:1423)
	at org.apache.pulsar.broker.service.BrokerService.lambda$loadOrCreatePersistentTopic$53(BrokerService.java:1398)
	at org.apache.pulsar.broker.service.BrokerService$$Lambda$1265/0x000000080141f980.run(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniRun.tryFire(java.base@17.0.6/CompletableFuture.java:787)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@17.0.6/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@17.0.6/CompletableFuture.java:2147)
	at org.apache.pulsar.metadata.coordination.impl.LockManagerImpl.lambda$acquireLock$1(LockManagerImpl.java:105)
	at org.apache.pulsar.metadata.coordination.impl.LockManagerImpl$$Lambda$604/0x00000008012086b8.run(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniRun.tryFire(java.base@17.0.6/CompletableFuture.java:787)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@17.0.6/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@17.0.6/CompletableFuture.java:2147)
	at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquire$2(ResourceLockImpl.java:128)
	at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl$$Lambda$602/0x0000000801208248.run(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniRun.tryFire(java.base@17.0.6/CompletableFuture.java:787)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@17.0.6/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@17.0.6/CompletableFuture.java:2147)
	at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl.lambda$acquireWithNoRevalidation$6(ResourceLockImpl.java:167)
	at org.apache.pulsar.metadata.coordination.impl.ResourceLockImpl$$Lambda$600/0x0000000801203400.accept(Unknown Source)
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(java.base@17.0.6/CompletableFuture.java:718)
	at java.util.concurrent.CompletableFuture.postComplete(java.base@17.0.6/CompletableFuture.java:510)
	at java.util.concurrent.CompletableFuture.complete(java.base@17.0.6/CompletableFuture.java:2147)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.handlePutResult(ZKMetadataStore.java:225)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:182)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$224/0x0000000800ee9d20.processResult(Unknown Source)
	at org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:490)
	at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:722)
	at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:563)

[2] Task: split bundle

"pulsar-modular-load-manager-31-1" #95 prio=5 os_prio=0 cpu=14.24ms elapsed=1109.95s tid=0x00007fb218013a20 nid=0x124 waiting on condition  [0x00007fb164cfb000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@17.0.6/Native Method)
	- parking to wait for  <0x0000000488ec5020> (a java.util.concurrent.CompletableFuture$Signaller)
	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.6/LockSupport.java:211)
	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.6/CompletableFuture.java:1864)
	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.6/ForkJoinPool.java:3463)
	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.6/ForkJoinPool.java:3434)
	at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.6/CompletableFuture.java:1898)
	at java.util.concurrent.CompletableFuture.join(java.base@17.0.6/CompletableFuture.java:2117)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.deleteBundleDataFromMetadataStore(ModularLoadManagerImpl.java:1088)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.checkNamespaceBundleSplit(ModularLoadManagerImpl.java:726)
	- locked <0x00000004867ecbd8> (a org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.updateAll(ModularLoadManagerImpl.java:484)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl$$Lambda$619/0x000000080120e440.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.6/Executors.java:539)
	at java.util.concurrent.FutureTask.run(java.base@17.0.6/FutureTask.java:264)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@17.0.6/ScheduledThreadPoolExecutor.java:304)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.6/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.6/ThreadPoolExecutor.java:635)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.6/Thread.java:833)

[3] Task: lookup

"pulsar-2-6" #131 prio=5 os_prio=0 cpu=9116.96ms elapsed=1109.04s tid=0x00007fb2100b9470 nid=0x145 waiting on condition  [0x00007fb0b1cc1000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@17.0.6/Native Method)
	- parking to wait for  <0x0000000488e81548> (a java.util.concurrent.CompletableFuture$Signaller)
	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.6/LockSupport.java:211)
	at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.6/CompletableFuture.java:1864)
	at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.6/ForkJoinPool.java:3463)
	at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.6/ForkJoinPool.java:3434)
	at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.6/CompletableFuture.java:1898)
	at java.util.concurrent.CompletableFuture.join(java.base@17.0.6/CompletableFuture.java:2117)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.getBundleDataOrDefault(ModularLoadManagerImpl.java:394)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.lambda$selectBrokerForAssignment$8(ModularLoadManagerImpl.java:791)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl$$Lambda$1213/0x0000000801404000.apply(Unknown Source)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(java.base@17.0.6/ConcurrentHashMap.java:1708)
	- locked <0x0000000488e81260> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.selectBrokerForAssignment(ModularLoadManagerImpl.java:790)
	- locked <0x00000004867a4498> (a java.util.HashSet)
	at org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper.getLeastLoaded(ModularLoadManagerWrapper.java:68)
	at org.apache.pulsar.broker.namespace.NamespaceService.getLeastLoadedFromLoadManager(NamespaceService.java:700)
	at org.apache.pulsar.broker.namespace.NamespaceService.searchForCandidateBroker(NamespaceService.java:523)
	at org.apache.pulsar.broker.namespace.NamespaceService.lambda$findBrokerServiceUrl$8(NamespaceService.java:408)
	at org.apache.pulsar.broker.namespace.NamespaceService$$Lambda$1209/0x00000008013ff650.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.6/Executors.java:539)
	at java.util.concurrent.FutureTask.run(java.base@17.0.6/FutureTask.java:264)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@17.0.6/ScheduledThreadPoolExecutor.java:304)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.6/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.6/ThreadPoolExecutor.java:635)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.6/Thread.java:833)

Modifications

Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

  • 1

@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Apr 26, 2023
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-required Your PR changes impact docs and you will update later. labels Apr 26, 2023
@poorbarcode poorbarcode force-pushed the fix/dl_of_meta_stroe branch from 354ef5f to 05e27e0 Compare April 26, 2023 19:08
@poorbarcode poorbarcode added this to the 3.1.0 milestone Apr 27, 2023
@poorbarcode poorbarcode self-assigned this Apr 27, 2023
@poorbarcode poorbarcode force-pushed the fix/dl_of_meta_stroe branch from c66ebb5 to a928f4a Compare May 9, 2023 08:04
@lhotari
Copy link
Member

lhotari commented May 9, 2023

@poorbarcode @Technoboy- is this related to #20130 changes and review comments in any way?

@poorbarcode
Copy link
Contributor Author

@lhotari

is this related to #20130 changes and review comments in any way?

Sorry, I didn't notice this PR before, but I sense that the two problems are very similar.

This PR is not related to #20130.

#20130 is used to solve the deadlock of the meta-store thread, and PR is used to solve the deadlock of the event-zk-client thread.

@poorbarcode
Copy link
Contributor Author

poorbarcode commented May 9, 2023

Now the current modification is problematic because the OwnershipCache.cache.executor uses the current thread. see

I will fix it tomorrow

@dave2wave dave2wave requested a review from eolivelli May 10, 2023 20:01
@codecov-commenter
Copy link

Codecov Report

Merging #20189 (f686e1f) into master (00f17e8) will increase coverage by 35.37%.
The diff coverage is 40.00%.

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #20189       +/-   ##
=============================================
+ Coverage     37.61%   72.98%   +35.37%     
- Complexity    12589    31971    +19382     
=============================================
  Files          1691     1868      +177     
  Lines        129028   138604     +9576     
  Branches      14066    15240     +1174     
=============================================
+ Hits          48530   101166    +52636     
+ Misses        74183    29399    -44784     
- Partials       6315     8039     +1724     
Flag Coverage Δ
inttests 24.12% <40.00%> (-0.06%) ⬇️
systests 25.00% <40.00%> (+0.23%) ⬆️
unittests 72.24% <40.00%> (+39.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...apache/pulsar/broker/namespace/OwnershipCache.java 85.26% <ø> (+10.52%) ⬆️
...ache/pulsar/broker/namespace/NamespaceService.java 69.75% <40.00%> (+25.99%) ⬆️

... and 1426 files with indirect coverage changes

/**
* @Deprecated This method is only used by test. call "isServiceUnitActiveAsync" is better.
*/
@Deprecated
public boolean isServiceUnitActive(TopicName topicName) {
try {
Copy link
Member

@lhotari lhotari May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to use something like isServiceUnitActiveAsync(topicName).get(conf.getMetadataStoreOperationTimeoutSeconds(), SECONDS);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari

I think it is a good suggestion. Already edit this method to make the test that calls the method works better.

Could you take a look again?

@poorbarcode poorbarcode requested a review from lhotari May 14, 2023 15:45
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@poorbarcode poorbarcode merged commit 4678c36 into apache:master May 18, 2023
poorbarcode added a commit that referenced this pull request May 18, 2023
Motivation: This task loadOrCreatePersistentTopic occupied the event thread of the ZK client so that other ZK tasks could not be finished anymore(Including the task itself), and it calls bundlesCache.synchronous().get(nsname) which is a blocking method.

Modification: Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread.
(cherry picked from commit 4678c36)
lhotari pushed a commit to datastax/pulsar that referenced this pull request May 22, 2023
Motivation: This task loadOrCreatePersistentTopic occupied the event thread of the ZK client so that other ZK tasks could not be finished anymore(Including the task itself), and it calls bundlesCache.synchronous().get(nsname) which is a blocking method.

Modification: Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread.
(cherry picked from commit 4678c36)
(cherry picked from commit 8a1a4be)
Technoboy- pushed a commit that referenced this pull request May 24, 2023
Motivation: This task loadOrCreatePersistentTopic occupied the event thread of the ZK client so that other ZK tasks could not be finished anymore(Including the task itself), and it calls bundlesCache.synchronous().get(nsname) which is a blocking method.

Modification: Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread.
poorbarcode added a commit that referenced this pull request May 30, 2023
Motivation: This task loadOrCreatePersistentTopic occupied the event thread of the ZK client so that other ZK tasks could not be finished anymore(Including the task itself), and it calls bundlesCache.synchronous().get(nsname) which is a blocking method.

Modification: Since the method getBundle(topic) will eventually call the method bundlesCache.synchronous().get(nsname), use getBundleAsync(topic) instead of getBundle(topic) to avoid blocking the thread.
(cherry picked from commit 4678c36)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants