Skip to content

Commit

Permalink
[improve] [broker] Avoid repeated Read-and-discard when using Key_Sha…
Browse files Browse the repository at this point in the history
…red mode (apache#22245)

(cherry picked from commit e34ea62)
(cherry picked from commit 5b37e84)
poorbarcode authored and srinath-ctds committed Apr 23, 2024
1 parent ddf77be commit 3eb5c2d
Showing 5 changed files with 470 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -95,6 +95,14 @@ private void removeFromHashBlocker(long ledgerId, long entryId) {
}
}

public Long getHash(long ledgerId, long entryId) {
LongPair value = hashesToBeBlocked.get(ledgerId, entryId);
if (value == null) {
return null;
}
return value.first;
}

public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
if (!allowOutOfOrderDelivery) {
List<LongPair> keysToRemove = new ArrayList<>();
Original file line number Diff line number Diff line change
@@ -309,24 +309,25 @@ public synchronized void readMoreEntries() {
}

NavigableSet<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

if (!messagesToReplayNow.isEmpty()) {
NavigableSet<PositionImpl> messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow);
if (!messagesToReplayFiltered.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
consumerList.size());
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
messagesToReplayFiltered.size(), consumerList.size());
}

havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.first();
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
? asyncReplayEntriesInOrder(messagesToReplayFiltered)
: asyncReplayEntries(messagesToReplayFiltered);
// clear already acked positions from replay bucket

deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
((PositionImpl) position).getEntryId()));
// if all the entries are acked-entries and cleared up from redeliveryMessages, try to read
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntriesAsync();
}
@@ -335,7 +336,7 @@ public synchronized void readMoreEntries() {
log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
}
} else if (!havePendingRead) {
} else if (!havePendingRead && hasConsumersNeededNormalRead()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
consumerList.size());
@@ -364,7 +365,16 @@ public synchronized void readMoreEntries() {
topic.getMaxReadPosition());
}
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
if (log.isDebugEnabled()) {
if (!messagesToReplayNow.isEmpty()) {
log.debug("[{}] [{}] Skipping read for the topic: because all entries in replay queue were"
+ " filtered out due to the mechanism of Key_Shared mode, and the left consumers have"
+ " no permits now",
topic.getName(), getSubscriptionName());
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
}
}
}
} else {
if (log.isDebugEnabled()) {
@@ -1106,6 +1116,27 @@ protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int max
}
}

/**
* This is a mode method designed for Key_Shared mode.
* Filter out the entries that will be discarded due to the order guarantee mechanism of Key_Shared mode.
* This method is in order to avoid the scenario below:
* - Get positions from the Replay queue.
* - Read entries from BK.
* - The order guarantee mechanism of Key_Shared mode filtered out all the entries.
* - Delivery non entry to the client, but we did a BK read.
*/
protected NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
return src;
}

/**
* This is a mode method designed for Key_Shared mode, to avoid unnecessary stuck.
* See detail {@link PersistentStickyKeyDispatcherMultipleConsumers#hasConsumersNeededNormalRead}.
*/
protected boolean hasConsumersNeededNormalRead() {
return true;
}

protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries();
}
Original file line number Diff line number Diff line change
@@ -30,13 +30,16 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
@@ -165,6 +168,14 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {
}
};

private static final FastThreadLocal<Map<Consumer, List<PositionImpl>>> localGroupedPositions =
new FastThreadLocal<Map<Consumer, List<PositionImpl>>>() {
@Override
protected Map<Consumer, List<PositionImpl>> initialValue() throws Exception {
return new HashMap<>();
}
};

@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
@@ -248,15 +259,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
assert consumer != null; // checked when added to groupedEntries
List<Entry> entriesWithSameKey = current.getValue();
int entriesWithSameKeyCount = entriesWithSameKey.size();
int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
if (consumer.getMaxUnackedMessages() > 0) {
int remainUnAckedMessages =
// Avoid negative number
Math.max(consumer.getMaxUnackedMessages() - consumer.getUnackedMessages(), 0);
availablePermits = Math.min(availablePermits, remainUnAckedMessages);
}
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
int availablePermits = getAvailablePermits(consumer);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer,
entriesWithSameKey.stream().map(Entry::getPosition).collect(Collectors.toList()), availablePermits,
readType, consumerStickyKeyHashesMap.get(consumer));
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read type is {}",
@@ -289,7 +294,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
totalEntries += filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);

consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
@@ -332,8 +336,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
return false;
}

private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
ReadType readType, Set<Integer> stickyKeyHashes) {
private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<? extends Position> entries,
int availablePermits, ReadType readType, Set<Integer> stickyKeyHashes) {
int maxMessages = Math.min(entries.size(), availablePermits);
if (maxMessages == 0) {
return 0;
}
@@ -378,7 +383,7 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
// Here, the consumer is one that has recently joined, so we can only send messages that were
// published before it has joined.
for (int i = 0; i < maxMessages; i++) {
if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
if (((PositionImpl) entries.get(i)).compareTo(maxReadPosition) >= 0) {
// We have already crossed the divider line. All messages in the list are now
// newer than what we can currently dispatch to this consumer
return i;
@@ -405,6 +410,9 @@ && removeConsumersFromRecentJoinedConsumers()) {
}

private boolean removeConsumersFromRecentJoinedConsumers() {
if (MapUtils.isEmpty(recentlyJoinedConsumers)) {
return false;
}
Iterator<Map.Entry<Consumer, PositionImpl>> itr = recentlyJoinedConsumers.entrySet().iterator();
boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
@@ -437,6 +445,76 @@ protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int max
}
}

private int getAvailablePermits(Consumer c) {
int availablePermits = Math.max(c.getAvailablePermits(), 0);
if (c.getMaxUnackedMessages() > 0) {
// Avoid negative number
int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
availablePermits = Math.min(availablePermits, remainUnAckedMessages);
}
return availablePermits;
}

@Override
protected synchronized NavigableSet<PositionImpl> filterOutEntriesWillBeDiscarded(NavigableSet<PositionImpl> src) {
if (src.isEmpty()) {
return src;
}
NavigableSet<PositionImpl> res = new TreeSet<>();
// Group positions.
final Map<Consumer, List<PositionImpl>> groupedPositions = localGroupedPositions.get();
groupedPositions.clear();
for (PositionImpl pos : src) {
Long stickyKeyHash = redeliveryMessages.getHash(pos.getLedgerId(), pos.getEntryId());
if (stickyKeyHash == null) {
res.add(pos);
continue;
}
Consumer c = selector.select(stickyKeyHash.intValue());
if (c == null) {
// Maybe using HashRangeExclusiveStickyKeyConsumerSelector.
continue;
}
groupedPositions.computeIfAbsent(c, k -> new ArrayList<>()).add(pos);
}
// Filter positions by the Recently Joined Position rule.
for (Map.Entry<Consumer, List<PositionImpl>> item : groupedPositions.entrySet()) {
int availablePermits = getAvailablePermits(item.getKey());
if (availablePermits == 0) {
continue;
}
int posCountToRead = getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(), availablePermits,
ReadType.Replay, null);
if (posCountToRead > 0) {
res.addAll(item.getValue().subList(0, posCountToRead));
}
}
return res;
}

/**
* In Key_Shared mode, the consumer will not receive any entries from a normal reading if it is included in
* {@link #recentlyJoinedConsumers}, they can only receive entries from replay reads.
* If all entries in {@link #redeliveryMessages} have been filtered out due to the order guarantee mechanism,
* Broker need a normal read to make the consumers not included in @link #recentlyJoinedConsumers} will not be
* stuck. See https://github.com/apache/pulsar/pull/7105.
*/
@Override
protected boolean hasConsumersNeededNormalRead() {
for (Consumer consumer : consumerList) {
if (consumer == null || consumer.isBlocked()) {
continue;
}
if (recentlyJoinedConsumers.containsKey(consumer)) {
continue;
}
if (consumer.getAvailablePermits() > 0) {
return true;
}
}
return false;
}

@Override
public SubType getType() {
return SubType.Key_Shared;
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -48,19 +49,25 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -1630,4 +1637,263 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
log.info("Got {} other messages...", sum);
Assert.assertEquals(sum, delayedMessages + messages);
}

private AtomicInteger injectReplayReadCounter(String topicName, String cursorName) throws Exception {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.openCursor(cursorName);
managedLedger.getCursors().removeCursor(cursor.getName());
managedLedger.getActiveCursors().removeCursor(cursor.getName());
ManagedCursorImpl spyCursor = Mockito.spy(cursor);
managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST);
AtomicInteger replyReadCounter = new AtomicInteger();
Mockito.doAnswer(invocation -> {
if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
replyReadCounter.incrementAndGet();
}
return invocation.callRealMethod();
}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), Mockito.any());
Mockito.doAnswer(invocation -> {
if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
replyReadCounter.incrementAndGet();
}
return invocation.callRealMethod();
}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
admin.topics().createSubscription(topicName, cursorName, MessageId.earliest);
return replyReadCounter;
}

@Test
public void testNoRepeatedReadAndDiscard() throws Exception {
int delayedMessages = 100;
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subName = "my-sub";
admin.topics().createNonPartitionedTopic(topic);
AtomicInteger replyReadCounter = injectReplayReadCounter(topic, subName);

// Send messages.
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
for (int i = 0; i < delayedMessages; i++) {
MessageId messageId = producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(100 + i)
.send();
log.info("Published message :{}", messageId);
}
producer.close();

// Make ack holes.
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
List<Message> msgList1 = new ArrayList<>();
List<Message> msgList2 = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message msg1 = consumer1.receive(1, TimeUnit.SECONDS);
if (msg1 != null) {
msgList1.add(msg1);
}
Message msg2 = consumer2.receive(1, TimeUnit.SECONDS);
if (msg2 != null) {
msgList2.add(msg2);
}
}
Consumer<Integer> redeliverConsumer = null;
if (!msgList1.isEmpty()) {
msgList1.forEach(msg -> consumer1.acknowledgeAsync(msg));
redeliverConsumer = consumer2;
} else {
msgList2.forEach(msg -> consumer2.acknowledgeAsync(msg));
redeliverConsumer = consumer1;
}

// consumer3 will be added to the "recentJoinedConsumers".
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(1000)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
redeliverConsumer.close();

// Verify: no repeated Read-and-discard.
Thread.sleep(5 * 1000);
int maxReplayCount = delayedMessages * 2;
log.info("Reply read count: {}", replyReadCounter.get());
assertTrue(replyReadCounter.get() < maxReplayCount);

// cleanup.
consumer1.close();
consumer2.close();
consumer3.close();
admin.topics().delete(topic, false);
}

/**
* This test is in order to guarantee the feature added by https://github.com/apache/pulsar/pull/7105.
* 1. Start 3 consumers:
* - consumer1 will be closed and trigger a messages redeliver.
* - consumer2 will not ack any messages to make the new consumer joined late will be stuck due
* to the mechanism "recentlyJoinedConsumers".
* - consumer3 will always receive and ack messages.
* 2. Add consumer4 after consumer1 was close, and consumer4 will be stuck due to the mechanism
* "recentlyJoinedConsumers".
* 3. Verify:
* - (Main purpose) consumer3 can still receive messages util the cursor.readerPosition is larger than LAC.
* - no repeated Read-and-discard.
* - at last, all messages will be received.
*/
@Test(timeOut = 180 * 1000) // the test will be finished in 60s.
public void testRecentJoinedPosWillNotStuckOtherConsumer() throws Exception {
final int messagesSentPerTime = 100;
final Set<Integer> totalReceivedMessages = new TreeSet<>();
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
final String subName = "my-sub";
admin.topics().createNonPartitionedTopic(topic);
AtomicInteger replyReadCounter = injectReplayReadCounter(topic, subName);

// Send messages.
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
for (int i = 0; i < messagesSentPerTime; i++) {
MessageId messageId = producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(100 + i)
.send();
log.info("Published message :{}", messageId);
}

// 1. Start 3 consumers and make ack holes.
// - one consumer will be closed and trigger a messages redeliver.
// - one consumer will not ack any messages to make the new consumer joined late will be stuck due to the
// mechanism "recentlyJoinedConsumers".
// - one consumer will always receive and ack messages.
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
List<Message> msgList1 = new ArrayList<>();
List<Message> msgList2 = new ArrayList<>();
List<Message> msgList3 = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message<Integer> msg1 = consumer1.receive(1, TimeUnit.SECONDS);
if (msg1 != null) {
totalReceivedMessages.add(msg1.getValue());
msgList1.add(msg1);
}
Message<Integer> msg2 = consumer2.receive(1, TimeUnit.SECONDS);
if (msg2 != null) {
totalReceivedMessages.add(msg2.getValue());
msgList2.add(msg2);
}
Message<Integer> msg3 = consumer3.receive(1, TimeUnit.SECONDS);
if (msg2 != null) {
totalReceivedMessages.add(msg3.getValue());
msgList3.add(msg3);
}
}
Consumer<Integer> consumerWillBeClose = null;
Consumer<Integer> consumerAlwaysAck = null;
Consumer<Integer> consumerStuck = null;
if (!msgList1.isEmpty()) {
msgList1.forEach(msg -> consumer1.acknowledgeAsync(msg));
consumerAlwaysAck = consumer1;
consumerWillBeClose = consumer2;
consumerStuck = consumer3;
} else if (!msgList2.isEmpty()){
msgList2.forEach(msg -> consumer2.acknowledgeAsync(msg));
consumerAlwaysAck = consumer2;
consumerWillBeClose = consumer3;
consumerStuck = consumer1;
} else {
msgList3.forEach(msg -> consumer3.acknowledgeAsync(msg));
consumerAlwaysAck = consumer3;
consumerWillBeClose = consumer1;
consumerStuck = consumer2;
}

// 2. Add consumer4 after "consumerWillBeClose" was close, and consumer4 will be stuck due to the mechanism
// "recentlyJoinedConsumers".
Consumer<Integer> consumer4 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(1000)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
consumerWillBeClose.close();

Thread.sleep(2000);

for (int i = messagesSentPerTime; i < messagesSentPerTime * 2; i++) {
MessageId messageId = producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(100 + i)
.send();
log.info("Published message :{}", messageId);
}

// Send messages again.
// Verify: "consumerAlwaysAck" can receive messages util the cursor.readerPosition is larger than LAC.
while (true) {
Message<Integer> msg = consumerAlwaysAck.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
totalReceivedMessages.add(msg.getValue());
consumerAlwaysAck.acknowledge(msg);
}
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.openCursor(subName);
log.info("cursor_readPosition {}, LAC {}", cursor.getReadPosition(), managedLedger.getLastConfirmedEntry());
assertTrue(((PositionImpl) cursor.getReadPosition())
.compareTo((PositionImpl) managedLedger.getLastConfirmedEntry()) > 0);

// Make all consumers to start to read and acknowledge messages.
// Verify: no repeated Read-and-discard.
Thread.sleep(5 * 1000);
int maxReplayCount = messagesSentPerTime * 2;
log.info("Reply read count: {}", replyReadCounter.get());
assertTrue(replyReadCounter.get() < maxReplayCount);
// Verify: at last, all messages will be received.
ReceivedMessages<Integer> receivedMessages = ackAllMessages(consumerAlwaysAck, consumerStuck, consumer4);
totalReceivedMessages.addAll(receivedMessages.messagesReceived.stream().map(p -> p.getRight()).collect(
Collectors.toList()));
assertEquals(totalReceivedMessages.size(), messagesSentPerTime * 2);

// cleanup.
consumer1.close();
consumer2.close();
consumer3.close();
consumer4.close();
producer.close();
admin.topics().delete(topic, false);
}
}
Original file line number Diff line number Diff line change
@@ -21,9 +21,14 @@
import com.google.common.collect.Sets;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;

import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -69,4 +74,65 @@ protected String newTopicName() {
return "my-property/my-ns/topic-" + Long.toHexString(random.nextLong());
}

protected <T> ReceivedMessages<T> receiveAndAckMessages(
BiFunction<MessageId, T, Boolean> ackPredicate,
Consumer<T>...consumers) throws Exception {
ReceivedMessages receivedMessages = new ReceivedMessages();
while (true) {
int receivedMsgCount = 0;
for (int i = 0; i < consumers.length; i++) {
Consumer<T> consumer = consumers[i];
while (true) {
Message<T> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg != null) {
receivedMsgCount++;
T v = msg.getValue();
MessageId messageId = msg.getMessageId();
receivedMessages.messagesReceived.add(Pair.of(msg.getMessageId(), v));
if (ackPredicate.apply(messageId, v)) {
consumer.acknowledge(msg);
receivedMessages.messagesAcked.add(Pair.of(msg.getMessageId(), v));
}
} else {
break;
}
}
}
// Because of the possibility of consumers getting stuck with each other, only jump out of the loop if all
// consumers could not receive messages.
if (receivedMsgCount == 0) {
break;
}
}
return receivedMessages;
}

protected <T> ReceivedMessages<T> ackAllMessages(Consumer<T>...consumers) throws Exception {
return receiveAndAckMessages((msgId, msgV) -> true, consumers);
}

protected static class ReceivedMessages<T> {

List<Pair<MessageId, T>> messagesReceived = new ArrayList<>();

List<Pair<MessageId, T>> messagesAcked = new ArrayList<>();

public boolean hasReceivedMessage(T v) {
for (Pair<MessageId, T> pair : messagesReceived) {
if (pair.getRight().equals(v)) {
return true;
}
}
return false;
}

public boolean hasAckedMessage(T v) {
for (Pair<MessageId, T> pair : messagesAcked) {
if (pair.getRight().equals(v)) {
return true;
}
}
return false;
}
}
}

0 comments on commit 3eb5c2d

Please sign in to comment.