Skip to content

Commit 2dd2265

Browse files
garyrussellartembilan
authored andcommitted
GH-950: Always wrap listener exception in LEFE
Fixes #950 Provide access to the `group.id` for all listener types, including simple listeners. * Fix tests and KafkaStreams.close() deprecation.
1 parent 64cff66 commit 2dd2265

File tree

4 files changed

+17
-16
lines changed

4 files changed

+17
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2018 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.config;
1818

19+
import java.time.Duration;
1920
import java.util.Map;
2021
import java.util.Properties;
2122
import java.util.concurrent.TimeUnit;
@@ -60,7 +61,7 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
6061

6162
private static final String CLEANUP_CONFIG_MUST_NOT_BE_NULL = "'cleanupConfig' must not be null";
6263

63-
private static final int DEFAULT_CLOSE_TIMEOUT = 10;
64+
private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(10);
6465

6566
private KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
6667

@@ -82,7 +83,7 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
8283

8384
private int phase = Integer.MAX_VALUE - 1000; // NOSONAR magic #
8485

85-
private int closeTimeout = DEFAULT_CLOSE_TIMEOUT;
86+
private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;
8687

8788
private KafkaStreams kafkaStreams;
8889

@@ -241,7 +242,7 @@ public void setStateRestoreListener(StateRestoreListener stateRestoreListener) {
241242
* @see KafkaStreams#close(long, TimeUnit)
242243
*/
243244
public void setCloseTimeout(int closeTimeout) {
244-
this.closeTimeout = closeTimeout; // NOSONAR (sync)
245+
this.closeTimeout = Duration.ofSeconds(closeTimeout); // NOSONAR (sync)
245246
}
246247

247248
@Override
@@ -319,7 +320,7 @@ public synchronized void stop() {
319320
if (this.running) {
320321
try {
321322
if (this.kafkaStreams != null) {
322-
this.kafkaStreams.close(this.closeTimeout, TimeUnit.SECONDS);
323+
this.kafkaStreams.close(this.closeTimeout);
323324
if (this.cleanupConfig.cleanupOnStop()) {
324325
this.kafkaStreams.cleanUp();
325326
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
* @author Yang Qiju
113113
* @author Tom van den Berge
114114
*/
115-
public class KafkaMessageListenerContainer<K, V> // NOSONAR comment density
115+
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
116116
extends AbstractMessageListenerContainer<K, V> {
117117

118118
private static final int DEFAULT_ACK_TIME = 5000;
@@ -1271,13 +1271,9 @@ private Exception decorateException(RuntimeException e) {
12711271
toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId,
12721272
toHandle.getCause());
12731273
}
1274-
// else {
1275-
// /*
1276-
// * TODO: in 2.3, wrap all exceptions (e.g. thrown by user implementations
1277-
// * of MessageListener) in LEFE with groupId. @KafkaListeners always throw
1278-
// * LEFE.
1279-
// */
1280-
// }
1274+
else {
1275+
toHandle = new ListenerExecutionFailedException("Listener failed", this.consumerGroupId, toHandle);
1276+
}
12811277
return toHandle;
12821278
}
12831279

spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,11 @@ private ConcurrentKafkaListenerContainerFactory<String, String> factory(Consumer
155155
new ConcurrentKafkaListenerContainerFactory<>();
156156
factory.setConsumerFactory(cf);
157157
factory.setErrorHandler((t, r) -> {
158-
if (r.value() == null && t instanceof DeserializationException) {
158+
if (r.value() == null && t.getCause() instanceof DeserializationException) {
159159
this.valueErrorCount.incrementAndGet();
160-
this.headers = ((DeserializationException) t).getHeaders();
160+
this.headers = ((DeserializationException) t.getCause()).getHeaders();
161161
}
162-
else if (r.key() == null && t instanceof DeserializationException) {
162+
else if (r.key() == null && t.getCause() instanceof DeserializationException) {
163163
this.keyErrorCount.incrementAndGet();
164164
}
165165
this.latch.countDown();

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ This section covers the changes made from version 2.2 to version 2.3.
77

88
This version requires the 2.1.0 `kafka-clients` or higher.
99

10+
==== Listener Contaienr Changes
1011

12+
Previously, error handlers received `ListenerExectionFailedException` (with the actual listener exception as the `cause`) when the listener was invoked using a listener adapter (such as `@KafkaListener` s).
13+
Exceptions thrown by native `GenericMessageListener` s were passed to the error handler unchanged.
14+
Now a `ListenerExecutionFailedException` is always the argument (with the actual listener exception as the `cause`), which provides access to the container's `group.id` property.

0 commit comments

Comments
 (0)