diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6dfaf4a488..6272330601 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2858,7 +2858,12 @@ private void commitOffsetsIfNeeded(final ConsumerRecord cRecord) { } if (this.remainingRecords == null || !cRecord.equals(this.remainingRecords.iterator().next())) { - ackCurrent(cRecord); + if (this.offsetsInThisBatch != null) { // NOSONAR (sync) + ackInOrder(cRecord); + } + else { + ackCurrent(cRecord); + } } if (this.isManualAck) { this.commitRecovered = false; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AsyncAckAfterHandleTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AsyncAckAfterHandleTests.java new file mode 100644 index 0000000000..ab436e457e --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AsyncAckAfterHandleTests.java @@ -0,0 +1,143 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.LogIfLevelEnabled; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = "asaah") +public class AsyncAckAfterHandleTests { + + @Test + void testAckAfterHandlerAsync(@Autowired + Config config, @Autowired + KafkaTemplate template) + throws InterruptedException { + + for (int i = 0; i < 6; i++) { + template.send("asaah", 0, null, "message contents"); + } + assertThat(config.latch.await(10, TimeUnit.SECONDS)) + .describedAs("CountDownLatch.count=%d", config.latch.getCount()) + .isTrue(); + } + + @Configuration + @EnableKafka + public static class Config { + + private final CountDownLatch latch = new CountDownLatch(6); + + @KafkaListener(id = "asaah.id", topics = "asaah") + public void onTestTopic(final ConsumerRecord record, + final Acknowledgment acknowledgment) { + accept(record, acknowledgment); + } + + private void accept(final ConsumerRecord record, + final Acknowledgment acknowledgment) { + if (record.offset() == 1) { + throw new RuntimeException("Exception for error handler"); + } + else { + this.latch.countDown(); + acknowledgment.acknowledge(); + } + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(1); + factory.setCommonErrorHandler(new MreErrorHandler()); + factory.getContainerProperties().setAsyncAcks(true); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.TRACE); + return factory; + } + + @Bean + ConsumerFactory consumerFactory(EmbeddedKafkaBroker broker) { + Map props = KafkaTestUtils.consumerProps("asaac.grp", "false", broker); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3); + return new DefaultKafkaConsumerFactory<>( + props); + } + + @Bean + ProducerFactory producerFactory(EmbeddedKafkaBroker broker) { + Map props = KafkaTestUtils.producerProps(broker); + props.put(ProducerConfig.LINGER_MS_CONFIG, 100L); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + KafkaTemplate template(ProducerFactory pf) { + return new KafkaTemplate<>(pf); + } + + public class MreErrorHandler implements CommonErrorHandler { + + @Override + public boolean handleOne(Exception thrownException, ConsumerRecord record, Consumer consumer, + MessageListenerContainer container) { + Config.this.latch.countDown(); + return true; + } + } + + } + +}