Skip to content

Commit

Permalink
GH-2588: support batch recoverable DefaultAfterRollbackProcessor
Browse files Browse the repository at this point in the history
deprecated transactionManager and add KafkaAwareTransactionManager at ContainerProperties, after remove transactionManager, change `KafkaMessageListenerContainer#transactionManager` type to PlatformTransactionManager

* modify toString in `ContainerProperties`
  • Loading branch information
Wzy19930507 committed Jan 18, 2024
1 parent 628b5da commit c37a616
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.transaction.PlatformTransactionManager;
Expand Down Expand Up @@ -256,8 +257,11 @@ public enum EOSMode {

private double idleBeforeDataMultiplier = DEFAULT_IDLE_BEFORE_DATA_MULTIPLIER;

@Deprecated(since = "3.1")
private PlatformTransactionManager transactionManager;

private KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager;

private boolean batchRecoverAfterRollback = false;

private int monitorInterval = DEFAULT_MONITOR_INTERVAL;
Expand Down Expand Up @@ -524,6 +528,7 @@ public Long getIdlePartitionEventInterval() {
return this.idlePartitionEventInterval;
}

@Deprecated(since = "3.1")
@Nullable
public PlatformTransactionManager getTransactionManager() {
return this.transactionManager;
Expand All @@ -541,10 +546,26 @@ public PlatformTransactionManager getTransactionManager() {
* @since 1.3
* @see #setAckMode(AckMode)
*/
@Deprecated(since = "3.1")
public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}

@Nullable
public KafkaAwareTransactionManager<?, ?> getKafkaAwareTransactionManager() {
return this.kafkaAwareTransactionManager;
}

/**
* Set the transaction manager to start a transaction; replace {@link #setTransactionManager}.
* @param kafkaAwareTransactionManager the transaction manager.
* @since 3.1
*/
public void setKafkaAwareTransactionManager(@Nullable KafkaAwareTransactionManager<?, ?> kafkaAwareTransactionManager) {
this.kafkaAwareTransactionManager = kafkaAwareTransactionManager;
}


public boolean isBatchRecoverAfterRollback() {
return this.batchRecoverAfterRollback;
}
Expand Down Expand Up @@ -1060,6 +1081,9 @@ public String toString() {
+ (this.transactionManager != null
? "\n transactionManager=" + this.transactionManager
: "")
+ (this.kafkaAwareTransactionManager != null
? "\n kafkaAwareTransactionManager=" + this.kafkaAwareTransactionManager
: "")
+ "\n monitorInterval=" + this.monitorInterval
+ (this.scheduler != null ? "\n scheduler=" + this.scheduler : "")
+ "\n noPollThreshold=" + this.noPollThreshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,11 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final CommonErrorHandler commonErrorHandler;

private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager();
@SuppressWarnings("deprecation")
private final PlatformTransactionManager transactionManager =
this.containerProperties.getKafkaAwareTransactionManager() != null ?
this.containerProperties.getKafkaAwareTransactionManager() :
this.containerProperties.getTransactionManager();

@SuppressWarnings(RAWTYPES)
private final KafkaAwareTransactionManager kafkaTxManager =
Expand Down Expand Up @@ -2993,7 +2997,6 @@ private void sendOffsetsToTransaction() {
doSendOffsets(this.producer, commits);
}

@SuppressWarnings("deprecation")
private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMetadata> commits) {
prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata());
if (this.fixTxOffsets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo("2");
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void testNestedTxProducerIsCached() throws Exception {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
Expand Down Expand Up @@ -376,6 +376,55 @@ public void testNestedTxProducerIsCached() throws Exception {
}
}

@SuppressWarnings("unchecked")
@Test
public void testNestedTxProducerIsCachedByKWTM() throws Exception {
Map<String, Object> producerProps = KafkaTestUtils.producerProps(this.embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
DefaultKafkaProducerFactory<Integer, String> pfTx = new DefaultKafkaProducerFactory<>(producerProps);
pfTx.setTransactionIdPrefix("fooTx.");
KafkaOperations<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
cf.addPostProcessor(consumer -> {
ProxyFactory prox = new ProxyFactory();
prox.setTarget(consumer);
@SuppressWarnings("unchecked")
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
wrapped.set(proxy);
return proxy;
});
ContainerProperties containerProps = new ContainerProperties("txCache1");
CountDownLatch latch = new CountDownLatch(1);
containerProps.setMessageListener((MessageListener<Integer, String>) r -> {
templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "bar"));
templateTx.executeInTransaction(t -> t.send("txCacheSendFromListener", "baz"));
latch.countDown();
});
KafkaTransactionManager<Integer, String> tm = new KafkaTransactionManager<>(pfTx);
containerProps.setKafkaAwareTransactionManager(tm);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
containerProps);
container.start();
try {
CompletableFuture<SendResult<Integer, String>> future = template.send("txCache1", "foo");
future.get(10, TimeUnit.SECONDS);
pf.getCache();
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", Map.class)).hasSize(0);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
assertThat(pfTx.getCache()).hasSize(1);
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get());
}
finally {
container.stop();
pf.destroy();
pfTx.destroy();
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void listener() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -105,7 +105,7 @@ void testLastOnly() throws InterruptedException {
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Test
void testLatestOnlyTx() throws InterruptedException {
ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties();
Expand All @@ -125,7 +125,7 @@ void testLatestOnlyTx() throws InterruptedException {
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Test
void testLatestOnlyNoTx() throws InterruptedException {
ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties();
Expand All @@ -135,13 +135,48 @@ void testLatestOnlyNoTx() throws InterruptedException {
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
Producer producer = mock(Producer.class);
given(pf.createProducer(any())).willReturn(producer);
CountDownLatch latch = new CountDownLatch(1);
props.setTransactionManager(tm);
this.registry.start();
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
verify(producer, never()).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void testLatestOnlyTxByKATM() throws InterruptedException {
ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties();
props.setAssignmentCommitOption(AssignmentCommitOption.LATEST_ONLY);
ProducerFactory pf = mock(ProducerFactory.class, withSettings().verboseLogging());
given(pf.transactionCapable()).willReturn(true);
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
Producer producer = mock(Producer.class);
given(pf.createProducer(any())).willReturn(producer);
CountDownLatch latch = new CountDownLatch(1);
willAnswer(inv -> {
latch.countDown();
return null;
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
props.setKafkaAwareTransactionManager(tm);
this.registry.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void testLatestOnlyNoTxByKATM() throws InterruptedException {
ContainerProperties props = this.registry.getListenerContainer("foo").getContainerProperties();
props.setAssignmentCommitOption(AssignmentCommitOption.LATEST_ONLY_NO_TX);
ProducerFactory pf = mock(ProducerFactory.class, withSettings().verboseLogging());
given(pf.transactionCapable()).willReturn(true);
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
Producer producer = mock(Producer.class);
given(pf.createProducer(any())).willReturn(producer);
props.setKafkaAwareTransactionManager(tm);
this.registry.start();
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
verify(producer, never()).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
}

@Configuration
@EnableKafka
public static class Config {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ void testBatchInterceptBeforeTx1() throws InterruptedException {
testIntercept(true, null, true);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batch) throws InterruptedException {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = mock(Consumer.class);
Expand Down Expand Up @@ -662,7 +662,7 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
void testInterceptInTxNonKafkaTM() throws InterruptedException {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = mock(Consumer.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -240,7 +240,7 @@ public Consumer consumer() {
return consumer;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -241,7 +241,7 @@ public Consumer consumer() {
return consumer;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -132,6 +132,7 @@ void withFilter() throws Exception {
this.registry.stop();
}

@SuppressWarnings("deprecation")
@Test
void defaults() {
Map<String, Object> props = KafkaTestUtils.consumerProps("sbpp", "false", this.broker);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -214,7 +214,7 @@ public Consumer consumer() {
return consumer;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -200,7 +200,7 @@ public Consumer consumer() {
return consumer;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode a
assertThat(stopEvent.get().getSource()).isSameAs(container);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Test
public void testConsumeAndProduceTransactionRollback() throws Exception {
Consumer consumer = mock(Consumer.class);
Expand Down Expand Up @@ -370,7 +370,7 @@ public void testConsumeAndProduceTransactionRollback() throws Exception {
assertThat(delivery.get()).isNotNull();
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Test
public void testConsumeAndProduceTransactionRollbackBatch() throws Exception {
Consumer consumer = mock(Consumer.class);
Expand Down Expand Up @@ -619,7 +619,7 @@ public void testFixLagOtherTM() throws InterruptedException {
testFixLagGuts(topic7, 2);
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "deprecation" })
private void testFixLagGuts(String topic, int whichTm) throws InterruptedException {
logger.info("Start testFixLag");
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka);
Expand Down Expand Up @@ -676,7 +676,7 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti
pf.destroy();
}

@SuppressWarnings({ "unchecked"})
@SuppressWarnings({ "unchecked", "deprecation" })
@Test
public void testMaxFailures() throws Exception {
logger.info("Start testMaxFailures");
Expand Down Expand Up @@ -786,7 +786,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
logger.info("Stop testMaxAttempts");
}

@SuppressWarnings({ "unchecked"})
@SuppressWarnings({ "unchecked", "deprecation" })
@Test
public void testBatchListenerMaxFailuresOnRecover() throws Exception {

Expand Down Expand Up @@ -908,7 +908,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
logger.info("Stop testBatchListenerMaxFailures");
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "deprecation" })
@Test
public void testRollbackProcessorCrash() throws Exception {
logger.info("Start testRollbackNoRetries");
Expand Down Expand Up @@ -971,7 +971,7 @@ public void testRollbackProcessorCrash() throws Exception {
logger.info("Stop testRollbackNoRetries");
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "deprecation" })
@Test
public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Exception {
logger.info("Start testBatchListenerRollbackNoRetries");
Expand Down Expand Up @@ -999,7 +999,7 @@ public void testBatchListenerRecoverAfterRollbackProcessorCrash() throws Excepti
}
});

@SuppressWarnings({ "rawtypes" })
@SuppressWarnings({ "rawtypes"})
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
containerProps.setTransactionManager(tm);
KafkaMessageListenerContainer<Integer, String> container =
Expand Down

0 comments on commit c37a616

Please sign in to comment.