Skip to content

Commit 5c0f150

Browse files
garyrussellartembilan
authored andcommitted
Use LogAccessor for logging
- use lambdas instead of testing logging level. - add tracing to `CloseSafeProducer`
1 parent acc53af commit 5c0f150

File tree

40 files changed

+382
-498
lines changed

40 files changed

+382
-498
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535

3636
import org.I0Itec.zkclient.ZkClient;
3737
import org.I0Itec.zkclient.exception.ZkInterruptedException;
38-
import org.apache.commons.logging.Log;
3938
import org.apache.commons.logging.LogFactory;
4039
import org.apache.kafka.clients.admin.AdminClient;
4140
import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -50,6 +49,7 @@
5049

5150
import org.springframework.beans.factory.DisposableBean;
5251
import org.springframework.beans.factory.InitializingBean;
52+
import org.springframework.core.log.LogAccessor;
5353
import org.springframework.kafka.test.core.BrokerAddress;
5454
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
5555
import org.springframework.retry.policy.SimpleRetryPolicy;
@@ -80,7 +80,7 @@
8080
*/
8181
public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {
8282

83-
private static final Log logger = LogFactory.getLog(EmbeddedKafkaBroker.class); // NOSONAR
83+
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(EmbeddedKafkaBroker.class)); // NOSONAR
8484

8585
public static final String BEAN_NAME = "embeddedKafka";
8686

@@ -471,9 +471,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
471471
@Override
472472
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
473473
assigned.set(true);
474-
if (logger.isDebugEnabled()) {
475-
logger.debug("partitions assigned: " + partitions);
476-
}
474+
logger.debug(() -> "partitions assigned: " + partitions);
477475
}
478476

479477
});
@@ -484,14 +482,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
484482
}
485483
if (records != null && records.count() > 0) {
486484
final ConsumerRecords<?, ?> theRecords = records;
487-
if (logger.isDebugEnabled()) {
488-
logger.debug("Records received on initial poll for assignment; re-seeking to beginning; "
489-
+ records.partitions().stream()
490-
.flatMap(p -> theRecords.records(p).stream())
491-
// map to same format as send metadata toString()
492-
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
493-
.collect(Collectors.toList()));
494-
}
485+
logger.debug(() -> "Records received on initial poll for assignment; re-seeking to beginning; "
486+
+ theRecords.partitions().stream()
487+
.flatMap(p -> theRecords.records(p).stream())
488+
// map to same format as send metadata toString()
489+
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
490+
.collect(Collectors.toList()));
495491
consumer.seekToBeginning(records.partitions());
496492
}
497493
assertThat(assigned.get())

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Properties;
2525
import java.util.stream.Collectors;
2626

27-
import org.apache.commons.logging.Log;
2827
import org.apache.commons.logging.LogFactory;
2928
import org.apache.kafka.clients.consumer.Consumer;
3029
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -37,6 +36,7 @@
3736
import org.apache.kafka.common.serialization.StringSerializer;
3837

3938
import org.springframework.beans.DirectFieldAccessor;
39+
import org.springframework.core.log.LogAccessor;
4040
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4141
import org.springframework.util.Assert;
4242

@@ -49,7 +49,7 @@
4949
*/
5050
public final class KafkaTestUtils {
5151

52-
private static final Log logger = LogFactory.getLog(KafkaTestUtils.class); // NOSONAR
52+
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaTestUtils.class)); // NOSONAR
5353

5454
private static Properties defaults;
5555

@@ -196,14 +196,12 @@ public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
196196
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout) {
197197
logger.debug("Polling...");
198198
ConsumerRecords<K, V> received = consumer.poll(Duration.ofMillis(timeout));
199-
if (logger.isDebugEnabled()) {
200-
logger.debug("Received: " + received.count() + ", "
201-
+ received.partitions().stream()
202-
.flatMap(p -> received.records(p).stream())
203-
// map to same format as send metadata toString()
204-
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
205-
.collect(Collectors.toList()));
206-
}
199+
logger.debug(() -> "Received: " + received.count() + ", "
200+
+ received.partitions().stream()
201+
.flatMap(p -> received.records(p).stream())
202+
// map to same format as send metadata toString()
203+
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
204+
.collect(Collectors.toList()));
207205
assertThat(received).as("null received from consumer.poll()").isNotNull();
208206
return received;
209207
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.concurrent.atomic.AtomicInteger;
3636
import java.util.regex.Pattern;
3737

38-
import org.apache.commons.logging.Log;
3938
import org.apache.commons.logging.LogFactory;
4039

4140
import org.springframework.aop.framework.Advised;
@@ -62,6 +61,7 @@
6261
import org.springframework.core.annotation.AnnotationUtils;
6362
import org.springframework.core.convert.converter.Converter;
6463
import org.springframework.core.convert.converter.GenericConverter;
64+
import org.springframework.core.log.LogAccessor;
6565
import org.springframework.format.Formatter;
6666
import org.springframework.format.FormatterRegistry;
6767
import org.springframework.format.support.DefaultFormattingConversionService;
@@ -138,7 +138,7 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
138138

139139
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
140140

141-
private final Log logger = LogFactory.getLog(getClass());
141+
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
142142

143143
private final ListenerScope listenerScope = new ListenerScope();
144144

@@ -290,9 +290,7 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
290290
}
291291
if (annotatedMethods.isEmpty()) {
292292
this.nonAnnotatedClasses.add(bean.getClass());
293-
if (this.logger.isTraceEnabled()) {
294-
this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
295-
}
293+
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
296294
}
297295
else {
298296
// Non-empty set of methods
@@ -302,10 +300,8 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
302300
processKafkaListener(listener, method, bean, beanName);
303301
}
304302
}
305-
if (this.logger.isDebugEnabled()) {
306-
this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
303+
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
307304
+ beanName + "': " + annotatedMethods);
308-
}
309305
}
310306
if (hasClassLevelListeners) {
311307
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
@@ -476,7 +472,7 @@ private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint,
476472
properties.load(new StringReader(value));
477473
}
478474
catch (IOException e) {
479-
this.logger.error("Failed to load property " + property + ", continuing...", e);
475+
this.logger.error(e, () -> "Failed to load property " + property + ", continuing...");
480476
}
481477
}
482478
}

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import java.util.Collection;
2222
import java.util.regex.Pattern;
2323

24-
import org.apache.commons.logging.Log;
2524
import org.apache.commons.logging.LogFactory;
2625

2726
import org.springframework.beans.BeanUtils;
2827
import org.springframework.beans.factory.InitializingBean;
2928
import org.springframework.context.ApplicationEventPublisher;
3029
import org.springframework.context.ApplicationEventPublisherAware;
30+
import org.springframework.core.log.LogAccessor;
3131
import org.springframework.kafka.core.ConsumerFactory;
3232
import org.springframework.kafka.core.KafkaTemplate;
3333
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
@@ -62,7 +62,7 @@
6262
public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K, V>, K, V>
6363
implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean {
6464

65-
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR protected
65+
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR protected
6666

6767
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
6868

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.Properties;
2424
import java.util.regex.Pattern;
2525

26-
import org.apache.commons.logging.Log;
2726
import org.apache.commons.logging.LogFactory;
2827

2928
import org.springframework.beans.BeansException;
@@ -34,6 +33,7 @@
3433
import org.springframework.beans.factory.config.BeanExpressionResolver;
3534
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
3635
import org.springframework.context.expression.BeanFactoryResolver;
36+
import org.springframework.core.log.LogAccessor;
3737
import org.springframework.expression.BeanResolver;
3838
import org.springframework.kafka.core.KafkaTemplate;
3939
import org.springframework.kafka.listener.BatchMessageListener;
@@ -67,7 +67,7 @@
6767
public abstract class AbstractKafkaListenerEndpoint<K, V>
6868
implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {
6969

70-
private final Log logger = LogFactory.getLog(getClass());
70+
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
7171

7272
private String id;
7373

@@ -458,10 +458,8 @@ private void setupMessageListener(MessageListenerContainer container, MessageCon
458458
if (this.recordFilterStrategy != null) {
459459
if (this.batchListener) {
460460
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {
461-
if (this.logger.isWarnEnabled()) {
462-
this.logger.warn("Filter strategy ignored when consuming 'ConsumerRecords'"
461+
this.logger.warn(() -> "Filter strategy ignored when consuming 'ConsumerRecords'"
463462
+ (this.id != null ? " id: " + this.id : ""));
464-
}
465463
}
466464
else {
467465
messageListener = new FilteringBatchMessageListenerAdapter<>(

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.atomic.AtomicInteger;
2727

28-
import org.apache.commons.logging.Log;
2928
import org.apache.commons.logging.LogFactory;
3029

3130
import org.springframework.beans.BeansException;
@@ -38,6 +37,7 @@
3837
import org.springframework.context.ConfigurableApplicationContext;
3938
import org.springframework.context.SmartLifecycle;
4039
import org.springframework.context.event.ContextRefreshedEvent;
40+
import org.springframework.core.log.LogAccessor;
4141
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
4242
import org.springframework.kafka.listener.MessageListenerContainer;
4343
import org.springframework.util.Assert;
@@ -68,7 +68,7 @@
6868
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
6969
ApplicationListener<ContextRefreshedEvent> {
7070

71-
protected final Log logger = LogFactory.getLog(getClass()); //NOSONAR
71+
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR
7272

7373
private final Map<String, MessageListenerContainer> listenerContainers =
7474
new ConcurrentHashMap<String, MessageListenerContainer>();
@@ -230,7 +230,7 @@ public void destroy() {
230230
((DisposableBean) listenerContainer).destroy();
231231
}
232232
catch (Exception ex) {
233-
this.logger.warn("Failed to destroy message listener container", ex);
233+
this.logger.warn(ex, "Failed to destroy message listener container");
234234
}
235235
}
236236
}

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import java.lang.reflect.Method;
2020
import java.util.Arrays;
2121

22-
import org.apache.commons.logging.Log;
2322
import org.apache.commons.logging.LogFactory;
2423

2524
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
2625
import org.springframework.core.annotation.AnnotationUtils;
26+
import org.springframework.core.log.LogAccessor;
2727
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
2828
import org.springframework.kafka.listener.MessageListenerContainer;
2929
import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter;
@@ -53,7 +53,7 @@
5353
*/
5454
public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {
5555

56-
private final Log logger = LogFactory.getLog(getClass());
56+
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
5757

5858
private Object bean;
5959

@@ -112,12 +112,11 @@ private String getReplyTopic() {
112112
if (replyingMethod != null) {
113113
SendTo ann = AnnotationUtils.getAnnotation(replyingMethod, SendTo.class);
114114
if (ann != null) {
115-
if (replyingMethod.getReturnType().equals(void.class)
116-
&& this.logger.isWarnEnabled()) {
117-
this.logger.warn("Method "
118-
+ replyingMethod
119-
+ " has a void return type; @SendTo is ignored" +
120-
(this.errorHandler == null ? "" : " unless the error handler returns a result"));
115+
if (replyingMethod.getReturnType().equals(void.class)) {
116+
this.logger.warn(() -> "Method "
117+
+ replyingMethod
118+
+ " has a void return type; @SendTo is ignored" +
119+
(this.errorHandler == null ? "" : " unless the error handler returns a result"));
121120
}
122121
String[] destinations = ann.value();
123122
if (destinations.length > 1) {

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.Map;
2121
import java.util.Properties;
2222

23-
import org.apache.commons.logging.Log;
2423
import org.apache.commons.logging.LogFactory;
2524
import org.apache.kafka.streams.KafkaClientSupplier;
2625
import org.apache.kafka.streams.KafkaStreams;
@@ -32,6 +31,7 @@
3231

3332
import org.springframework.beans.factory.config.AbstractFactoryBean;
3433
import org.springframework.context.SmartLifecycle;
34+
import org.springframework.core.log.LogAccessor;
3535
import org.springframework.kafka.KafkaException;
3636
import org.springframework.kafka.core.CleanupConfig;
3737
import org.springframework.lang.Nullable;
@@ -60,7 +60,7 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
6060
*/
6161
public static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(10);
6262

63-
private static final Log LOGGER = LogFactory.getLog(StreamsBuilderFactoryBean.class);
63+
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(StreamsBuilderFactoryBean.class));
6464

6565
private static final String STREAMS_CONFIG_MUST_NOT_BE_NULL = "'streamsConfig' must not be null";
6666

@@ -291,9 +291,7 @@ public synchronized void start() {
291291
Assert.state(this.streamsConfig != null || this.properties != null,
292292
"'streamsConfig' or streams configuration properties must not be null");
293293
Topology topology = getObject().build(); // NOSONAR
294-
if (LOGGER.isDebugEnabled()) {
295-
LOGGER.debug(topology.describe());
296-
}
294+
LOGGER.debug(() -> topology.describe().toString());
297295
if (this.properties != null) {
298296
this.kafkaStreams = new KafkaStreams(topology, this.properties, this.clientSupplier);
299297
}
@@ -331,7 +329,7 @@ public synchronized void stop() {
331329
}
332330
}
333331
catch (Exception e) {
334-
LOGGER.error("Failed to stop streams", e);
332+
LOGGER.error(e, "Failed to stop streams");
335333
}
336334
finally {
337335
this.running = false;

0 commit comments

Comments
 (0)