diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index 7b789e6da04bc..e141cf7d40a3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -111,4 +111,8 @@ default long getNumberOfDelayedMessages() { default void cursorIsReset() { //No-op } + + default void acknowledgementWasProcessed() { + // No-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 9e93ff8a302ce..5c757c1ba6054 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -39,7 +39,6 @@ import org.apache.pulsar.broker.service.SendMessageInfo; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.ReadType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +49,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private boolean isDispatcherStuckOnReplays = false; + /** + * When a consumer joins, it will be added to this map with the current read position. + * This means that, in order to preserve ordering, new consumers can only receive old + * messages, until the mark-delete position will move past this point. + */ + private final Map recentlyJoinedConsumers = new HashMap<>(); + PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, StickyKeyConsumerSelector selector) { super(topic, cursor, subscription); @@ -60,12 +66,20 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { super.addConsumer(consumer); selector.addConsumer(consumer); + + // If this was the 1st consumer, or if all the messages are already acked, then we + // don't need to do anything special + if (consumerList.size() > 1 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { + recentlyJoinedConsumers.put(consumer, (PositionImpl) cursor.getReadPosition()); + } } @Override public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { super.removeConsumer(consumer); selector.removeConsumer(consumer); + + recentlyJoinedConsumers.remove(consumer); } private static final FastThreadLocal>> localGroupedEntries = new FastThreadLocal>>() { @@ -109,7 +123,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { List entriesWithSameKey = current.getValue(); int entriesWithSameKeyCount = entriesWithSameKey.size(); - int messagesForC = Math.min(entriesWithSameKeyCount, consumer.getAvailablePermits()); + int maxMessagesForC = Math.min(entriesWithSameKeyCount, consumer.getAvailablePermits()); + int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC); if (log.isDebugEnabled()) { log.debug("[{}] select consumer {} with messages num {}, read type is {}", name, consumer.consumerName(), messagesForC, readType); @@ -165,7 +180,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { } } - if (totalMessagesSent == 0) { + if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) { // This means, that all the messages we've just read cannot be dispatched right now. // This condition can only happen when: // 1. We have consumers ready to accept messages (otherwise the would not haven been triggered) @@ -173,11 +188,58 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // // The solution here is to move on and read next batch of messages which might hopefully contain // also keys meant for other consumers. + // + // We do it unless that are "recently joined consumers". In that case, we would be looking + // ahead in the stream while the new consumers are not ready to accept the new messages, + // therefore would be most likely only increase the distance between read-position and mark-delete + // position. isDispatcherStuckOnReplays = true; readMoreEntries(); } } + private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages) { + if (maxMessages == 0) { + return 0; + } + + PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer); + if (maxReadPosition == null) { + // The consumer has not recently joined, so we can send all messages + return maxMessages; + } + + PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); + + if (maxReadPosition.compareTo(markDeletePosition.getNext()) <= 0) { + // At this point, all the old messages were already consumed and this consumer + // is now ready to receive any message + recentlyJoinedConsumers.remove(consumer); + return maxMessages; + } + + // Here, the consumer is one that has recently joined, so we can only send messages that were + // published before it has joined. + for (int i = 0; i < maxMessages; i++) { + if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) { + // We have already crossed the divider line. All messages in the list are now + // newer than what we can currently dispatch to this consumer + return i; + } + } + + return maxMessages; + } + + @Override + public synchronized void acknowledgementWasProcessed() { + if (!recentlyJoinedConsumers.isEmpty()) { + // After we process acks, we need to check whether the mark-delete position was advanced and we can finally + // read more messages. It's safe to call readMoreEntries() multiple times. + readMoreEntries(); + } + } + protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (isDispatcherStuckOnReplays) { // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1ba4ce73a1881..c20da49c1442d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -20,6 +20,8 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -403,6 +405,9 @@ public void acknowledgeMessage(List positions, AckType ackType, Map producer = createProducer(topic, false); + + @Cleanup + Consumer c1 = createConsumer(topic); + + for (int i = 0; i < 10; i++) { + producer.newMessage() + .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .value(i) + .send(); + } + + // All the already published messages will be pre-fetched by C1. + + // Adding a new consumer. + @Cleanup + Consumer c2 = createConsumer(topic); + + for (int i = 10; i < 20; i++) { + producer.newMessage() + .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .value(i) + .send(); + } + + // Closing c1, would trigger all messages to go to c2 + c1.close(); + + for (int i = 0; i < 20; i++) { + Message msg = c2.receive(); + assertEquals(msg.getValue().intValue(), i); + + c2.acknowledge(msg); + } + } + + @Test + public void testReadAheadWhenAddingConsumers() throws Exception { + this.conf.setSubscriptionKeySharedEnable(true); + String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID(); + + @Cleanup + Producer producer = createProducer(topic, false); + + @Cleanup + Consumer c1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("key_shared") + .subscriptionType(SubscriptionType.Key_Shared) + .receiverQueueSize(10) + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.newMessage() + .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .value(i) + .send(); + } + + // All the already published messages will be pre-fetched by C1. + + // Adding a new consumer. + @Cleanup + Consumer c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("key_shared") + .subscriptionType(SubscriptionType.Key_Shared) + .receiverQueueSize(10) + .subscribe(); + + // C2 will not be able to receive any messages until C1 is done processing whatever he got prefetched + + for (int i = 10; i < 1000; i++) { + producer.newMessage() + .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .value(i) + .sendAsync(); + } + + producer.flush(); + Thread.sleep(1000); + + Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + PersistentSubscription sub = (PersistentSubscription) t.getSubscription("key_shared"); + + // We need to ensure that dispatcher does not keep to look ahead in the topic, + PositionImpl readPosition = (PositionImpl) sub.getCursor().getReadPosition(); + assertTrue(readPosition.getEntryId() < 1000); + } + private Producer createProducer(String topic, boolean enableBatch) throws PulsarClientException { Producer producer = null; if (enableBatch) {