Skip to content

Commit

Permalink
Cleanup TransactionalContainerTests
Browse files Browse the repository at this point in the history
* remove Test `testConsumeAndProduceTransactionKTM_BETA` because it same as testConsumeAndProduceTransactionKTM.
* remove logger
  • Loading branch information
Wzy19930507 authored Feb 6, 2024
1 parent 630dbb2 commit addfaf0
Showing 1 changed file with 4 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -53,7 +52,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
Expand All @@ -74,7 +72,6 @@
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
Expand Down Expand Up @@ -119,8 +116,6 @@
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
public class TransactionalContainerTests {

private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));

public static final String topic1 = "txTopic1";

public static final String topic2 = "txTopic2";
Expand Down Expand Up @@ -165,24 +160,18 @@ public void testConsumeAndProduceTransactionKTMManual() throws Exception {
testConsumeAndProduceTransactionGuts(false, AckMode.MANUAL_IMMEDIATE, EOSMode.V2);
}

@Test
public void testConsumeAndProduceTransactionKTM_BETA() throws Exception {
testConsumeAndProduceTransactionGuts(false, AckMode.RECORD, EOSMode.V2);
}

@Test
public void testConsumeAndProduceTransactionStopWhenFenced() throws Exception {
testConsumeAndProduceTransactionGuts(false, AckMode.RECORD, EOSMode.V2, true);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode ackMode,
EOSMode eosMode) throws Exception {

testConsumeAndProduceTransactionGuts(handleError, ackMode, eosMode, false);
}

@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@SuppressWarnings({ "rawtypes", "unchecked" })
private void testConsumeAndProduceTransactionGuts(boolean handleError, AckMode ackMode,
EOSMode eosMode, boolean stopWhenFenced) throws Exception {

Expand Down Expand Up @@ -439,16 +428,11 @@ public void testConsumeAndProduceTransactionRollbackBatch() throws Exception {
verify(pf, times(1)).createProducer(isNull());
}

@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testConsumeAndProduceTransactionExternalTM() throws Exception {
Consumer consumer = mock(Consumer.class);
final TopicPartition topicPartition = new TopicPartition("foo", 0);
willAnswer(i -> {
((ConsumerRebalanceListener) i.getArgument(1))
.onPartitionsAssigned(Collections.singletonList(topicPartition));
return null;
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
final ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition,
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))));
final AtomicBoolean done = new AtomicBoolean();
Expand Down Expand Up @@ -476,7 +460,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
final ProducerFactory pf = mock(ProducerFactory.class);
given(pf.transactionCapable()).willReturn(true);
given(pf.createProducer(isNull())).willReturn(producer);
ContainerProperties props = new ContainerProperties("foo");
ContainerProperties props = new ContainerProperties(new TopicPartitionOffset("foo", 0));
props.setGroupId("group");
props.setTransactionManager(new SomeOtherTransactionManager());
final KafkaTemplate template = new KafkaTemplate(pf);
Expand Down Expand Up @@ -508,7 +492,6 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
@SuppressWarnings({ "unchecked"})
@Test
public void testRollbackRecord() throws Exception {
logger.info("Start testRollbackRecord");
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest1", "false", embeddedKafka);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
Expand Down Expand Up @@ -576,7 +559,7 @@ public void testRollbackRecord() throws Exception {
container.stop();
Consumer<Integer, String> consumer = cf.createConsumer();
final CountDownLatch subsLatch = new CountDownLatch(1);
consumer.subscribe(Arrays.asList(topic1), new ConsumerRebalanceListener() {
consumer.subscribe(List.of(topic1), new ConsumerRebalanceListener() {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
Expand All @@ -598,7 +581,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assertThat(records.count()).isEqualTo(0);
assertThat(consumer.position(partition0)).isEqualTo(2L);
assertThat(transactionalId.get()).startsWith("rr.");
logger.info("Stop testRollbackRecord");
pf.destroy();
consumer.close();
}
Expand All @@ -620,7 +602,6 @@ public void testFixLagOtherTM() throws InterruptedException {

@SuppressWarnings("unchecked")
private void testFixLagGuts(String topic, int whichTm) throws InterruptedException {
logger.info("Start testFixLag");
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
Expand Down Expand Up @@ -678,7 +659,6 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti
@SuppressWarnings({ "unchecked"})
@Test
public void testMaxFailures() throws Exception {
logger.info("Start testMaxFailures");
String group = "groupInARBP";
Map<String, Object> props = KafkaTestUtils.consumerProps(group, "false", embeddedKafka);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
Expand Down Expand Up @@ -782,7 +762,6 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
verify(dlTemplate).sendOffsetsToTransaction(
eq(Collections.singletonMap(new TopicPartition(topic3, 0), new OffsetAndMetadata(1L))),
any(ConsumerGroupMetadata.class));
logger.info("Stop testMaxAttempts");
}

@SuppressWarnings({ "unchecked"})
Expand Down Expand Up @@ -907,7 +886,6 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
@SuppressWarnings("unchecked")
@Test
public void testRollbackProcessorCrash() throws Exception {
logger.info("Start testRollbackNoRetries");
Map<String, Object> props = KafkaTestUtils.consumerProps("testRollbackNoRetries", "false", embeddedKafka);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
Expand Down Expand Up @@ -964,7 +942,6 @@ public void testRollbackProcessorCrash() throws Exception {
container.stop();
pf.destroy();
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
logger.info("Stop testRollbackNoRetries");
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit addfaf0

Please sign in to comment.