Skip to content

Commit 785ed7a

Browse files
garyrussellartembilan
authored andcommitted
GH-501: Fix transactions with AckMode.RECORD
Fixes #501 With `AckMode.RECORD`, the container was committing the offsets on the consumer instead of sending them to the transaction.
1 parent ddde5e3 commit 785ed7a

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -934,12 +934,16 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
934934
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
935935
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
936936
new OffsetAndMetadata(record.offset() + 1));
937-
938-
if (this.containerProperties.isSyncCommits()) {
939-
this.consumer.commitSync(offsetsToCommit);
937+
if (producer == null) {
938+
if (this.containerProperties.isSyncCommits()) {
939+
this.consumer.commitSync(offsetsToCommit);
940+
}
941+
else {
942+
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
943+
}
940944
}
941945
else {
942-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
946+
this.acks.add(record);
943947
}
944948
}
945949
else if (!this.isAnyManualAck && !this.autoCommit) {

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
6262
import org.springframework.kafka.core.KafkaTemplate;
6363
import org.springframework.kafka.core.ProducerFactory;
64+
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
6465
import org.springframework.kafka.listener.config.ContainerProperties;
6566
import org.springframework.kafka.test.rule.KafkaEmbedded;
6667
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -88,9 +89,18 @@ public class TransactionalContainerTests {
8889
@ClassRule
8990
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(3, true, topic1, topic2);
9091

91-
@SuppressWarnings({ "rawtypes", "unchecked" })
9292
@Test
93-
public void testConsumeAndProduceTransaction() throws Exception {
93+
public void testConsumeAndProduceTransactionBatch() throws Exception {
94+
testConsumeAndProduceTransactionGuts(AckMode.BATCH);
95+
}
96+
97+
@Test
98+
public void testConsumeAndProduceTransactionRecord() throws Exception {
99+
testConsumeAndProduceTransactionGuts(AckMode.RECORD);
100+
}
101+
102+
@SuppressWarnings({ "rawtypes", "unchecked" })
103+
private void testConsumeAndProduceTransactionGuts(AckMode ackMode) throws Exception {
94104
Consumer consumer = mock(Consumer.class);
95105
final TopicPartition topicPartition = new TopicPartition("foo", 0);
96106
willAnswer(i -> {
@@ -129,6 +139,7 @@ public void testConsumeAndProduceTransaction() throws Exception {
129139
props.setMessageListener((MessageListener) m -> {
130140
template.send("bar", "baz");
131141
});
142+
props.setAckMode(ackMode);
132143
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
133144
container.setBeanName("commit");
134145
container.start();

0 commit comments

Comments
 (0)