diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 06d91867ce668..48ccd474af7ef 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -929,15 +929,15 @@ public static MultiTopicsConsumerImpl createPartitionedConsumer( "Should have only 1 topic for partitioned consumer"); // get topic name, then remove it from conf, so constructor will create a consumer with no topic. - ConsumerConfigurationData cloneConf = conf.clone(); + ConsumerConfigurationData cloneConf = conf.clone(); String topicName = cloneConf.getSingleTopic(); cloneConf.getTopicNames().remove(topicName); - CompletableFuture future = new CompletableFuture<>(); - MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, executorProvider, - future, schema, interceptors, true /* createTopicIfDoesNotExist */); + CompletableFuture> future = new CompletableFuture<>(); + MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, + executorProvider, future, schema, interceptors, true /* createTopicIfDoesNotExist */); - future.thenCompose(c -> ((MultiTopicsConsumerImpl) c).subscribeAsync(topicName, numPartitions)) + future.thenCompose(c -> ((MultiTopicsConsumerImpl) c).subscribeAsync(topicName, numPartitions)) .thenRun(()-> subscribeFuture.complete(consumer)) .exceptionally(e -> { log.warn("Failed subscription for createPartitionedConsumer: {} {}, e:{}",