Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3080,6 +3080,9 @@ else if (!this.autoCommit && (!this.isAnyManualAck || commitRecovered)) {
}

private void sendOffsetsToTransaction() {
if (this.kafkaTxManager == null || TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) {
return;
}
handleAcks();
Map<TopicPartition, OffsetAndMetadata> commits = buildCommits();
this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jspecify.annotations.Nullable;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -75,6 +76,9 @@
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.MessageHeaders;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionExecution;
import org.springframework.transaction.TransactionExecutionListener;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.backoff.FixedBackOff;

Expand Down Expand Up @@ -140,6 +144,8 @@ public class TransactionalContainerTests {

public static final String topic10 = "txTopic10";

public static final String topic11 = "txTopic11";

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
Expand Down Expand Up @@ -1148,4 +1154,82 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) {
container.stop();
}

@Test
void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException {
// init producer
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties);
pf.setTransactionIdPrefix("testSendOffsetOnlyOnActiveTransaction.recordListener");
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);

// init consumer
String group = "testSendOffsetOnlyOnActiveTransaction";
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false);
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProps = new ContainerProperties(topic11);
containerProps.setPollTimeout(10_000);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Transactional("testSendOffsetOnlyOnActiveTransaction")
@Override
public void onMessage(ConsumerRecord<Integer, String> data) {
}
});

// init container
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
AtomicInteger txCount = new AtomicInteger(0);
tm.addListener(new TransactionExecutionListener() {
@Override
public void afterCommit(TransactionExecution transaction, @Nullable Throwable commitFailure) {
txCount.incrementAndGet();
TransactionExecutionListener.super.afterCommit(transaction, commitFailure);
}
});
containerProps.setKafkaAwareTransactionManager(tm);

KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testSendOffsetOnlyOnActiveTransaction");
final var interceptorLatch = new AtomicReference<>(new CountDownLatch(1));
container.setRecordInterceptor(new RecordInterceptor<Integer, String>() {
boolean isFirst = true;

@Override
public @Nullable ConsumerRecord<Integer, String> intercept(
ConsumerRecord<Integer, String> record,
Consumer<Integer, String> consumer) {
if (isFirst) {
isFirst = false;
return record;
}
return null;
}

@Override
public void afterRecord(
ConsumerRecord<Integer, String> record,
Consumer<Integer, String> consumer) {
interceptorLatch.get().countDown();
}
});
container.start();

template.executeInTransaction(t -> {
template.send(new ProducerRecord<>(topic11, 0, 0, "bar1"));
return null;
});
assertThat(interceptorLatch.get().await(30, TimeUnit.SECONDS)).isTrue();
assertThat(txCount.get()).isEqualTo(1);

interceptorLatch.set(new CountDownLatch(1));
template.executeInTransaction(t -> {
template.send(new ProducerRecord<>(topic11, 0, 0, "bar2"));
return null;
});
assertThat(interceptorLatch.get().await(30, TimeUnit.SECONDS)).isTrue();
assertThat(txCount.get()).isEqualTo(1);

container.stop();
pf.destroy();
}
}