-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.8.7
Describe the bug
java.lang.IllegalStateException: Another endpoint is already registered with id on bean instantiation using code below caused by races in KafkaListenerAnnotationBeanPostProcessor.
KafkaListenerAnnotationBeanPostProcessor keeps state in private final ListenerScope listenerScope = new ListenerScope(); instance field, but the same instance of KafkaListenerAnnotationBeanPostProcessor can be called from multiple threads as shown below.
That leads to java.lang.IllegalStateException: Another endpoint is already registered with id exception to be thrown by KafkaListenerEndpointRegistry. And potentially to misconfigured kafka listener.
To Reproduce
Run code from the sample.
Expected behavior
No races during KafkaListenerEndpoint instantiation.
Sample
static class MyListener<K, V> {
final String id;
final String[] topics;
MyListener(String id, String[] topics) {
this.id = id;
this.topics = topics;
}
@KafkaListener(
id = "#{__listener.id}",
topics = "#{__listener.topics}"
)
public void onMessage(@NonNull ConsumerRecord<K, V> data) {
System.out.println(data);
}
public String getId() {
return id;
}
public String[] getTopics() {
return topics;
}
}
@Configuration
static class MyConfig {
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener<?, ?> myListener(String id, String[] topics) {
return new MyListener<>(id, topics);
}
}
@Test
public void test() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<MyListener<?, ?>>> futureList = new LinkedList<>();
for (int i = 0; i < 5; i++) {
futureList.add(executorService.submit(
() -> (MyListener<?, ?>) applicationContext.getBean(
"myListener",
UUID.randomUUID().toString(),
new String[]{"foo"}
)
));
}
for (Future<MyListener<?, ?>> future : futureList) {
future.get();
}
}