diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index 532be3298d..2bb3486444 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -169,42 +169,8 @@ public DefaultKafkaConsumerFactory(Map configs, this.configs = new ConcurrentHashMap<>(configs); this.configureDeserializers = configureDeserializers; - this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier); - this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier); - } - - private Supplier> keyDeserializerSupplier( - @Nullable Supplier> keyDeserializerSupplier) { - - if (!this.configureDeserializers) { - return keyDeserializerSupplier; - } - return keyDeserializerSupplier == null - ? () -> null - : () -> { - Deserializer deserializer = keyDeserializerSupplier.get(); - if (deserializer != null) { - deserializer.configure(this.configs, true); - } - return deserializer; - }; - } - - private Supplier> valueDeserializerSupplier( - @Nullable Supplier> valueDeserializerSupplier) { - - if (!this.configureDeserializers) { - return valueDeserializerSupplier; - } - return valueDeserializerSupplier == null - ? () -> null - : () -> { - Deserializer deserializer = valueDeserializerSupplier.get(); - if (deserializer != null) { - deserializer.configure(this.configs, false); - } - return deserializer; - }; + this.keyDeserializerSupplier = keyDeserializerSupplier; + this.valueDeserializerSupplier = valueDeserializerSupplier; } @Override @@ -219,7 +185,7 @@ public void setBeanName(String name) { * @param keyDeserializer the deserializer. */ public void setKeyDeserializer(@Nullable Deserializer keyDeserializer) { - this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer); + this.keyDeserializerSupplier = () -> keyDeserializer; } /** @@ -229,7 +195,7 @@ public void setKeyDeserializer(@Nullable Deserializer keyDeserializer) { * @param valueDeserializer the value deserializer. */ public void setValueDeserializer(@Nullable Deserializer valueDeserializer) { - this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer); + this.valueDeserializerSupplier = () -> valueDeserializer; } /** @@ -240,7 +206,7 @@ public void setValueDeserializer(@Nullable Deserializer valueDeserializer) { * @since 2.8 */ public void setKeyDeserializerSupplier(Supplier> keyDeserializerSupplier) { - this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier); + this.keyDeserializerSupplier = keyDeserializerSupplier; } /** @@ -251,7 +217,7 @@ public void setKeyDeserializerSupplier(Supplier> keyDeserializer * @since 2.8 */ public void setValueDeserializerSupplier(Supplier> valueDeserializerSupplier) { - this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier); + this.valueDeserializerSupplier = valueDeserializerSupplier; } /** @@ -499,14 +465,36 @@ public void setApplicationContext(ApplicationContext applicationContext) throws this.applicationContext = applicationContext; } + @Nullable + private Deserializer keyDeserializer(Map configs) { + Deserializer deserializer = + this.keyDeserializerSupplier != null + ? this.keyDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, true); + } + return deserializer; + } + + @Nullable + private Deserializer valueDeserializer(Map configs) { + Deserializer deserializer = + this.valueDeserializerSupplier != null + ? this.valueDeserializerSupplier.get() + : null; + if (deserializer != null && this.configureDeserializers) { + deserializer.configure(configs, false); + } + return deserializer; + } + protected class ExtendedKafkaConsumer extends KafkaConsumer { private String idForListeners; protected ExtendedKafkaConsumer(Map configProps) { - super(configProps, - DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(), - DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get()); + super(configProps, keyDeserializer(configProps), valueDeserializer(configProps)); if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) { Iterator metricIterator = metrics().keySet().iterator(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java index a8463ea3c2..5cdc3cf8ab 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java @@ -503,10 +503,11 @@ public void consumerRemoved(String id, Consumer consumer) { void configDeserializer() { Deserializer key = mock(Deserializer.class); Deserializer value = mock(Deserializer.class); - Map config = new HashMap<>(); + Map config = KafkaTestUtils.consumerProps("mockGroup", "false", this.embeddedKafka); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(config, key, value); Deserializer keyDeserializer = cf.getKeyDeserializer(); assertThat(keyDeserializer).isSameAs(key); + cf.createKafkaConsumer(config); verify(key).configure(config, true); Deserializer valueDeserializer = cf.getValueDeserializer(); assertThat(valueDeserializer).isSameAs(value);