diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index 9ea0f20391..777f46589a 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -562,6 +562,27 @@ It is disabled by default to avoid the (small) overhead of looking up the state The `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` support this feature. +[[delivery-attempts-header-for-batch-listener]] +== Delivery Attempts Header for batch listener + +When processing `ConsumerRecord` with the `BatchListener`, the `KafkaHeaders.DELIVERY_ATTEMPT` header can be present in a different way compared to `SingleRecordListener`. + +Starting with version 3.3, if you want to inject the `KafkaHeaders.DELIVERY_ATTEMPT` header into the `ConsumerRecord` when using the `BatchListener`, set the `DeliveryAttemptAwareRetryListener` as the `RetryListener` in the `ErrorHandler`. + +Please refer to the code below. +[source, java] +---- +final FixedBackOff fixedBackOff = new FixedBackOff(1, 10); +final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); +errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); + +ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); +factory.setConsumerFactory(consumerFactory); +factory.setCommonErrorHandler(errorHandler); +---- + +Then, whenever a batch fails to complete, the `DeliveryAttemptAwareRetryListener` will inject a `KafkaHeaders.DELIVERY_ATTMPT` header into the `ConsumerRecord`. + [[li-header]] == Listener Info Header @@ -796,4 +817,3 @@ DefaultErrorHandler handler() { ---- This will retry after `1, 2, 4, 8, 10, 10` seconds, before calling the recoverer. - diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index aa37535b38..0bac8459a3 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -56,3 +56,8 @@ When extending `KafkaAdmin`, user applications may override the `createAdmin` me When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method. +[[x33-kafka-headers-for-batch-listeners]] +=== KafkaHeaders.DELIVERY_ATTEMPT for batch listeners +When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields. +If the `DeliveryAttemptAwareRetryListener` is set to error handler as retry listener, each `ConsumerRecord` has delivery attempt header. +For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[kafka-headers-for-batch-listener]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java new file mode 100644 index 0000000000..b35fbb371d --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListener.java @@ -0,0 +1,64 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.nio.ByteBuffer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.internals.RecordHeader; + +import org.springframework.kafka.support.KafkaHeaders; + +/** + * The DeliveryAttemptAwareRetryListener class for {@link RetryListener} implementations. + * The DeliveryAttemptAwareRetryListener adds the {@link KafkaHeaders}.DELIVERY_ATTEMPT header + * to the record's headers when batch records fail and are retried. + * Note that DeliveryAttemptAwareRetryListener modifies the headers of the original record. + * + * @author Sanghyeok An + * @since 3.3 + */ + +public class DeliveryAttemptAwareRetryListener implements RetryListener { + + @Override + public void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt) { + // Pass + } + + /** + * Invoke after delivery failure for batch records. + * If the {@link KafkaHeaders}.DELIVERY_ATTEMPT header already exists in the {@link ConsumerRecord}'s headers, + * it will be removed. Then, the provided `deliveryAttempt` is added to the {@link ConsumerRecord}'s headers. + * @param records the records. + * @param ex the exception. + * @param deliveryAttempt the delivery attempt, if available. + */ + @Override + public void failedDelivery(ConsumerRecords records, Exception ex, int deliveryAttempt) { + for (ConsumerRecord record : records) { + record.headers().remove(KafkaHeaders.DELIVERY_ATTEMPT); + + byte[] buff = new byte[4]; // NOSONAR (magic #) + ByteBuffer bb = ByteBuffer.wrap(buff); + bb.putInt(deliveryAttempt); + record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff)); + } + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java new file mode 100644 index 0000000000..20b7cea8fb --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerIntegrationTests.java @@ -0,0 +1,269 @@ +/* + * Copyright 2019-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.backoff.FixedBackOff; + +/** + * @author Sanghyeok An + * @since 3.3.0 + */ + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka +class DeliveryAttemptAwareRetryListenerIntegrationTests { + + static final String MAIN_TOPIC_CONTAINER_FACTORY0 = "deliveryMyTestKafkaListenerContainerFactory0"; + + static final String TEST_TOPIC0 = "myBatchDeliveryAttemptTopic0"; + + static final int MAX_ATTEMPT_COUNT0 = 3; + + static final CountDownLatch latch0 = new CountDownLatch(MAX_ATTEMPT_COUNT0 + 1); + + static final String MAIN_TOPIC_CONTAINER_FACTORY1 = "deliveryMyTestKafkaListenerContainerFactory1"; + + static final String TEST_TOPIC1 = "myBatchDeliveryAttemptTopic1"; + + static final int MAX_ATTEMPT_COUNT1 = 10; + + static final CountDownLatch latch1 = new CountDownLatch(MAX_ATTEMPT_COUNT1 + 1); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Test + void should_have_delivery_attempt_header_in_each_consumer_record(@Autowired TestTopicListener0 listener) { + + // Given + String msg1 = "1"; + String msg2 = "2"; + String msg3 = "3"; + + // When + kafkaTemplate.send(TEST_TOPIC0, msg1); + kafkaTemplate.send(TEST_TOPIC0, msg2); + kafkaTemplate.send(TEST_TOPIC0, msg3); + + // Then + assertThat(awaitLatch(latch0)).isTrue(); + + Map deliveryAttemptCountMap = convertToMap(listener.receivedHeaders); + + for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT0; attemptCnt++) { + assertThat(deliveryAttemptCountMap.get(attemptCnt)).isGreaterThan(0); + } + } + + @Test + void should_have_delivery_attempt_header_in_each_consumer_record_with_more_bigger_max_attempt(@Autowired TestTopicListener1 listener) { + // Given + String msg1 = "1"; + String msg2 = "2"; + String msg3 = "3"; + + // When + kafkaTemplate.send(TEST_TOPIC1, msg1); + kafkaTemplate.send(TEST_TOPIC1, msg2); + kafkaTemplate.send(TEST_TOPIC1, msg3); + + // Then + assertThat(awaitLatch(latch1)).isTrue(); + + Map deliveryAttemptCountMap = convertToMap(listener.receivedHeaders); + + for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT1; attemptCnt++) { + assertThat(deliveryAttemptCountMap.get(attemptCnt)).isGreaterThan(0); + } + } + + private Map convertToMap(List
headers) { + Map map = new HashMap<>(); + for (Header header : headers) { + int attemptCount = ByteBuffer.wrap(header.value()).getInt(); + Integer cnt = map.getOrDefault(attemptCount, 0); + map.put(attemptCount, cnt + 1); + } + return map; + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(60, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } + + private static CommonErrorHandler createErrorHandler(int interval, int maxAttemptCount) { + FixedBackOff fixedBackOff = new FixedBackOff(interval, maxAttemptCount); + DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff); + errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener()); + return errorHandler; + } + + private static ConcurrentKafkaListenerContainerFactory createListenerContainerFactory( + ConsumerFactory consumerFactory, CommonErrorHandler errorHandler) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setCommonErrorHandler(errorHandler); + + ContainerProperties containerProperties = factory.getContainerProperties(); + containerProperties.setDeliveryAttemptHeader(true); + return factory; + } + + static class TestTopicListener0 { + final List
receivedHeaders = new ArrayList<>(); + + @KafkaListener( + topics = TEST_TOPIC0, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY0, + batch = "true") + public void listen(List> records) { + for (ConsumerRecord record : records) { + Iterable
headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + for (Header header : headers) { + receivedHeaders.add(header); + } + } + latch0.countDown(); + throw new RuntimeException("Failed."); + } + } + + static class TestTopicListener1 { + final List
receivedHeaders = new ArrayList<>(); + + @KafkaListener( + topics = TEST_TOPIC1, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY1, + batch = "true") + public void listen(List> records) { + for (ConsumerRecord record : records) { + Iterable
headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + for (Header header : headers) { + receivedHeaders.add(header); + } + } + latch1.countDown(); + throw new RuntimeException("Failed."); + } + } + + @Configuration + static class TestConfiguration { + + @Bean + TestTopicListener0 testTopicListener0() { + return new TestTopicListener0(); + } + + @Bean + TestTopicListener1 testTopicListener1() { + return new TestTopicListener1(); + } + } + + @Configuration + static class KafkaProducerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ProducerFactory producerFactory() { + Map props = KafkaTestUtils.producerProps( + this.broker.getBrokersAsString()); + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean("customKafkaTemplate") + KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + } + + @EnableKafka + @Configuration + static class KafkaConsumerConfig { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ConsumerFactory consumerFactory() { + Map props = KafkaTestUtils.consumerProps( + this.broker.getBrokersAsString(), + "DeliveryAttemptAwareRetryListenerIntegrationTestsGroupId", + "true"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + ConcurrentKafkaListenerContainerFactory + deliveryMyTestKafkaListenerContainerFactory0(ConsumerFactory consumerFactory) { + CommonErrorHandler errorHandler = createErrorHandler(1, MAX_ATTEMPT_COUNT0); + return createListenerContainerFactory(consumerFactory, errorHandler); + } + + @Bean + ConcurrentKafkaListenerContainerFactory + deliveryMyTestKafkaListenerContainerFactory1(ConsumerFactory consumerFactory) { + CommonErrorHandler errorHandler = createErrorHandler(1, MAX_ATTEMPT_COUNT1); + return createListenerContainerFactory(consumerFactory, errorHandler); + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTests.java new file mode 100644 index 0000000000..6d79218e2a --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeliveryAttemptAwareRetryListenerTests.java @@ -0,0 +1,139 @@ +/* + * Copyright 2019-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.support.KafkaHeaders; + +/** + * @author Sanghyeok An + * @since 3.3 + */ + +class DeliveryAttemptAwareRetryListenerTests { + + @Test + void should_have_single_header_and_header_value_should_be_1() { + // Given + TopicPartition tpForTopicA = new TopicPartition("topicA", 1); + TopicPartition tpForTopicB = new TopicPartition("topicB", 1); + + ConsumerRecord record1 = new ConsumerRecord<>("topicA", 1, 1, "key", "value1"); + ConsumerRecord record2 = new ConsumerRecord<>("topicA", 1, 2, "key", "value2"); + ConsumerRecord record3 = new ConsumerRecord<>("topicA", 1, 3, "key", "value3"); + + ConsumerRecord record4 = new ConsumerRecord<>("topicB", 1, 1, "key", "value4"); + ConsumerRecord record5 = new ConsumerRecord<>("topicB", 1, 2, "key", "value5"); + ConsumerRecord record6 = new ConsumerRecord<>("topicB", 1, 3, "key", "value6"); + + Map>> map = new HashMap<>(); + + List> topicARecords = List.of(record1, record2, record3); + List> topicBRecords = List.of(record4, record5, record6); + + map.put(tpForTopicA, topicARecords); + map.put(tpForTopicB, topicBRecords); + + ConsumerRecords consumerRecords = new ConsumerRecords<>(map); + final DeliveryAttemptAwareRetryListener listener = new DeliveryAttemptAwareRetryListener(); + Exception ex = new RuntimeException("Dummy Exception"); + + // Given : Expected Value + int expectedDeliveryAttemptInHeader = 1; + + // When + listener.failedDelivery(consumerRecords, ex, 1); + + // Then + for (ConsumerRecord consumerRecord : consumerRecords) { + int deliveryAttemptHeaderCount = 0; + Iterable
headers = consumerRecord.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + + for (Header header : headers) { + int deliveryAttempt = ByteBuffer.wrap(header.value()).getInt(); + deliveryAttemptHeaderCount++; + + // Assertion + assertThat(deliveryAttempt).isEqualTo(expectedDeliveryAttemptInHeader); + assertThat(deliveryAttemptHeaderCount).isEqualTo(1); + } + } + } + + @Test + void should_have_single_header_and_header_value_should_be_4() { + // Given + TopicPartition tpForTopicA = new TopicPartition("topicA", 1); + TopicPartition tpForTopicB = new TopicPartition("topicB", 1); + + ConsumerRecord record1 = new ConsumerRecord<>("topicA", 1, 1, "key", "value1"); + ConsumerRecord record2 = new ConsumerRecord<>("topicA", 1, 2, "key", "value2"); + ConsumerRecord record3 = new ConsumerRecord<>("topicA", 1, 3, "key", "value3"); + + ConsumerRecord record4 = new ConsumerRecord<>("topicB", 1, 1, "key", "value4"); + ConsumerRecord record5 = new ConsumerRecord<>("topicB", 1, 2, "key", "value5"); + ConsumerRecord record6 = new ConsumerRecord<>("topicB", 1, 3, "key", "value6"); + + Map>> map = new HashMap<>(); + + List> topicARecords = List.of(record1, record2, record3); + List> topicBRecords = List.of(record4, record5, record6); + + map.put(tpForTopicA, topicARecords); + map.put(tpForTopicB, topicBRecords); + + ConsumerRecords consumerRecords = new ConsumerRecords<>(map); + final DeliveryAttemptAwareRetryListener listener = new DeliveryAttemptAwareRetryListener(); + Exception ex = new RuntimeException("Dummy Exception"); + + // Given : Expected Value + int expectedDeliveryAttemptInHeader = 4; + + // When + for (int deliveryAttempt = 1; deliveryAttempt < 5; deliveryAttempt++) { + listener.failedDelivery(consumerRecords, ex, deliveryAttempt); + } + + // Then + for (ConsumerRecord consumerRecord : consumerRecords) { + int deliveryAttemptHeaderCount = 0; + Iterable
headers = consumerRecord.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT); + for (Header header : headers) { + int deliveryAttempt = ByteBuffer.wrap(header.value()).getInt(); + deliveryAttemptHeaderCount++; + + // Assertion + assertThat(deliveryAttempt).isEqualTo(expectedDeliveryAttemptInHeader); + assertThat(deliveryAttemptHeaderCount).isEqualTo(1); + } + } + + } + +}