Skip to content

Commit 27a7690

Browse files
garyrussellartembilan
authored andcommitted
Improve container test coverage
From 78.5% to 83.8% and fix bugs found: - Deprecate unused ctor - Fix check for proper error handler for listener type * More coverage and polishing.
1 parent 764509d commit 27a7690

File tree

4 files changed

+207
-37
lines changed

4 files changed

+207
-37
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ public final void start() {
271271
checkGroupId();
272272
synchronized (this.lifecycleMonitor) {
273273
if (!isRunning()) {
274-
Assert.isTrue(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
274+
Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
275275
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
276276
doStart();
277277
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.lang.reflect.Type;
2019
import java.time.Duration;
2120
import java.util.ArrayList;
2221
import java.util.Arrays;
@@ -145,7 +144,10 @@ public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consu
145144
* @param consumerFactory the consumer factory.
146145
* @param containerProperties the container properties.
147146
* @param topicPartitions the topics/partitions; duplicates are eliminated.
147+
* @deprecated - the topicPartitions should be provided in the
148+
* {@link ContainerProperties}.
148149
*/
150+
@Deprecated
149151
public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
150152
ContainerProperties containerProperties, TopicPartitionInitialOffset... topicPartitions) {
151153

@@ -237,7 +239,7 @@ else if (partitionsListenerConsumer.assignedPartitions != null) {
237239

238240
@Override
239241
public boolean isContainerPaused() {
240-
return isPaused() && this.listenerConsumer.consumerPaused;
242+
return isPaused() && this.listenerConsumer != null && this.listenerConsumer.consumerPaused;
241243
}
242244

243245
@Override
@@ -266,13 +268,11 @@ protected void doStart() {
266268
checkAckMode(containerProperties);
267269

268270
Object messageListener = containerProperties.getMessageListener();
269-
Assert.state(messageListener != null, "A MessageListener is required");
270271
if (containerProperties.getConsumerTaskExecutor() == null) {
271272
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
272273
(getBeanName() == null ? "" : getBeanName()) + "-C-");
273274
containerProperties.setConsumerTaskExecutor(consumerExecutor);
274275
}
275-
Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
276276
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
277277
ListenerType listenerType = determineListenerType(listener);
278278
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
@@ -480,7 +480,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
480480

481481
private boolean taskSchedulerExplicitlySet;
482482

483-
private boolean consumerPaused;
483+
volatile boolean consumerPaused;
484484

485485
private long lastReceive = System.currentTimeMillis();
486486

@@ -546,12 +546,11 @@ else if (listener instanceof MessageListener) {
546546
this.taskScheduler = threadPoolTaskScheduler;
547547
}
548548
this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer,
549-
this.containerProperties.getMonitorInterval() * 1000); // NOSONAR magic #
549+
Duration.ofSeconds(this.containerProperties.getMonitorInterval()));
550550
if (this.containerProperties.isLogContainerConfig()) {
551551
this.logger.info(this);
552552
}
553-
Map<String, Object> props =
554-
KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
553+
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
555554
this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, false));
556555
this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, true));
557556
this.syncCommitTimeout = determineSyncCommitTimeout();
@@ -642,7 +641,8 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
642641
subscribingConsumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
643642
}
644643
else {
645-
subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
644+
subscribingConsumer.subscribe(Arrays.asList(this.containerProperties.getTopics()),
645+
rebalanceListener);
646646
}
647647
}
648648
else {
@@ -717,22 +717,14 @@ public void seekToEnd(String topic, int partition) {
717717

718718
private void validateErrorHandler(boolean batch) {
719719
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
720-
if (this.errorHandler == null) {
720+
if (errHandler == null) {
721721
return;
722722
}
723-
Type[] genericInterfaces = errHandler.getClass().getGenericInterfaces();
724-
boolean ok = false;
725-
for (Type t : genericInterfaces) {
726-
if (t.equals(ErrorHandler.class)) {
727-
ok = !batch;
728-
break;
729-
}
730-
else if (t.equals(BatchErrorHandler.class)) {
731-
ok = batch;
732-
break;
733-
}
734-
}
735-
Assert.state(ok, "Error handler is not compatible with the message listener, expecting an instance of "
723+
Class<?> clazz = errHandler.getClass();
724+
Assert.state(batch
725+
? BatchErrorHandler.class.isAssignableFrom(clazz)
726+
: ErrorHandler.class.isAssignableFrom(clazz),
727+
"Error handler is not compatible with the message listener, expecting an instance of "
736728
+ (batch ? "BatchErrorHandler" : "ErrorHandler") + " not " + errHandler.getClass().getName());
737729
}
738730

@@ -821,7 +813,7 @@ private void debugRecords(ConsumerRecords<K, V> records) {
821813
.flatMap(p -> records.records(p).stream())
822814
// map to same format as send metadata toString()
823815
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
824-
.collect(Collectors.toList()));
816+
.collect(Collectors.toList()).toString());
825817
}
826818
}
827819
}
@@ -1022,7 +1014,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
10221014
if (ListenerConsumer.this.kafkaTxManager != null) {
10231015
producer = ((KafkaResourceHolder) TransactionSynchronizationManager
10241016
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
1025-
.getProducer(); // NOSONAR nullable
1017+
.getProducer(); // NOSONAR nullable
10261018
}
10271019
RuntimeException aborted = doInvokeBatchListener(records, recordList, producer);
10281020
if (aborted != null) {
@@ -1097,7 +1089,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
10971089
this.logger.error("Error handler threw an exception", ee);
10981090
return ee;
10991091
}
1100-
catch (Error er) { //NOSONAR
1092+
catch (Error er) { // NOSONAR
11011093
this.logger.error("Error handler threw an error", er);
11021094
throw er;
11031095
}
@@ -1114,7 +1106,8 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<Cons
11141106
this.batchListener.onMessage(records,
11151107
this.isAnyManualAck
11161108
? new ConsumerBatchAcknowledgment(records)
1117-
: null, this.consumer);
1109+
: null,
1110+
this.consumer);
11181111
}
11191112
else {
11201113
doInvokeBatchOnMessage(records, recordList);
@@ -1202,7 +1195,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
12021195
if (ListenerConsumer.this.kafkaTxManager != null) {
12031196
producer = ((KafkaResourceHolder) TransactionSynchronizationManager
12041197
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
1205-
.getProducer(); // NOSONAR
1198+
.getProducer(); // NOSONAR
12061199
}
12071200
RuntimeException aborted = doInvokeRecordListener(record, producer, iterator);
12081201
if (aborted != null) {
@@ -1290,7 +1283,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
12901283
this.logger.error("Error handler threw an exception", ee);
12911284
return ee;
12921285
}
1293-
catch (Error er) { //NOSONAR
1286+
catch (Error er) { // NOSONAR
12941287
this.logger.error("Error handler threw an error", er);
12951288
throw er;
12961289
}
@@ -1777,7 +1770,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
17771770
ListenerConsumer.this.transactionTemplate
17781771
.execute(new TransactionCallbackWithoutResult() {
17791772

1780-
@SuppressWarnings({UNCHECKED, RAWTYPES})
1773+
@SuppressWarnings({ UNCHECKED, RAWTYPES })
17811774
@Override
17821775
protected void doInTransactionWithoutResult(TransactionStatus status) {
17831776
KafkaResourceHolder holder =

spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ else if (r.key() == null && t.getCause() instanceof DeserializationException) {
171171
public ConsumerFactory<String, String> cf() {
172172
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC + ".g1", "false", embeddedKafka());
173173
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
174-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
174+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class.getName());
175175
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
176176
props.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class);
177177
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, FailSometimesDeserializer.class.getName());

0 commit comments

Comments
 (0)