Skip to content

Commit

Permalink
[consumer] Revert "Remove consumer unnecessary locks (#9261)" (#10240)
Browse files Browse the repository at this point in the history
  • Loading branch information
linlinnn authored Apr 27, 2021
1 parent bbd4e5d commit e95d6f0
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
Message<T> message = null;
lock.writeLock().lock();
try {
message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
if (message == null) {
Expand All @@ -422,6 +423,8 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(e);
} finally {
lock.writeLock().unlock();
}

if (message != null) {
Expand Down Expand Up @@ -472,8 +475,8 @@ protected Messages<T> internalBatchReceive() throws PulsarClientException {
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
lock.writeLock().lock();
try {
lock.writeLock().lock();
if (pendingBatchReceives == null) {
pendingBatchReceives = Queues.newConcurrentLinkedQueue();
}
Expand All @@ -498,7 +501,6 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
} finally {
lock.writeLock().unlock();
}

return result;
}

Expand Down Expand Up @@ -561,8 +563,8 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
delayTime = 0;
}
if (retryLetterProducer == null) {
createProducerLock.writeLock().lock();
try {
createProducerLock.writeLock().lock();
if (retryLetterProducer == null) {
retryLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getRetryLetterTopic())
Expand Down Expand Up @@ -957,9 +959,14 @@ private void closeConsumerTasks() {
}

private void failPendingReceive() {
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
lock.readLock().lock();
try {
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
}
} finally {
lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -1058,18 +1065,23 @@ uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redelive
poolMessages);
uncompressedPayload.release();

// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
Collections.singletonList(message));
}
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
lock.readLock().lock();
try {
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
Collections.singletonList(message));
}
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
} finally {
lock.readLock().unlock();
}
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
Expand Down Expand Up @@ -1280,11 +1292,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}

if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
lock.readLock().lock();
try {
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
} finally {
lock.readLock().unlock();
}
singleMessagePayload.release();
}
Expand Down Expand Up @@ -1706,8 +1722,8 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageId)

private void initDeadLetterProducerIfNeeded() {
if (deadLetterProducer == null) {
createProducerLock.writeLock().lock();
try {
createProducerLock.writeLock().lock();
if (deadLetterProducer == null) {
deadLetterProducer = client.newProducer(schema)
.topic(this.deadLetterPolicy.getDeadLetterTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,17 @@ private ConsumerConfigurationData<T> getInternalConsumerConfig() {

@Override
public void redeliverUnacknowledgedMessages() {
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();

lock.writeLock().lock();
try {
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();
} finally {
lock.writeLock().unlock();
}
resumeReceivingFromPausedConsumersIfNeeded();
}

Expand Down

0 comments on commit e95d6f0

Please sign in to comment.