diff --git a/README.adoc b/README.adoc index ce4dc9c10..7f67b86bd 100644 --- a/README.adoc +++ b/README.adoc @@ -375,25 +375,31 @@ Where `${project.version}` is the version to be used: .Setup the client [source,java,indent=0] ---- - Consumer kafkaConsumer = getKafkaConsumer(); // <4> + Consumer kafkaConsumer = getKafkaConsumer(); // <1> Producer kafkaProducer = getKafkaProducer(); - var options = getOptions().toBuilder().consumer(kafkaConsumer).producer(kafkaProducer).build(); + var options = ParallelConsumerOptions.builder() + .ordering(KEY) // <2> + .maxMessagesToQueue(1000) // <3> + .maxNumberMessagesBeyondBaseCommitOffset(1000) // <4> + .consumer(kafkaConsumer) + .producer(kafkaProducer) + .build(); - ParallelStreamProcessor eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options); - if (!(kafkaConsumer instanceof MockConsumer)) { - eosStreamProcessor.subscribe(UniLists.of(inputTopic)); // <5> - } + ParallelStreamProcessor eosStreamProcessor = + ParallelStreamProcessor.createEosStreamProcessor(options); + + eosStreamProcessor.subscribe(of(inputTopic)); // <5> return eosStreamProcessor; ---- -<1> Choose your ordering type, `KEY` in this case. +<1> Setup your clients as per normal. A Producer is only required if using the `produce` flows. +<2> Choose your ordering type, `KEY` in this case. This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also. -<2> The maximum number of concurrent processing operations to be performing at any given time -<3> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time. +<3> The maximum number of concurrent processing operations to be performing at any given time +<4> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time. Also, because the library coordinates offsets, `enable.auto.commit` must be disabled in your consumer. -<4> Setup your consumer client as per normal -<5> Setup your topic subscriptions - (when using the `MockConsumer` you must use the `MockConsumer#assign` method) +<5> Subscribe to your topics NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disabled. diff --git a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/LongPollingMockConsumer.java b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/LongPollingMockConsumer.java index edb7a05e2..798ff44c6 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/LongPollingMockConsumer.java +++ b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/LongPollingMockConsumer.java @@ -136,6 +136,6 @@ public void subscribeWithRebalanceAndAssignment(final List topics, int p beginningOffsets.put(tp, 0L); } super.updateBeginningOffsets(beginningOffsets); - } + } \ No newline at end of file diff --git a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java index 36973da8e..dc97817a0 100644 --- a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java +++ b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY; +import static pl.tlinkowski.unij.api.UniLists.of; /** * Basic core examples @@ -50,6 +51,8 @@ Producer getKafkaProducer() { void run() { this.parallelConsumer = setupParallelConsumer(); + postSetup(); + // tag::example[] parallelConsumer.poll(record -> log.info("Concurrently processing a record: {}", record) @@ -57,32 +60,33 @@ void run() { // end::example[] } + protected void postSetup() { + // ignore + } + @SuppressWarnings({"FeatureEnvy", "MagicNumber"}) ParallelStreamProcessor setupParallelConsumer() { // tag::exampleSetup[] - Consumer kafkaConsumer = getKafkaConsumer(); // <4> + Consumer kafkaConsumer = getKafkaConsumer(); // <1> Producer kafkaProducer = getKafkaProducer(); - var options = getOptions().toBuilder().consumer(kafkaConsumer).producer(kafkaProducer).build(); + var options = ParallelConsumerOptions.builder() + .ordering(KEY) // <2> + .maxMessagesToQueue(1000) // <3> + .maxNumberMessagesBeyondBaseCommitOffset(1000) // <4> + .consumer(kafkaConsumer) + .producer(kafkaProducer) + .build(); + + ParallelStreamProcessor eosStreamProcessor = + ParallelStreamProcessor.createEosStreamProcessor(options); - ParallelStreamProcessor eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options); - if (!(kafkaConsumer instanceof MockConsumer)) { - eosStreamProcessor.subscribe(UniLists.of(inputTopic)); // <5> - } + eosStreamProcessor.subscribe(of(inputTopic)); // <5> return eosStreamProcessor; // end::exampleSetup[] } - ParallelConsumerOptions getOptions() { - var options = ParallelConsumerOptions.builder() - .ordering(KEY) // <1> - .maxMessagesToQueue(1000) // <2> - .maxNumberMessagesBeyondBaseCommitOffset(1000) // <3> - .build(); - return options; - } - void close() { this.parallelConsumer.close(); } @@ -90,6 +94,8 @@ void close() { void runPollAndProduce() { this.parallelConsumer = setupParallelConsumer(); + postSetup(); + // tag::exampleProduce[] this.parallelConsumer.pollAndProduce(record -> { var result = processBrokerRecord(record); diff --git a/parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/CoreAppTest.java b/parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/CoreAppTest.java index 49f13cbc9..f44ba7159 100644 --- a/parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/CoreAppTest.java +++ b/parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/CoreAppTest.java @@ -24,6 +24,7 @@ import static io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase.DEFAULT_GROUP_METADATA; import static org.mockito.Mockito.when; +import static pl.tlinkowski.unij.api.UniLists.of; @Slf4j public class CoreAppTest { @@ -48,13 +49,11 @@ public void test() { coreApp.close(); } - @SneakyThrows @Test public void testPollAndProduce() { log.info("Test start"); CoreAppUnderTest coreApp = new CoreAppUnderTest(); - TopicPartition tp = new TopicPartition(coreApp.inputTopic, 0); coreApp.runPollAndProduce(); @@ -76,11 +75,7 @@ class CoreAppUnderTest extends CoreApp { @Override Consumer getKafkaConsumer() { - HashMap beginningOffsets = new HashMap<>(); - beginningOffsets.put(tp, 0L); - mockConsumer.updateBeginningOffsets(beginningOffsets); when(mockConsumer.groupMetadata()).thenReturn(DEFAULT_GROUP_METADATA); // todo fix AK mock consumer - mockConsumer.assign(UniLists.of(tp)); return mockConsumer; } @@ -89,5 +84,9 @@ Producer getKafkaProducer() { return new MockProducer<>(true, null, null); } + @Override + protected void postSetup() { + mockConsumer.subscribeWithRebalanceAndAssignment(of(inputTopic), 1); + } } } diff --git a/src/docs/README.adoc b/src/docs/README.adoc index 30e4d533a..82f8034d4 100644 --- a/src/docs/README.adoc +++ b/src/docs/README.adoc @@ -367,13 +367,13 @@ include::{project_root}/parallel-consumer-examples/parallel-consumer-example-ver ---- include::{project_root}/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java[tag=exampleSetup] ---- -<1> Choose your ordering type, `KEY` in this case. +<1> Setup your clients as per normal. A Producer is only required if using the `produce` flows. +<2> Choose your ordering type, `KEY` in this case. This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also. -<2> The maximum number of concurrent processing operations to be performing at any given time -<3> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time. +<3> The maximum number of concurrent processing operations to be performing at any given time +<4> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time. Also, because the library coordinates offsets, `enable.auto.commit` must be disabled in your consumer. -<4> Setup your consumer client as per normal -<5> Setup your topic subscriptions - (when using the `MockConsumer` you must use the `MockConsumer#assign` method) +<5> Subscribe to your topics NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disabled.