From a51d02c8e3104cc65facd878692a921c27e9fb4f Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 28 Nov 2024 16:41:16 +0800 Subject: [PATCH] fix deadlock of negative tracker. --- .../client/impl/NegativeAcksTracker.java | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index e1724ebb85cda..5256ebf04f43c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -68,36 +68,38 @@ public NegativeAcksTracker(ConsumerBase consumer, ConsumerConfigurationData messagesToRedeliver = new HashSet<>(); - long now = System.nanoTime(); - nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> { - if (timestamp < now) { - MessageId msgId = new MessageIdImpl(ledgerId, entryId, - // need to covert non-partitioned topic partition index to -1 - (int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex)); - addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); - messagesToRedeliver.add(msgId); + synchronized (this) { + if (nackedMessages.isEmpty()) { + this.timeout = null; + return; } - }); - if (!messagesToRedeliver.isEmpty()) { + long now = System.nanoTime(); + nackedMessages.forEach((ledgerId, entryId, partitionIndex, timestamp) -> { + if (timestamp < now) { + MessageId msgId = new MessageIdImpl(ledgerId, entryId, + // need to covert non-partitioned topic partition index to -1 + (int) (partitionIndex == NON_PARTITIONED_TOPIC_PARTITION_INDEX ? -1 : partitionIndex)); + addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); + messagesToRedeliver.add(msgId); + } + }); for (MessageId messageId : messagesToRedeliver) { nackedMessages.remove(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId()); } + this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); + } + + // release the lock of NegativeAcksTracker before calling consumer.redeliverUnacknowledgedMessages, + // in which we may acquire the lock of consumer, leading to potential deadlock. + if (!messagesToRedeliver.isEmpty()) { consumer.onNegativeAcksSend(messagesToRedeliver); log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size()); consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); } - - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); } public synchronized void add(MessageId messageId) {