Skip to content

Commit

Permalink
Fix message not dispatch for key_shared sub type in non-persistent su… (
Browse files Browse the repository at this point in the history
apache#9826)

Fixes apache#9703

### Motivation

With a non-persistent topic, I see messages published in the topic stats, but consumers do not consume them if they use Key_Shared. Other consumer modes work fine.

### Verifying this change

Covered by existing test case, verified manually.
  • Loading branch information
MarvinCai authored and Miguelez committed Mar 16, 2021
1 parent 6367903 commit 748ea65
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {

@Override
public void sendMessages(List<Entry> entries) {
if (!entries.isEmpty()) {
if (entries.isEmpty()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

@PrepareForTest({ DispatchRateLimiter.class })
@PowerMockIgnore({"org.apache.logging.log4j.*"})
public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {

private PulsarService pulsarMock;
private BrokerService brokerMock;
private NonPersistentTopic topicMock;
private NonPersistentSubscription subscriptionMock;
private ServiceConfiguration configMock;
private ChannelPromise channelMock;

private NonPersistentStickyKeyDispatcherMultipleConsumers nonpersistentDispatcher;

final String topicName = "non-persistent://public/default/testTopic";

@ObjectFactory
public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}

@BeforeMethod
public void setup() throws Exception {
configMock = mock(ServiceConfiguration.class);
doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled();
doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();

pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();

brokerMock = mock(BrokerService.class);
doReturn(pulsarMock).when(brokerMock).pulsar();

topicMock = mock(NonPersistentTopic.class);
doReturn(brokerMock).when(topicMock).getBrokerService();
doReturn(topicName).when(topicMock).getName();

channelMock = mock(ChannelPromise.class);
subscriptionMock = mock(NonPersistentSubscription.class);

PowerMockito.mockStatic(DispatchRateLimiter.class);
PowerMockito.when(DispatchRateLimiter.isDispatchRateNeeded(
any(BrokerService.class),
any(Optional.class),
anyString(),
any(DispatchRateLimiter.Type.class))
).thenReturn(false);

nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
topicMock, subscriptionMock,
new HashRangeAutoSplitStickyKeyConsumerSelector());
}

@Test(timeOut = 10000)
public void testSendMessage() throws BrokerServiceException {
Consumer consumerMock = mock(Consumer.class);
nonpersistentDispatcher.addConsumer(consumerMock);

List<Entry> entries = new ArrayList<>();
entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
doAnswer(invocationOnMock -> {
ChannelPromise mockPromise = mock(ChannelPromise.class);
List<Entry> receivedEntries = invocationOnMock.getArgument(0, List.class);
for (int index = 1; index <= receivedEntries.size(); index++) {
Entry entry = receivedEntries.get(index - 1);
assertEquals(entry.getLedgerId(), 1);
assertEquals(entry.getEntryId(), index);
ByteBuf byteBuf = entry.getDataBuffer();
MessageMetadata messageMetadata = Commands.parseMessageMetadata(byteBuf);
assertEquals(byteBuf.toString(UTF_8), "message" + index);
};
return mockPromise;
}).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(),
anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
try {
nonpersistentDispatcher.sendMessages(entries);
} catch (Exception e) {
fail("Failed to sendMessages.", e);
}
verify(consumerMock, times(1)).sendMessages(any(List.class), any(EntryBatchSizes.class),
eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
}

private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}

private ByteBuf createMessage(String message, int sequenceId, String key) {
MessageMetadata messageMetadata = new MessageMetadata()
.setSequenceId(sequenceId)
.setProducerName("testProducer")
.setPartitionKey(key)
.setPartitionKeyB64Encoded(false)
.setPublishTime(System.currentTimeMillis());
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -134,12 +135,12 @@ public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(St
.send();
}

receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000);
}

@Test(dataProvider = "data")
public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch)
throws PulsarClientException {
throws Exception {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID();

Expand All @@ -155,23 +156,33 @@ public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch
@Cleanup
Producer<Integer> producer = createProducer(topic, enableBatch);

CompletableFuture<MessageId> future;

for (int i = 0; i < 1000; i++) {
// Send the same key twice so that we'll have a batch message
String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS));
producer.newMessage()
future = producer.newMessage()
.key(key)
.value(i)
.sendAsync();

producer.newMessage()
// If not batching, need to wait for message to be persisted
if (!enableBatch) {
future.get();
}

future = producer.newMessage()
.key(key)
.value(i)
.sendAsync();
if (!enableBatch) {
future.get();
}
}

// If batching, flush buffered messages
producer.flush();

receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000 * 2);
}

@Test(dataProvider = "batch")
Expand Down Expand Up @@ -251,7 +262,7 @@ public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsum
.send();
}

receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000);

// wait for consumer grouping acking send.
Thread.sleep(1000);
Expand All @@ -266,7 +277,7 @@ public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsum
.send();
}

receiveAndCheckDistribution(Lists.newArrayList(consumer3));
receiveAndCheckDistribution(Lists.newArrayList(consumer3), 10);
}

@Test(dataProvider = "data")
Expand Down Expand Up @@ -362,7 +373,7 @@ public void testOrderingKeyWithHashRangeAutoSplitStickyKeyConsumerSelector(boole
.send();
}

receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000);
}

@Test(dataProvider = "batch")
Expand Down Expand Up @@ -792,7 +803,7 @@ public void testAttachKeyToMessageMetadata()
.send();
}

receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3), 1000);
}

@Test
Expand Down Expand Up @@ -1015,11 +1026,13 @@ private Producer<Integer> createProducer(String topic, boolean enableBatch) thro
producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(true)
.maxPendingMessages(2001)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
} else {
producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.maxPendingMessages(2001)
.enableBatching(false)
.create();
}
Expand Down Expand Up @@ -1074,7 +1087,7 @@ private void receive(List<Consumer<?>> consumers) throws PulsarClientException {
/**
* Check that every consumer receives a fair number of messages and that same key is delivered to only 1 consumer
*/
private void receiveAndCheckDistribution(List<Consumer<?>> consumers) throws PulsarClientException {
private void receiveAndCheckDistribution(List<Consumer<?>> consumers, int expectedTotalMessage) throws PulsarClientException {
// Add a key so that we know this key was already assigned to one consumer
Map<String, Consumer<?>> keyToConsumer = new HashMap<>();
Map<Consumer<?>, Integer> messagesPerConsumer = new HashMap<>();
Expand Down Expand Up @@ -1112,8 +1125,7 @@ private void receiveAndCheckDistribution(List<Consumer<?>> consumers) throws Pul
final double PERCENT_ERROR = 0.40; // 40 %

double expectedMessagesPerConsumer = totalMessages / consumers.size();

System.err.println(messagesPerConsumer);
Assert.assertEquals(expectedTotalMessage, totalMessages);
for (int count : messagesPerConsumer.values()) {
Assert.assertEquals(count, expectedMessagesPerConsumer, expectedMessagesPerConsumer * PERCENT_ERROR);
}
Expand Down

0 comments on commit 748ea65

Please sign in to comment.