Skip to content

Commit 1043cb1

Browse files
izeyeartembilan
authored andcommitted
Introduce an intermediate variable for topicNameToTopic.get(n)
1 parent fa2e023 commit 1043cb1

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,23 +193,24 @@ private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> top
193193
List<NewTopic> topicsToAdd = new ArrayList<>();
194194
Map<String, NewPartitions> topicsToModify = new HashMap<>();
195195
topicInfo.values().forEach((n, f) -> {
196+
NewTopic topic = topicNameToTopic.get(n);
196197
try {
197198
TopicDescription topicDescription = f.get(this.operationTimeout, TimeUnit.SECONDS);
198-
if (topicNameToTopic.get(n).numPartitions() < topicDescription.partitions().size()) {
199+
if (topic.numPartitions() < topicDescription.partitions().size()) {
199200
if (logger.isInfoEnabled()) {
200201
logger.info(String.format(
201202
"Topic '%s' exists but has a different partition count: %d not %d", n,
202-
topicDescription.partitions().size(), topicNameToTopic.get(n).numPartitions()));
203+
topicDescription.partitions().size(), topic.numPartitions()));
203204
}
204205
}
205-
else if (topicNameToTopic.get(n).numPartitions() > topicDescription.partitions().size()) {
206+
else if (topic.numPartitions() > topicDescription.partitions().size()) {
206207
if (logger.isInfoEnabled()) {
207208
logger.info(String.format(
208209
"Topic '%s' exists but has a different partition count: %d not %d, increasing "
209210
+ "if the broker supports it", n,
210-
topicDescription.partitions().size(), topicNameToTopic.get(n).numPartitions()));
211+
topicDescription.partitions().size(), topic.numPartitions()));
211212
}
212-
topicsToModify.put(n, NewPartitions.increaseTo(topicNameToTopic.get(n).numPartitions()));
213+
topicsToModify.put(n, NewPartitions.increaseTo(topic.numPartitions()));
213214
}
214215
}
215216
catch (InterruptedException e) {
@@ -219,7 +220,7 @@ else if (topicNameToTopic.get(n).numPartitions() > topicDescription.partitions()
219220
throw new KafkaException("Timed out waiting to get existing topics", e);
220221
}
221222
catch (ExecutionException e) {
222-
topicsToAdd.add(topicNameToTopic.get(n));
223+
topicsToAdd.add(topic);
223224
}
224225
});
225226
if (topicsToAdd.size() > 0) {

0 commit comments

Comments
 (0)