Skip to content

Commit

Permalink
[fix][client] Fix subscription topic name error. (#16719)
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored Jul 23, 2022
1 parent 3e68fa4 commit 15a0013
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,6 @@ public void testStartEmptyPatternConsumer() throws Exception {
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
.create();

List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
NamespaceService nss = pulsar.getNamespaceService();
doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
.getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));

// 5. call recheckTopics to subscribe each added topics above
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
Expand Down Expand Up @@ -514,6 +509,40 @@ public void testStartEmptyPatternConsumer() throws Exception {
producer3.close();
}

@Test(timeOut = testTimeout)
public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception {
String key = "AutoSubscribePatternConsumer";
String subscriptionName = "my-ex-subscription-" + key;

Pattern pattern = Pattern.compile("persistent://my-property/my-ns/pattern-topic.*");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
// Disable automatic discovery.
.patternAutoDiscoveryPeriod(1000)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.receiverQueueSize(4)
.subscribe();

// 1. create partition
String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key;
TenantInfoImpl tenantInfo = createDefaultTenantInfo();
admin.tenants().createTenant("prop", tenantInfo);
admin.topics().createPartitionedTopic(topicName, 4);

// 2. verify consumer get methods. There is no need to trigger discovery, because the broker will push the
// changes to update(CommandWatchTopicUpdate).
assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern());
Awaitility.await().untilAsserted(() -> {
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitions().size(), 4);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 4);
assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 1);
});

consumer.close();
}

// simulate subscribe a pattern which has 3 topics, but then matched topic added in.
@Test(timeOut = testTimeout)
public void testAutoSubscribePatternConsumer() throws Exception {
Expand Down Expand Up @@ -590,11 +619,6 @@ public void testAutoSubscribePatternConsumer() throws Exception {
.messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
.create();

List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);
NamespaceService nss = pulsar.getNamespaceService();
doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
.getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));

// 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
Expand Down Expand Up @@ -203,9 +205,14 @@ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
return addFuture;
}

Set<String> addTopicPartitionedName = addedTopics.stream()
.map(addTopicName -> TopicName.get(addTopicName).getPartitionedTopicName())
.collect(Collectors.toSet());

List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(partitionedTopics.size());
addedTopics.stream().forEach(topic -> futures.add(
subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
addTopicPartitionedName.forEach(partitionedTopic -> futures.add(
subscribeAsync(partitionedTopic,
false /* createTopicIfDoesNotExist */)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> addFuture.complete(null))
.exceptionally(ex -> {
Expand Down

0 comments on commit 15a0013

Please sign in to comment.