Skip to content

Commit

Permalink
docs: Fix code examples
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Nov 23, 2020
1 parent f1463ae commit a90f11c
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 36 deletions.
25 changes: 15 additions & 10 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -372,25 +372,30 @@ Where `${project.version}` is the version to be used:
.Setup the client
[source,java,indent=0]
----
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <4>
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <1>
Producer<String, String> kafkaProducer = getKafkaProducer();
var options = getOptions().toBuilder().consumer(kafkaConsumer).producer(kafkaProducer).build();
var options = ParallelConsumerOptions.<String, String>builder()
.ordering(KEY) // <2>
.maxMessagesToQueue(1000) // <3>
.maxNumberMessagesBeyondBaseCommitOffset(1000) // <4>
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.build();
ParallelStreamProcessor<String, String> eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);
if (!(kafkaConsumer instanceof MockConsumer)) {
eosStreamProcessor.subscribe(UniLists.of(inputTopic)); // <5>
}
eosStreamProcessor.subscribe(of(inputTopic)); // <5>
return eosStreamProcessor;
----
<1> Choose your ordering type, `KEY` in this case.
<1> Setup your clients as per normal. A Producer is only required if using the `produce` flows.
<2> Choose your ordering type, `KEY` in this case.
This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also.
<2> The maximum number of concurrent processing operations to be performing at any given time
<3> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time.
<3> The maximum number of concurrent processing operations to be performing at any given time
<4> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time.
Also, because the library coordinates offsets, `enable.auto.commit` must be disabled in your consumer.
<4> Setup your consumer client as per normal
<5> Setup your topic subscriptions - (when using the `MockConsumer` you must use the `MockConsumer#assign` method)
<5> Subscribe to your topics

NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disabled.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,6 @@ public void subscribeWithRebalanceAndAssignment(final List<String> topics, int p
beginningOffsets.put(tp, 0L);
}
super.updateBeginningOffsets(beginningOffsets);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static pl.tlinkowski.unij.api.UniLists.of;

/**
* Basic core examples
Expand All @@ -50,46 +51,50 @@ Producer<String, String> getKafkaProducer() {
void run() {
this.parallelConsumer = setupParallelConsumer();

postSetup();

// tag::example[]
parallelConsumer.poll(record ->
log.info("Concurrently processing a record: {}", record)
);
// end::example[]
}

protected void postSetup() {
// ignore
}

@SuppressWarnings({"FeatureEnvy", "MagicNumber"})
ParallelStreamProcessor<String, String> setupParallelConsumer() {
// tag::exampleSetup[]
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <4>
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <1>
Producer<String, String> kafkaProducer = getKafkaProducer();

var options = getOptions().toBuilder().consumer(kafkaConsumer).producer(kafkaProducer).build();
var options = ParallelConsumerOptions.<String, String>builder()
.ordering(KEY) // <2>
.maxMessagesToQueue(1000) // <3>
.maxNumberMessagesBeyondBaseCommitOffset(1000) // <4>
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.build();

ParallelStreamProcessor<String, String> eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);
if (!(kafkaConsumer instanceof MockConsumer)) {
eosStreamProcessor.subscribe(UniLists.of(inputTopic)); // <5>
}

eosStreamProcessor.subscribe(of(inputTopic)); // <5>

return eosStreamProcessor;
// end::exampleSetup[]
}

ParallelConsumerOptions getOptions() {
var options = ParallelConsumerOptions.builder()
.ordering(KEY) // <1>
.maxMessagesToQueue(1000) // <2>
.maxNumberMessagesBeyondBaseCommitOffset(1000) // <3>
.build();
return options;
}

void close() {
this.parallelConsumer.close();
}

void runPollAndProduce() {
this.parallelConsumer = setupParallelConsumer();

postSetup();

// tag::exampleProduce[]
this.parallelConsumer.pollAndProduce(record -> {
var result = processBrokerRecord(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import static io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase.DEFAULT_GROUP_METADATA;
import static org.mockito.Mockito.when;
import static pl.tlinkowski.unij.api.UniLists.of;

@Slf4j
public class CoreAppTest {
Expand All @@ -48,13 +49,11 @@ public void test() {
coreApp.close();
}


@SneakyThrows
@Test
public void testPollAndProduce() {
log.info("Test start");
CoreAppUnderTest coreApp = new CoreAppUnderTest();
TopicPartition tp = new TopicPartition(coreApp.inputTopic, 0);

coreApp.runPollAndProduce();

Expand All @@ -76,11 +75,7 @@ class CoreAppUnderTest extends CoreApp {

@Override
Consumer<String, String> getKafkaConsumer() {
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(tp, 0L);
mockConsumer.updateBeginningOffsets(beginningOffsets);
when(mockConsumer.groupMetadata()).thenReturn(DEFAULT_GROUP_METADATA); // todo fix AK mock consumer
mockConsumer.assign(UniLists.of(tp));
return mockConsumer;
}

Expand All @@ -89,5 +84,9 @@ Producer<String, String> getKafkaProducer() {
return new MockProducer<>(true, null, null);
}

@Override
protected void postSetup() {
mockConsumer.subscribeWithRebalanceAndAssignment(of(inputTopic), 1);
}
}
}
10 changes: 5 additions & 5 deletions src/docs/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,13 @@ include::{project_root}/parallel-consumer-examples/parallel-consumer-example-ver
----
include::{project_root}/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java[tag=exampleSetup]
----
<1> Choose your ordering type, `KEY` in this case.
<1> Setup your clients as per normal. A Producer is only required if using the `produce` flows.
<2> Choose your ordering type, `KEY` in this case.
This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also.
<2> The maximum number of concurrent processing operations to be performing at any given time
<3> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time.
<3> The maximum number of concurrent processing operations to be performing at any given time
<4> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time.
Also, because the library coordinates offsets, `enable.auto.commit` must be disabled in your consumer.
<4> Setup your consumer client as per normal
<5> Setup your topic subscriptions - (when using the `MockConsumer` you must use the `MockConsumer#assign` method)
<5> Subscribe to your topics

NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disabled.

Expand Down

0 comments on commit a90f11c

Please sign in to comment.