Skip to content

Commit

Permalink
[pulsar-broker] Stop to dispatch when skip message temporally since K…
Browse files Browse the repository at this point in the history
…ey_Shared consumer stuck on delivery (#7553)

### Motivation
In some case of Key_Shared consumer, messages ordering was broken.
Here is how to reproduce(I think it is one of case to reproduce this issue).

1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive
   - receiverQueueSize: 500
2. Connect Producer and publish 500 messages with key `(i % 10)`
3. Connect Consumer2 to same subscription and start to receive
   - receiverQueueSize: 1
   - since #7106 , Consumer2 can't receive (expected)
4. Producer publish more 500 messages with same key generation algorithm
5. After that, Consumer1 start to receive
6. Check Consumer2 message ordering
   - sometimes message ordering was broken in same key

Consumer1:
```
Connected: Tue Jul 14 09:36:39 JST 2020
[pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
[pulsar-timer-4-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46 JST 2020
...
Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14 09:37:46 JST 2020
...
Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14 09:37:46 JST 2020
```

Consumer2:
```
Connected: Tue Jul 14 09:37:03 JST 2020
[pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815 Date: Tue Jul 14 09:37:46 JST 2020
ordering was broken, key: 1 oldNum: 901 newNum: 511
Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826 Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830 Date: Tue Jul 14 09:37:46 JST 2020
...
```

I think this issue is caused by #7105.
Here is an example.
1. dispatch messages
2. Consumer2 was stuck and `totalMessagesSent=0`
   - Consumer2 availablePermits was 0
3. skip redeliver messages temporally
   - Consumer2 availablePermits was back to 1
4. dispatch new messages
   - new message was dispatched to Consumer2
5. back to redeliver messages
4. dispatch messages
   - ordering was broken

### Modifications
Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery.
  • Loading branch information
equanz authored Sep 2, 2020
1 parent 3ac98d8 commit c7ac08b
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -64,12 +65,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
*/
private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;

private final Set<Consumer> stuckConsumers;
private final Set<Consumer> nextStuckConsumers;

PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
super(topic, cursor, subscription);

this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>();
this.stuckConsumers = new HashSet<>();
this.nextStuckConsumers = new HashSet<>();

switch (ksm.getKeySharedMode()) {
case AUTO_SPLIT:
Expand Down Expand Up @@ -143,6 +149,8 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
return;
}

nextStuckConsumers.clear();

final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
groupedEntries.clear();

Expand Down Expand Up @@ -217,11 +225,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}
}

stuckConsumers.clear();

if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
// 2. All keys in the current set of messages are routing to consumers that are currently busy
// and stuck is not caused by stuckConsumers
//
// The solution here is to move on and read next batch of messages which might hopefully contain
// also keys meant for other consumers.
Expand All @@ -230,18 +241,31 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
// ahead in the stream while the new consumers are not ready to accept the new messages,
// therefore would be most likely only increase the distance between read-position and mark-delete
// position.
isDispatcherStuckOnReplays = true;
if (!nextStuckConsumers.isEmpty()) {
isDispatcherStuckOnReplays = true;
stuckConsumers.addAll(nextStuckConsumers);
}
// readMoreEntries should run regardless whether or not stuck is caused by stuckConsumers for avoid stopping dispatch.
readMoreEntries();
}
}

private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages) {
if (maxMessages == 0) {
// the consumer was stuck
nextStuckConsumers.add(consumer);
return 0;
}

PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
if (maxReadPosition == null) {
// stop to dispatch by stuckConsumers
if (stuckConsumers.contains(consumer)) {
if (log.isDebugEnabled()) {
log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer);
}
return 0;
}
// The consumer has not recently joined, so we can send all messages
return maxMessages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -49,6 +50,7 @@
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

@PrepareForTest({ DispatchRateLimiter.class })
Expand Down Expand Up @@ -79,6 +81,8 @@ public void setup() throws Exception {
configMock = mock(ServiceConfiguration.class);
doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled();
doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();

pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
Expand All @@ -96,6 +100,7 @@ public void setup() throws Exception {

consumerMock = mock(Consumer.class);
channelMock = mock(ChannelPromise.class);
doReturn("consumer1").when(consumerMock).consumerName();
doReturn(1000).when(consumerMock).getAvailablePermits();
doReturn(true).when(consumerMock).isWritable();
doReturn(channelMock).when(consumerMock).sendMessages(
Expand All @@ -120,12 +125,17 @@ public void setup() throws Exception {

persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock, KeySharedMeta.getDefaultInstance());
persistentDispatcher.addConsumer(consumerMock);
persistentDispatcher.consumerFlow(consumerMock, 1000);
}

@Test
public void testSendMarkerMessage() {
try {
persistentDispatcher.addConsumer(consumerMock);
persistentDispatcher.consumerFlow(consumerMock, 1000);
} catch (Exception e) {
fail("Failed to add mock consumer", e);
}

List<Entry> entries = new ArrayList<>();
ByteBuf markerMessage = Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster");
entries.add(EntryImpl.create(1, 1, markerMessage));
Expand Down Expand Up @@ -156,11 +166,100 @@ public void testSendMarkerMessage() {
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
}

@Test
public void testSkipRedeliverTemporally() {
final Consumer slowConsumerMock = mock(Consumer.class);
final ChannelPromise slowChannelMock = mock(ChannelPromise.class);
// add entries to redeliver and read target
final List<Entry> redeliverEntries = new ArrayList<>();
redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1")));
final List<Entry> readEntries = new ArrayList<>();
readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1")));
readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2")));

try {
Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
totalAvailablePermitsField.setAccessible(true);
totalAvailablePermitsField.set(persistentDispatcher, 1000);

doAnswer(invocationOnMock -> {
((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2))
.readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
return null;
}).when(cursorMock).asyncReadEntriesOrWait(
anyInt(), anyLong(), any(PersistentStickyKeyDispatcherMultipleConsumers.class),
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal));
} catch (Exception e) {
fail("Failed to set to field", e);
}

// Create 2Consumers
try {
doReturn("consumer2").when(slowConsumerMock).consumerName();
// Change slowConsumer availablePermits to 0 and back to normal
when(slowConsumerMock.getAvailablePermits())
.thenReturn(0)
.thenReturn(1);
doReturn(true).when(slowConsumerMock).isWritable();
doReturn(slowChannelMock).when(slowConsumerMock).sendMessages(
anyList(),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);

persistentDispatcher.addConsumer(consumerMock);
persistentDispatcher.addConsumer(slowConsumerMock);
} catch (Exception e) {
fail("Failed to add mock consumer", e);
}

// run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers
// run readMoreEntries internally (and skip internally)
// Change slowConsumer availablePermits to 1
// run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers internally
// and then stop to dispatch to slowConsumer
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, redeliverEntries);

verify(consumerMock, times(1)).sendMessages(
argThat(arg -> {
assertEquals(arg.size(), 1);
Entry entry = arg.get(0);
assertEquals(entry.getLedgerId(), 1);
assertEquals(entry.getEntryId(), 3);
return true;
}),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);
verify(slowConsumerMock, times(0)).sendMessages(
anyList(),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
anyInt(),
anyLong(),
anyLong(),
any(RedeliveryTracker.class)
);
}

private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}

private ByteBuf createMessage(String message, int sequenceId, String key) {
PulsarApi.MessageMetadata.Builder messageMetadata = PulsarApi.MessageMetadata.newBuilder();
messageMetadata.setSequenceId(sequenceId);
messageMetadata.setProducerName("testProducer");
messageMetadata.setPartitionKey("testKey");
messageMetadata.setPartitionKey(key);
messageMetadata.setPartitionKeyB64Encoded(false);
messageMetadata.setPublishTime(System.currentTimeMillis());
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}
Expand Down

0 comments on commit c7ac08b

Please sign in to comment.