From ce1961bb90e36a19c54e0a9683a31b3c28fa9228 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 23 Apr 2024 21:50:05 +0900 Subject: [PATCH 1/5] fix ackDiscarded. --- .../annotation/KafkaListenerAnnotationBeanPostProcessor.java | 4 ++-- .../kafka/config/AbstractKafkaListenerContainerFactory.java | 4 ++-- .../kafka/config/AbstractKafkaListenerEndpoint.java | 5 +++++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 1876283145..aa850276ed 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -702,12 +702,12 @@ private void resolveContentTypeConverter(MethodKafkaListenerEndpoint endpo private void resolveFilter(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener) { Object filter = resolveExpression(kafkaListener.filter()); if (filter instanceof RecordFilterStrategy rfs) { - endpoint.setRecordFilterStrategy(rfs); + endpoint.useRecordFilterStrategy(rfs); } else { String filterBeanName = resolveExpressionAsString(kafkaListener.filter(), "filter"); if (StringUtils.hasText(filterBeanName)) { - endpoint.setRecordFilterStrategy( + endpoint.useRecordFilterStrategy( this.beanFactory.getBean(filterBeanName, RecordFilterStrategy.class)); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 95f10a0fda..0643fef90f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -373,7 +373,7 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) { private void configureEndpoint(AbstractKafkaListenerEndpoint aklEndpoint) { if (aklEndpoint.getRecordFilterStrategy() == null) { JavaUtils.INSTANCE - .acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy); + .acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::useRecordFilterStrategy); } JavaUtils.INSTANCE .acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 608ffd84d7..e345eb14e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -340,6 +340,11 @@ public void setAckDiscarded(boolean ackDiscarded) { this.ackDiscarded = ackDiscarded; } + public void useRecordFilterStrategy(RecordFilterStrategy recordFilterStrategy) { + setRecordFilterStrategy(recordFilterStrategy); + setAckDiscarded(true); + } + @Nullable @Override public String getClientIdPrefix() { From 22483e85f3c61b6babadd9da7095154e359a37ae Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 23 Apr 2024 22:53:46 +0900 Subject: [PATCH 2/5] apply review --- .../KafkaListenerAnnotationBeanPostProcessor.java | 6 ++++-- .../kafka/config/AbstractKafkaListenerContainerFactory.java | 2 +- .../kafka/config/AbstractKafkaListenerEndpoint.java | 5 ----- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index aa850276ed..f2c9545898 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -702,13 +702,15 @@ private void resolveContentTypeConverter(MethodKafkaListenerEndpoint endpo private void resolveFilter(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener) { Object filter = resolveExpression(kafkaListener.filter()); if (filter instanceof RecordFilterStrategy rfs) { - endpoint.useRecordFilterStrategy(rfs); + endpoint.setRecordFilterStrategy(rfs); + endpoint.setAckDiscarded(true); } else { String filterBeanName = resolveExpressionAsString(kafkaListener.filter(), "filter"); if (StringUtils.hasText(filterBeanName)) { - endpoint.useRecordFilterStrategy( + endpoint.setRecordFilterStrategy( this.beanFactory.getBean(filterBeanName, RecordFilterStrategy.class)); + endpoint.setAckDiscarded(true); } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 0643fef90f..1b9ba2bed5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -373,7 +373,7 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) { private void configureEndpoint(AbstractKafkaListenerEndpoint aklEndpoint) { if (aklEndpoint.getRecordFilterStrategy() == null) { JavaUtils.INSTANCE - .acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::useRecordFilterStrategy); + .acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy); } JavaUtils.INSTANCE .acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index e345eb14e0..608ffd84d7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -340,11 +340,6 @@ public void setAckDiscarded(boolean ackDiscarded) { this.ackDiscarded = ackDiscarded; } - public void useRecordFilterStrategy(RecordFilterStrategy recordFilterStrategy) { - setRecordFilterStrategy(recordFilterStrategy); - setAckDiscarded(true); - } - @Nullable @Override public String getClientIdPrefix() { From 99f5430bfd562d86abfeda96dc5631fdc532b360 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 23 Apr 2024 22:57:05 +0900 Subject: [PATCH 3/5] apply review. --- .../annotation/KafkaListenerAnnotationBeanPostProcessor.java | 2 -- .../kafka/config/AbstractKafkaListenerEndpoint.java | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index f2c9545898..1876283145 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -703,14 +703,12 @@ private void resolveFilter(MethodKafkaListenerEndpoint endpoint, KafkaList Object filter = resolveExpression(kafkaListener.filter()); if (filter instanceof RecordFilterStrategy rfs) { endpoint.setRecordFilterStrategy(rfs); - endpoint.setAckDiscarded(true); } else { String filterBeanName = resolveExpressionAsString(kafkaListener.filter(), "filter"); if (StringUtils.hasText(filterBeanName)) { endpoint.setRecordFilterStrategy( this.beanFactory.getBean(filterBeanName, RecordFilterStrategy.class)); - endpoint.setAckDiscarded(true); } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 608ffd84d7..2ac668e538 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -326,6 +326,7 @@ public void setReplyTemplate(KafkaTemplate replyTemplate) { @SuppressWarnings("unchecked") public void setRecordFilterStrategy(RecordFilterStrategy recordFilterStrategy) { this.recordFilterStrategy = (RecordFilterStrategy) recordFilterStrategy; + setAckDiscarded(true); } protected boolean isAckDiscarded() { From e6612cfa1914f48750d6df44be9ead7a973e9253 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 23 Apr 2024 23:03:08 +0900 Subject: [PATCH 4/5] Add author --- .../kafka/config/AbstractKafkaListenerEndpoint.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 2ac668e538..5486861a9a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -62,6 +62,7 @@ * @author Gary Russell * @author Artem Bilan * @author Wang Zhiyang + * @author Sanghyeok An * * @see MethodKafkaListenerEndpoint */ From 974231d2d89130742f8769dd6392b922c7dc20fc Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Tue, 23 Apr 2024 23:30:25 +0900 Subject: [PATCH 5/5] apply review --- .../kafka/config/AbstractKafkaListenerEndpoint.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 5486861a9a..4b7cf8c97c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -327,7 +327,6 @@ public void setReplyTemplate(KafkaTemplate replyTemplate) { @SuppressWarnings("unchecked") public void setRecordFilterStrategy(RecordFilterStrategy recordFilterStrategy) { this.recordFilterStrategy = (RecordFilterStrategy) recordFilterStrategy; - setAckDiscarded(true); } protected boolean isAckDiscarded() { @@ -335,7 +334,7 @@ protected boolean isAckDiscarded() { } /** - * Set to true if the {@link #setRecordFilterStrategy(RecordFilterStrategy)} is in use. + * Set to true if the {@link #setRecordFilterStrategy(RecordFilterStrategy)} should ack discarded messages. * @param ackDiscarded the ackDiscarded. */ public void setAckDiscarded(boolean ackDiscarded) {