Skip to content

Commit

Permalink
[improve][client] Optimize code for MultiTopicsConsumerImpl (#18748)
Browse files Browse the repository at this point in the history
Co-authored-by: huangzegui <huangzegui@didiglobal.com>
  • Loading branch information
Pomelongan and huangzegui authored Dec 8, 2022
1 parent 816acc1 commit da87e40
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -929,15 +929,15 @@ public static <T> MultiTopicsConsumerImpl<T> createPartitionedConsumer(
"Should have only 1 topic for partitioned consumer");

// get topic name, then remove it from conf, so constructor will create a consumer with no topic.
ConsumerConfigurationData cloneConf = conf.clone();
ConsumerConfigurationData<T> cloneConf = conf.clone();
String topicName = cloneConf.getSingleTopic();
cloneConf.getTopicNames().remove(topicName);

CompletableFuture<Consumer> future = new CompletableFuture<>();
MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, executorProvider,
future, schema, interceptors, true /* createTopicIfDoesNotExist */);
CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
MultiTopicsConsumerImpl<T> consumer = new MultiTopicsConsumerImpl<T>(client, topicName, cloneConf,
executorProvider, future, schema, interceptors, true /* createTopicIfDoesNotExist */);

future.thenCompose(c -> ((MultiTopicsConsumerImpl) c).subscribeAsync(topicName, numPartitions))
future.thenCompose(c -> ((MultiTopicsConsumerImpl<T>) c).subscribeAsync(topicName, numPartitions))
.thenRun(()-> subscribeFuture.complete(consumer))
.exceptionally(e -> {
log.warn("Failed subscription for createPartitionedConsumer: {} {}, e:{}",
Expand Down

0 comments on commit da87e40

Please sign in to comment.