Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept received-msg-ack from different consumer than received-consumer on shared-subscription #52

Merged
merged 1 commit into from
Oct 7, 2016

Conversation

rdhabalia
Copy link
Contributor

Motivation

  • Right now, on shared-subscription: consumer which receives message can only acknowledge that message at broker. If other consumer acks this message then broker may redeliver same message to earlier consumer which received message as it still considers that message is not acked.
  • To support REST/SQS consumer interface : need a feature where for any consumer under a same subscription should be able to ack messages.

Modifications

  • find a consumer which delivered ack-msg and remove it from that consumer's pendingMessage list (so, msg won't be redelievered in anycase) and mark msg-acked.

Result

any consumer under a given subscription can ack message without any side-effect.

@yahoocla
Copy link

yahoocla commented Oct 6, 2016

CLA is valid!

2 similar comments
@yahoocla
Copy link

yahoocla commented Oct 6, 2016

CLA is valid!

@yahoocla
Copy link

yahoocla commented Oct 6, 2016

CLA is valid!

@rdhabalia rdhabalia added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Oct 6, 2016
@rdhabalia rdhabalia added this to the 1.15 milestone Oct 6, 2016
@rdhabalia rdhabalia self-assigned this Oct 6, 2016
@merlimat merlimat merged commit bb35716 into apache:master Oct 7, 2016
@rdhabalia rdhabalia deleted the consumer-ack branch November 11, 2016 23:05
sijie pushed a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Adhere to rest semantics

* Renamed FunctionState to FunctionMetaData
codelipenghui pushed a commit that referenced this pull request Apr 14, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. 
This happened in v2.4.2 and master branch. 

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
``` 
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. 
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. 
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
addisonj pushed a commit to instructure/pulsar that referenced this pull request May 7, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. 
This happened in v2.4.2 and master branch. 

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
``` 
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. 
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. 
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
jiazhai pushed a commit that referenced this pull request May 8, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time.
This happened in v2.4.2 and master branch.

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" #52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" #1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
```
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`.
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`.
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
(cherry picked from commit 6d30414)
jerrypeng pushed a commit to jerrypeng/incubator-pulsar that referenced this pull request May 15, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. 
This happened in v2.4.2 and master branch. 

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
``` 
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. 
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. 
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
massakam pushed a commit to massakam/pulsar that referenced this pull request Aug 6, 2020
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
### Motivation

Broker servers were not able to connect clients when consumers and readers connected to broker servers at almost the same time. 
This happened in v2.4.2 and master branch. 

As the following threaddump at that time:
```
"bookkeeper-ml-workers-OrderedExecutor-5-0" apache#52 prio=5 os_prio=0 tid=0x00007ff425fd0800 nid=0x28bf waiting on condition [0x00007ff3478f6000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000750c51a00> (a org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:245)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$2.openCursorComplete(PersistentTopic.java:638)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncOpenCursor(ManagedLedgerImpl.java:712)
        - locked <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getDurableSubscription(PersistentTopic.java:631)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:578)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$2.lambda$openLedgerComplete$1(BrokerService.java:687)
        at org.apache.pulsar.broker.service.BrokerService$2$$Lambda$229/1013432130.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.BrokerService$2.openLedgerComplete(BrokerService.java:680)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$7(ManagedLedgerFactoryImpl.java:328)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$$Lambda$184/272111809.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:316)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3$1.operationComplete(ManagedLedgerImpl.java:464)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:276)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$1.operationComplete(ManagedCursorImpl.java:249)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$7(MetaStoreImplZookeeper.java:241)
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper$$Lambda$584/1125537287.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

...

"ForkJoinPool.commonPool-worker-36" apache#1043 daemon prio=5 os_prio=0 tid=0x00007ff34c0ce800 nid=0x26f2 waiting for monitor entry [0x00007ff32d2eb000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.newNonDurableCursor(ManagedLedgerImpl.java:856)
        - waiting to lock <0x0000000750c53a00> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$getNonDurableSubscription$13(PersistentTopic.java:684)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$572/174683985.apply(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.put(ConcurrentOpenHashMap.java:274)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.computeIfAbsent(ConcurrentOpenHashMap.java:129)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.getNonDurableSubscription(PersistentTopic.java:667)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:579)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$10(ServerCnx.java:699)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$459/848410492.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:682)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$458/375938934.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
``` 
`PersistentTopic#getDurableSubscription` locked  `ConcurrentOpenHashMap` after locking `ManagedLedgerImpl`. 
( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)

On the other hand, `PersistentTopic#getNonDurableSubscription` tried to lock `ManagedLedgerImpl` after trying to lock `ConcurrentOpenHashMap`. 
( `ConcurrentOpenHashMap` => `ManagedLedgerImpl`)

So, it seems that deadlock happens.

### Modifications
Fixed as `PersistentTopic#getNonDurableSubscription` try to lock `ConcurrentOpenHashMap` after trying to lock `ManagedLedgerImpl`. ( `ManagedLedgerImpl` => `ConcurrentOpenHashMap`)
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
fix Issue apache#32  
Make Consumer Group work distributed.
Add unit test for multi-brokers.
Pass tests  for 2 brokers with `bin/kop kafka-broker`
cbornet added a commit to cbornet/pulsar that referenced this pull request Apr 5, 2022
…pache#52)

* Add FULL_MESSAGE_IN_JSON_EXPAND_VALUE message format to Kinesis sink

* Add doc for FULL_MESSAGE_IN_JSON_EXPAND_VALUE format for Kinesis sink

* Fix missing dependencies

* Rename record value field to payload
to prevent confusion with KeyValue value field

* Add option to flatten JSON with FULL_MESSAGE_IN_JSON_EXPAND_VALUE format
nicoloboschi referenced this pull request in nicoloboschi/pulsar Apr 12, 2022
…52)

* Add FULL_MESSAGE_IN_JSON_EXPAND_VALUE message format to Kinesis sink

* Add doc for FULL_MESSAGE_IN_JSON_EXPAND_VALUE format for Kinesis sink

* Fix missing dependencies

* Rename record value field to payload
to prevent confusion with KeyValue value field

* Add option to flatten JSON with FULL_MESSAGE_IN_JSON_EXPAND_VALUE format

(cherry picked from commit a2e12a8)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants