11/*
2- * Copyright 2016-2021 the original author or authors.
2+ * Copyright 2016-2022 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
@@ -154,6 +154,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
154154
155155 private long maxAge ;
156156
157+ private boolean configureSerializers = true ;
158+
157159 private volatile String transactionIdPrefix ;
158160
159161 private volatile String clientIdPrefix ;
@@ -183,16 +185,37 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
183185 @ Nullable Serializer <K > keySerializer ,
184186 @ Nullable Serializer <V > valueSerializer ) {
185187
186- this (configs , () -> keySerializer , () -> valueSerializer );
188+ this (configs , () -> keySerializer , () -> valueSerializer , true );
187189 }
188190
189191 /**
190- * Construct a factory with the provided configuration and {@link Serializer} Suppliers.
191- * Also configures a {@link #transactionIdPrefix} as a value from the
192- * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
193- * This config is going to be overridden with a suffix for target {@link Producer} instance.
194- * When the suppliers are invoked to get an instance, the serializers'
195- * {@code configure()} methods will be called with the configuration map.
192+ * Construct a factory with the provided configuration and {@link Serializer}s. Also
193+ * configures a {@link #transactionIdPrefix} as a value from the
194+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
195+ * be overridden with a suffix for target {@link Producer} instance. The serializers'
196+ * {@code configure()} methods will be called with the configuration map unless
197+ * {@code configureSerializers} is false..
198+ * @param configs the configuration.
199+ * @param keySerializer the key {@link Serializer}.
200+ * @param valueSerializer the value {@link Serializer}.
201+ * @param configureSerializers set to false if serializers are already fully
202+ * configured.
203+ * @since 2.8.7
204+ */
205+ public DefaultKafkaProducerFactory (Map <String , Object > configs ,
206+ @ Nullable Serializer <K > keySerializer ,
207+ @ Nullable Serializer <V > valueSerializer , boolean configureSerializers ) {
208+
209+ this (configs , () -> keySerializer , () -> valueSerializer , configureSerializers );
210+ }
211+
212+ /**
213+ * Construct a factory with the provided configuration and {@link Serializer}
214+ * Suppliers. Also configures a {@link #transactionIdPrefix} as a value from the
215+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
216+ * be overridden with a suffix for target {@link Producer} instance. When the
217+ * suppliers are invoked to get an instance, the serializers' {@code configure()}
218+ * methods will be called with the configuration map.
196219 * @param configs the configuration.
197220 * @param keySerializerSupplier the key {@link Serializer} supplier function.
198221 * @param valueSerializerSupplier the value {@link Serializer} supplier function.
@@ -202,7 +225,30 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
202225 @ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
203226 @ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
204227
228+ this (configs , keySerializerSupplier , valueSerializerSupplier , true );
229+ }
230+
231+ /**
232+ * Construct a factory with the provided configuration and {@link Serializer}
233+ * Suppliers. Also configures a {@link #transactionIdPrefix} as a value from the
234+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
235+ * be overridden with a suffix for target {@link Producer} instance. When the
236+ * suppliers are invoked to get an instance, the serializers' {@code configure()}
237+ * methods will be called with the configuration map unless
238+ * {@code configureSerializers} is false.
239+ * @param configs the configuration.
240+ * @param keySerializerSupplier the key {@link Serializer} supplier function.
241+ * @param valueSerializerSupplier the value {@link Serializer} supplier function.
242+ * @param configureSerializers set to false if serializers are already fully
243+ * configured.
244+ * @since 2.8.7
245+ */
246+ public DefaultKafkaProducerFactory (Map <String , Object > configs ,
247+ @ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
248+ @ Nullable Supplier <Serializer <V >> valueSerializerSupplier , boolean configureSerializers ) {
249+
205250 this .configs = new ConcurrentHashMap <>(configs );
251+ this .configureSerializers = configureSerializers ;
206252 this .keySerializerSupplier = keySerializerSupplier (keySerializerSupplier );
207253 this .valueSerializerSupplier = valueSerializerSupplier (valueSerializerSupplier );
208254 if (this .clientIdPrefix == null && configs .get (ProducerConfig .CLIENT_ID_CONFIG ) instanceof String ) {
@@ -217,6 +263,9 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
217263
218264 private Supplier <Serializer <K >> keySerializerSupplier (@ Nullable Supplier <Serializer <K >> keySerializerSupplier ) {
219265 this .rawKeySerializerSupplier = keySerializerSupplier ;
266+ if (!this .configureSerializers ) {
267+ return keySerializerSupplier ;
268+ }
220269 return keySerializerSupplier == null
221270 ? () -> null
222271 : () -> {
@@ -230,6 +279,9 @@ private Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Seriali
230279
231280 private Supplier <Serializer <V >> valueSerializerSupplier (@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
232281 this .rawValueSerializerSupplier = valueSerializerSupplier ;
282+ if (!this .configureSerializers ) {
283+ return valueSerializerSupplier ;
284+ }
233285 return valueSerializerSupplier == null
234286 ? () -> null
235287 : () -> {
@@ -252,37 +304,79 @@ public void setBeanName(String name) {
252304 }
253305
254306 /**
255- * Set a key serializer.
307+ * Set a key serializer. The serializer will be configured using the producer
308+ * configuration, unless {@link #setConfigureSerializers(boolean)
309+ * configureSerializers} is false.
256310 * @param keySerializer the key serializer.
311+ * @see #setConfigureSerializers(boolean)
257312 */
258313 public void setKeySerializer (@ Nullable Serializer <K > keySerializer ) {
259314 this .keySerializerSupplier = keySerializerSupplier (() -> keySerializer );
260315 }
261316
262317 /**
263- * Set a value serializer.
318+ * Set a value serializer. The serializer will be configured using the producer
319+ * configuration, unless {@link #setConfigureSerializers(boolean)
320+ * configureSerializers} is false.
264321 * @param valueSerializer the value serializer.
322+ * @see #setConfigureSerializers(boolean)
265323 */
266324 public void setValueSerializer (@ Nullable Serializer <V > valueSerializer ) {
267325 this .valueSerializerSupplier = valueSerializerSupplier (() -> valueSerializer );
268326 }
269327
270328 /**
271- * Set a supplier to supply instances of the key serializer.
329+ * Set a supplier to supply instances of the key serializer. The serializer will be
330+ * configured using the producer configuration, unless
331+ * {@link #setConfigureSerializers(boolean) configureSerializers} is false.
272332 * @param keySerializerSupplier the supplier.
273333 * @since 2.8
334+ * @see #setConfigureSerializers(boolean)
274335 */
275336 public void setKeySerializerSupplier (Supplier <Serializer <K >> keySerializerSupplier ) {
276- this .keySerializerSupplier = keySerializerSupplier ;
337+ this .keySerializerSupplier = keySerializerSupplier ( keySerializerSupplier ) ;
277338 }
278339
279340 /**
280341 * Set a supplier to supply instances of the value serializer.
281- * @param valueSerializerSupplier the supplier.
342+ * @param valueSerializerSupplier the supplier. The serializer will be configured
343+ * using the producer configuration, unless {@link #setConfigureSerializers(boolean)
344+ * configureSerializers} is false.
282345 * @since 2.8
346+ * @see #setConfigureSerializers(boolean)
283347 */
284348 public void setValueSerializerSupplier (Supplier <Serializer <V >> valueSerializerSupplier ) {
285- this .valueSerializerSupplier = valueSerializerSupplier ;
349+ this .valueSerializerSupplier = valueSerializerSupplier (valueSerializerSupplier );
350+ }
351+
352+ /**
353+ * If true (default), programmatically provided serializers (via constructor or
354+ * setters) will be configured using the producer configuration. Set to false if the
355+ * serializers are already fully configured.
356+ * @return true to configure.
357+ * @since 2.8.7
358+ * @see #setKeySerializer(Serializer)
359+ * @see #setKeySerializerSupplier(Supplier)
360+ * @see #setValueSerializer(Serializer)
361+ * @see #setValueSerializerSupplier(Supplier)
362+ */
363+ public boolean isConfigureSerializers () {
364+ return this .configureSerializers ;
365+ }
366+
367+ /**
368+ * Set to false (default true) to prevent programmatically provided serializers (via
369+ * constructor or setters) from being configured using the producer configuration,
370+ * e.g. if the serializers are already fully configured.
371+ * @param configureSerializers false to not configure.
372+ * @since 2.8.7
373+ * @see #setKeySerializer(Serializer)
374+ * @see #setKeySerializerSupplier(Supplier)
375+ * @see #setValueSerializer(Serializer)
376+ * @see #setValueSerializerSupplier(Supplier)
377+ */
378+ public void setConfigureSerializers (boolean configureSerializers ) {
379+ this .configureSerializers = configureSerializers ;
286380 }
287381
288382 /**
@@ -441,10 +535,10 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
441535 Map <String , Object > producerProperties = new HashMap <>(getConfigurationProperties ());
442536 producerProperties .putAll (overrideProperties );
443537 producerProperties = ensureExistingTransactionIdPrefixInProperties (producerProperties );
444- DefaultKafkaProducerFactory <K , V > newFactory =
445- new DefaultKafkaProducerFactory <>(producerProperties ,
538+ DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(producerProperties ,
446539 getKeySerializerSupplier (),
447- getValueSerializerSupplier ());
540+ getValueSerializerSupplier (),
541+ isConfigureSerializers ());
448542 newFactory .setPhysicalCloseTimeout ((int ) getPhysicalCloseTimeout ().getSeconds ());
449543 newFactory .setProducerPerConsumerPartition (isProducerPerConsumerPartition ());
450544 newFactory .setProducerPerThread (isProducerPerThread ());
0 commit comments