Skip to content

Commit

Permalink
[Transaction] Resolve the performance bottleneck of TransactionBuffer…
Browse files Browse the repository at this point in the history
…Handle (#13988)

### Motivation
Previously, synchronization locks were frequently used in TransactionBufferHandleImpl in order to achieve request timeouts. For this reason, the performance of TC has been affected.
### Modification
Each request uses a Timeout to do a timeout check

(cherry picked from commit f48b53d)
  • Loading branch information
liangyepianzhou authored and codelipenghui committed Jan 29, 2022
1 parent 137200b commit fa549f6
Showing 1 changed file with 15 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -49,12 +46,11 @@
import org.apache.pulsar.common.protocol.Commands;

@Slf4j
public class TransactionBufferHandlerImpl implements TransactionBufferHandler, TimerTask {
public class TransactionBufferHandlerImpl implements TransactionBufferHandler {

private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
private final AtomicLong requestIdGenerator = new AtomicLong();
private final long operationTimeoutInMills;
private Timeout requestTimeout;
private final HashedWheelTimer timer;
private final Semaphore semaphore;
private final boolean blockIfReachMaxPendingOps;
Expand Down Expand Up @@ -84,11 +80,10 @@ public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
this.semaphore = new Semaphore(10000);
this.blockIfReachMaxPendingOps = true;
this.timer = timer;
this.requestTimeout = timer.newTimeout(this, operationTimeoutInMills, TimeUnit.MILLISECONDS);
}

@Override
public synchronized CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits,
public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits,
TxnAction action, long lowWaterMark) {
if (log.isDebugEnabled()) {
log.debug("[{}] endTxnOnTopic txnId: [{}], txnAction: [{}]",
Expand All @@ -105,7 +100,7 @@ public synchronized CompletableFuture<TxnID> endTxnOnTopic(String topic, long tx
}

@Override
public synchronized CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription,
public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription,
long txnIdMostBits, long txnIdLeastBits,
TxnAction action, long lowWaterMark) {
if (log.isDebugEnabled()) {
Expand All @@ -129,10 +124,16 @@ private CompletableFuture<TxnID> endTxn(long requestId, String topic, ByteBuf cm
if (throwable == null) {
if (clientCnx.ctx().channel().isActive()) {
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
synchronized (TransactionBufferHandlerImpl.this) {
pendingRequests.put(requestId, op);
cmd.retain();
}
pendingRequests.put(requestId, op);
timer.newTimeout(timeout -> {
OpRequestSend peek = pendingRequests.remove(requestId);
if (peek != null && !peek.cb.isDone() && !peek.cb.isCompletedExceptionally()) {
peek.cb.completeExceptionally(new TransactionBufferClientException
.RequestTimeoutException());
onResponse(peek);
}
}, operationTimeoutInMills, TimeUnit.MILLISECONDS);
cmd.retain();
clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
} else {
cache.invalidate(topic);
Expand All @@ -158,7 +159,7 @@ private CompletableFuture<TxnID> endTxn(long requestId, String topic, ByteBuf cm
}

@Override
public synchronized void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartitionResponse response) {
OpRequestSend op = pendingRequests.remove(requestId);
if (op == null) {
if (log.isDebugEnabled()) {
Expand All @@ -183,7 +184,7 @@ public synchronized void handleEndTxnOnTopicResponse(long requestId, CommandEndT
}

@Override
public synchronized void handleEndTxnOnSubscriptionResponse(long requestId,
public void handleEndTxnOnSubscriptionResponse(long requestId,
CommandEndTxnOnSubscriptionResponse response) {
OpRequestSend op = pendingRequests.remove(requestId);
if (op == null) {
Expand Down Expand Up @@ -227,32 +228,6 @@ private boolean canSendRequest(CompletableFuture<?> callback) {
return true;
}

public synchronized void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
long timeToWaitMs;
OpRequestSend peeked;
Map.Entry<Long, OpRequestSend> firstEntry = pendingRequests.firstEntry();
peeked = firstEntry == null ? null : firstEntry.getValue();
while (peeked != null && peeked.createdAt + operationTimeoutInMills - System.currentTimeMillis() <= 0) {
if (!peeked.cb.isDone()) {
peeked.cb.completeExceptionally(new TransactionBufferClientException.RequestTimeoutException());
onResponse(peeked);
} else {
break;
}
firstEntry = pendingRequests.firstEntry();
pendingRequests.remove(pendingRequests.firstKey());
peeked = firstEntry == null ? null : firstEntry.getValue();
}
if (peeked == null) {
timeToWaitMs = operationTimeoutInMills;
} else {
timeToWaitMs = (peeked.createdAt + operationTimeoutInMills) - System.currentTimeMillis();
}
requestTimeout = timer.newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
}

void onResponse(OpRequestSend op) {
ReferenceCountUtil.safeRelease(op.byteBuf);
Expand Down

0 comments on commit fa549f6

Please sign in to comment.