Skip to content

Commit

Permalink
Revert "Remove consumer unnecessary locks (apache#9261)"
Browse files Browse the repository at this point in the history
This reverts commit 9d08f64
  • Loading branch information
linjunhua committed Apr 26, 2021
1 parent 8fed161 commit 0b8288e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
Message<T> message = null;
try {
lock.writeLock().lock();
message = incomingMessages.poll(0, TimeUnit.MILLISECONDS);
if (message == null) {
pendingReceives.add(result);
Expand All @@ -422,6 +423,8 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(e);
} finally {
lock.writeLock().unlock();
}

if (message != null) {
Expand Down Expand Up @@ -498,7 +501,6 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
} finally {
lock.writeLock().unlock();
}

return result;
}

Expand Down Expand Up @@ -957,9 +959,14 @@ private void closeConsumerTasks() {
}

private void failPendingReceive() {
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
lock.readLock().lock();
try {
if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
failPendingReceives(this.pendingReceives);
failPendingBatchReceives(this.pendingBatchReceives);
}
} finally {
lock.readLock().unlock();
}
}

Expand Down Expand Up @@ -1058,18 +1065,23 @@ uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redelive
poolMessages);
uncompressedPayload.release();

// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
Collections.singletonList(message));
}
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
lock.readLock().lock();
try {
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(),
Collections.singletonList(message));
}
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
} finally {
lock.readLock().unlock();
}
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
Expand Down Expand Up @@ -1280,11 +1292,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
}

if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
lock.readLock().lock();
try {
if (peekPendingReceive() != null) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
} finally {
lock.readLock().unlock();
}
singleMessagePayload.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,17 @@ private ConsumerConfigurationData<T> getInternalConsumerConfig() {

@Override
public void redeliverUnacknowledgedMessages() {
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();

lock.writeLock().lock();
try {
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunkedMessageIdSequenceMap.clear();
});
clearIncomingMessages();
unAckedMessageTracker.clear();
} finally {
lock.writeLock().unlock();
}
resumeReceivingFromPausedConsumersIfNeeded();
}

Expand Down

0 comments on commit 0b8288e

Please sign in to comment.