From 43dc20364fb2b4fc15c3b0ed13a697d3668ea536 Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Tue, 25 Apr 2023 13:42:37 +0900 Subject: [PATCH 1/2] fix: change the definition of recentlyJoinedConsumers position --- .../mledger/impl/PositionImplRecyclable.java | 8 + .../pulsar/broker/service/Consumer.java | 10 +- ...PersistentDispatcherMultipleConsumers.java | 6 +- ...tStickyKeyDispatcherMultipleConsumers.java | 340 +++++++++++------- .../persistent/PersistentSubscription.java | 12 +- .../common/policies/data/ConsumerStats.java | 4 +- .../policies/data/SubscriptionStats.java | 4 +- .../data/stats/ConsumerStatsImpl.java | 10 +- .../data/stats/SubscriptionStatsImpl.java | 10 +- 9 files changed, 261 insertions(+), 143 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java index eb2b33e858d63..7bcb6522b88ca 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java @@ -38,6 +38,14 @@ private PositionImplRecyclable(Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } + public void setLedgerId(final long ledgerId) { + this.ledgerId = ledgerId; + } + + public void setEntryId(final long entryId) { + this.entryId = entryId; + } + public static PositionImplRecyclable create() { return RECYCLER.get(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index a3f9da41e6b35..c1dc50e2c3250 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -134,7 +134,7 @@ public class Consumer { private static final double avgPercent = 0.9; private boolean preciseDispatcherFlowControl; - private PositionImpl readPositionWhenJoining; + private String lastSentPositionsWhenJoiningString; private final String clientAddress; // IP address only, no port number included private final MessageId startMessageId; private final boolean isAcknowledgmentAtBatchIndexLevelEnabled; @@ -867,8 +867,8 @@ public ConsumerStatsImpl getStats() { stats.unackedMessages = unackedMessages; stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs; stats.avgMessagesPerEntry = getAvgMessagesPerEntry(); - if (readPositionWhenJoining != null) { - stats.readPositionWhenJoining = readPositionWhenJoining.toString(); + if (lastSentPositionsWhenJoiningString != null) { + stats.lastSentPositionsWhenJoining = lastSentPositionsWhenJoiningString; } return stats; } @@ -1088,8 +1088,8 @@ public boolean isPreciseDispatcherFlowControl() { return preciseDispatcherFlowControl; } - public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) { - this.readPositionWhenJoining = readPositionWhenJoining; + public void setLastSentPositionsWhenJoiningString(String lastSentPositionsWhenJoiningString) { + this.lastSentPositionsWhenJoiningString = lastSentPositionsWhenJoiningString; } public int getMaxUnackedMessages() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 81adda053e8cd..5c4fb6d0bebc0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1188,7 +1188,11 @@ public ManagedCursor getCursor() { } protected int getStickyKeyHash(Entry entry) { - return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); + return getStickyKeyHash(peekStickyKey(entry.getDataBuffer())); + } + + protected int getStickyKeyHash(byte[] stickyKey) { + return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); } private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 1a8c6e180a2a2..138655cd850f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.FastThreadLocal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -32,11 +33,12 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.PositionImplRecyclable; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector; @@ -65,14 +67,14 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private final KeySharedMode keySharedMode; /** - * When a consumer joins, it will be added to this map with the current read position. - * This means that, in order to preserve ordering, new consumers can only receive old - * messages, until the mark-delete position will move past this point. + * When a consumer joins, it will be added to this map with the current last sent position per the message key. + * This means that, in order to preserve ordering per the message key, new consumers can only receive old + * messages, until the mark-delete position will move past this point in the key. New consumers can receive + * any new messages with the message key that is not in the last sent position. */ - private final LinkedHashMap recentlyJoinedConsumers; - - private final Set stuckConsumers; - private final Set nextStuckConsumers; + private final LinkedHashMap recentlyJoinedConsumers; + // The lastSentPosition is not thread-safe + private final LastSentPositions lastSentPositions; PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { @@ -80,8 +82,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery(); this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); - this.stuckConsumers = new HashSet<>(); - this.nextStuckConsumers = new HashSet<>(); + lastSentPositions = allowOutOfOrderDelivery ? null : new LastSentPositions(); this.keySharedMode = ksm.getKeySharedMode(); switch (this.keySharedMode) { case AUTO_SPLIT: @@ -102,6 +103,128 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi } } + protected static class LastSentPositions implements Cloneable, AutoCloseable { + /** + * Should hold not sticky key hashes but sticky keys as map key, + * because the dispatcher guarantees message ordering by not sticky key hashes but sticky keys. + */ + private Map positionMap; + + public LastSentPositions() { + positionMap = new HashMap<>(); + } + + public synchronized boolean removePreviousPositionsFromHash(final long ledgerId, final long entryId) { + if (positionMap == null) { + return false; + } + final Iterator> itr = positionMap.entrySet().iterator(); + boolean hasPositionRemovedFromLastSentPosition = false; + while (itr.hasNext()) { + final Map.Entry e = itr.next(); + if (e.getValue().compareTo(ledgerId, entryId) <= 0) { + hasPositionRemovedFromLastSentPosition = true; + e.getValue().recycle(); + itr.remove(); + } + } + return hasPositionRemovedFromLastSentPosition; + } + + public synchronized void updateOrPutPosition(final ByteBuffer stickyKey, + final long ledgerId, final long entryId) { + if (positionMap == null) { + return; + } + positionMap.compute(stickyKey, (k, currentPosition) -> { + if (currentPosition == null) { + final PositionImplRecyclable newPosition = PositionImplRecyclable.create(); + newPosition.setLedgerId(ledgerId); + newPosition.setEntryId(entryId); + return newPosition; + } + if (currentPosition.compareTo(ledgerId, entryId) < 0) { + currentPosition.setLedgerId(ledgerId); + currentPosition.setEntryId(entryId); + } + return currentPosition; + }); + } + + public synchronized int compareToLastSentPosition(final ByteBuffer stickyKey, + final long ledgerId, final long entryId) { + final PositionImplRecyclable lastSentPosition = positionMap == null ? null : positionMap.get(stickyKey); + if (lastSentPosition == null) { + // The dispatcher can dispatch any messages that is not contained in the last sent position. + return 1; + } + return lastSentPosition.compareTo(ledgerId, entryId); + } + + public synchronized boolean isEmpty() { + return positionMap == null || positionMap.isEmpty(); + } + + @Override + public synchronized LastSentPositions clone() { + final LastSentPositions clone = new LastSentPositions(); + if (positionMap != null) { + positionMap.forEach((k, v) -> { + final PositionImplRecyclable position = PositionImplRecyclable.create(); + position.setLedgerId(v.getLedgerId()); + position.setEntryId(v.getEntryId()); + clone.positionMap.put(k, position); + }); + } + return clone; + } + + @Override + public synchronized String toString() { + final StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("{"); + if (positionMap != null) { + stringBuilder.append(positionMap.entrySet().stream() + .map(e -> new String(e.getKey().array()) + "=" + e.getValue()) + .collect(Collectors.joining(", "))); + } + stringBuilder.append("}"); + return stringBuilder.toString(); + } + + public synchronized String toPositionSetString() { + return positionMap == null ? null : positionMap.values().toString(); + } + + @Override + public synchronized void close() { + if (positionMap != null && !positionMap.isEmpty()) { + positionMap.values().forEach(PositionImplRecyclable::recycle); + positionMap.clear(); + } + positionMap = null; + } + } + + @Override + public CompletableFuture close() { + final CompletableFuture future = super.close(); + if (!allowOutOfOrderDelivery) { + future.thenRun(() -> { + // Make sure all PositionImplRecyclable instances are recycled. + synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { + if (lastSentPositions != null) { + lastSentPositions.close(); + } + if (recentlyJoinedConsumers != null) { + recentlyJoinedConsumers.values().forEach(LastSentPositions::close); + } + } + }); + } + return future; + } + @VisibleForTesting public StickyKeyConsumerSelector getSelector() { return selector; @@ -126,16 +249,22 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { return result; }) ).thenRun(() -> { - synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { - PositionImpl readPositionWhenJoining = (PositionImpl) cursor.getReadPosition(); - consumer.setReadPositionWhenJoining(readPositionWhenJoining); - // If this was the 1st consumer, or if all the messages are already acked, then we - // don't need to do anything special - if (!allowOutOfOrderDelivery - && recentlyJoinedConsumers != null - && consumerList.size() > 1 - && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { - recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); + if (lastSentPositions != null) { + synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { + final PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition(); + if (mdp != null) { + lastSentPositions.removePreviousPositionsFromHash(mdp.getLedgerId(), mdp.getEntryId()); + } + final LastSentPositions lastSentPositionsWhenJoining = lastSentPositions.clone(); + consumer.setLastSentPositionsWhenJoiningString(lastSentPositionsWhenJoining.toPositionSetString()); + + // If this was the 1st consumer, or if all the messages are already acked, then we + // don't need to do anything special + if (recentlyJoinedConsumers != null + && consumerList.size() > 1 + && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { + recentlyJoinedConsumers.put(consumer, lastSentPositionsWhenJoining); + } } } }); @@ -152,8 +281,12 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE selector.removeConsumer(consumer); super.removeConsumer(consumer); if (recentlyJoinedConsumers != null) { - recentlyJoinedConsumers.remove(consumer); + final LastSentPositions lastSentPositionsForC = recentlyJoinedConsumers.remove(consumer); + if (lastSentPositionsForC != null) { + lastSentPositionsForC.close(); + } if (consumerList.size() == 1) { + recentlyJoinedConsumers.values().forEach(LastSentPositions::close); recentlyJoinedConsumers.clear(); } if (removeConsumersFromRecentJoinedConsumers() || !redeliveryMessages.isEmpty()) { @@ -163,12 +296,19 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE } private static final FastThreadLocal>> localGroupedEntries = - new FastThreadLocal>>() { + new FastThreadLocal<>() { @Override protected Map> initialValue() throws Exception { return new HashMap<>(); } }; + private static final FastThreadLocal>> localConsumerStickyKeyHashesMap = + new FastThreadLocal<>() { + @Override + protected Map> initialValue() throws Exception { + return new HashMap<>(); + } + }; @Override protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { @@ -226,16 +366,25 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } - nextStuckConsumers.clear(); + if (recentlyJoinedConsumers != null) { + removeConsumersFromRecentJoinedConsumers(); + } final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); - final Map> consumerStickyKeyHashesMap = new HashMap<>(); + final Map> consumerStickyKeyHashesMap = localConsumerStickyKeyHashesMap.get(); + consumerStickyKeyHashesMap.clear(); for (Entry entry : entries) { - int stickyKeyHash = getStickyKeyHash(entry); + final ByteBuffer stickyKey = ByteBuffer.wrap(peekStickyKey(entry.getDataBuffer())); + int stickyKeyHash = getStickyKeyHash(stickyKey.array()); Consumer c = selector.select(stickyKeyHash); - if (c != null) { + + final LastSentPositions lastSentPositions = recentlyJoinedConsumers == null + ? null : recentlyJoinedConsumers.get(c); + if (c != null && (lastSentPositions == null + || lastSentPositions + .compareToLastSentPosition(stickyKey, entry.getLedgerId(), entry.getEntryId()) >= 0)) { groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); } else { @@ -253,8 +402,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis for (Map.Entry> current : groupedEntries.entrySet()) { Consumer consumer = current.getKey(); assert consumer != null; // checked when added to groupedEntries - List entriesWithSameKey = current.getValue(); - int entriesWithSameKeyCount = entriesWithSameKey.size(); + List entriesWithinSameRange = current.getValue(); + int entriesWithinSameRangeCount = entriesWithinSameRange.size(); int availablePermits = Math.max(consumer.getAvailablePermits(), 0); if (consumer.getMaxUnackedMessages() > 0) { int remainUnAckedMessages = @@ -262,23 +411,24 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis Math.max(consumer.getMaxUnackedMessages() - consumer.getUnackedMessages(), 0); availablePermits = Math.min(availablePermits, remainUnAckedMessages); } - int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits); - int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, - readType, consumerStickyKeyHashesMap.get(consumer)); + int maxMessagesForC = Math.min(entriesWithinSameRangeCount, availablePermits); + int messagesForC = getRestrictedMaxEntriesForConsumer(maxMessagesForC, readType, + consumerStickyKeyHashesMap.get(consumer)); + if (log.isDebugEnabled()) { log.debug("[{}] select consumer {} with messages num {}, read type is {}", name, consumer.consumerName(), messagesForC, readType); } - if (messagesForC < entriesWithSameKeyCount) { + if (messagesForC < entriesWithinSameRangeCount) { // We are not able to push all the messages with given key to its consumer, // so we discard for now and mark them for later redelivery - for (int i = messagesForC; i < entriesWithSameKeyCount; i++) { - Entry entry = entriesWithSameKey.get(i); + for (int i = messagesForC; i < entriesWithinSameRangeCount; i++) { + Entry entry = entriesWithinSameRange.get(i); long stickyKeyHash = getStickyKeyHash(entry); addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); - entriesWithSameKey.set(i, null); + entriesWithinSameRange.set(i, null); } } @@ -286,7 +436,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis // remove positions first from replay list first : sendMessages recycles entries if (readType == ReadType.Replay) { for (int i = 0; i < messagesForC; i++) { - Entry entry = entriesWithSameKey.get(i); + Entry entry = entriesWithinSameRange.get(i); redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); } } @@ -294,10 +444,20 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC); EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC); - totalEntries += filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, + totalEntries += filterEntriesForConsumer(entriesWithinSameRange, batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay, consumer); - consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, + if (lastSentPositions != null) { + entriesWithinSameRange.forEach(e -> { + if (e == null) { + return; + } + final ByteBuffer stickyKey = ByteBuffer.wrap(peekStickyKey(e.getDataBuffer())); + lastSentPositions.updateOrPutPosition(stickyKey, e.getLedgerId(), e.getEntryId()); + }); + } + + consumer.sendMessages(entriesWithinSameRange, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker()).addListener(future -> { @@ -305,7 +465,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis readMoreEntries(); } }); - TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount())); totalMessagesSent += sendMessageInfo.getTotalMessages(); @@ -318,94 +477,27 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis // acquire message-dispatch permits for already delivered messages acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); - stuckConsumers.clear(); - - if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) { + if (totalMessagesSent == 0) { // This means, that all the messages we've just read cannot be dispatched right now. // This condition can only happen when: // 1. We have consumers ready to accept messages (otherwise the would not haven been triggered) // 2. All keys in the current set of messages are routing to consumers that are currently busy - // and stuck is not caused by stuckConsumers // // The solution here is to move on and read next batch of messages which might hopefully contain // also keys meant for other consumers. - // - // We do it unless that are "recently joined consumers". In that case, we would be looking - // ahead in the stream while the new consumers are not ready to accept the new messages, - // therefore would be most likely only increase the distance between read-position and mark-delete - // position. - if (!nextStuckConsumers.isEmpty()) { - isDispatcherStuckOnReplays = true; - stuckConsumers.addAll(nextStuckConsumers); - } - return true; - } else if (currentThreadKeyNumber == 0) { + isDispatcherStuckOnReplays = true; return true; } - return false; + return currentThreadKeyNumber == 0; } - private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages, - ReadType readType, Set stickyKeyHashes) { - if (maxMessages == 0) { - // the consumer was stuck - nextStuckConsumers.add(consumer); - return 0; - } + private int getRestrictedMaxEntriesForConsumer(int maxMessages, ReadType readType, Set stickyKeyHashes) { if (readType == ReadType.Normal && stickyKeyHashes != null && redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) { // If redeliveryMessages contains messages that correspond to the same hash as the messages // that the dispatcher is trying to send, do not send those messages for order guarantee return 0; } - if (recentlyJoinedConsumers == null) { - return maxMessages; - } - removeConsumersFromRecentJoinedConsumers(); - PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer); - // At this point, all the old messages were already consumed and this consumer - // is now ready to receive any message - if (maxReadPosition == null) { - // stop to dispatch by stuckConsumers - if (stuckConsumers.contains(consumer)) { - if (log.isDebugEnabled()) { - log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer); - } - return 0; - } - // The consumer has not recently joined, so we can send all messages - return maxMessages; - } - - // If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers, - // For example, we have 10 messages [0,1,2,3,4,5,6,7,8,9] - // If the consumer0 get message 0 and 1, and does not acked message 0, then consumer1 joined, - // when consumer1 get message 2,3, the broker will not dispatch messages to consumer1 - // because of the mark delete position did not move forward. - // So message 2,3 will stored in the redeliver tracker. - // Now, consumer2 joined, it will read new messages from the cursor, - // so the recentJoinedPosition is 4 for consumer2 - // Because of there are messages need to redeliver, so the broker will read the redelivery message first [2,3] - // message [2,3] is lower than the recentJoinedPosition 4, - // so the message [2,3] will dispatched to the consumer2 - // But the message [2,3] should not dispatch to consumer2. - - if (readType == ReadType.Replay) { - PositionImpl minReadPositionForRecentJoinedConsumer = recentlyJoinedConsumers.values().iterator().next(); - if (minReadPositionForRecentJoinedConsumer != null - && minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) { - maxReadPosition = minReadPositionForRecentJoinedConsumer; - } - } - // Here, the consumer is one that has recently joined, so we can only send messages that were - // published before it has joined. - for (int i = 0; i < maxMessages; i++) { - if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) { - // We have already crossed the divider line. All messages in the list are now - // newer than what we can currently dispatch to this consumer - return i; - } - } return maxMessages; } @@ -416,6 +508,12 @@ public void markDeletePositionMoveForward() { // from the delete operation that was completed topic.getBrokerService().getTopicOrderedExecutor().execute(() -> { synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { + if (lastSentPositions != null) { + final PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition(); + if (mdp != null) { + lastSentPositions.removePreviousPositionsFromHash(mdp.getLedgerId(), mdp.getEntryId()); + } + } if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty() && removeConsumersFromRecentJoinedConsumers()) { // After we process acks, we need to check whether the mark-delete position was advanced and we @@ -427,23 +525,23 @@ && removeConsumersFromRecentJoinedConsumers()) { } private boolean removeConsumersFromRecentJoinedConsumers() { - Iterator> itr = recentlyJoinedConsumers.entrySet().iterator(); - boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false; - PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition(); + Iterator> itr = recentlyJoinedConsumers.entrySet().iterator(); + boolean hasPositionRemovedFromLastSentPositionForTheConsumer = false; + final PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition(); if (mdp != null) { - PositionImpl nextPositionOfTheMarkDeletePosition = - ((ManagedLedgerImpl) cursor.getManagedLedger()).getNextValidPosition(mdp); while (itr.hasNext()) { - Map.Entry entry = itr.next(); - if (entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) { - itr.remove(); - hasConsumerRemovedFromTheRecentJoinedConsumers = true; + LastSentPositions lastSentPositionsForC = itr.next().getValue(); + if (lastSentPositionsForC.removePreviousPositionsFromHash(mdp.getLedgerId(), mdp.getEntryId())) { + hasPositionRemovedFromLastSentPositionForTheConsumer = true; + if (lastSentPositionsForC.isEmpty()) { + itr.remove(); + } } else { break; } } } - return hasConsumerRemovedFromTheRecentJoinedConsumers; + return hasPositionRemovedFromLastSentPositionForTheConsumer; } @Override @@ -482,7 +580,7 @@ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) { && ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery); } - public LinkedHashMap getRecentlyJoinedConsumers() { + public LinkedHashMap getRecentlyJoinedConsumers() { return recentlyJoinedConsumers; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index d283cac77c7b6..4f2401157f0a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1186,11 +1186,19 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri subStats.allowOutOfOrderDelivery = keySharedDispatcher.isAllowOutOfOrderDelivery(); subStats.keySharedMode = keySharedDispatcher.getKeySharedMode().toString(); - LinkedHashMap recentlyJoinedConsumers = keySharedDispatcher + LinkedHashMap + recentlyJoinedConsumers = keySharedDispatcher .getRecentlyJoinedConsumers(); if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) { recentlyJoinedConsumers.forEach((k, v) -> { - subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString()); + // Dispatchers allows same name consumers + final StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("consumerName=").append(k.consumerName()) + .append(", consumerId=").append(k.consumerId()); + if (k.cnx() != null) { + stringBuilder.append(", address=").append(k.cnx().clientAddress()); + } + subStats.recentlyJoinedConsumers.put(stringBuilder.toString(), v.toPositionSetString()); }); } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index 8c9a615d6d01c..dc9675b3aafa8 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -69,8 +69,8 @@ public interface ConsumerStats { /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */ boolean isBlockedConsumerOnUnackedMsgs(); - /** The read position of the cursor when the consumer joining. */ - String getReadPositionWhenJoining(); + /** Last sent positions per sticky key of the cursor when the consumer joining. */ + String getLastSentPositionsWhenJoining(); /** Address of this consumer. */ String getAddress(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 9ff94a2952ea3..d89f590fa82b9 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -115,8 +115,8 @@ public interface SubscriptionStats { /** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */ String getKeySharedMode(); - /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ - Map getConsumersAfterMarkDeletePosition(); + /** This is for Key_Shared subscription to get the recentlyJoinedConsumers in the Key_Shared subscription. */ + Map getRecentlyJoinedConsumers(); /** SubscriptionProperties (key/value strings) associated with this subscribe. */ Map getSubscriptionProperties(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java index ddae2e7135695..5b80a7a582a27 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java @@ -75,8 +75,8 @@ public class ConsumerStatsImpl implements ConsumerStats { /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */ public boolean blockedConsumerOnUnackedMsgs; - /** The read position of the cursor when the consumer joining. */ - public String readPositionWhenJoining; + /** Last sent positions per sticky key of the cursor when the consumer joining. */ + public String lastSentPositionsWhenJoining; /** Address of this consumer. */ @JsonIgnore @@ -129,7 +129,7 @@ public ConsumerStatsImpl add(ConsumerStatsImpl stats) { this.availablePermits += stats.availablePermits; this.unackedMessages += stats.unackedMessages; this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs; - this.readPositionWhenJoining = stats.readPositionWhenJoining; + this.lastSentPositionsWhenJoining = stats.lastSentPositionsWhenJoining; return this; } @@ -177,8 +177,8 @@ public void setClientVersion(String clientVersion) { this.stringBuffer.append(clientVersion); } - public String getReadPositionWhenJoining() { - return readPositionWhenJoining; + public String getLastSentPositionsWhenJoining() { + return lastSentPositionsWhenJoining; } public String getLastAckedTime() { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index 7ae439829b319..90b610664e4c0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -122,8 +122,8 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */ public String keySharedMode; - /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ - public Map consumersAfterMarkDeletePosition; + /** This is for Key_Shared subscription to get the recentlyJoinedConsumers in the Key_Shared subscription. */ + public Map recentlyJoinedConsumers; /** The number of non-contiguous deleted messages ranges. */ public int nonContiguousDeletedMessagesRanges; @@ -149,7 +149,7 @@ public class SubscriptionStatsImpl implements SubscriptionStats { public SubscriptionStatsImpl() { this.consumers = new ArrayList<>(); - this.consumersAfterMarkDeletePosition = new LinkedHashMap<>(); + this.recentlyJoinedConsumers = new LinkedHashMap<>(); this.subscriptionProperties = new HashMap<>(); this.bucketDelayedIndexStats = new HashMap<>(); } @@ -171,7 +171,7 @@ public void reset() { lastExpireTimestamp = 0L; lastMarkDeleteAdvancedTimestamp = 0L; consumers.clear(); - consumersAfterMarkDeletePosition.clear(); + recentlyJoinedConsumers.clear(); nonContiguousDeletedMessagesRanges = 0; nonContiguousDeletedMessagesRangesSerializedSize = 0; delayedMessageIndexSizeInBytes = 0; @@ -214,7 +214,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) { } } this.allowOutOfOrderDelivery |= stats.allowOutOfOrderDelivery; - this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition); + this.recentlyJoinedConsumers.putAll(stats.recentlyJoinedConsumers); this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize; this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes; From 27880e42c03eabd02d3d5f93f97988f6d03a5d90 Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Tue, 25 Apr 2023 13:42:48 +0900 Subject: [PATCH 2/2] test: fix to address new definition of recently joined consumers --- .../pulsar/broker/admin/AdminApiTest.java | 39 ++-- .../broker/loadbalance/LoadBalancerTest.java | 4 +- .../broker/stats/SubscriptionStatsTest.java | 2 +- .../client/api/KeySharedSubscriptionTest.java | 213 ++++++++++++++++-- 4 files changed, 222 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index e33108203cc81..b02b8727ac860 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -3286,14 +3286,21 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception { } @Test - public void testGetReadPositionWhenJoining() throws Exception { - final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString(); + public void testGetLastSentPositionsWhenJoining() throws Exception { + final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionsWhenJoining-" + UUID.randomUUID().toString(); final String subName = "my-sub"; @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .enableBatching(false) .create(); + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .consumerName("c1") + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .subscribe(); final int messages = 10; MessageIdImpl messageId = null; @@ -3301,28 +3308,26 @@ public void testGetReadPositionWhenJoining() throws Exception { messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes()); } - List> consumers = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - Consumer consumer = pulsarClient.newConsumer() - .topic(topic) - .subscriptionType(SubscriptionType.Key_Shared) - .subscriptionName(subName) - .subscribe(); - consumers.add(consumer); + for (int i = 0; i < messages; i++) { + assertNotNull(consumer1.receive(100, TimeUnit.MILLISECONDS)); } + @Cleanup + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .consumerName("c2") + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .subscribe(); + TopicStats stats = admin.topics().getStats(topic); Assert.assertEquals(stats.getSubscriptions().size(), 1); SubscriptionStats subStats = stats.getSubscriptions().get(subName); Assert.assertNotNull(subStats); Assert.assertEquals(subStats.getConsumers().size(), 2); - ConsumerStats consumerStats = subStats.getConsumers().get(0); - Assert.assertEquals(consumerStats.getReadPositionWhenJoining(), - PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId() + 1).toString()); - - for (Consumer consumer : consumers) { - consumer.close(); - } + ConsumerStats consumer2Stats = subStats.getConsumers().stream().filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get(); + Assert.assertEquals(consumer2Stats.getLastSentPositionsWhenJoining(), + Set.of(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).toString()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 68902c73e5717..17ec58eb21e88 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -154,7 +154,9 @@ void shutdown() throws Exception { executor.shutdownNow(); for (int i = 0; i < BROKER_COUNT; i++) { - pulsarAdmins[i].close(); + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + } if (pulsarServices[i] != null) { pulsarServices[i].close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index d5e0066a86f15..5c65cc5aeebdd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -116,7 +116,7 @@ public void testConsumersAfterMarkDelete() throws PulsarClientException, PulsarA TopicStats stats = admin.topics().getStats(topicName); Assert.assertEquals(stats.getSubscriptions().size(), 1); Assert.assertEquals(stats.getSubscriptions().entrySet().iterator().next().getValue() - .getConsumersAfterMarkDeletePosition().size(), 1); + .getRecentlyJoinedConsumers().size(), 1); consumer1.close(); consumer2.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 7b617a6a192d8..dc16356d023ff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -20,11 +20,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -455,7 +457,7 @@ public void testDisableKeySharedSubscription() throws PulsarClientException { String topic = "persistent://public/default/key_shared_disabled"; try { @Cleanup - Consumer c = pulsarClient.newConsumer() + Consumer c = pulsarClient.newConsumer() .topic(topic) .subscriptionName("key_shared") .subscriptionType(SubscriptionType.Key_Shared) @@ -499,7 +501,7 @@ public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exc String slowKey = "slowKey"; List clients = new ArrayList<>(); - List consumers = new ArrayList<>(); + List> consumers = new ArrayList<>(); try { AtomicInteger receivedMessages = new AtomicInteger(); @@ -509,7 +511,7 @@ public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exc .build(); clients.add(client); - Consumer c = client.newConsumer(Schema.INT32) + Consumer c = client.newConsumer(Schema.INT32) .topic(topic) .subscriptionName("key_shared") .subscriptionType(SubscriptionType.Key_Shared) @@ -556,7 +558,7 @@ public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exc assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3); }); - for (Consumer c : consumers) { + for (Consumer c : consumers) { c.close(); } } finally { @@ -576,14 +578,19 @@ public void testOrderingWhenAddingConsumers() throws Exception { @Cleanup Consumer c1 = createConsumer(topic); + final List keys = new ArrayList<>(); for (int i = 0; i < 10; i++) { + final String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS)); + keys.add(key); producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(key) .value(i) .send(); } // All the already published messages will be pre-fetched by C1. + Awaitility.await().untilAsserted(() -> + assertEquals(((ConsumerImpl) c1).getTotalIncomingMessages(), 10)); // Adding a new consumer. @Cleanup @@ -591,7 +598,7 @@ public void testOrderingWhenAddingConsumers() throws Exception { for (int i = 10; i < 20; i++) { producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(keys.get(i % 10)) .value(i) .send(); } @@ -630,6 +637,8 @@ public void testReadAheadWhenAddingConsumers() throws Exception { } // All the already published messages will be pre-fetched by C1. + Awaitility.await().untilAsserted(() -> + assertEquals(((ConsumerImpl) c1).getTotalIncomingMessages(), 10)); // Adding a new consumer. @Cleanup @@ -676,9 +685,12 @@ public void testRemoveFirstConsumer() throws Exception { .consumerName("c1") .subscribe(); + final List keys = new ArrayList<>(); for (int i = 0; i < 10; i++) { + final String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS)); + keys.add(key); producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(key) .value(i) .send(); } @@ -699,7 +711,7 @@ public void testRemoveFirstConsumer() throws Exception { for (int i = 10; i < 20; i++) { producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(keys.get(i % 10)) .value(i) .send(); } @@ -838,6 +850,7 @@ public void testAttachKeyToMessageMetadata() throws PulsarClientException { public void testContinueDispatchMessagesWhenMessageTTL() throws Exception { int defaultTTLSec = 3; int totalMessages = 1000; + int numOfKeys = totalMessages / 10; this.conf.setTtlDurationDefaultInSeconds(defaultTTLSec); final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID(); final String subName = "my-sub"; @@ -846,7 +859,7 @@ public void testContinueDispatchMessagesWhenMessageTTL() throws Exception { Consumer consumer1 = pulsarClient.newConsumer(Schema.INT32) .topic(topic) .subscriptionName(subName) - .receiverQueueSize(10) + .receiverQueueSize(numOfKeys) .subscriptionType(SubscriptionType.Key_Shared) .subscribe(); @@ -855,19 +868,24 @@ public void testContinueDispatchMessagesWhenMessageTTL() throws Exception { .topic(topic) .create(); + final Map keys = new HashMap<>(); for (int i = 0; i < totalMessages; i++) { + // all type of keys are sent to consumer1 + final int keyIndex = i % numOfKeys; + keys.computeIfAbsent(keyIndex, k -> String.valueOf(random.nextInt(NUMBER_OF_KEYS))); producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(keys.get(keyIndex)) .value(i) .send(); } // don't ack the first message + Awaitility.await().untilAsserted(() -> + assertEquals(((ConsumerImpl) consumer1).getTotalIncomingMessages(), numOfKeys)); consumer1.receive(); consumer1.acknowledge(consumer1.receive()); - // The consumer1 and consumer2 should be stuck because of the mark delete position did not move forward. - + // The consumer2 and consumer3 should be stuck because of the mark delete position did not move forward. @Cleanup Consumer consumer2 = pulsarClient.newConsumer(Schema.INT32) .topic(topic) @@ -1190,6 +1208,167 @@ public void testCheckConsumersWithSameName() throws Exception { l.await(); } + @Test(timeOut = 30_000) + public void testCheckRecentlyJoinedConsumers() throws Exception { + conf.setSubscriptionKeySharedUseConsistentHashing(true); + conf.setSubscriptionKeySharedConsistentHashingReplicaPoints(100); + + final String topicName = "persistent://public/default/recently-joined-consumers-" + UUID.randomUUID(); + final String subName = "my-sub"; + final String consumerName = "name"; + + final ConsumerBuilder cb = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared); + + // Create 2 consumers + @Cleanup + final Consumer c1 = cb.consumerName("c1").subscribe(); + final Map>> c1Msgs = new HashMap<>(); + @Cleanup + final Consumer c2 = cb.consumerName("c2").subscribe(); + final Map>> c2Msgs = new HashMap<>(); + + @Cleanup + final Producer p = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create(); + for (int i = 0; i < 100; i++) { + p.newMessage() + .key(Integer.toString(i % 10)) + .value("msg-" + i) + .send(); + } + + final Set c1Keys1 = Set.of("1", "3", "4", "5", "9"); + final Set c2Keys1 = Set.of("0", "2", "6", "7", "8"); + for (int i = 0; i < 100; i++) { + final Message msg1 = c1.receive(10, TimeUnit.MILLISECONDS); + if (msg1 != null) { + c1Msgs.computeIfAbsent(msg1.getKey(), k -> new ArrayList<>()); + c1Msgs.get(msg1.getKey()).add(msg1); + c1.acknowledge(msg1); + } + } + assertEquals(c1Msgs.values().stream().mapToInt(List::size).sum(), c1Keys1.size() * 10); + assertEquals(c1Msgs.keySet(), c1Keys1); + assertNotEquals(c1Msgs.keySet(), c2Keys1); + c1Msgs.clear(); + + @Cleanup + final Consumer c3 = cb.consumerName("c3").subscribe(); + final Map>> c3Msgs = new HashMap<>(); + + for (int i = 100; i < 200; i++) { + p.newMessage() + .key(Integer.toString(i % 10)) + .value("msg-" + i) + .send(); + } + + final Set c1Keys2 = Set.of("3", "4", "5", "9"); + final Set c2Keys2 = Set.of("0", "8"); + final Set c2RemovedKeys = Sets.difference(c2Keys1, c2Keys2); + final Set c3Keys2 = Set.of("1", "2", "6", "7"); + for (int i = 0; i < 100; i++) { + final Message msg1 = c1.receive(10, TimeUnit.MILLISECONDS); + if (msg1 != null) { + c1Msgs.computeIfAbsent(msg1.getKey(), k -> new ArrayList<>()); + c1Msgs.get(msg1.getKey()).add(msg1); + c1.acknowledge(msg1); + } + + final Message msg3 = c3.receive(10, TimeUnit.MILLISECONDS); + if (msg3 != null) { + fail(); + } + } + assertEquals(c1Msgs.values().stream().mapToInt(List::size).sum(), c1Keys2.size() * 10); + assertEquals(c1Msgs.keySet(), c1Keys2); + assertNotEquals(c1Msgs.keySet(), c2Keys2); + assertNotEquals(c1Msgs.keySet(), c3Keys2); + c1Msgs.clear(); + + for (int i = 200; i < 300; i++) { + p.newMessage() + .key(Integer.toString((i % 10) + 10)) + .value("msg-" + i) + .send(); + } + + final Set c1Keys3 = Set.of("11", "16"); + final Set c2Keys3 = Set.of("13", "19"); + final Set c3Keys3 = Set.of("10", "12", "14", "15", "17", "18"); + for (int i = 0; i < 100; i++) { + final Message msg1 = c1.receive(10, TimeUnit.MILLISECONDS); + if (msg1 != null) { + c1Msgs.computeIfAbsent(msg1.getKey(), k -> new ArrayList<>()); + c1Msgs.get(msg1.getKey()).add(msg1); + c1.acknowledge(msg1); + } + + final Message msg3 = c3.receive(10, TimeUnit.MILLISECONDS); + if (msg3 != null) { + c3Msgs.computeIfAbsent(msg3.getKey(), k -> new ArrayList<>()); + c3Msgs.get(msg3.getKey()).add(msg3); + c3.acknowledge(msg3); + } + } + assertEquals(c1Msgs.values().stream().mapToInt(List::size).sum(), c1Keys3.size() * 10); + assertEquals(c1Msgs.keySet(), c1Keys3); + c1Msgs.clear(); + + assertEquals(c3Msgs.values().stream().mapToInt(List::size).sum(), c3Keys3.size() * 10); + assertEquals(c3Msgs.keySet(), c3Keys3); + c3Msgs.clear(); + + for (int i = 0; i < 300; i++) { + final Message msg2 = c2.receive(10, TimeUnit.MILLISECONDS); + if (msg2 != null) { + final String key2 = msg2.getKey(); + c2Msgs.computeIfAbsent(key2, k -> new ArrayList<>()); + c2Msgs.get(key2).add(msg2); + if (c2Keys2.contains(key2) || c2Keys3.contains(key2)) { + c2.acknowledge(msg2); + } + } + + final Message msg3 = c3.receive(10, TimeUnit.MILLISECONDS); + if (msg3 != null) { + fail(); + } + } + assertEquals(c2Msgs.values().stream().mapToInt(List::size).sum(), + (c2Keys1.size() + c2Keys2.size() + c2Keys3.size()) * 10); + assertEquals(c2Msgs.keySet(), Sets.union(c2Keys1, c2Keys3)); + + for (final String key : c2RemovedKeys) { + final List> msgs = c2Msgs.get(key); + final int numOfMsgs = msgs.size(); + for (int i = 0; i < numOfMsgs; i++) { + c2.acknowledge(msgs.get(i)); + if (i < numOfMsgs - 1) { + assertNull(c3.receive(10, TimeUnit.MILLISECONDS)); + } + } + } + c2Msgs.clear(); + + for (int i = 0; i < 300; i++) { + final Message msg3 = c3.receive(10, TimeUnit.MILLISECONDS); + if (msg3 != null) { + c3Msgs.computeIfAbsent(msg3.getKey(), k -> new ArrayList<>()); + c3Msgs.get(msg3.getKey()).add(msg3); + c3.acknowledge(msg3); + } + } + assertEquals(c3Msgs.values().stream().mapToInt(List::size).sum(), c3Keys2.size() * 10); + assertEquals(c3Msgs.keySet(), c3Keys2); + c3Msgs.clear(); + + assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 0); + } private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) { if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) { @@ -1323,7 +1502,7 @@ private void receiveAndCheckDistribution(List> consumers, int expect } private void receiveAndCheck(List, Integer>> checkList) throws PulsarClientException { - Map> consumerKeys = new HashMap<>(); + Map, Set> consumerKeys = new HashMap<>(); for (KeyValue, Integer> check : checkList) { if (check.getValue() % 2 != 0) { throw new IllegalArgumentException(); @@ -1371,7 +1550,7 @@ private void receiveAndCheck(List, Integer>> checkLis } lastMessageForKey.put(key, message); } - Message noMessages = null; + Message noMessages = null; try { noMessages = check.getKey().receive(100, TimeUnit.MILLISECONDS); } catch (PulsarClientException ignore) { @@ -1479,7 +1658,7 @@ public void testStickyKeyRangesRestartConsumers() throws Exception { }) .subscribe(); - Future producerFuture = pulsar.getExecutor().submit(() -> { + Future producerFuture = pulsar.getExecutor().submit(() -> { try { try (Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -1513,7 +1692,7 @@ public void testStickyKeyRangesRestartConsumers() throws Exception { // start consuming again... @Cleanup - Consumer consumer3 = pulsarClient.newConsumer(Schema.STRING) + Consumer consumer3 = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subscriptionName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) @@ -1531,7 +1710,7 @@ public void testStickyKeyRangesRestartConsumers() throws Exception { }) .subscribe(); @Cleanup - Consumer consumer4 = pulsarClient.newConsumer(Schema.STRING) + Consumer consumer4 = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subscriptionName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)