From be57d9252d247a72f411a54767e1df75e3298d8f Mon Sep 17 00:00:00 2001 From: Jorgen Ringen Date: Thu, 12 Nov 2020 21:35:35 +0100 Subject: [PATCH] fixed bug in test, added more tests and better verification --- .../examples/core/CoreApp.java | 16 ++-- .../examples/core/Bug25AppTest.java | 85 ++++++++++++++----- 2 files changed, 74 insertions(+), 27 deletions(-) diff --git a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java index 06c028ed7..42abd65f1 100644 --- a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java +++ b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java @@ -17,6 +17,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import pl.tlinkowski.unij.api.UniLists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; @@ -28,6 +31,7 @@ @Slf4j public class CoreApp { + String inputTopic = "input-topic-" + RandomUtils.nextInt(); String outputTopic = "output-topic-" + RandomUtils.nextInt(); @@ -41,8 +45,9 @@ Producer getKafkaProducer() { ParallelStreamProcessor parallelConsumer; - public AtomicInteger messagesProcessed = new AtomicInteger(0); - public AtomicInteger messagesProduced = new AtomicInteger(0); + public List processedAndProducedKeys = Collections.synchronizedList(new ArrayList<>()); + public AtomicInteger processedCount = new AtomicInteger(0); + public AtomicInteger producedCount = new AtomicInteger(0); @SuppressWarnings("UnqualifiedFieldAccess") void run() { @@ -93,12 +98,13 @@ void runPollAndProduce() { this.parallelConsumer.pollAndProduce(record -> { var result = processBrokerRecord(record); ProducerRecord produceRecord = - new ProducerRecord<>(outputTopic, "a-key", result.payload); + new ProducerRecord<>(outputTopic, record.key(), result.payload); - messagesProcessed.incrementAndGet(); + processedCount.incrementAndGet(); return UniLists.of(produceRecord); }, consumeProduceResult -> { - messagesProduced.incrementAndGet(); + producedCount.incrementAndGet(); + processedAndProducedKeys.add(consumeProduceResult.getIn().key()); log.info("Message {} saved to broker at offset {}", consumeProduceResult.getOut(), consumeProduceResult.getMeta().offset()); diff --git a/parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/Bug25AppTest.java b/parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/Bug25AppTest.java index 0df9e2a28..25a009e0f 100644 --- a/parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/Bug25AppTest.java +++ b/parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/Bug25AppTest.java @@ -7,7 +7,6 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.integrationTests.KafkaTest; import lombok.AllArgsConstructor; -import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -15,69 +14,112 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.assertj.core.api.Assertions; -import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.waitAtMost; @Slf4j public class Bug25AppTest extends KafkaTest { - int DEAFULT_MAX_POLL_RECORDS_CONFIG = 500; + int LOW_MAX_POLL_RECORDS_CONFIG = 1; + int DEFAULT_MAX_POLL_RECORDS_CONFIG = 500; + int HIGH_MAX_POLL_RECORDS_CONFIG = 10_000; - @Test + @RepeatedTest(5) public void testTransactionalDefaultMaxPoll() { boolean tx = true; - runTest(tx, DEAFULT_MAX_POLL_RECORDS_CONFIG); + runTest(tx, DEFAULT_MAX_POLL_RECORDS_CONFIG); } @Test public void testNonTransactionalDefaultMaxPoll() { boolean tx = false; - runTest(tx, DEAFULT_MAX_POLL_RECORDS_CONFIG); + runTest(tx, DEFAULT_MAX_POLL_RECORDS_CONFIG); } @Test - public void testTransactional() { + public void testTransactionalLowMaxPoll() { boolean tx = true; - runTest(tx, 1); // Sometimes causes test to fail (default 500) + runTest(tx, LOW_MAX_POLL_RECORDS_CONFIG); } @Test - public void testNonTransactional() { + public void testNonTransactionalLowMaxPoll() { boolean tx = false; - runTest(tx, 1); // Sometimes causes test to fail (default 500) + runTest(tx, LOW_MAX_POLL_RECORDS_CONFIG); + } + + @Test + public void testTransactionalHighMaxPoll() { + boolean tx = true; + runTest(tx, HIGH_MAX_POLL_RECORDS_CONFIG); + } + + @Test + public void testNonTransactionalHighMaxPoll() { + boolean tx = false; + runTest(tx, HIGH_MAX_POLL_RECORDS_CONFIG); } @SneakyThrows private void runTest(boolean tx, int maxPoll) { - AppUnderTest coreApp = new AppUnderTest(tx, ParallelConsumerOptions.builder().ordering(KEY).usingTransactionalProducer(tx).build(), maxPoll); + AppUnderTest coreApp = new AppUnderTest(tx, ParallelConsumerOptions.builder() + .ordering(KEY) + .usingTransactionalProducer(tx) + .build(), + maxPoll); ensureTopic(coreApp.inputTopic, 1); ensureTopic(coreApp.outputTopic, 1); - log.info("Producing 1000 messages before starting application"); + // pre-produce messages to input-topic + List expectedKeys = new ArrayList<>(); + int expectedMessageCount = 1000; + log.info("Producing {} messages before starting application", expectedMessageCount); try (Producer kafkaProducer = kcu.createNewProducer(false)) { - for (int i = 0; i < 1000; i++) { - kafkaProducer.send(new ProducerRecord<>(coreApp.inputTopic, "key-" + i, "value-" + i)); + for (int i = 0; i < expectedMessageCount; i++) { + String key = "key-" + i; + kafkaProducer.send(new ProducerRecord<>(coreApp.inputTopic, key, "value-" + i)); + expectedKeys.add(key); } } + // run parallel-consumer log.info("Starting application..."); coreApp.runPollAndProduce(); - waitAtMost(Duration.ofSeconds(30)).untilAsserted(() -> { - log.info("Processed-count: " + coreApp.messagesProcessed.get()); - log.info("Produced-count: " + coreApp.messagesProduced.get()); - assertThat(coreApp.messagesProcessed.get()).isEqualTo(1000); - assertThat(coreApp.messagesProduced.get()).isEqualTo(1000); - }); + // wait for all pre-produced messages to be processed and produced + try { + waitAtMost(Duration.ofSeconds(30)).untilAsserted(() -> { + log.debug("Processed-count: " + coreApp.processedCount.get()); + log.debug("Produced-count: " + coreApp.producedCount.get()); + List processedAndProducedKeys = new ArrayList<>(coreApp.processedAndProducedKeys); // avoid concurrent-modification in assert + assertThat(processedAndProducedKeys).contains(expectedKeys.toArray(new String[0])); + }); + } catch (ConditionTimeoutException e) { + String failureMessage = "All keys sent to input-topic should be processed and produced"; + log.warn(failureMessage); + log.debug("Expected keys=" + expectedKeys + ""); + log.debug("Processed and produced keys=" + coreApp.processedAndProducedKeys + ""); + log.debug("Missing keys=" + expectedKeys.removeAll(coreApp.processedAndProducedKeys)); + fail(failureMessage); + } + + + assertThat(coreApp.processedCount.get()) + .as("messages processed and produced by parallel-consumer should be equal") + .isEqualTo(coreApp.producedCount.get()); + coreApp.close(); } @@ -93,7 +135,6 @@ class AppUnderTest extends CoreApp { Consumer getKafkaConsumer() { Properties props = kcu.props; props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS_CONFIG); - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000); return new KafkaConsumer<>(props); }