From caa82c66ce97f6365b6cad95ad31f5db7047ba0c Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 4 Aug 2023 18:02:08 -0400 Subject: [PATCH 1/2] GH-2066: Partial Batch Acknowledgments Resolves https://github.com/spring-projects/spring-kafka/issues/2066 Allow apps to commit partial batches. --- .../src/main/asciidoc/kafka.adoc | 18 ++ .../src/main/asciidoc/whats-new.adoc | 3 + .../KafkaMessageListenerContainer.java | 49 +++- .../kafka/support/Acknowledgment.java | 15 +- .../listener/ManualAckPartialBatchTests.java | 229 ++++++++++++++++++ 5 files changed, 308 insertions(+), 6 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAckPartialBatchTests.java diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 17b84636fe..760cb3cf6c 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -1245,6 +1245,20 @@ The actual sleep time, and its resolution, depends on the container's `pollTimeo The minimum sleep time is equal to the `pollTimeout` and all sleep times will be a multiple of it. For small sleep times or, to increase its accuracy, consider reducing the container's `pollTimeout`. +Starting with version 3.0.10, batch listeners can commit the offsets of parts of the batch, using `acknowledge(index)` on the `Acknowledgment` argument. +When this method is called, the offset of the record at the index (as well as all previous records) will be committed. +Calling `acknowledge()` after a partial batch commit is performed will commit the offsets of the remainder of the batch. +The following limitations apply: + +* `AckMode.MANUAL_IMMEDIATE` is required +* The method must be called on the listener thread +* The listener must consume a `List` rather than the raw `ConsumerRecords` +* The index must be in the range of the list's elements +* The index must be larger than that used in a previous call +* Out of order commits (<>) are not supported + +These restrictions are enforced and the method will throw an `IllegalArgumentException` or `IllegalStateException`, depending on the violation. + [[container-auto-startup]] ====== Listener Container Auto Startup @@ -2443,6 +2457,10 @@ When creating the `TopicPartitionOffset` s for the request, only positive, absol |Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won't run in a transaction even if there is a transaction manager present. See the javadocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options. +|[[asyncAcks]]<> +|false +|Enable out-of-order commits (see <>); the consumer is paused and commits are deferred until gaps are filled. + |[[authExceptionRetryInterval]]<> |`null` |When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client. diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index d23847357e..3aa9ac46a1 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -88,6 +88,9 @@ See <> and <>. You can now use a custom correlation header which will be echoed in any reply message. See the note at the end of <> for more information. +You can now manually commit parts of a batch before the entire batch is processed. +See <> for more information. + [[x30-headers]] ==== `KafkaHeaders` Changes diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 289a30db6c..8d6eefd041 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2446,7 +2446,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r if (this.wantsFullRecords) { this.batchListener.onMessage(records, // NOSONAR this.isAnyManualAck - ? new ConsumerBatchAcknowledgment(records) + ? new ConsumerBatchAcknowledgment(records, recordList) : null, this.consumer); } @@ -2456,19 +2456,19 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r } private void doInvokeBatchOnMessage(final ConsumerRecords records, - List> recordList) { + @Nullable List> recordList) { try { switch (this.listenerType) { case ACKNOWLEDGING_CONSUMER_AWARE -> this.batchListener.onMessage(recordList, this.isAnyManualAck - ? new ConsumerBatchAcknowledgment(records) + ? new ConsumerBatchAcknowledgment(records, recordList) : null, this.consumer); case ACKNOWLEDGING -> this.batchListener.onMessage(recordList, this.isAnyManualAck - ? new ConsumerBatchAcknowledgment(records) + ? new ConsumerBatchAcknowledgment(records, recordList) : null); case CONSUMER_AWARE -> this.batchListener.onMessage(recordList, this.consumer); case SIMPLE -> this.batchListener.onMessage(recordList); @@ -3429,14 +3429,25 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment { private final ConsumerRecords records; + private final List> recordList; + private volatile boolean acked; - ConsumerBatchAcknowledgment(ConsumerRecords records) { + private volatile int partial = -1; + + ConsumerBatchAcknowledgment(ConsumerRecords records, + @Nullable List> recordList) { + this.records = records; + this.recordList = recordList; } @Override public void acknowledge() { + if (this.partial >= 0) { + acknowledge(this.partial + 1); + return; + } Map> offs = ListenerConsumer.this.offsetsInThisBatch; if (!this.acked) { Map>> deferred = ListenerConsumer.this.deferredOffsets; @@ -3451,6 +3462,34 @@ public void acknowledge() { } } + @Override + public void acknowledge(int index) { + Assert.isTrue(index > this.partial, + () -> String.format("index (%d) must be greater than the previous partial commit (%d)", index, + this.partial)); + Assert.state(ListenerConsumer.this.isManualImmediateAck, + "Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE"); + Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(), + "Partial batch acknowledgment is not supported with out-of-order commits (asyncAcks=true)"); + Assert.state(this.recordList != null, + "Listener must receive a List of records to use partial batch acknowledgment"); + Assert.isTrue(index >= 0 && index < this.recordList.size(), + () -> String.format("index (%d) is out of range (%d-%d)", index, 0, + this.recordList.size() - 1)); + Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), + "Partial batch acknowledgment is only supported on the consumer thread"); + Map>> offsetsToCommit = new LinkedHashMap<>(); + for (int i = this.partial + 1; i <= index; i++) { + ConsumerRecord record = this.recordList.get(i); + offsetsToCommit.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), + tp -> new ArrayList<>()).add(record); + } + if (!offsetsToCommit.isEmpty()) { + processAcks(new ConsumerRecords<>(offsetsToCommit)); + } + this.partial = index; + } + @Override public void nack(int index, Duration sleep) { Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread), diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index f22be223da..58c3033aa7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.time.Duration; +import org.springframework.kafka.listener.ContainerProperties.AckMode; + /** * Handle for acknowledging the processing of a * {@link org.apache.kafka.clients.consumer.ConsumerRecord}. Recipients can store the @@ -49,6 +51,17 @@ default void nack(Duration sleep) { throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment"); } + /** + * Acknowledge the record at an index in the batch - commit the offset(s) of records in the batch + * up to and including the index. Requires {@link AckMode#MANUAL_IMMEDIATE}. The index must be + * greater than any previous partial batch acknowledgment index. + * @param index the index of the record to acknowledge. + * @since 3.0.10 + */ + default void acknowledge(int index) { + throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment"); + } + /** * Negatively acknowledge the record at an index in a batch - commit the offset(s) of * records before the index and re-seek the partitions so that the record at the index diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAckPartialBatchTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAckPartialBatchTests.java new file mode 100644 index 0000000000..10a3e4e979 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAckPartialBatchTests.java @@ -0,0 +1,229 @@ +/* + * Copyright 2017-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.LogFactory; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.3 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class ManualAckPartialBatchTests { + + private static final String CONTAINER_ID = "container"; + + protected static AckMode ackMode; + + static { + ackMode = AckMode.MANUAL_IMMEDIATE; + } + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + /* + * Deliver 6 records from three partitions. + * 2 partial commits and final commit. + */ + @SuppressWarnings("unchecked") + @Test + public void discardRemainingRecordsFromPollAndSeek() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer); + inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + Map commit1 = Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L), + new TopicPartition("foo", 1), new OffsetAndMetadata(1L)); + Map commit2 = Map.of(new TopicPartition("foo", 1), new OffsetAndMetadata(2L), + new TopicPartition("foo", 2), new OffsetAndMetadata(1L)); + Map commit3 = Map.of( + new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); + inOrder.verify(this.consumer).commitSync(commit1, Duration.ofSeconds(60)); + inOrder.verify(this.consumer).commitSync(commit2, Duration.ofSeconds(60)); + inOrder.verify(this.consumer).commitSync(commit3, Duration.ofSeconds(60)); + assertThat(this.config.contents.toArray()).isEqualTo(new String[] + { "foo", "bar", "baz", "qux", "fiz", "buz" }); + } + + @Configuration + @EnableKafka + public static class Config { + + final List contents = new ArrayList<>(); + + final CountDownLatch pollLatch = new CountDownLatch(3); + + final CountDownLatch deliveryLatch = new CountDownLatch(1); + + final CountDownLatch commitLatch = new CountDownLatch(3); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + @KafkaListener(id = CONTAINER_ID, topics = "foo") + public void foo(List in, Acknowledgment ack) { + contents.addAll(in); + this.deliveryLatch.countDown(); + try { + ack.acknowledge(2); + ack.acknowledge(4); + ack.acknowledge(); + } + catch (Exception ex) { + LogFactory.getLog(getClass()).error("Ack failed", ex); + } + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + willAnswer(i -> { + ((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned( + Collections.singletonList(topicPartition1)); + return null; + }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + willAnswer(i -> { + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + default: + try { + Thread.sleep(100); + } + catch (@SuppressWarnings("unused") InterruptedException e) { + Thread.currentThread().interrupt(); + } + return ConsumerRecords.empty(); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + return Collections.emptySet(); + }).given(consumer).paused(); + willAnswer(i -> { + this.commitLatch.countDown(); + return null; + }).given(consumer).commitSync(anyMap(), any()); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); + factory.getContainerProperties().setAckMode(ackMode); + return factory; + } + + } + +} From dab7fbd95c40987c506768b39cdde5a2fe8da224 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 4 Aug 2023 19:08:48 -0400 Subject: [PATCH 2/2] Remove async ack check - not applicable to batch listeners. --- spring-kafka-docs/src/main/asciidoc/kafka.adoc | 1 - .../kafka/listener/KafkaMessageListenerContainer.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 760cb3cf6c..f2f2a07224 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -1255,7 +1255,6 @@ The following limitations apply: * The listener must consume a `List` rather than the raw `ConsumerRecords` * The index must be in the range of the list's elements * The index must be larger than that used in a previous call -* Out of order commits (<>) are not supported These restrictions are enforced and the method will throw an `IllegalArgumentException` or `IllegalStateException`, depending on the violation. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8d6eefd041..861147167c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3469,8 +3469,6 @@ public void acknowledge(int index) { this.partial)); Assert.state(ListenerConsumer.this.isManualImmediateAck, "Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE"); - Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(), - "Partial batch acknowledgment is not supported with out-of-order commits (asyncAcks=true)"); Assert.state(this.recordList != null, "Listener must receive a List of records to use partial batch acknowledgment"); Assert.isTrue(index >= 0 && index < this.recordList.size(),