Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,19 @@ The actual sleep time, and its resolution, depends on the container's `pollTimeo
The minimum sleep time is equal to the `pollTimeout` and all sleep times will be a multiple of it.
For small sleep times or, to increase its accuracy, consider reducing the container's `pollTimeout`.

Starting with version 3.0.10, batch listeners can commit the offsets of parts of the batch, using `acknowledge(index)` on the `Acknowledgment` argument.
When this method is called, the offset of the record at the index (as well as all previous records) will be committed.
Calling `acknowledge()` after a partial batch commit is performed will commit the offsets of the remainder of the batch.
The following limitations apply:

* `AckMode.MANUAL_IMMEDIATE` is required
* The method must be called on the listener thread
* The listener must consume a `List` rather than the raw `ConsumerRecords`
* The index must be in the range of the list's elements
* The index must be larger than that used in a previous call

These restrictions are enforced and the method will throw an `IllegalArgumentException` or `IllegalStateException`, depending on the violation.

[[container-auto-startup]]
====== Listener Container Auto Startup

Expand Down Expand Up @@ -2443,6 +2456,10 @@ When creating the `TopicPartitionOffset` s for the request, only positive, absol
|Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won't run in a transaction even if there is a transaction manager present.
See the javadocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options.

|[[asyncAcks]]<<asyncAcks,`asyncAcks`>>
|false
|Enable out-of-order commits (see <<ooo-commits>>); the consumer is paused and commits are deferred until gaps are filled.

|[[authExceptionRetryInterval]]<<authExceptionRetryInterval,`authExceptionRetryInterval`>>
|`null`
|When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client.
Expand Down
3 changes: 3 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ See <<replying-template>> and <<exchanging-messages>>.
You can now use a custom correlation header which will be echoed in any reply message.
See the note at the end of <<replying-template>> for more information.

You can now manually commit parts of a batch before the entire batch is processed.
See <<committing-offsets>> for more information.

[[x30-headers]]
==== `KafkaHeaders` Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2446,7 +2446,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
if (this.wantsFullRecords) {
this.batchListener.onMessage(records, // NOSONAR
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
? new ConsumerBatchAcknowledgment(records, recordList)
: null,
this.consumer);
}
Expand All @@ -2456,19 +2456,19 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
}

private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
List<ConsumerRecord<K, V>> recordList) {
@Nullable List<ConsumerRecord<K, V>> recordList) {

try {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE ->
this.batchListener.onMessage(recordList,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
? new ConsumerBatchAcknowledgment(records, recordList)
: null, this.consumer);
case ACKNOWLEDGING ->
this.batchListener.onMessage(recordList,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
? new ConsumerBatchAcknowledgment(records, recordList)
: null);
case CONSUMER_AWARE -> this.batchListener.onMessage(recordList, this.consumer);
case SIMPLE -> this.batchListener.onMessage(recordList);
Expand Down Expand Up @@ -3429,14 +3429,25 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {

private final ConsumerRecords<K, V> records;

private final List<ConsumerRecord<K, V>> recordList;

private volatile boolean acked;

ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records) {
private volatile int partial = -1;

ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records,
@Nullable List<ConsumerRecord<K, V>> recordList) {

this.records = records;
this.recordList = recordList;
}

@Override
public void acknowledge() {
if (this.partial >= 0) {
acknowledge(this.partial + 1);
return;
}
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
if (!this.acked) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
Expand All @@ -3451,6 +3462,32 @@ public void acknowledge() {
}
}

@Override
public void acknowledge(int index) {
Assert.isTrue(index > this.partial,
() -> String.format("index (%d) must be greater than the previous partial commit (%d)", index,
this.partial));
Assert.state(ListenerConsumer.this.isManualImmediateAck,
"Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE");
Assert.state(this.recordList != null,
"Listener must receive a List of records to use partial batch acknowledgment");
Assert.isTrue(index >= 0 && index < this.recordList.size(),
() -> String.format("index (%d) is out of range (%d-%d)", index, 0,
this.recordList.size() - 1));
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
"Partial batch acknowledgment is only supported on the consumer thread");
Map<TopicPartition, List<ConsumerRecord<K, V>>> offsetsToCommit = new LinkedHashMap<>();
for (int i = this.partial + 1; i <= index; i++) {
ConsumerRecord<K, V> record = this.recordList.get(i);
offsetsToCommit.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
tp -> new ArrayList<>()).add(record);
}
if (!offsetsToCommit.isEmpty()) {
processAcks(new ConsumerRecords<>(offsetsToCommit));
}
this.partial = index;
}

@Override
public void nack(int index, Duration sleep) {
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@

import java.time.Duration;

import org.springframework.kafka.listener.ContainerProperties.AckMode;

/**
* Handle for acknowledging the processing of a
* {@link org.apache.kafka.clients.consumer.ConsumerRecord}. Recipients can store the
Expand Down Expand Up @@ -49,6 +51,17 @@ default void nack(Duration sleep) {
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
}

/**
* Acknowledge the record at an index in the batch - commit the offset(s) of records in the batch
* up to and including the index. Requires {@link AckMode#MANUAL_IMMEDIATE}. The index must be
* greater than any previous partial batch acknowledgment index.
* @param index the index of the record to acknowledge.
* @since 3.0.10
*/
default void acknowledge(int index) {
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
}

/**
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
* records before the index and re-seek the partitions so that the record at the index
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

/**
* @author Gary Russell
* @since 2.3
*
*/
@SpringJUnitConfig
@DirtiesContext
public class ManualAckPartialBatchTests {

private static final String CONTAINER_ID = "container";

protected static AckMode ackMode;

static {
ackMode = AckMode.MANUAL_IMMEDIATE;
}

@SuppressWarnings("rawtypes")
@Autowired
private Consumer consumer;

@Autowired
private Config config;

@Autowired
private KafkaListenerEndpointRegistry registry;

/*
* Deliver 6 records from three partitions.
* 2 partial commits and final commit.
*/
@SuppressWarnings("unchecked")
@Test
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
this.registry.stop();
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
InOrder inOrder = inOrder(this.consumer);
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
Map<TopicPartition, OffsetAndMetadata> commit1 = Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L),
new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
Map<TopicPartition, OffsetAndMetadata> commit2 = Map.of(new TopicPartition("foo", 1), new OffsetAndMetadata(2L),
new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
Map<TopicPartition, OffsetAndMetadata> commit3 = Map.of(
new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
inOrder.verify(this.consumer).commitSync(commit1, Duration.ofSeconds(60));
inOrder.verify(this.consumer).commitSync(commit2, Duration.ofSeconds(60));
inOrder.verify(this.consumer).commitSync(commit3, Duration.ofSeconds(60));
assertThat(this.config.contents.toArray()).isEqualTo(new String[]
{ "foo", "bar", "baz", "qux", "fiz", "buz" });
}

@Configuration
@EnableKafka
public static class Config {

final List<String> contents = new ArrayList<>();

final CountDownLatch pollLatch = new CountDownLatch(3);

final CountDownLatch deliveryLatch = new CountDownLatch(1);

final CountDownLatch commitLatch = new CountDownLatch(3);

final CountDownLatch closeLatch = new CountDownLatch(1);

@KafkaListener(id = CONTAINER_ID, topics = "foo")
public void foo(List<String> in, Acknowledgment ack) {
contents.addAll(in);
this.deliveryLatch.countDown();
try {
ack.acknowledge(2);
ack.acknowledge(4);
ack.acknowledge();
}
catch (Exception ex) {
LogFactory.getLog(getClass()).error("Ack failed", ex);
}
}

@SuppressWarnings({ "rawtypes" })
@Bean
public ConsumerFactory consumerFactory() {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = consumer();
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
return consumerFactory;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Consumer consumer() {
final Consumer consumer = mock(Consumer.class);
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
willAnswer(i -> {
((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned(
Collections.singletonList(topicPartition1));
return null;
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
records1.put(topicPartition0, Arrays.asList(
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
new RecordHeaders(), Optional.empty()),
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar",
new RecordHeaders(), Optional.empty())));
records1.put(topicPartition1, Arrays.asList(
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz",
new RecordHeaders(), Optional.empty()),
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
new RecordHeaders(), Optional.empty())));
records1.put(topicPartition2, Arrays.asList(
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz",
new RecordHeaders(), Optional.empty()),
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
new RecordHeaders(), Optional.empty())));
final AtomicInteger which = new AtomicInteger();
willAnswer(i -> {
this.pollLatch.countDown();
switch (which.getAndIncrement()) {
case 0:
return new ConsumerRecords(records1);
default:
try {
Thread.sleep(100);
}
catch (@SuppressWarnings("unused") InterruptedException e) {
Thread.currentThread().interrupt();
}
return ConsumerRecords.empty();
}
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
willAnswer(i -> {
return Collections.emptySet();
}).given(consumer).paused();
willAnswer(i -> {
this.commitLatch.countDown();
return null;
}).given(consumer).commitSync(anyMap(), any());
willAnswer(i -> {
this.closeLatch.countDown();
return null;
}).given(consumer).close();
return consumer;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ackMode);
return factory;
}

}

}