Skip to content

Commit

Permalink
Fix NPE of cumulative ack mode and incorrect unack message count (#14021
Browse files Browse the repository at this point in the history
)

link #13383
## Motivation
#13383 has fixed  the batch message ack does not decrease the unacked-msg count, but in cumulative ack mode also decrease, it will use pendingAcks, but in cumulative ack, this will not init.

![image](https://user-images.githubusercontent.com/39078850/151622041-7fb0acc5-32fd-4140-82d7-8c75d2a6aef5.png)
![image](https://user-images.githubusercontent.com/39078850/151622106-bf75f3fa-84d5-4099-99f4-50f4dddd43a2.png)

If ack the batch index one by one, the last ack of a batch will decrease unack message with `batchSize`
```
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 9
================ batch size -> 10
```

### Modifications
add judge `Subscription.isIndividualAckMode(subType)` when get ackCount.
If the ack from consumer don't have ackset, we should treat it as empty ackset to calculate the ack count with the currently ackset.

(cherry picked from commit 618f17c)
  • Loading branch information
congbobo184 authored and zymap committed Jan 30, 2022
1 parent fc271ad commit 40579c5
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class Consumer {
private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry");
private volatile int avgMessagesPerEntry = 1000;
private static final long [] EMPTY_ACK_SET = new long[0];

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
Expand Down Expand Up @@ -413,10 +414,10 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,
}
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
if (isAcknowledgmentAtBatchIndexLevelEnabled) {
if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
ackedCount = batchSize - BitSet.valueOf(cursorAckSet).cardinality();
ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
} else {
ackedCount = batchSize;
}
Expand Down Expand Up @@ -521,7 +522,7 @@ private long getBatchSize(MessageIdData msgId) {

private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) {
long ackedCount = 0;
if (isAcknowledgmentAtBatchIndexLevelEnabled) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,27 @@
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -105,4 +111,66 @@ public void testBatchMessageAck() {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
});
}

@DataProvider(name = "testSubTypeAndEnableBatch")
public Object[][] testSubTypeAndEnableBatch() {
return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
{ SubscriptionType.Failover, Boolean.TRUE },
{ SubscriptionType.Shared, Boolean.FALSE },
{ SubscriptionType.Failover, Boolean.FALSE }};
}


@Test(dataProvider="testSubTypeAndEnableBatch")
private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType,
boolean enableBatch) throws Exception {

final int messageCount = 50;
final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
final String subscriptionName = "sub-batch-1";
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
.newConsumer(Schema.BYTES)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.enableBatching(enableBatch)
.topic(topicName)
.batchingMaxMessages(10)
.create();

CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown);
}

countDownLatch.await();

for (int i = 0; i < messageCount; i++) {
Message<byte[]> message = consumer.receive();
// wait for receipt
if (i < messageCount / 2) {
consumer.acknowledgeAsync(message.getMessageId()).get();
}
}

String topic = TopicName.get(topicName).toString();
PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopic(topic, false).get().get().getSubscription(subscriptionName);

Awaitility.await().untilAsserted(() -> {
if (subType == SubscriptionType.Shared) {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
}
});
}
}

0 comments on commit 40579c5

Please sign in to comment.