Skip to content

Commit 4ed8023

Browse files
authored
GH-2066: Partial Batch Acknowledgments
Resolves #2066 Allow apps to commit partial batches. * Remove async ack check - not applicable to batch listeners.
1 parent 5407673 commit 4ed8023

File tree

5 files changed

+305
-6
lines changed

5 files changed

+305
-6
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,6 +1245,19 @@ The actual sleep time, and its resolution, depends on the container's `pollTimeo
12451245
The minimum sleep time is equal to the `pollTimeout` and all sleep times will be a multiple of it.
12461246
For small sleep times or, to increase its accuracy, consider reducing the container's `pollTimeout`.
12471247

1248+
Starting with version 3.0.10, batch listeners can commit the offsets of parts of the batch, using `acknowledge(index)` on the `Acknowledgment` argument.
1249+
When this method is called, the offset of the record at the index (as well as all previous records) will be committed.
1250+
Calling `acknowledge()` after a partial batch commit is performed will commit the offsets of the remainder of the batch.
1251+
The following limitations apply:
1252+
1253+
* `AckMode.MANUAL_IMMEDIATE` is required
1254+
* The method must be called on the listener thread
1255+
* The listener must consume a `List` rather than the raw `ConsumerRecords`
1256+
* The index must be in the range of the list's elements
1257+
* The index must be larger than that used in a previous call
1258+
1259+
These restrictions are enforced and the method will throw an `IllegalArgumentException` or `IllegalStateException`, depending on the violation.
1260+
12481261
[[container-auto-startup]]
12491262
====== Listener Container Auto Startup
12501263

@@ -2443,6 +2456,10 @@ When creating the `TopicPartitionOffset` s for the request, only positive, absol
24432456
|Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won't run in a transaction even if there is a transaction manager present.
24442457
See the javadocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options.
24452458

2459+
|[[asyncAcks]]<<asyncAcks,`asyncAcks`>>
2460+
|false
2461+
|Enable out-of-order commits (see <<ooo-commits>>); the consumer is paused and commits are deferred until gaps are filled.
2462+
24462463
|[[authExceptionRetryInterval]]<<authExceptionRetryInterval,`authExceptionRetryInterval`>>
24472464
|`null`
24482465
|When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client.

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ See <<replying-template>> and <<exchanging-messages>>.
8888
You can now use a custom correlation header which will be echoed in any reply message.
8989
See the note at the end of <<replying-template>> for more information.
9090

91+
You can now manually commit parts of a batch before the entire batch is processed.
92+
See <<committing-offsets>> for more information.
93+
9194
[[x30-headers]]
9295
==== `KafkaHeaders` Changes
9396

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2446,7 +2446,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24462446
if (this.wantsFullRecords) {
24472447
this.batchListener.onMessage(records, // NOSONAR
24482448
this.isAnyManualAck
2449-
? new ConsumerBatchAcknowledgment(records)
2449+
? new ConsumerBatchAcknowledgment(records, recordList)
24502450
: null,
24512451
this.consumer);
24522452
}
@@ -2456,19 +2456,19 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24562456
}
24572457

24582458
private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
2459-
List<ConsumerRecord<K, V>> recordList) {
2459+
@Nullable List<ConsumerRecord<K, V>> recordList) {
24602460

24612461
try {
24622462
switch (this.listenerType) {
24632463
case ACKNOWLEDGING_CONSUMER_AWARE ->
24642464
this.batchListener.onMessage(recordList,
24652465
this.isAnyManualAck
2466-
? new ConsumerBatchAcknowledgment(records)
2466+
? new ConsumerBatchAcknowledgment(records, recordList)
24672467
: null, this.consumer);
24682468
case ACKNOWLEDGING ->
24692469
this.batchListener.onMessage(recordList,
24702470
this.isAnyManualAck
2471-
? new ConsumerBatchAcknowledgment(records)
2471+
? new ConsumerBatchAcknowledgment(records, recordList)
24722472
: null);
24732473
case CONSUMER_AWARE -> this.batchListener.onMessage(recordList, this.consumer);
24742474
case SIMPLE -> this.batchListener.onMessage(recordList);
@@ -3429,14 +3429,25 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
34293429

34303430
private final ConsumerRecords<K, V> records;
34313431

3432+
private final List<ConsumerRecord<K, V>> recordList;
3433+
34323434
private volatile boolean acked;
34333435

3434-
ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records) {
3436+
private volatile int partial = -1;
3437+
3438+
ConsumerBatchAcknowledgment(ConsumerRecords<K, V> records,
3439+
@Nullable List<ConsumerRecord<K, V>> recordList) {
3440+
34353441
this.records = records;
3442+
this.recordList = recordList;
34363443
}
34373444

34383445
@Override
34393446
public void acknowledge() {
3447+
if (this.partial >= 0) {
3448+
acknowledge(this.partial + 1);
3449+
return;
3450+
}
34403451
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
34413452
if (!this.acked) {
34423453
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
@@ -3451,6 +3462,32 @@ public void acknowledge() {
34513462
}
34523463
}
34533464

3465+
@Override
3466+
public void acknowledge(int index) {
3467+
Assert.isTrue(index > this.partial,
3468+
() -> String.format("index (%d) must be greater than the previous partial commit (%d)", index,
3469+
this.partial));
3470+
Assert.state(ListenerConsumer.this.isManualImmediateAck,
3471+
"Partial batch acknowledgment is only supported with AckMode.MANUAL_IMMEDIATE");
3472+
Assert.state(this.recordList != null,
3473+
"Listener must receive a List of records to use partial batch acknowledgment");
3474+
Assert.isTrue(index >= 0 && index < this.recordList.size(),
3475+
() -> String.format("index (%d) is out of range (%d-%d)", index, 0,
3476+
this.recordList.size() - 1));
3477+
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
3478+
"Partial batch acknowledgment is only supported on the consumer thread");
3479+
Map<TopicPartition, List<ConsumerRecord<K, V>>> offsetsToCommit = new LinkedHashMap<>();
3480+
for (int i = this.partial + 1; i <= index; i++) {
3481+
ConsumerRecord<K, V> record = this.recordList.get(i);
3482+
offsetsToCommit.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
3483+
tp -> new ArrayList<>()).add(record);
3484+
}
3485+
if (!offsetsToCommit.isEmpty()) {
3486+
processAcks(new ConsumerRecords<>(offsetsToCommit));
3487+
}
3488+
this.partial = index;
3489+
}
3490+
34543491
@Override
34553492
public void nack(int index, Duration sleep) {
34563493
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),

spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import java.time.Duration;
2020

21+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
22+
2123
/**
2224
* Handle for acknowledging the processing of a
2325
* {@link org.apache.kafka.clients.consumer.ConsumerRecord}. Recipients can store the
@@ -49,6 +51,17 @@ default void nack(Duration sleep) {
4951
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
5052
}
5153

54+
/**
55+
* Acknowledge the record at an index in the batch - commit the offset(s) of records in the batch
56+
* up to and including the index. Requires {@link AckMode#MANUAL_IMMEDIATE}. The index must be
57+
* greater than any previous partial batch acknowledgment index.
58+
* @param index the index of the record to acknowledge.
59+
* @since 3.0.10
60+
*/
61+
default void acknowledge(int index) {
62+
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
63+
}
64+
5265
/**
5366
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
5467
* records before the index and re-seek the partitions so that the record at the index
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright 2017-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyMap;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.willAnswer;
24+
import static org.mockito.Mockito.inOrder;
25+
import static org.mockito.Mockito.mock;
26+
27+
import java.time.Duration;
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.Collection;
31+
import java.util.Collections;
32+
import java.util.LinkedHashMap;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Optional;
36+
import java.util.concurrent.CountDownLatch;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.atomic.AtomicInteger;
39+
40+
import org.apache.commons.logging.LogFactory;
41+
import org.apache.kafka.clients.consumer.Consumer;
42+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
43+
import org.apache.kafka.clients.consumer.ConsumerRecord;
44+
import org.apache.kafka.clients.consumer.ConsumerRecords;
45+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
46+
import org.apache.kafka.common.TopicPartition;
47+
import org.apache.kafka.common.header.internals.RecordHeaders;
48+
import org.apache.kafka.common.record.TimestampType;
49+
import org.junit.jupiter.api.Test;
50+
import org.mockito.InOrder;
51+
52+
import org.springframework.beans.factory.annotation.Autowired;
53+
import org.springframework.context.annotation.Bean;
54+
import org.springframework.context.annotation.Configuration;
55+
import org.springframework.kafka.annotation.EnableKafka;
56+
import org.springframework.kafka.annotation.KafkaListener;
57+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
58+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
59+
import org.springframework.kafka.core.ConsumerFactory;
60+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
61+
import org.springframework.kafka.support.Acknowledgment;
62+
import org.springframework.kafka.test.utils.KafkaTestUtils;
63+
import org.springframework.test.annotation.DirtiesContext;
64+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
65+
66+
/**
67+
* @author Gary Russell
68+
* @since 2.3
69+
*
70+
*/
71+
@SpringJUnitConfig
72+
@DirtiesContext
73+
public class ManualAckPartialBatchTests {
74+
75+
private static final String CONTAINER_ID = "container";
76+
77+
protected static AckMode ackMode;
78+
79+
static {
80+
ackMode = AckMode.MANUAL_IMMEDIATE;
81+
}
82+
83+
@SuppressWarnings("rawtypes")
84+
@Autowired
85+
private Consumer consumer;
86+
87+
@Autowired
88+
private Config config;
89+
90+
@Autowired
91+
private KafkaListenerEndpointRegistry registry;
92+
93+
/*
94+
* Deliver 6 records from three partitions.
95+
* 2 partial commits and final commit.
96+
*/
97+
@SuppressWarnings("unchecked")
98+
@Test
99+
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
100+
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
101+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
102+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
103+
this.registry.stop();
104+
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
105+
InOrder inOrder = inOrder(this.consumer);
106+
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
107+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
108+
Map<TopicPartition, OffsetAndMetadata> commit1 = Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L),
109+
new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
110+
Map<TopicPartition, OffsetAndMetadata> commit2 = Map.of(new TopicPartition("foo", 1), new OffsetAndMetadata(2L),
111+
new TopicPartition("foo", 2), new OffsetAndMetadata(1L));
112+
Map<TopicPartition, OffsetAndMetadata> commit3 = Map.of(
113+
new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
114+
inOrder.verify(this.consumer).commitSync(commit1, Duration.ofSeconds(60));
115+
inOrder.verify(this.consumer).commitSync(commit2, Duration.ofSeconds(60));
116+
inOrder.verify(this.consumer).commitSync(commit3, Duration.ofSeconds(60));
117+
assertThat(this.config.contents.toArray()).isEqualTo(new String[]
118+
{ "foo", "bar", "baz", "qux", "fiz", "buz" });
119+
}
120+
121+
@Configuration
122+
@EnableKafka
123+
public static class Config {
124+
125+
final List<String> contents = new ArrayList<>();
126+
127+
final CountDownLatch pollLatch = new CountDownLatch(3);
128+
129+
final CountDownLatch deliveryLatch = new CountDownLatch(1);
130+
131+
final CountDownLatch commitLatch = new CountDownLatch(3);
132+
133+
final CountDownLatch closeLatch = new CountDownLatch(1);
134+
135+
@KafkaListener(id = CONTAINER_ID, topics = "foo")
136+
public void foo(List<String> in, Acknowledgment ack) {
137+
contents.addAll(in);
138+
this.deliveryLatch.countDown();
139+
try {
140+
ack.acknowledge(2);
141+
ack.acknowledge(4);
142+
ack.acknowledge();
143+
}
144+
catch (Exception ex) {
145+
LogFactory.getLog(getClass()).error("Ack failed", ex);
146+
}
147+
}
148+
149+
@SuppressWarnings({ "rawtypes" })
150+
@Bean
151+
public ConsumerFactory consumerFactory() {
152+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
153+
final Consumer consumer = consumer();
154+
given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
155+
.willReturn(consumer);
156+
return consumerFactory;
157+
}
158+
159+
@SuppressWarnings({ "rawtypes", "unchecked" })
160+
@Bean
161+
public Consumer consumer() {
162+
final Consumer consumer = mock(Consumer.class);
163+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
164+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
165+
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
166+
willAnswer(i -> {
167+
((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned(
168+
Collections.singletonList(topicPartition1));
169+
return null;
170+
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
171+
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
172+
records1.put(topicPartition0, Arrays.asList(
173+
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo",
174+
new RecordHeaders(), Optional.empty()),
175+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar",
176+
new RecordHeaders(), Optional.empty())));
177+
records1.put(topicPartition1, Arrays.asList(
178+
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz",
179+
new RecordHeaders(), Optional.empty()),
180+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
181+
new RecordHeaders(), Optional.empty())));
182+
records1.put(topicPartition2, Arrays.asList(
183+
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz",
184+
new RecordHeaders(), Optional.empty()),
185+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz",
186+
new RecordHeaders(), Optional.empty())));
187+
final AtomicInteger which = new AtomicInteger();
188+
willAnswer(i -> {
189+
this.pollLatch.countDown();
190+
switch (which.getAndIncrement()) {
191+
case 0:
192+
return new ConsumerRecords(records1);
193+
default:
194+
try {
195+
Thread.sleep(100);
196+
}
197+
catch (@SuppressWarnings("unused") InterruptedException e) {
198+
Thread.currentThread().interrupt();
199+
}
200+
return ConsumerRecords.empty();
201+
}
202+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
203+
willAnswer(i -> {
204+
return Collections.emptySet();
205+
}).given(consumer).paused();
206+
willAnswer(i -> {
207+
this.commitLatch.countDown();
208+
return null;
209+
}).given(consumer).commitSync(anyMap(), any());
210+
willAnswer(i -> {
211+
this.closeLatch.countDown();
212+
return null;
213+
}).given(consumer).close();
214+
return consumer;
215+
}
216+
217+
@SuppressWarnings({ "rawtypes", "unchecked" })
218+
@Bean
219+
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
220+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
221+
factory.setConsumerFactory(consumerFactory());
222+
factory.setBatchListener(true);
223+
factory.getContainerProperties().setAckMode(ackMode);
224+
return factory;
225+
}
226+
227+
}
228+
229+
}

0 commit comments

Comments
 (0)