Skip to content

Commit

Permalink
test: add test cases about the last sent position and individually se…
Browse files Browse the repository at this point in the history
…nt positions
  • Loading branch information
equanz committed Jan 23, 2024
1 parent 2a9fa2e commit 43a2dc3
Show file tree
Hide file tree
Showing 4 changed files with 934 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.admin;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -56,6 +58,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
Expand All @@ -75,6 +78,8 @@
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.testcontext.SpyConfig;
import org.apache.pulsar.client.admin.GetStatsOptions;
Expand Down Expand Up @@ -139,7 +144,10 @@
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -3449,43 +3457,198 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception {
}

@Test
public void testGetReadPositionWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString();
public void testGetLastSentPositionWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionWhenJoining-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();

@Cleanup
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
consumer1.receive();
}

List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();
consumers.add(consumer);
}
@Cleanup
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getConsumers().size(), 2);
ConsumerStats consumerStats = subStats.getConsumers().get(0);
Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId() + 1).toString());
ConsumerStats consumerStats = subStats.getConsumers().stream()
.filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get();
Assert.assertEquals(consumerStats.getLastSentPositionWhenJoining(),
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()).toString());
}

@Test
public void testGetLastSentPosition() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetLastSentPosition-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
final AtomicInteger counter = new AtomicInteger();
@Cleanup
final Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.messageListener((c, msg) -> {
try {
c.acknowledge(msg);
counter.getAndIncrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.subscribe();

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertNull(subStats.getLastSentPosition());

for (Consumer<byte[]> consumer : consumers) {
consumer.close();
final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
}

Awaitility.await().untilAsserted(() -> assertEquals(counter.get(), messages));

stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getLastSentPosition(), PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()).toString());
}

@Test
public void testGetIndividuallySentPositions() throws Exception {
// The producer sends messages with two types of keys.
// The dispatcher sends keyA messages to consumer1.
// Consumer1 will not receive any messages. Its receiver queue size is 1.
// Consumer2 will receive and ack any messages immediately.

final String topic = "persistent://prop-xyz/ns1/testGetIndividuallySentPositions-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();

final String consumer1Name = "c1";
final String consumer2Name = "c2";

@Cleanup
final Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.consumerName(consumer1Name)
.receiverQueueSize(1)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();

final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
(PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topic).get().get().getSubscription(subName).getDispatcher();
final String keyA = "key-a";
final String keyB = "key-b";
final int hashA = Murmur3_32Hash.getInstance().makeHash(keyA.getBytes());

final Field selectorField = PersistentStickyKeyDispatcherMultipleConsumers.class.getDeclaredField("selector");
selectorField.setAccessible(true);
final StickyKeyConsumerSelector selector = spy((StickyKeyConsumerSelector) selectorField.get(dispatcher));
selectorField.set(dispatcher, selector);

// the selector returns consumer1 if keyA
doAnswer((invocationOnMock -> {
final int hash = invocationOnMock.getArgument(0);

final String consumerName = hash == hashA ? consumer1Name : consumer2Name;
return dispatcher.getConsumers().stream().filter(consumer -> consumer.consumerName().equals(consumerName)).findFirst().get();
})).when(selector).select(anyInt());

final AtomicInteger consumer2AckCounter = new AtomicInteger();
@Cleanup
final Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.consumerName(consumer2Name)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.messageListener((c, msg) -> {
try {
c.acknowledge(msg);
consumer2AckCounter.getAndIncrement();
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.subscribe();

final LongPairRangeSet.LongPairConsumer<PositionImpl> positionRangeConverter = PositionImpl::new;
final LongPairRangeSet<PositionImpl> expectedIndividuallySentPositions = new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter);

TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());

final Function<String, MessageIdImpl> sendFn = (key) -> {
try {
return (MessageIdImpl) producer.newMessage().key(key).value(("msg").getBytes()).send();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
};
final List<MessageIdImpl> messageIdList = new ArrayList<>();

// the dispatcher can send keyA message, but then consumer1's receiver queue will be full
messageIdList.add(sendFn.apply(keyA));

// the dispatcher can send messages other than keyA
messageIdList.add(sendFn.apply(keyA));
messageIdList.add(sendFn.apply(keyB));
messageIdList.add(sendFn.apply(keyA));
messageIdList.add(sendFn.apply(keyB));
messageIdList.add(sendFn.apply(keyB));

assertEquals(messageIdList.size(), 6);
Awaitility.await().untilAsserted(() -> assertEquals(consumer2AckCounter.get(), 3));

// set expected value
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(1).getLedgerId(), messageIdList.get(1).getEntryId(),
messageIdList.get(2).getLedgerId(), messageIdList.get(2).getEntryId());
expectedIndividuallySentPositions.addOpenClosed(messageIdList.get(3).getLedgerId(), messageIdList.get(3).getEntryId(),
messageIdList.get(5).getLedgerId(), messageIdList.get(5).getEntryId());

stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getIndividuallySentPositions(), expectedIndividuallySentPositions.toString());
}

@Test
Expand Down
Loading

0 comments on commit 43a2dc3

Please sign in to comment.