Skip to content

Commit

Permalink
Fix Duplicated messages are sent to dead letter topic apache#6960 (ap…
Browse files Browse the repository at this point in the history
…ache#7021)

Fixes apache#6960


### Motivation

bug fix

### Modifications

"UnAckedMessageTracker" will call "consumerBase.redeliverUnacknowledgedMessages". There will be 2 steps here: 
1) Filter out the messages that need to enter the DLQ
2) The remaining messages will be re-delivered via RedeliverUnacknowledgedMessages request

The problem appeared in the second step, when all messages were filtered out in the first step. If the MessageIdsList received by the broker is empty, it will trigger the reposting of all unackedMessages under this consumer.

Other consumers will consume messages that exceed maxRedeliveryCount again, and then the messages will repeatedly enter DLQ

Therefore, if all messages have been filtered out, the RedeliverUnacknowledgedMessages request should not be initiated. Just let "processPossibleToDLQ" send them to DLQ and ack.

### Verifying this change
unit test:
org.apache.pulsar.client.api.DeadLetterTopicTest#testDuplicatedMessageSendToDeadLetterTopic
  • Loading branch information
315157973 authored and cdbartholomew committed Jul 24, 2020
1 parent c8101bc commit 7e76a73
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@
*/
package org.apache.pulsar.client.api;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;

public class DeadLetterTopicTest extends ProducerConsumerBase {

Expand Down Expand Up @@ -116,6 +123,77 @@ public void testDeadLetterTopic() throws Exception {
newPulsarClient.close();
}

@Test(timeOut = 30000)
public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception {
final String topic = "persistent://my-property/my-ns/dead-letter-topic-DuplicatedMessage";
final int maxRedeliveryCount = 1;
final int messageCount = 10;
final int consumerCount = 3;
//1 start 3 parallel consumers
List<Consumer<String>> consumers = new ArrayList<>();
final AtomicInteger totalReceived = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(consumerCount);
for (int i = 0; i < consumerCount; i++) {
executor.execute(() -> {
try {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("my-subscription-DuplicatedMessage")
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1001, TimeUnit.MILLISECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).deadLetterTopic(topic + "-DLQ").build())
.negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
totalReceived.getAndIncrement();
//never ack
})
.subscribe();
consumers.add(consumer);
} catch (PulsarClientException e) {
fail();
}
});
}

//2 send messages
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < messageCount; i++) {
producer.send(String.format("Message [%d]", i));
}

//3 start a DLQ consumer
Consumer<String> deadLetterConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic + "-DLQ")
.subscriptionName("my-subscription-DuplicatedMessage-DLQ")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
int totalInDeadLetter = 0;
while (true) {
Message<String> message = deadLetterConsumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
}

//4 The number of messages that consumers can consume should be equal to messageCount * (maxRedeliveryCount + 1)
assertEquals(totalReceived.get(), messageCount * (maxRedeliveryCount + 1));

//5 The message in DLQ should be equal to messageCount
assertEquals(totalInDeadLetter, messageCount);

//6 clean up
executor.shutdownNow();
producer.close();
deadLetterConsumer.close();
for (Consumer<String> consumer : consumers) {
consumer.close();
}
}

/**
* The test is disabled {@link https://github.com/apache/pulsar/issues/2647}.
* @throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1676,10 +1676,11 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
builder.setEntryId(messageId.getEntryId());
return builder.build();
}).collect(Collectors.toList());

ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
messageIdDatas.forEach(MessageIdData::recycle);
if (!messageIdDatas.isEmpty()) {
ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
messageIdDatas.forEach(MessageIdData::recycle);
}
});
if (messagesFromQueue > 0) {
increaseAvailablePermits(cnx, messagesFromQueue);
Expand Down

0 comments on commit 7e76a73

Please sign in to comment.