diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index 9940c9e69d..2a4d831bf8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -545,9 +545,9 @@ public ProducerFactory copyWithConfigurationOverride(Map o producerProperties.putAll(overrideProperties); producerProperties = ensureExistingTransactionIdPrefixInProperties(producerProperties); DefaultKafkaProducerFactory newFactory = new DefaultKafkaProducerFactory<>(producerProperties, - getKeySerializerSupplier(), - getValueSerializerSupplier(), - isConfigureSerializers()); + getKeySerializerSupplier(), + getValueSerializerSupplier(), + isConfigureSerializers()); newFactory.setPhysicalCloseTimeout((int) getPhysicalCloseTimeout().getSeconds()); newFactory.setProducerPerThread(isProducerPerThread()); for (ProducerPostProcessor templatePostProcessor : getPostProcessors()) { @@ -867,7 +867,7 @@ protected Producer createTransactionalProducer(String txIdPrefix) { private boolean expire(CloseSafeProducer producer) { boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge; if (expired) { - closeTransactionProducer(producer, this.physicalCloseTimeout, this.listeners); + producer.closeDelegate(this.physicalCloseTimeout, this.listeners); } return expired; } @@ -880,14 +880,10 @@ boolean cacheReturner(CloseSafeProducer producerToRemove, Duration timeout else { this.globalLock.lock(); try { - if (producerToRemove.epoch != this.epoch.get()) { - producerToRemove.closeDelegate(timeout, this.listeners); - return true; - } - BlockingQueue> txIdCache = getCache(producerToRemove.txIdPrefix); - if (txIdCache != null && !txIdCache.contains(producerToRemove) - && !txIdCache.offer(producerToRemove)) { + if (producerToRemove.epoch != this.epoch.get() + || (txIdCache != null && !txIdCache.contains(producerToRemove) + && !txIdCache.offer(producerToRemove))) { closeTransactionProducer(producerToRemove, timeout, this.listeners); return true; } @@ -900,7 +896,7 @@ boolean cacheReturner(CloseSafeProducer producerToRemove, Duration timeout } private void closeTransactionProducer(CloseSafeProducer producer, Duration timeout, - List> listeners) { + List> listeners) { try { producer.closeDelegate(timeout, listeners); } @@ -1084,12 +1080,16 @@ public Future send(ProducerRecord record) { @Override public Future send(ProducerRecord record, Callback callback) { LOGGER.trace(() -> toString() + " send(" + record + ")"); - return this.delegate.send(record, (metadata, exception) -> { - if (exception instanceof OutOfOrderSequenceException) { - CloseSafeProducer.this.producerFailed = exception; - close(CloseSafeProducer.this.closeTimeout); + return this.delegate.send(record, new Callback() { + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception instanceof OutOfOrderSequenceException) { + CloseSafeProducer.this.producerFailed = exception; + close(CloseSafeProducer.this.closeTimeout); + } + callback.onCompletion(metadata, exception); } - callback.onCompletion(metadata, exception); }); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategy.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategy.java index ec7c6c86ad..09733e1d41 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategy.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultTransactionIdSuffixStrategy.java @@ -56,7 +56,7 @@ public class DefaultTransactionIdSuffixStrategy implements TransactionIdSuffixSt */ @Override public String acquireSuffix(String txIdPrefix) { - Assert.notNull(txIdPrefix, "'txIdPrefix' must not be null"); + Assert.notNull(txIdPrefix, () -> "'txIdPrefix' must not be null"); BlockingQueue cache = getSuffixCache(txIdPrefix); if (cache == null) { return String.valueOf(this.transactionIdSuffix.getAndIncrement()); @@ -71,8 +71,8 @@ public String acquireSuffix(String txIdPrefix) { @Override public void releaseSuffix(String txIdPrefix, String suffix) { - Assert.notNull(txIdPrefix, "'txIdPrefix' must not be null"); - Assert.notNull(suffix, "'suffix' must not be null"); + Assert.notNull(txIdPrefix, () -> "'txIdPrefix' must not be null"); + Assert.notNull(suffix, () -> "'suffix' must not be null"); if (this.maxCache <= 0) { return; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index ad475b1279..906fd4a1fa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -513,7 +513,7 @@ public Producer createProducer(@Nullable String txIdPrefixArg) { } KafkaTestUtils.getPropertyValue(this, "cache", Map.class).put("foo", cache); CloseSafeProducer closeSafeProducer = new CloseSafeProducer<>(producer, - this::cacheReturner, "foo", Duration.ofSeconds(1), "factory", 0); + this::cacheReturner, "foo", "1", Duration.ofSeconds(1), "factory", 0); return closeSafeProducer; }