Skip to content

Commit 8f26bfb

Browse files
garyrussellartembilan
authored andcommitted
GH-900: Use correct txId for initial commit
Fixes #900 Use the proper zombie-fencing `transational.id` when committing initial offsets after assignment. **cherry-pick to all, 1.3.x will probable need a backport** * Fix - set TL before invoking the transaction template.
1 parent c077b1d commit 8f26bfb

File tree

2 files changed

+48
-22
lines changed

2 files changed

+48
-22
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
490490
this.containerProperties.getClientId(),
491491
KafkaMessageListenerContainer.this.clientIdSuffix);
492492

493+
if (this.transactionManager != null) {
494+
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
495+
}
496+
else {
497+
this.transactionTemplate = null;
498+
}
493499
subscribeOrAssignTopics(this.consumer);
494500
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
495501
this.genericListener = listener;
@@ -525,12 +531,6 @@ else if (listener instanceof MessageListener) {
525531
this.batchErrorHandler = new BatchLoggingErrorHandler();
526532
}
527533
Assert.state(!this.isBatchListener || !this.isRecordAck, "Cannot use AckMode.RECORD with a batch listener");
528-
if (this.transactionManager != null) {
529-
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
530-
}
531-
else {
532-
this.transactionTemplate = null;
533-
}
534534
if (this.containerProperties.getScheduler() != null) {
535535
this.taskScheduler = this.containerProperties.getScheduler();
536536
this.taskSchedulerExplicitlySet = true;
@@ -1654,18 +1654,29 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
16541654
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
16551655
if (ListenerConsumer.this.transactionTemplate != null &&
16561656
ListenerConsumer.this.kafkaTxManager != null) {
1657-
ListenerConsumer.this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
1658-
1659-
@SuppressWarnings({ "unchecked", RAWTYPES })
1660-
@Override
1661-
protected void doInTransactionWithoutResult(TransactionStatus status) {
1662-
((KafkaResourceHolder) TransactionSynchronizationManager
1663-
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
1664-
.getProducer().sendOffsetsToTransaction(offsetsToCommit, // NOSONAR
1665-
ListenerConsumer.this.consumerGroupId);
1666-
}
1667-
1668-
});
1657+
try {
1658+
offsetsToCommit.forEach((partition, offsetAndMetadata) -> {
1659+
TransactionSupport.setTransactionIdSuffix(
1660+
zombieFenceTxIdSuffix(partition.topic(), partition.partition()));
1661+
ListenerConsumer.this.transactionTemplate
1662+
.execute(new TransactionCallbackWithoutResult() {
1663+
1664+
@SuppressWarnings({ "unchecked", RAWTYPES })
1665+
@Override
1666+
protected void doInTransactionWithoutResult(TransactionStatus status) {
1667+
((KafkaResourceHolder) TransactionSynchronizationManager
1668+
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
1669+
.getProducer().sendOffsetsToTransaction(
1670+
Collections.singletonMap(partition, offsetAndMetadata),
1671+
ListenerConsumer.this.consumerGroupId);
1672+
}
1673+
1674+
});
1675+
});
1676+
}
1677+
finally {
1678+
TransactionSupport.clearTransactionIdSuffix();
1679+
}
16691680
}
16701681
else if (KafkaMessageListenerContainer.this.getContainerProperties().isSyncCommits()) {
16711682
ListenerConsumer.this.consumer.commitSync(offsetsToCommit);

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import static org.mockito.Mockito.spy;
3131
import static org.mockito.Mockito.times;
3232
import static org.mockito.Mockito.verify;
33+
import static org.mockito.Mockito.verifyNoMoreInteractions;
3334

3435
import java.time.Duration;
36+
import java.util.ArrayList;
3537
import java.util.Arrays;
3638
import java.util.Collection;
3739
import java.util.Collections;
@@ -73,6 +75,7 @@
7375
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
7476
import org.springframework.kafka.support.KafkaHeaders;
7577
import org.springframework.kafka.support.TopicPartitionInitialOffset;
78+
import org.springframework.kafka.support.TransactionSupport;
7679
import org.springframework.kafka.test.EmbeddedKafkaBroker;
7780
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
7881
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -153,14 +156,18 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
153156
ConsumerFactory cf = mock(ConsumerFactory.class);
154157
willReturn(consumer).given(cf).createConsumer("group", "", null);
155158
Producer producer = mock(Producer.class);
156-
final CountDownLatch closeLatch = new CountDownLatch(1);
159+
final CountDownLatch closeLatch = new CountDownLatch(2);
157160
willAnswer(i -> {
158161
closeLatch.countDown();
159162
return null;
160163
}).given(producer).close();
161164
ProducerFactory pf = mock(ProducerFactory.class);
162165
given(pf.transactionCapable()).willReturn(true);
163-
given(pf.createProducer()).willReturn(producer);
166+
final List<String> transactionalIds = new ArrayList<>();
167+
willAnswer(i -> {
168+
transactionalIds.add(TransactionSupport.getTransactionIdSuffix());
169+
return producer;
170+
}).given(pf).createProducer();
164171
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
165172
PlatformTransactionManager ptm = tm;
166173
if (chained) {
@@ -185,6 +192,11 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
185192
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
186193
InOrder inOrder = inOrder(producer);
187194
inOrder.verify(producer).beginTransaction();
195+
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
196+
new OffsetAndMetadata(0)), "group");
197+
inOrder.verify(producer).commitTransaction();
198+
inOrder.verify(producer).close();
199+
inOrder.verify(producer).beginTransaction();
188200
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
189201
inOrder.verify(producer).send(captor.capture(), any(Callback.class));
190202
assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
@@ -193,7 +205,10 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
193205
inOrder.verify(producer).commitTransaction();
194206
inOrder.verify(producer).close();
195207
container.stop();
196-
verify(pf, times(1)).createProducer();
208+
verify(pf, times(2)).createProducer();
209+
verifyNoMoreInteractions(producer);
210+
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
211+
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
197212
}
198213

199214
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -433,7 +448,7 @@ public void testRollbackRecord() throws Exception {
433448
}
434449
});
435450

436-
@SuppressWarnings({ "rawtypes", "unchecked" })
451+
@SuppressWarnings({ "rawtypes" })
437452
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
438453
containerProps.setTransactionManager(tm);
439454
KafkaMessageListenerContainer<Integer, String> container =

0 commit comments

Comments
 (0)