@@ -165,6 +165,7 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
165165
166166 /**
167167 * Construct a factory with the provided configuration.
168+ *
168169 * @param configs the configuration.
169170 */
170171 public DefaultKafkaProducerFactory (Map <String , Object > configs ) {
@@ -178,11 +179,13 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
178179 * This config is going to be overridden with a suffix for target {@link Producer} instance.
179180 * The serializers' {@code configure()} methods will be called with the
180181 * configuration map.
182+ *
181183 * @param configs the configuration.
182184 * @param keySerializer the key {@link Serializer}.
183185 * @param valueSerializer the value {@link Serializer}.
184186 */
185- public DefaultKafkaProducerFactory (Map <String , Object > configs ,
187+ public DefaultKafkaProducerFactory (
188+ Map <String , Object > configs ,
186189 @ Nullable Serializer <K > keySerializer ,
187190 @ Nullable Serializer <V > valueSerializer ) {
188191
@@ -196,14 +199,17 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
196199 * be overridden with a suffix for target {@link Producer} instance. The serializers'
197200 * {@code configure()} methods will be called with the configuration map unless
198201 * {@code configureSerializers} is false..
202+ *
199203 * @param configs the configuration.
200204 * @param keySerializer the key {@link Serializer}.
201205 * @param valueSerializer the value {@link Serializer}.
202206 * @param configureSerializers set to false if serializers are already fully
203207 * configured.
208+ *
204209 * @since 2.8.7
205210 */
206- public DefaultKafkaProducerFactory (Map <String , Object > configs ,
211+ public DefaultKafkaProducerFactory (
212+ Map <String , Object > configs ,
207213 @ Nullable Serializer <K > keySerializer ,
208214 @ Nullable Serializer <V > valueSerializer , boolean configureSerializers ) {
209215
@@ -217,12 +223,15 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
217223 * be overridden with a suffix for target {@link Producer} instance. When the
218224 * suppliers are invoked to get an instance, the serializers' {@code configure()}
219225 * methods will be called with the configuration map.
226+ *
220227 * @param configs the configuration.
221228 * @param keySerializerSupplier the key {@link Serializer} supplier function.
222229 * @param valueSerializerSupplier the value {@link Serializer} supplier function.
230+ *
223231 * @since 2.3
224232 */
225- public DefaultKafkaProducerFactory (Map <String , Object > configs ,
233+ public DefaultKafkaProducerFactory (
234+ Map <String , Object > configs ,
226235 @ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
227236 @ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
228237
@@ -244,7 +253,8 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
244253 * configured.
245254 * @since 2.8.7
246255 */
247- public DefaultKafkaProducerFactory (Map <String , Object > configs ,
256+ public DefaultKafkaProducerFactory (
257+ Map <String , Object > configs ,
248258 @ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
249259 @ Nullable Supplier <Serializer <V >> valueSerializerSupplier , boolean configureSerializers ) {
250260
@@ -308,7 +318,9 @@ public void setBeanName(String name) {
308318 * Set a key serializer. The serializer will be configured using the producer
309319 * configuration, unless {@link #setConfigureSerializers(boolean)
310320 * configureSerializers} is false.
321+ *
311322 * @param keySerializer the key serializer.
323+ *
312324 * @see #setConfigureSerializers(boolean)
313325 */
314326 public void setKeySerializer (@ Nullable Serializer <K > keySerializer ) {
@@ -319,7 +331,9 @@ public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
319331 * Set a value serializer. The serializer will be configured using the producer
320332 * configuration, unless {@link #setConfigureSerializers(boolean)
321333 * configureSerializers} is false.
334+ *
322335 * @param valueSerializer the value serializer.
336+ *
323337 * @see #setConfigureSerializers(boolean)
324338 */
325339 public void setValueSerializer (@ Nullable Serializer <V > valueSerializer ) {
@@ -330,6 +344,7 @@ public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
330344 * Set a supplier to supply instances of the key serializer. The serializer will be
331345 * configured using the producer configuration, unless
332346 * {@link #setConfigureSerializers(boolean) configureSerializers} is false.
347+ *
333348 * @param keySerializerSupplier the supplier.
334349 * @since 2.8
335350 * @see #setConfigureSerializers(boolean)
@@ -340,6 +355,7 @@ public void setKeySerializerSupplier(Supplier<Serializer<K>> keySerializerSuppli
340355
341356 /**
342357 * Set a supplier to supply instances of the value serializer.
358+ *
343359 * @param valueSerializerSupplier the supplier. The serializer will be configured
344360 * using the producer configuration, unless {@link #setConfigureSerializers(boolean)
345361 * configureSerializers} is false.
@@ -354,12 +370,14 @@ public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSu
354370 * If true (default), programmatically provided serializers (via constructor or
355371 * setters) will be configured using the producer configuration. Set to false if the
356372 * serializers are already fully configured.
373+ *
357374 * @return true to configure.
358375 * @since 2.8.7
359376 * @see #setKeySerializer(Serializer)
360377 * @see #setKeySerializerSupplier(Supplier)
361378 * @see #setValueSerializer(Serializer)
362379 * @see #setValueSerializerSupplier(Supplier)
380+
363381 */
364382 public boolean isConfigureSerializers () {
365383 return this .configureSerializers ;
@@ -369,6 +387,7 @@ public boolean isConfigureSerializers() {
369387 * Set to false (default true) to prevent programmatically provided serializers (via
370388 * constructor or setters) from being configured using the producer configuration,
371389 * e.g. if the serializers are already fully configured.
390+ *
372391 * @param configureSerializers false to not configure.
373392 * @since 2.8.7
374393 * @see #setKeySerializer(Serializer)
@@ -385,6 +404,7 @@ public void setConfigureSerializers(boolean configureSerializers) {
385404 * closing the producer itself (when {@link #reset()}, {@link #destroy()
386405 * #closeProducerFor(String)}, or {@link #closeThreadBoundProducer()} are invoked).
387406 * Specified in seconds; default {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
407+
388408 * @param physicalCloseTimeout the timeout in seconds.
389409 * @since 1.0.7
390410 */
@@ -425,6 +445,7 @@ public final void setTransactionIdPrefix(String transactionIdPrefix) {
425445 * all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
426446 * physically close the producer when it is no longer needed. These producers will not
427447 * be closed by {@link #destroy()} or {@link #reset()}.
448+ *
428449 * @param producerPerThread true for a producer per thread.
429450 * @since 2.3
430451 * @see #closeThreadBoundProducer()
@@ -526,19 +547,23 @@ public int getPhase() {
526547 * you want to change the ID config, add a new
527548 * {@link org.apache.kafka.clients.producer.ProducerConfig#TRANSACTIONAL_ID_CONFIG}
528549 * key to the override config.</p>
550+ *
529551 * @param overrideProperties the properties to be applied to the new factory
552+ *
530553 * @return {@link org.springframework.kafka.core.DefaultKafkaProducerFactory} with
531- * properties applied
554+ * properties applied
532555 */
533556 @ Override
534557 public ProducerFactory <K , V > copyWithConfigurationOverride (Map <String , Object > overrideProperties ) {
535558 Map <String , Object > producerProperties = new HashMap <>(getConfigurationProperties ());
536559 producerProperties .putAll (overrideProperties );
537560 producerProperties = ensureExistingTransactionIdPrefixInProperties (producerProperties );
538- DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(producerProperties ,
539- getKeySerializerSupplier (),
540- getValueSerializerSupplier (),
541- isConfigureSerializers ());
561+ DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(
562+ producerProperties ,
563+ getKeySerializerSupplier (),
564+ getValueSerializerSupplier (),
565+ isConfigureSerializers ()
566+ );
542567 newFactory .setPhysicalCloseTimeout ((int ) getPhysicalCloseTimeout ().getSeconds ());
543568 newFactory .setProducerPerThread (isProducerPerThread ());
544569 for (ProducerPostProcessor <K , V > templatePostProcessor : getPostProcessors ()) {
@@ -559,7 +584,9 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
559584 * new factory, the transactionId has to be reinserted prior use.
560585 * The incoming properties are checked for a transactionId key. If none is
561586 * there, the one existing in the factory is added.
587+ *
562588 * @param producerProperties the properties to be used for the new factory
589+ *
563590 * @return the producerProperties or a copy with the transaction ID set
564591 */
565592 private Map <String , Object > ensureExistingTransactionIdPrefixInProperties (Map <String , Object > producerProperties ) {
@@ -576,7 +603,9 @@ private Map<String, Object> ensureExistingTransactionIdPrefixInProperties(Map<St
576603
577604 /**
578605 * Add a listener.
606+ *
579607 * @param listener the listener.
608+ *
580609 * @since 2.5
581610 */
582611 @ Override
@@ -587,8 +616,10 @@ public void addListener(Listener<K, V> listener) {
587616
588617 /**
589618 * Add a listener at a specific index.
619+ *
590620 * @param index the index (list position).
591621 * @param listener the listener.
622+ *
592623 * @since 2.5
593624 */
594625 @ Override
@@ -604,8 +635,11 @@ public void addListener(int index, Listener<K, V> listener) {
604635
605636 /**
606637 * Remove a listener.
638+ *
607639 * @param listener the listener.
640+ *
608641 * @return true if removed.
642+ *
609643 * @since 2.5
610644 */
611645 @ Override
@@ -633,7 +667,7 @@ public void updateConfigs(Map<String, Object> updates) {
633667 Assert .isTrue (this .transactionIdPrefix != null
634668 ? entry .getValue () != null
635669 : entry .getValue () == null ,
636- "Cannot change transactional capability" );
670+ "Cannot change transactional capability" );
637671 this .transactionIdPrefix = (String ) entry .getValue ();
638672 }
639673 else if (entry .getKey ().equals (ProducerConfig .CLIENT_ID_CONFIG )) {
@@ -723,6 +757,7 @@ public void onApplicationEvent(ContextStoppedEvent event) {
723757 /**
724758 * Close the {@link Producer}(s) and clear the cache of transactional
725759 * {@link Producer}(s).
760+ *
726761 * @since 2.2
727762 */
728763 @ Override
@@ -769,7 +804,8 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
769804 }
770805 if (this .producer == null ) {
771806 this .producer = new CloseSafeProducer <>(createKafkaProducer (), this ::removeProducer ,
772- this .physicalCloseTimeout , this .beanName , this .epoch .get ());
807+ this .physicalCloseTimeout , this .beanName , this .epoch .get ()
808+ );
773809 this .listeners .forEach (listener -> listener .producerAdded (this .producer .clientId , this .producer ));
774810 }
775811 return this .producer ;
@@ -787,7 +823,8 @@ private Producer<K, V> getOrCreateThreadBoundProducer() {
787823 }
788824 if (tlProducer == null ) {
789825 tlProducer = new CloseSafeProducer <>(createKafkaProducer (), this ::removeProducer ,
790- this .physicalCloseTimeout , this .beanName , this .epoch .get ());
826+ this .physicalCloseTimeout , this .beanName , this .epoch .get ()
827+ );
791828 for (Listener <K , V > listener : this .listeners ) {
792829 listener .producerAdded (tlProducer .clientId , tlProducer );
793830 }
@@ -798,6 +835,7 @@ private Producer<K, V> getOrCreateThreadBoundProducer() {
798835
799836 /**
800837 * Subclasses must return a raw producer which will be wrapped in a {@link CloseSafeProducer}.
838+ *
801839 * @return the producer.
802840 */
803841 protected Producer <K , V > createKafkaProducer () {
@@ -806,9 +844,12 @@ protected Producer<K, V> createKafkaProducer() {
806844
807845 /**
808846 * Remove the single shared producer and a thread-bound instance if present.
847+ *
809848 * @param producerToRemove the producer.
810849 * @param timeout the close timeout.
850+ *
811851 * @return true if the producer was closed.
852+ *
812853 * @since 2.2.13
813854 */
814855 protected final boolean removeProducer (CloseSafeProducer <K , V > producerToRemove , Duration timeout ) {
@@ -818,7 +859,9 @@ protected final boolean removeProducer(CloseSafeProducer<K, V> producerToRemove,
818859 /**
819860 * Subclasses must return a producer from the {@link #getCache()} or a
820861 * new raw producer wrapped in a {@link CloseSafeProducer}.
862+ *
821863 * @return the producer - cannot be null.
864+ *
822865 * @since 1.3
823866 */
824867 protected Producer <K , V > createTransactionalProducer () {
@@ -838,7 +881,11 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
838881 }
839882 }
840883 if (cachedProducer == null ) {
841- return doCreateTxProducer (txIdPrefix , "" + this .transactionIdSuffix .getAndIncrement (), this ::cacheReturner );
884+ return doCreateTxProducer (
885+ txIdPrefix ,
886+ "" + this .transactionIdSuffix .getAndIncrement (),
887+ this ::cacheReturner
888+ );
842889 }
843890 else {
844891 return cachedProducer ;
@@ -876,7 +923,8 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
876923 }
877924 }
878925
879- private CloseSafeProducer <K , V > doCreateTxProducer (String prefix , String suffix ,
926+ private CloseSafeProducer <K , V > doCreateTxProducer (
927+ String prefix , String suffix ,
880928 BiPredicate <CloseSafeProducer <K , V >, Duration > remover ) {
881929 Producer <K , V > newProducer = createRawProducer (getTxProducerConfigs (prefix + suffix ));
882930 try {
@@ -895,7 +943,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
895943 }
896944 CloseSafeProducer <K , V > closeSafeProducer =
897945 new CloseSafeProducer <>(newProducer , remover , prefix , this .physicalCloseTimeout , this .beanName ,
898- this .epoch .get ());
946+ this .epoch .get ()
947+ );
899948 this .listeners .forEach (listener -> listener .producerAdded (closeSafeProducer .clientId , closeSafeProducer ));
900949 return closeSafeProducer ;
901950 }
@@ -939,6 +988,7 @@ public void closeThreadBoundProducer() {
939988
940989 /**
941990 * Return the configuration of a producer.
991+ *
942992 * @return the configuration of a producer.
943993 * @since 2.8.3
944994 * @see #createKafkaProducer()
@@ -947,15 +997,19 @@ protected Map<String, Object> getProducerConfigs() {
947997 final Map <String , Object > newProducerConfigs = new HashMap <>(this .configs );
948998 checkBootstrap (newProducerConfigs );
949999 if (this .clientIdPrefix != null ) {
950- newProducerConfigs .put (ProducerConfig .CLIENT_ID_CONFIG ,
951- this .clientIdPrefix + "-" + this .clientIdCounter .incrementAndGet ());
1000+ newProducerConfigs .put (
1001+ ProducerConfig .CLIENT_ID_CONFIG ,
1002+ this .clientIdPrefix + "-" + this .clientIdCounter .incrementAndGet ()
1003+ );
9521004 }
9531005 return newProducerConfigs ;
9541006 }
9551007
9561008 /**
9571009 * Return the configuration of a transactional producer.
1010+ *
9581011 * @param transactionId the transactionId.
1012+ *
9591013 * @return the configuration of a transactional producer.
9601014 * @since 2.8.3
9611015 * @see #doCreateTxProducer(String, String, BiPredicate)
@@ -971,7 +1025,6 @@ protected Map<String, Object> getTxProducerConfigs(String transactionId) {
9711025 *
9721026 * @param <K> the key type.
9731027 * @param <V> the value type.
974- *
9751028 */
9761029 protected static class CloseSafeProducer <K , V > implements Producer <K , V > {
9771030
@@ -995,14 +1048,16 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
9951048
9961049 volatile boolean closed ; // NOSONAR
9971050
998- CloseSafeProducer (Producer <K , V > delegate ,
1051+ CloseSafeProducer (
1052+ Producer <K , V > delegate ,
9991053 BiPredicate <CloseSafeProducer <K , V >, Duration > removeConsumerProducer , Duration closeTimeout ,
10001054 String factoryName , int epoch ) {
10011055
10021056 this (delegate , removeConsumerProducer , null , closeTimeout , factoryName , epoch );
10031057 }
10041058
1005- CloseSafeProducer (Producer <K , V > delegate ,
1059+ CloseSafeProducer (
1060+ Producer <K , V > delegate ,
10061061 BiPredicate <CloseSafeProducer <K , V >, Duration > removeProducer , @ Nullable String txIdPrefix ,
10071062 Duration closeTimeout , String factoryName , int epoch ) {
10081063
@@ -1097,7 +1152,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
10971152 }
10981153
10991154 @ Override
1100- public void sendOffsetsToTransaction (Map <TopicPartition , OffsetAndMetadata > offsets ,
1155+ public void sendOffsetsToTransaction (
1156+ Map <TopicPartition , OffsetAndMetadata > offsets ,
11011157 ConsumerGroupMetadata groupMetadata ) throws ProducerFencedException {
11021158
11031159 LOGGER .trace (() -> toString () + " sendOffsetsToTransaction(" + offsets + ", " + groupMetadata + ")" );
0 commit comments