From 35ca84254192a1b8220a78abaa79a9b4343790fa Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 7 Mar 2022 17:23:55 -0500 Subject: [PATCH] GH-2132: Deprecate ListenerUtils.recordToString Resolves https://github.com/spring-projects/spring-kafka/pull/2133 Apply to 2.8.x, and 2.7.x. In 3.0 they will be removed and `KafkaUtils` called directly instead. --- .../src/main/asciidoc/kafka.adoc | 9 +++ .../listener/CommonLoggingErrorHandler.java | 4 +- .../kafka/listener/ConsumerProperties.java | 9 ++- .../DeadLetterPublishingRecoverer.java | 7 +- .../kafka/listener/ErrorHandlingUtils.java | 3 +- .../kafka/listener/FailedRecordProcessor.java | 4 +- .../kafka/listener/FailedRecordTracker.java | 1 + .../KafkaMessageListenerContainer.java | 13 +++- .../kafka/listener/ListenerUtils.java | 16 +++-- .../kafka/listener/SeekUtils.java | 3 +- .../AggregatingReplyingKafkaTemplate.java | 2 +- .../requestreply/ReplyingKafkaTemplate.java | 6 +- .../retrytopic/RetryTopicConfigurer.java | 1 + .../kafka/support/KafkaUtils.java | 68 +++++++------------ ...erTemplateTransactionIntegrationTests.java | 4 +- .../DefaultErrorHandlerRecordTests.java | 3 +- .../FallbackBatchErrorHandlerTests.java | 3 + 17 files changed, 91 insertions(+), 65 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index b6458bc6a3..4ac92aeb00 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -2560,6 +2560,8 @@ See `monitorInterval`. |[[onlyLogRecordMetadata]]<> |`false` |Set to false to log the complete consumer record (in error, debug logs etc) instead of just `topic-partition@offset`. +Deprecated. +Replaced by `KafkaUtils.setConsumerRecordFormatter`. |[[pollTimeout]]<> |5000 @@ -5720,3 +5722,10 @@ public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException { } ---- ==== + +[[record-logging]] +==== Producer and Consumer Record Logging + +Starting with versions 2.7.12, 2.8.4, you can determine how these records will be rendered in debug logs, etc. + +See `KafkaUtils.setProducerRecordFormatter()` and `KafkaUtils.setProducerRecordFormatter()` for more information. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java index 0eb059e928..3b9f138fd4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,6 +46,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) { this.ackAfterHandle = ackAfterHandle; } + @SuppressWarnings("deprecation") @Override public void handleRecord(Exception thrownException, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { @@ -53,6 +54,7 @@ public void handleRecord(Exception thrownException, ConsumerRecord record, LOGGER.error(thrownException, () -> "Error occured while processing: " + ListenerUtils.recordToString(record)); } + @SuppressWarnings("deprecation") @Override public void handleBatch(Exception thrownException, ConsumerRecords data, Consumer consumer, MessageListenerContainer container, Runnable invokeListener) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java index 6591f9dca8..55a34cb039 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.LogIfLevelEnabled; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.lang.Nullable; @@ -431,16 +432,20 @@ public void setCommitRetries(int commitRetries) { this.commitRetries = commitRetries; } + @Deprecated public boolean isOnlyLogRecordMetadata() { return this.onlyLogRecordMetadata; } /** - * Set to false to log {@code record.toString()} in log messages instead - * of {@code topic-partition@offset}. + * Set to false to log {@code record.toString()} in log messages instead of + * {@code topic-partition@offset}. * @param onlyLogRecordMetadata false to log the entire record. * @since 2.2.14 + * @deprecated in favor of + * {@link KafkaUtils#setConsumerRecordFormatter(java.util.function.Function)}. */ + @Deprecated public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) { this.onlyLogRecordMetadata = onlyLogRecordMetadata; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java index 3a2965f2a1..f80cda4251 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -339,7 +339,7 @@ public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions; } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Override public void accept(ConsumerRecord record, @Nullable Consumer consumer, Exception exception) { TopicPartition tp = this.destinationResolver.apply(record, exception); @@ -406,6 +406,7 @@ private void sendOrThrow(ProducerRecord outRecord, } private void maybeThrow(ConsumerRecord record, Exception exception) { + @SuppressWarnings("deprecation") String message = String.format("No destination returned for record %s and exception %s. " + "failIfNoDestinationReturned: %s", ListenerUtils.recordToString(record), exception, this.throwIfNoDestinationReturned); @@ -518,6 +519,7 @@ protected ProducerRecord createProducerRecord(ConsumerRecord outRecord, KafkaOperations kafkaTemplate, ConsumerRecord inRecord) { @@ -559,6 +561,7 @@ private void verifySendResult(KafkaOperations kafkaTemplate, } } + @SuppressWarnings("deprecation") private String pubFailMessage(ProducerRecord outRecord, ConsumerRecord inRecord) { return "Dead-letter publication to " + outRecord.topic() + "failed for: " + ListenerUtils.recordToString(inRecord, true); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 52cc61ba12..d9d160da7b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -105,6 +105,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r * @param records the records. * @return the String. */ + @SuppressWarnings("deprecation") public static String recordsToString(ConsumerRecords records) { StringBuffer sb = new StringBuffer(); records.spliterator().forEachRemaining(rec -> sb diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index d66face873..c7c05acc1b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -126,6 +126,7 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) { * @return the {@link BiPredicate}. * @deprecated in favor of {@link #getRecoveryStrategy(List, Exception)}. */ + @SuppressWarnings("deprecation") @Deprecated protected BiPredicate, Exception> getSkipPredicate(List> records, Exception thrownException) { @@ -169,6 +170,7 @@ protected RecoveryStrategy getRecoveryStrategy(List> record * @return the {@link RecoveryStrategy}. * @since 2.8.4 */ + @SuppressWarnings("deprecation") protected RecoveryStrategy getRecoveryStrategy(List> records, @Nullable Consumer recoveryConsumer, Exception thrownException) { if (getClassifier().classify(thrownException)) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index 0aaa60c2d5..bc7d99403b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -61,6 +61,7 @@ class FailedRecordTracker implements RecoveryStrategy { private boolean resetStateOnExceptionChange; + @SuppressWarnings("deprecation") FailedRecordTracker(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, LogAccessor logger) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 5281a21d60..e8daa13193 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1225,6 +1225,7 @@ public boolean isLongLived() { return true; } + @SuppressWarnings("deprecation") @Override // NOSONAR complexity public void run() { ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata()); @@ -1797,6 +1798,7 @@ record = this.acks.poll(); } } + @SuppressWarnings("deprecation") private void traceAck(ConsumerRecord record) { this.logger.trace(() -> "Ack: " + ListenerUtils.recordToString(record, true)); } @@ -1871,6 +1873,7 @@ private void processAcks(ConsumerRecords records) { } } + @SuppressWarnings("deprecation") private synchronized void ackInOrder(ConsumerRecord record) { TopicPartition part = new TopicPartition(record.topic(), record.partition()); List offs = this.offsetsInThisBatch.get(part); @@ -2289,7 +2292,7 @@ private void invokeRecordListener(final ConsumerRecords records) { * Invoke the listener with each record in a separate transaction. * @param records the records. */ - @SuppressWarnings(RAW_TYPES) // NOSONAR complexity + @SuppressWarnings("deprecation") // NOSONAR complexity private void invokeRecordListenerInTx(final ConsumerRecords records) { Iterator> iterator = records.iterator(); while (iterator.hasNext()) { @@ -2391,6 +2394,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { } } + @SuppressWarnings("deprecation") private void doInvokeWithRecords(final ConsumerRecords records) { Iterator> iterator = records.iterator(); while (iterator.hasNext()) { @@ -2426,6 +2430,7 @@ private ConsumerRecords checkEarlyIntercept(ConsumerRecords nextArg) return next; } + @SuppressWarnings("deprecation") @Nullable private ConsumerRecord checkEarlyIntercept(ConsumerRecord recordArg) { internalHeaders(recordArg); @@ -2574,14 +2579,15 @@ private void invokeOnMessage(final ConsumerRecord record) { } } + @SuppressWarnings("deprecation") private void doInvokeOnMessage(final ConsumerRecord recordArg) { ConsumerRecord record = recordArg; if (this.recordInterceptor != null) { record = this.recordInterceptor.intercept(record, this.consumer); } if (record == null) { - this.logger.debug(() -> "RecordInterceptor returned null, skipping: " - + ListenerUtils.recordToString(recordArg)); + this.logger.debug(() -> ("RecordInterceptor returned null, skipping: " + + ListenerUtils.recordToString(recordArg))); } else { try { @@ -3139,6 +3145,7 @@ public void nack(long sleep) { } @Override + @SuppressWarnings("deprecation") public String toString() { return "Acknowledgment for " + ListenerUtils.recordToString(this.record, true); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index d14f6fc34c..7624d3e774 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,8 @@ public final class ListenerUtils { private ListenerUtils() { } + private static final ThreadLocal LOG_METADATA_ONLY = new ThreadLocal<>(); + private static final int DEFAULT_SLEEP_INTERVAL = 100; private static final int SMALL_SLEEP_INTERVAL = 10; @@ -150,10 +152,12 @@ protected Class resolveClass(ObjectStreamClass desc) throws IOException, Clas * Set to true to only log record metadata. * @param onlyMeta true to only log record metadata. * @since 2.2.14 + * @deprecated in favor of {@link KafkaUtils#format(ConsumerRecord)}. * @see #recordToString(ConsumerRecord) */ + @Deprecated public static void setLogOnlyMetadata(boolean onlyMeta) { - KafkaUtils.setLogOnlyMetadata(onlyMeta); + LOG_METADATA_ONLY.set(onlyMeta); } /** @@ -162,10 +166,12 @@ public static void setLogOnlyMetadata(boolean onlyMeta) { * @param record the record. * @return the rendered record. * @since 2.2.14 + * @deprecated in favor of {@link KafkaUtils#format(ConsumerRecord)}. * @see #setLogOnlyMetadata(boolean) */ + @Deprecated public static String recordToString(ConsumerRecord record) { - return KafkaUtils.recordToString(record); + return recordToString(record, Boolean.TRUE.equals(LOG_METADATA_ONLY.get())); } /** @@ -175,9 +181,11 @@ public static String recordToString(ConsumerRecord record) { * @param meta true to log just the metadata. * @return the rendered record. * @since 2.5.4 + * @deprecated in favor of {@link KafkaUtils#format(ConsumerRecord)}. */ + @Deprecated public static String recordToString(ConsumerRecord record, boolean meta) { - return KafkaUtils.recordToString(record, meta); + return KafkaUtils.format(record, !meta); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index aa7f7aa9d6..9da5d5ac5e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -92,6 +92,7 @@ public static boolean doSeeks(List> records, Consumer * @param logger a {@link LogAccessor} for seek errors. * @return true if the failed record was skipped. */ + @SuppressWarnings("deprecation") public static boolean doSeeks(List> records, Consumer consumer, Exception exception, boolean recoverable, RecoveryStrategy recovery, @Nullable MessageListenerContainer container, LogAccessor logger) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java index bf745fbcf4..61182c4ee2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java @@ -127,7 +127,7 @@ public void onMessage(List>>> data.forEach(record -> { Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID); if (correlation == null) { - this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record) + this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record) + " - to use request/reply semantics, the responding server must return the correlation id " + " in the '" + KafkaHeaders.CORRELATION_ID + "' header"); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java index 04f26cb9e8..db43e5a664 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java @@ -457,7 +457,7 @@ public void onMessage(List> data) { correlationId = new CorrelationKey(correlationHeader.value()); } if (correlationId == null) { - this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.recordToString(record) + this.logger.error(() -> "No correlationId found in reply: " + KafkaUtils.format(record) + " - to use request/reply semantics, the responding server must return the correlation id " + " in the '" + this.correlationHeaderName + "' header"); } @@ -475,7 +475,7 @@ public void onMessage(List> data) { future.setException(exception); } if (ok) { - this.logger.debug(() -> "Received: " + KafkaUtils.recordToString(record) + this.logger.debug(() -> "Received: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + correlationKey); future.set(record); } @@ -543,7 +543,7 @@ protected void logLateArrival(ConsumerRecord record, CorrelationKey correl } private String missingCorrelationLogMessage(ConsumerRecord record, CorrelationKey correlationId) { - return "No pending reply: " + KafkaUtils.recordToString(record) + WITH_CORRELATION_ID + return "No pending reply: " + KafkaUtils.format(record) + WITH_CORRELATION_ID + correlationId + ", perhaps timed out, or using a shared reply topic"; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java index 876502e534..9a0b772401 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java @@ -438,6 +438,7 @@ static class LoggingDltListenerHandlerMethod { public static final String DEFAULT_DLT_METHOD_NAME = "logMessage"; + @SuppressWarnings("deprecation") public void logMessage(Object message) { if (message instanceof ConsumerRecord) { LOGGER.info(() -> "Received message in dlt listener: " diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java index e7454ae1b3..610802e0f5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java @@ -42,8 +42,6 @@ */ public final class KafkaUtils { - private static final ThreadLocal LOG_METADATA_ONLY = new ThreadLocal<>(); - private static Function, String> prFormatter = rec -> rec.toString(); private static Function, String> crFormatter = @@ -146,48 +144,10 @@ else if (dt instanceof String) { } /** - * Set to true to only log record metadata. - * @param onlyMeta true to only log record metadata. - * @since 2.7.12 - * @see #recordToString(ConsumerRecord) - */ - public static void setLogOnlyMetadata(boolean onlyMeta) { - LOG_METADATA_ONLY.set(onlyMeta); - } - - /** - * Return the {@link ConsumerRecord} as a String; either {@code toString()} or - * {@code topic-partition@offset}. - * @param record the record. - * @return the rendered record. - * @since 2.7.12 - * @see #setLogOnlyMetadata(boolean) - */ - public static String recordToString(ConsumerRecord record) { - return recordToString(record, Boolean.TRUE.equals(LOG_METADATA_ONLY.get())); - } - - /** - * Return the {@link ConsumerRecord} as a String; either {@code toString()} or + * Set a formatter for logging {@link ConsumerRecord}s; default is * {@code topic-partition@offset}. - * @param record the record. - * @param meta true to log just the metadata. - * @return the rendered record. - * @since 2.7.12 - */ - public static String recordToString(ConsumerRecord record, boolean meta) { - if (meta) { - return crFormatter.apply(record); - } - else { - return record.toString(); - } - } - - /** - * Set a formatter for logging {@link ConsumerRecord}s. * @param formatter a function to format the record as a String - * @since 2.7.11 + * @since 2.7.12 */ public static void setConsumerRecordFormatter(Function, String> formatter) { Assert.notNull(formatter, "'formatter' cannot be null"); @@ -195,9 +155,10 @@ public static void setConsumerRecordFormatter(Function, Str } /** - * Set a formatter for logging {@link ProducerRecord}s. + * Set a formatter for logging {@link ProducerRecord}s; default is + * {@link ProducerRecord#toString()}. * @param formatter a function to format the record as a String - * @since 2.7.11 + * @since 2.7.12 */ public static void setProducerRecordFormatter(Function, String> formatter) { Assert.notNull(formatter, "'formatter' cannot be null"); @@ -209,16 +170,35 @@ public static void setProducerRecordFormatter(Function, Str * {@code topic-partition@offset}. * @param record the record to format. * @return the formatted String. + * @since 2.7.12 */ public static String format(ConsumerRecord record) { return crFormatter.apply(record); } + /** + * Format the {@link ConsumerRecord} for logging; default + * {@code topic-partition@offset}. Provided for backwards compatibility only. + * @param record the record to format. + * @param full use {@link ConsumerRecord#toString()}. + * @return the formatted String. + * @since 2.7.12 + * @deprecated in favor of {@link #format(ConsumerRecord)}. + */ + @Deprecated + public static String format(ConsumerRecord record, boolean full) { + if (full) { + return record.toString(); + } + return crFormatter.apply(record); + } + /** * Format the {@link ProducerRecord} for logging; default * {@link ProducerRecord}{@link #toString()}. * @param record the record to format. * @return the formatted String. + * @since 2.7.12 */ public static String format(ProducerRecord record) { return prFormatter.apply(record); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java index fad4db56cb..1e2b4a9491 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplateTransactionIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -281,6 +281,7 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv .verify(DEFAULT_VERIFY_TIMEOUT); } + @SuppressWarnings("deprecation") @LogLevels(categories = "reactor.kafka.receiver.internals.ConsumerEventLoop", level = "TRACE", classes = { JUnitUtils.class, LogLevelsCondition.class, DefaultKafkaSender.class, @@ -343,6 +344,7 @@ private Publisher> doThrowKafka throw new KafkaException(); } + @SuppressWarnings("deprecation") private SenderRecord toSenderRecord(ConsumerRecord record) { logger.info(ListenerUtils.recordToString(record, true)); return SenderRecord.create(REACTIVE_INT_KEY_TOPIC, record.partition(), null, record.key(), diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerRecordTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerRecordTests.java index 060d202b96..5699bf7097 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerRecordTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerRecordTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ */ public class DefaultErrorHandlerRecordTests { + @SuppressWarnings("deprecation") @Test public void testClassifier() { ListenerUtils.setLogOnlyMetadata(true); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java index f320f74d2e..ca7a4845a4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java @@ -47,6 +47,7 @@ public class FallbackBatchErrorHandlerTests { private int invoked; + @SuppressWarnings("deprecation") @Test void recover() { this.invoked = 0; @@ -75,6 +76,7 @@ void recover() { verifyNoMoreInteractions(consumer); } + @SuppressWarnings("deprecation") @Test void successOnRetry() { this.invoked = 0; @@ -100,6 +102,7 @@ void successOnRetry() { verifyNoMoreInteractions(consumer); } + @SuppressWarnings("deprecation") @Test void recoveryFails() { this.invoked = 0;