-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed deadlock on txn semaphore permit exhaustion #14131
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me.
+1
Also 10000 should have been made configurable.
BTW as you say this semaphore brings more trouble than benefit, it is better to drop it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
I remember the semaphore is for limiting the transaction buffer calls while reply transaction logs. @congbobo184 Please help double confirm. |
### Motivation Removing semaphore on the end of transactions operations. The semaphore is not very useful here as we are already closing the transactions (backpressure should eventually be applied at the starting of the transactions). The semaphore here is being acquired from a BK callback thread and it causes a deadlock in broker when the semaphore is full, because the response that will release the permits on the semaphore are coming from either the same thread or a thread in the same condition. ``` sun.misc.Unsafe.park(Unsafe.java) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) java.util.concurrent.Semaphore.acquire(Semaphore.java:312) org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.canSendRequest(TransactionBufferHandlerImpl.java:216) org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.endTxnOnTopic(TransactionBufferHandlerImpl.java:93) org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl.commitTxnOnTopic(TransactionBufferClientImpl.java:50) org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$null$23(TransactionMetadataStoreService.java:484) org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1253.accept() java.util.ArrayList.forEach(ArrayList.java:1257) org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$endTxnInTransactionBuffer$25(TransactionMetadataStoreService.java:481) org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1251.accept() java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) org.apache.pulsar.broker.TransactionMetadataStoreService.endTxnInTransactionBuffer(TransactionMetadataStoreService.java:458) org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$null$11(TransactionMetadataStoreService.java:349) org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1309.accept() java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl$3.addComplete(MLTransactionLogImpl.java:160) org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:228) org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) ``` (cherry picked from commit bea5bb8)
### Motivation Removing semaphore on the end of transactions operations. The semaphore is not very useful here as we are already closing the transactions (backpressure should eventually be applied at the starting of the transactions). The semaphore here is being acquired from a BK callback thread and it causes a deadlock in broker when the semaphore is full, because the response that will release the permits on the semaphore are coming from either the same thread or a thread in the same condition. ``` sun.misc.Unsafe.park(Unsafe.java) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) java.util.concurrent.Semaphore.acquire(Semaphore.java:312) org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.canSendRequest(TransactionBufferHandlerImpl.java:216) org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.endTxnOnTopic(TransactionBufferHandlerImpl.java:93) org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl.commitTxnOnTopic(TransactionBufferClientImpl.java:50) org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$null$23(TransactionMetadataStoreService.java:484) org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1253.accept() java.util.ArrayList.forEach(ArrayList.java:1257) org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$endTxnInTransactionBuffer$25(TransactionMetadataStoreService.java:481) org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1251.accept() java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) org.apache.pulsar.broker.TransactionMetadataStoreService.endTxnInTransactionBuffer(TransactionMetadataStoreService.java:458) org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$null$11(TransactionMetadataStoreService.java:349) org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1309.accept() java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl$3.addComplete(MLTransactionLogImpl.java:160) org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:228) org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) ```
Motivation
Removing semaphore on the end of transactions operations. The semaphore is not very useful here as we are already closing the transactions (backpressure should eventually be applied at the starting of the transactions).
The semaphore here is being acquired from a BK callback thread and it causes a deadlock in broker when the semaphore is full, because the response that will release the permits on the semaphore are coming from either the same thread or a thread in the same condition.