Skip to content

Commit

Permalink
[fix] Avoid redelivering duplicated messages when batching is enabled (
Browse files Browse the repository at this point in the history
…apache#18486)

apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454.

The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations.

Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454.

`ConsumerRedeliveryTest#testAckNotSent` is added to verify it works.

The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

PR in forked repository: BewareMyPower#8

(cherry picked from commit be1d07e)
(cherry picked from commit 870b060)
  • Loading branch information
BewareMyPower authored and nicoloboschi committed Dec 6, 2022
1 parent 5e65536 commit 71c3b25
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;

import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,19 @@ public Object[][] ackReceiptEnabled() {
return new Object[][] { { true }, { false } };
}

@DataProvider(name = "batchedMessageAck")
public Object[][] batchedMessageAck() {
// When batch index ack is disabled (by default), only after all single messages were sent would the pending
// ACK be added into the ACK tracker.
return new Object[][] {
// numAcked, batchSize, ack type
{ 3, 5, CommandAck.AckType.Individual },
{ 5, 5, CommandAck.AckType.Individual },
{ 3, 5, CommandAck.AckType.Cumulative },
{ 5, 5, CommandAck.AckType.Cumulative }
};
}

/**
* It verifies that redelivered messages are sorted based on the ledger-ids.
* <pre>
Expand Down Expand Up @@ -301,4 +315,57 @@ public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exce
consumer.close();
producer.close();
}

@Test(timeOut = 30000, dataProvider = "batchedMessageAck")
public void testAckNotSent(int numAcked, int batchSize, CommandAck.AckType ackType) throws Exception {
String topic = "persistent://my-property/my-ns/test-ack-not-sent-"
+ numAcked + "-" + batchSize + "-" + ackType.getValue();
@Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.enableBatchIndexAcknowledgment(false)
.acknowledgmentGroupTime(1, TimeUnit.HOURS) // ACK won't be sent
.subscribe();
@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(batchSize)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create();
for (int i = 0; i < batchSize; i++) {
String value = "msg-" + i;
producer.sendAsync(value).thenAccept(id -> log.info("{} was sent to {}", value, id));
}
List<Message<String>> messages = new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
messages.add(consumer.receive());
}
if (ackType == CommandAck.AckType.Individual) {
for (int i = 0; i < numAcked; i++) {
consumer.acknowledge(messages.get(i));
}
} else {
consumer.acknowledgeCumulative(messages.get(numAcked - 1));
}

consumer.redeliverUnacknowledgedMessages();

messages.clear();
for (int i = 0; i < batchSize; i++) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
log.info("Received {} from {}", msg.getValue(), msg.getMessageId());
messages.add(msg);
}
List<String> values = messages.stream().map(Message::getValue).collect(Collectors.toList());
// All messages are redelivered because only if the whole batch are acknowledged would the message ID be
// added into the ACK tracker.
if (numAcked < batchSize) {
assertEquals(values, IntStream.range(0, batchSize).mapToObj(i -> "msg-" + i).collect(Collectors.toList()));
} else {
assertTrue(values.isEmpty());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,18 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
*/
@Override
public boolean isDuplicate(MessageId messageId) {
final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
if (!(messageId instanceof MessageIdImpl)) {
throw new IllegalArgumentException("isDuplicated cannot accept "
+ messageId.getClass().getName() + ": " + messageId);
}
if (lastCumulativeAck.compareTo(messageId) >= 0) {
// Already included in a cumulative ack
return true;
} else {
return pendingIndividualAcks.contains((MessageIdImpl) messageId);
final MessageIdImpl messageIdImpl = (messageId instanceof BatchMessageIdImpl)
? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
: (MessageIdImpl) messageId;
return pendingIndividualAcks.contains(messageIdImpl);
}
}

Expand Down Expand Up @@ -657,7 +663,7 @@ protected LastCumulativeAck initialValue() {
private boolean flushRequired = false;

public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
if (messageId.compareTo(this.messageId) > 0) {
if (compareTo(messageId) < 0) {
if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
this.bitSetRecyclable.recycle();
}
Expand Down Expand Up @@ -691,6 +697,24 @@ public synchronized void reset() {
flushRequired = false;
}

public synchronized int compareTo(MessageId messageId) {
if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) {
final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
final MessageIdImpl rhs = (MessageIdImpl) messageId;
return MessageIdImpl.messageIdCompare(
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(),
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE);
} else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){
final MessageIdImpl lhs = this.messageId;
final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
return MessageIdImpl.messageIdCompare(
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE,
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex());
} else {
return this.messageId.compareTo(messageId);
}
}

private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
this.messageId = messageId;
this.bitSetRecyclable = bitSetRecyclable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,24 @@ public void testFlush() {
assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
}

@Test
public void testCompareTo() {
LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
lastCumulativeAck.update(new MessageIdImpl(0L, 1L, -1), null);

assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
assertEquals(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)), 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);

lastCumulativeAck = new LastCumulativeAck();
lastCumulativeAck.update(new BatchMessageIdImpl(0L, 1L, -1, 1), null);

assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 0L, -1)) > 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 1L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new MessageIdImpl(0L, 2L, -1)) < 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 0)) > 0);
assertTrue(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 2)) < 0);
assertEquals(lastCumulativeAck.compareTo(new BatchMessageIdImpl(0L, 1L, -1, 1)), 0);
}
}

0 comments on commit 71c3b25

Please sign in to comment.