diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java index b8ea87ab4016e..315ce378d6953 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java @@ -76,6 +76,11 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) { + clientBuilder.ioThreads(4).connectionsPerBroker(4); + } + // test that reproduces the issue https://github.com/apache/pulsar/issues/12024 // where closing the consumer leads to an endless receive loop @Test @@ -351,4 +356,19 @@ public int choosePartition(Message msg, TopicMetadata metadata) { } consumer.close(); } + + @Test(invocationCount = 10, timeOut = 30000) + public void testMultipleIOThreads() throws PulsarAdminException, PulsarClientException { + final var topic = TopicName.get(newTopicName()).toString(); + final var numPartitions = 100; + admin.topics().createPartitionedTopic(topic, numPartitions); + for (int i = 0; i < 100; i++) { + admin.topics().createNonPartitionedTopic(topic + "-" + i); + } + @Cleanup + final var consumer = pulsarClient.newConsumer(Schema.INT32).topicsPattern(topic + ".*") + .subscriptionName("sub").subscribe(); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); + assertTrue(consumer.isConnected()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index d0607b97c1893..fb7be3c5a5ea2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -92,7 +92,7 @@ public class MultiTopicsConsumerImpl extends ConsumerBase { // sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. AtomicInteger allTopicPartitionsNumber; - private boolean paused = false; + private volatile boolean paused = false; private final Object pauseMutex = new Object(); // timeout related to auto check and subscribe partition increasement private volatile Timeout partitionsAutoUpdateTimeout = null; @@ -1059,29 +1059,28 @@ private void doSubscribeTopicPartitions(Schema schema, CompletableFuture> subFuture = new CompletableFuture<>(); - consumers.compute(topicName, (key, existingValue) -> { - if (existingValue != null) { - String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. " - + "Topic is already being subscribed for in other thread.", topic, topicName); - log.warn(errorMessage); - subscribeResult.completeExceptionally(new PulsarClientException(errorMessage)); - return existingValue; - } else { - internalConfig.setStartPaused(paused); - ConsumerImpl newConsumer = createInternalConsumer(internalConfig, topicName, - -1, subFuture, createIfDoesNotExist, schema); - - synchronized (pauseMutex) { + synchronized (pauseMutex) { + consumers.compute(topicName, (key, existingValue) -> { + if (existingValue != null) { + String errorMessage = + String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. " + + "Topic is already being subscribed for in other thread.", topic, topicName); + log.warn(errorMessage); + subscribeResult.completeExceptionally(new PulsarClientException(errorMessage)); + return existingValue; + } else { + internalConfig.setStartPaused(paused); + ConsumerImpl newConsumer = createInternalConsumer(internalConfig, topicName, + -1, subFuture, createIfDoesNotExist, schema); if (paused) { newConsumer.pause(); } else { newConsumer.resume(); } + return newConsumer; } - return newConsumer; - } - }); - + }); + } futureList = Collections.singletonList(subFuture); } @@ -1409,7 +1408,7 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa } if (log.isDebugEnabled()) { log.debug("[{}] create consumer {} for partitionName: {}", - topicName, newConsumer.getTopic(), partitionName); + topicName, newConsumer.getTopic(), partitionName); } return subFuture; })