From a31afbbb32ba12820e0e70cb62ec390c2f0f075b Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Fri, 7 Jul 2023 10:29:19 +0800 Subject: [PATCH 1/7] fix negative message re-delivery twice issue --- .../apache/pulsar/client/impl/NegativeAcksTest.java | 3 +++ .../org/apache/pulsar/client/impl/ConsumerBase.java | 7 +++++++ .../org/apache/pulsar/client/impl/ConsumerImpl.java | 4 ---- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 2 ++ .../pulsar/client/impl/NegativeAcksTracker.java | 12 +++++++++--- .../pulsar/client/impl/UnAckedMessageTracker.java | 8 ++++++++ 6 files changed, 29 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 0ae36b4ca9045..876fa98bce414 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -146,6 +146,9 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti consumer.negativeAcknowledge(msg); } + assertTrue(consumer instanceof ConsumerBase); + assertEquals(((ConsumerBase) consumer).getUnAckedMessageTracker().size(), 0); + Set receivedMessages = new HashSet<>(); // All the messages should be received again diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index e933005f2d6ea..4896bc6bef6b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -20,6 +20,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import io.netty.util.Timeout; import java.nio.charset.StandardCharsets; @@ -193,6 +195,11 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat initReceiverQueueSize(); } + @VisibleForTesting + public UnAckedMessageTracker getUnAckedMessageTracker() { + return unAckedMessageTracker; + } + protected void triggerBatchReceiveTimeoutTask() { if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) { batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b0d7d3a0f8b3a..2ea24c1ef135d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -385,10 +385,6 @@ public ConnectionHandler getConnectionHandler() { return connectionHandler; } - public UnAckedMessageTracker getUnAckedMessageTracker() { - return unAckedMessageTracker; - } - @VisibleForTesting NegativeAcksTracker getNegativeAcksTracker() { return negativeAcksTracker; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index fb7be3c5a5ea2..2026a8b406804 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -546,6 +546,7 @@ public void negativeAcknowledge(MessageId messageId) { checkArgument(messageId instanceof TopicMessageId); ConsumerImpl consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic()); consumer.negativeAcknowledge(messageId); + unAckedMessageTracker.remove(messageId); } @Override @@ -554,6 +555,7 @@ public void negativeAcknowledge(Message message) { checkArgument(messageId instanceof TopicMessageId); ConsumerImpl consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic()); consumer.negativeAcknowledge(message); + unAckedMessageTracker.remove(messageId); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 37f58a0218091..d6b86e3593dc2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -32,8 +32,11 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class NegativeAcksTracker implements Closeable { + private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class); private HashMap nackedMessages = null; @@ -79,9 +82,12 @@ private synchronized void triggerRedelivery(Timeout t) { } }); - messagesToRedeliver.forEach(nackedMessages::remove); - consumer.onNegativeAcksSend(messagesToRedeliver); - consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); + if (!messagesToRedeliver.isEmpty()) { + messagesToRedeliver.forEach(nackedMessages::remove); + consumer.onNegativeAcksSend(messagesToRedeliver); + log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size()); + consumer.redeliverUnacknowledgedMessages(messagesToRedeliver); + } this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 534f33350267d..3073bd64fd941 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -189,6 +189,10 @@ public void clear() { } public boolean add(MessageId messageId) { + if (messageId == null) { + return false; + } + messageId = MessageIdAdvUtils.discardBatch(messageId); writeLock.lock(); try { HashSet partition = timePartitions.peekLast(); @@ -217,6 +221,10 @@ boolean isEmpty() { } public boolean remove(MessageId messageId) { + if (messageId == null) { + return false; + } + messageId = MessageIdAdvUtils.discardBatch(messageId); writeLock.lock(); try { boolean removed = false; From 492b388cf1480b876432b9c0c9c2e23e4f58472e Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Thu, 13 Jul 2023 13:02:35 +0800 Subject: [PATCH 2/7] fix checkstyle --- .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 4896bc6bef6b9..056ba8e434f2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; - import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import io.netty.util.Timeout; From cf5ee58fb0286b4625a0ae98e603186732b083e7 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Thu, 13 Jul 2023 14:46:01 +0800 Subject: [PATCH 3/7] remove annotation --- .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 1 - .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 4 ---- 2 files changed, 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 056ba8e434f2c..579a40c1b5011 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -194,7 +194,6 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat initReceiverQueueSize(); } - @VisibleForTesting public UnAckedMessageTracker getUnAckedMessageTracker() { return unAckedMessageTracker; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 2026a8b406804..add11e7cf8d47 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -854,10 +854,6 @@ public synchronized ConsumerStats getStats() { return stats; } - public UnAckedMessageTracker getUnAckedMessageTracker() { - return unAckedMessageTracker; - } - private void removeExpiredMessagesFromQueue(Set messageIds) { Message peek = incomingMessages.peek(); if (peek != null) { From 6126a5a48bf961c975749fb516229547e943a36f Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Thu, 13 Jul 2023 16:19:30 +0800 Subject: [PATCH 4/7] remove useless import --- .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 579a40c1b5011..7d0053b7de462 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; import io.netty.util.Timeout; import java.nio.charset.StandardCharsets; From 47d87b8bb63f981766ad048457a3d63fef180734 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Thu, 13 Jul 2023 18:44:42 +0800 Subject: [PATCH 5/7] public to protect --- .../main/java/org/apache/pulsar/client/impl/ConsumerBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 7d0053b7de462..fec428824c205 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -193,7 +193,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat initReceiverQueueSize(); } - public UnAckedMessageTracker getUnAckedMessageTracker() { + protected UnAckedMessageTracker getUnAckedMessageTracker() { return unAckedMessageTracker; } From 4a6a1ef202eb4fd65e34ac375e6097602754a0b5 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Sat, 15 Jul 2023 07:45:42 +0800 Subject: [PATCH 6/7] resolve comments --- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 5 +++++ .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2ea24c1ef135d..1b04babea9b98 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -385,6 +385,11 @@ public ConnectionHandler getConnectionHandler() { return connectionHandler; } + @Override + public UnAckedMessageTracker getUnAckedMessageTracker() { + return unAckedMessageTracker; + } + @VisibleForTesting NegativeAcksTracker getNegativeAcksTracker() { return negativeAcksTracker; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index add11e7cf8d47..8a515a9f9b8d7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -854,6 +854,11 @@ public synchronized ConsumerStats getStats() { return stats; } + @Override + public UnAckedMessageTracker getUnAckedMessageTracker() { + return unAckedMessageTracker; + } + private void removeExpiredMessagesFromQueue(Set messageIds) { Message peek = incomingMessages.peek(); if (peek != null) { From 305ac2e244d7c4717c7f59be02715a08311585eb Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Sun, 16 Jul 2023 19:25:45 +0800 Subject: [PATCH 7/7] Keep consistent type for message id adding to and removing from the UnAckedMessageTracker --- .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +- .../org/apache/pulsar/client/impl/UnAckedMessageTracker.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1b04babea9b98..a929fe9aa6bb2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -757,7 +757,7 @@ public void negativeAcknowledge(Message message) { negativeAcksTracker.add(message); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" - unAckedMessageTracker.remove(message.getMessageId()); + unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId())); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 3073bd64fd941..69f86a1a89f2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -192,7 +192,7 @@ public boolean add(MessageId messageId) { if (messageId == null) { return false; } - messageId = MessageIdAdvUtils.discardBatch(messageId); + writeLock.lock(); try { HashSet partition = timePartitions.peekLast(); @@ -224,7 +224,7 @@ public boolean remove(MessageId messageId) { if (messageId == null) { return false; } - messageId = MessageIdAdvUtils.discardBatch(messageId); + writeLock.lock(); try { boolean removed = false;