Skip to content

Commit c0398ab

Browse files
garyrussellartembilan
authored andcommitted
GH-942: KafkaAdmin Improvements
Resolves #942 Suppress topic/partition already exist exceptions - possible race when multiple instances of the same app start concurrently.
1 parent 32063e4 commit c0398ab

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2018 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,8 @@
3636
import org.apache.kafka.clients.admin.NewPartitions;
3737
import org.apache.kafka.clients.admin.NewTopic;
3838
import org.apache.kafka.clients.admin.TopicDescription;
39+
import org.apache.kafka.common.errors.InvalidPartitionsException;
40+
import org.apache.kafka.common.errors.TopicExistsException;
3941
import org.apache.kafka.common.errors.UnsupportedVersionException;
4042

4143
import org.springframework.beans.BeansException;
@@ -206,6 +208,7 @@ private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> top
206208

207209
private Map<String, NewPartitions> checkPartitions(Map<String, NewTopic> topicNameToTopic,
208210
DescribeTopicsResult topicInfo, List<NewTopic> topicsToAdd) {
211+
209212
Map<String, NewPartitions> topicsToModify = new HashMap<>();
210213
topicInfo.values().forEach((n, f) -> {
211214
NewTopic topic = topicNameToTopic.get(n);
@@ -254,8 +257,13 @@ private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
254257
throw new KafkaException("Timed out waiting for create topics results", e);
255258
}
256259
catch (ExecutionException e) {
257-
logger.error("Failed to create topics", e.getCause());
258-
throw new KafkaException("Failed to create topics", e.getCause()); // NOSONAR
260+
if (e.getCause() instanceof TopicExistsException) { // Possible race with another app instance
261+
logger.debug("Failed to create topics", e.getCause());
262+
}
263+
else {
264+
logger.error("Failed to create topics", e.getCause());
265+
throw new KafkaException("Failed to create topics", e.getCause()); // NOSONAR
266+
}
259267
}
260268
}
261269

@@ -272,9 +280,14 @@ private void modifyTopics(AdminClient adminClient, Map<String, NewPartitions> to
272280
throw new KafkaException("Timed out waiting for create partitions results", e);
273281
}
274282
catch (ExecutionException e) {
275-
logger.error("Failed to create partitions", e.getCause());
276-
if (!(e.getCause() instanceof UnsupportedVersionException)) {
277-
throw new KafkaException("Failed to create partitions", e.getCause()); // NOSONAR
283+
if (e.getCause() instanceof InvalidPartitionsException) { // Possible race with another app instance
284+
logger.debug("Failed to create partitions", e.getCause());
285+
}
286+
else {
287+
logger.error("Failed to create partitions", e.getCause());
288+
if (!(e.getCause() instanceof UnsupportedVersionException)) {
289+
throw new KafkaException("Failed to create partitions", e.getCause()); // NOSONAR
290+
}
278291
}
279292
}
280293
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2018 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,14 +18,18 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.lang.reflect.Method;
2122
import java.util.Arrays;
23+
import java.util.Collections;
2224
import java.util.HashMap;
2325
import java.util.Map;
2426
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicReference;
2528

2629
import org.apache.kafka.clients.admin.AdminClient;
2730
import org.apache.kafka.clients.admin.AdminClientConfig;
2831
import org.apache.kafka.clients.admin.DescribeTopicsResult;
32+
import org.apache.kafka.clients.admin.NewPartitions;
2933
import org.apache.kafka.clients.admin.NewTopic;
3034
import org.apache.kafka.clients.admin.TopicDescription;
3135
import org.junit.Test;
@@ -38,6 +42,7 @@
3842
import org.springframework.kafka.test.EmbeddedKafkaBroker;
3943
import org.springframework.test.annotation.DirtiesContext;
4044
import org.springframework.test.context.junit4.SpringRunner;
45+
import org.springframework.util.ReflectionUtils;
4146
import org.springframework.util.StringUtils;
4247

4348
/**
@@ -72,6 +77,28 @@ public void testAddTopics() throws Exception {
7277
adminClient.close(10, TimeUnit.SECONDS);
7378
}
7479

80+
@Test
81+
public void alreadyExists() throws Exception {
82+
AtomicReference<Method> addTopics = new AtomicReference<>();
83+
AtomicReference<Method> modifyTopics = new AtomicReference<>();
84+
ReflectionUtils.doWithMethods(KafkaAdmin.class, m -> {
85+
m.setAccessible(true);
86+
if (m.getName().equals("addTopics")) {
87+
addTopics.set(m);
88+
}
89+
else if (m.getName().equals("modifyTopics")) {
90+
modifyTopics.set(m);
91+
}
92+
}, m -> {
93+
return m.getName().endsWith("Topics");
94+
});
95+
try (AdminClient adminClient = AdminClient.create(this.admin.getConfig())) {
96+
addTopics.get().invoke(this.admin, adminClient, Collections.singletonList(this.topic1));
97+
modifyTopics.get().invoke(this.admin, adminClient, Collections.singletonMap(
98+
this.topic1.name(), NewPartitions.increaseTo(this.topic1.numPartitions())));
99+
}
100+
}
101+
75102
@Configuration
76103
public static class Config {
77104

0 commit comments

Comments
 (0)