Skip to content

Commit

Permalink
Fixed ordering issue in KeyShared dispatcher when adding consumer (ap…
Browse files Browse the repository at this point in the history
…ache#7106)

### Motivation

Fixes:  apache#6554

Ordering is broken in KeyShared dispatcher if a new consumer `c2` comes in and an existing consumer `c1` goes out. 

This is because messages with keys previously assigned to `c1` may now route to `c2`. 

The solution here is to have new consumers joining in a "paused" state.

For example, consider this sequence:

 1. Subscriptions has `c1` and `c2` consumers
 2. `c3` joins. Some of the keys are now supposed to go to `c3`.
 3. Instead of starting delivering to `c3`. We mark the current readPosition (let's call it `rp0_c3`) of the cursor for `c3`.
 4. Any message that now hashes to `c3` and that has `messageId >= rp0_c3` will be deferred for later re-delivery
 5. Any message that might get re-delivered (eg: originally sent to `c1`, but `c1` has failed) to `c3` and that has `messageId < rp0_c3` will be sent to `c3`
 6. When the markDelete position of the cursor will move past `rp0_c3` the restriction on `c3` will be lifted.

Essentially, `c3` joins but can only receive old messages, until everything that was read before joining gets acked.
  • Loading branch information
merlimat authored and cdbartholomew committed Jul 24, 2020
1 parent 7e76a73 commit 752ce14
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,8 @@ default long getNumberOfDelayedMessages() {
default void cursorIsReset() {
//No-op
}

default void acknowledgementWasProcessed() {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.ReadType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,6 +49,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi

private boolean isDispatcherStuckOnReplays = false;

/**
* When a consumer joins, it will be added to this map with the current read position.
* This means that, in order to preserve ordering, new consumers can only receive old
* messages, until the mark-delete position will move past this point.
*/
private final Map<Consumer, PositionImpl> recentlyJoinedConsumers = new HashMap<>();

PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, StickyKeyConsumerSelector selector) {
super(topic, cursor, subscription);
Expand All @@ -60,12 +66,20 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
super.addConsumer(consumer);
selector.addConsumer(consumer);

// If this was the 1st consumer, or if all the messages are already acked, then we
// don't need to do anything special
if (consumerList.size() > 1 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
recentlyJoinedConsumers.put(consumer, (PositionImpl) cursor.getReadPosition());
}
}

@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
super.removeConsumer(consumer);
selector.removeConsumer(consumer);

recentlyJoinedConsumers.remove(consumer);
}

private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
Expand Down Expand Up @@ -109,7 +123,8 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
List<Entry> entriesWithSameKey = current.getValue();
int entriesWithSameKeyCount = entriesWithSameKey.size();

int messagesForC = Math.min(entriesWithSameKeyCount, consumer.getAvailablePermits());
int maxMessagesForC = Math.min(entriesWithSameKeyCount, consumer.getAvailablePermits());
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC);
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read type is {}",
name, consumer.consumerName(), messagesForC, readType);
Expand Down Expand Up @@ -165,19 +180,66 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}
}

if (totalMessagesSent == 0) {
if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
// 2. All keys in the current set of messages are routing to consumers that are currently busy
//
// The solution here is to move on and read next batch of messages which might hopefully contain
// also keys meant for other consumers.
//
// We do it unless that are "recently joined consumers". In that case, we would be looking
// ahead in the stream while the new consumers are not ready to accept the new messages,
// therefore would be most likely only increase the distance between read-position and mark-delete
// position.
isDispatcherStuckOnReplays = true;
readMoreEntries();
}
}

private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages) {
if (maxMessages == 0) {
return 0;
}

PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
if (maxReadPosition == null) {
// The consumer has not recently joined, so we can send all messages
return maxMessages;
}

PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();

if (maxReadPosition.compareTo(markDeletePosition.getNext()) <= 0) {
// At this point, all the old messages were already consumed and this consumer
// is now ready to receive any message
recentlyJoinedConsumers.remove(consumer);
return maxMessages;
}

// Here, the consumer is one that has recently joined, so we can only send messages that were
// published before it has joined.
for (int i = 0; i < maxMessages; i++) {
if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
// We have already crossed the divider line. All messages in the list are now
// newer than what we can currently dispatch to this consumer
return i;
}
}

return maxMessages;
}

@Override
public synchronized void acknowledgementWasProcessed() {
if (!recentlyJoinedConsumers.isEmpty()) {
// After we process acks, we need to check whether the mark-delete position was advanced and we can finally
// read more messages. It's safe to call readMoreEntries() multiple times.
readMoreEntries();
}
}

protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
// If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -403,6 +405,9 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
// Notify all consumer that the end of topic was reached
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
}

// Signal the dispatchers to give chance to take extra actions
dispatcher.acknowledgementWasProcessed();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,30 @@
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.Cleanup;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
Expand All @@ -33,23 +52,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class KeySharedSubscriptionTest extends ProducerConsumerBase {

private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
Expand Down Expand Up @@ -463,6 +465,102 @@ public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exc
}
}

@Test
public void testOrderingWhenAddingConsumers() throws Exception {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "testOrderingWhenAddingConsumers-" + UUID.randomUUID();

@Cleanup
Producer<Integer> producer = createProducer(topic, false);

@Cleanup
Consumer<Integer> c1 = createConsumer(topic);

for (int i = 0; i < 10; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(i)
.send();
}

// All the already published messages will be pre-fetched by C1.

// Adding a new consumer.
@Cleanup
Consumer<Integer> c2 = createConsumer(topic);

for (int i = 10; i < 20; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(i)
.send();
}

// Closing c1, would trigger all messages to go to c2
c1.close();

for (int i = 0; i < 20; i++) {
Message<Integer> msg = c2.receive();
assertEquals(msg.getValue().intValue(), i);

c2.acknowledge(msg);
}
}

@Test
public void testReadAheadWhenAddingConsumers() throws Exception {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();

@Cleanup
Producer<Integer> producer = createProducer(topic, false);

@Cleanup
Consumer<Integer> c1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.receiverQueueSize(10)
.subscribe();

for (int i = 0; i < 10; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(i)
.send();
}

// All the already published messages will be pre-fetched by C1.

// Adding a new consumer.
@Cleanup
Consumer<Integer> c2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.receiverQueueSize(10)
.subscribe();

// C2 will not be able to receive any messages until C1 is done processing whatever he got prefetched

for (int i = 10; i < 1000; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(i)
.sendAsync();
}

producer.flush();
Thread.sleep(1000);

Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get();
PersistentSubscription sub = (PersistentSubscription) t.getSubscription("key_shared");

// We need to ensure that dispatcher does not keep to look ahead in the topic,
PositionImpl readPosition = (PositionImpl) sub.getCursor().getReadPosition();
assertTrue(readPosition.getEntryId() < 1000);
}

private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
Producer<Integer> producer = null;
if (enableBatch) {
Expand Down

0 comments on commit 752ce14

Please sign in to comment.