diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java index cb74bf3bf2e14..552ee275dd543 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java @@ -28,14 +28,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClientException; -import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -52,8 +50,6 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler { private final AtomicLong requestIdGenerator = new AtomicLong(); private final long operationTimeoutInMills; private final HashedWheelTimer timer; - private final Semaphore semaphore; - private final boolean blockIfReachMaxPendingOps; private final PulsarClient pulsarClient; private final LoadingCache> cache = CacheBuilder.newBuilder() @@ -77,8 +73,6 @@ public TransactionBufferHandlerImpl(PulsarClient pulsarClient, this.pulsarClient = pulsarClient; this.pendingRequests = new ConcurrentSkipListMap<>(); this.operationTimeoutInMills = 3000L; - this.semaphore = new Semaphore(10000); - this.blockIfReachMaxPendingOps = true; this.timer = timer; } @@ -90,9 +84,6 @@ public CompletableFuture endTxnOnTopic(String topic, long txnIdMostBits, topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue()); } CompletableFuture cb = new CompletableFuture<>(); - if (!canSendRequest(cb)) { - return cb; - } long requestId = requestIdGenerator.getAndIncrement(); ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits, topic, action, lowWaterMark); @@ -108,9 +99,6 @@ public CompletableFuture endTxnOnSubscription(String topic, String subscr topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue()); } CompletableFuture cb = new CompletableFuture<>(); - if (!canSendRequest(cb)) { - return cb; - } long requestId = requestIdGenerator.getAndIncrement(); ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits, topic, subscription, action, lowWaterMark); @@ -210,29 +198,9 @@ public void handleEndTxnOnSubscriptionResponse(long requestId, onResponse(op); } - private boolean canSendRequest(CompletableFuture callback) { - try { - if (blockIfReachMaxPendingOps) { - semaphore.acquire(); - } else { - if (!semaphore.tryAcquire()) { - callback.completeExceptionally(new ReachMaxPendingOpsException("Reach max pending ops.")); - return false; - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - callback.completeExceptionally(TransactionBufferClientException.unwrap(e)); - return false; - } - return true; - } - - void onResponse(OpRequestSend op) { ReferenceCountUtil.safeRelease(op.byteBuf); op.recycle(); - semaphore.release(); } private static final class OpRequestSend { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java index 0082d81f5fe41..c25447bfb0915 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java @@ -30,7 +30,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -248,16 +247,6 @@ public void testTransactionBufferLookUp() throws Exception { @Test public void testTransactionBufferHandlerSemaphore() throws Exception { - - Field field = TransactionBufferClientImpl.class.getDeclaredField("tbHandler"); - field.setAccessible(true); - TransactionBufferHandlerImpl transactionBufferHandler = (TransactionBufferHandlerImpl) field.get(tbClient); - - field = TransactionBufferHandlerImpl.class.getDeclaredField("semaphore"); - field.setAccessible(true); - field.set(transactionBufferHandler, new Semaphore(2)); - - String topic = "persistent://" + namespace + "/testTransactionBufferHandlerSemaphore"; String subName = "test";