diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java index ee732c426..08b812dcf 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.internal; /*- - * Copyright (C) 2020-2022 Confluent, Inc. + * Copyright (C) 2020-2023 Confluent, Inc. */ import io.confluent.parallelconsumer.ParallelConsumerOptions; @@ -272,8 +272,8 @@ private void transitionToClosing() { */ private void managePauseOfSubscription() { boolean throttle = shouldThrottle(); - log.trace("Need to throttle: {}", throttle); - if (throttle) { + log.trace("Need to throttle: {}, state: {}", throttle, runState); + if (throttle || runState == DRAINING) { doPauseMaybe(); } else { resumeIfPaused(); diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/DrainShutdownTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/DrainShutdownTest.java new file mode 100644 index 000000000..4bb3b7f7c --- /dev/null +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/DrainShutdownTest.java @@ -0,0 +1,84 @@ + +/*- + * Copyright (C) 2020-2023 Confluent, Inc. + */ +package io.confluent.parallelconsumer.integrationTests; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; +import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import pl.tlinkowski.unij.api.UniSets; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.PARTITION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; +import static org.testcontainers.shaded.org.hamcrest.Matchers.equalTo; +import static org.testcontainers.shaded.org.hamcrest.Matchers.is; + +@Slf4j +public class DrainShutdownTest extends BrokerIntegrationTest { + + Consumer consumer; + + ParallelConsumerOptions pcOpts; + ParallelEoSStreamProcessor pc; + + @BeforeEach + void setUp() { + setupTopic(); + consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP); + + pcOpts = ParallelConsumerOptions.builder() + .consumer(consumer) + .ordering(PARTITION) + .build(); + + pc = new ParallelEoSStreamProcessor<>(pcOpts); + + pc.subscribe(UniSets.of(topic)); + } + + @Test + @SneakyThrows + void dontPollAfterSetToDraining() { + var recordsToProduce = 2L; // 1 in process + 1 waiting in shard queue + var recordsToProduceAfterClose = 10L; + + var count = new AtomicLong(); + var latch = new CountDownLatch(1); + + getKcu().produceMessages(topic, recordsToProduce); + pc.poll(recordContexts -> { + count.getAndIncrement(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + log.debug("Processed record, count now {} - offset: {}", count, recordContexts.offset()); + }); + await().untilAtomic(count, is(equalTo(1L))); + + new Thread(() -> pc.closeDrainFirst(Duration.ofSeconds(30))).start(); + Thread.sleep(2000); + + getKcu().produceMessages(topic, recordsToProduceAfterClose); + Thread.sleep(5000); + + latch.countDown(); + + await().until(() -> pc.isClosedOrFailed() + || count.get() == recordsToProduce + recordsToProduceAfterClose); + assertEquals(recordsToProduce, count.get()); + log.debug("Test finished"); + } +}