Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Choose between consumer or producer commit modes #31

Merged
merged 11 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ hs_err_pid*
*.iml
target/
.DS_Store
*.versionsBackup
*.versionsBackup

# JENV
.java-version

delombok/
**/*.releaseBackup
2 changes: 1 addition & 1 deletion .idea/runConfigurations/All_examples.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

111 changes: 73 additions & 38 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ ifdef::env-github[]
:warning-caption: :warning:
endif::[]

image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"] image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]
image:https://maven-badges.herokuapp.com/maven-central/io.confluent.parallelconsumer/parallel-consumer-parent/badge.svg?style=flat[link=https://mvnrepository.com/artifact/io.confluent.parallelconsumer/parallel-consumer-parent,Latest Parallel Consumer on Maven Central]

// travis badges temporarily disabled as travis isn't running CI currently
//image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"] image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]

Parallel Apache Kafka client wrapper with client side queueing, a simpler consumer/producer API with *key concurrency* and *extendable non-blocking IO* processing.

Expand Down Expand Up @@ -113,10 +116,12 @@ This is effective in many situations, but falls short in a lot too.

* Primarily: You cannot use more consumers than you have partitions available to read from.
For example, if you have a topic with five partitions, you cannot use a group with more than five consumers to read from it.
* Running more extra consumers has resource implications - each consumer takes up resources on both the client and broker side
* Running more extra consumers has resource implications - each consumer takes up resources on both the client and broker side.
Each consumer adds a lot of overhead in terms of memory, CPU, and network bandwidth.
* Large consumer groups (especially many large groups) can cause a lot of strain on the consumer group coordination system, such as rebalance storms.
* Even with several partitions, you cannot achieve the performance levels obtainable by *per-key* ordered or unordered concurrent processing
* Even with several partitions, you cannot achieve the performance levels obtainable by *per-key* ordered or unordered concurrent processing.
* A single slow or failing message will also still block all messages behind the problematic message, ie. the entire partition.
The process may recover, but the latency of all the messages behind the problematic one will be negatively impacted severely.

Why not run more consumers __within__ your application instance?::
* This is in some respects a slightly easier way of running more consumer instances, and in others a more complicated way.
Expand Down Expand Up @@ -195,11 +200,12 @@ without operational burden or harming the clusters performance
* Vert.x non-blocking library integration (HTTP currently)
* Fair partition traversal
* Zero~ dependencies (`Slf4j` and `Lombok`) for the core module
* Java 8 compatibility
* Throttle control and broker liveliness management
* Clean draining shutdown cycle

image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]
image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"]
//image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]
//image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"]

And more <<roadmap,to come>>!

Expand Down Expand Up @@ -342,6 +348,11 @@ Latest version can be seen https://search.maven.org/artifact/io.confluent.parall

Where `${project.version}` is the version to be used:

* group ID: `io.confluent.parallelconsumer`
* artifact ID: `parallel-consumer-core`
* version: image:https://maven-badges.herokuapp.com/maven-central/io.confluent.parallelconsumer/parallel-consumer-parent/badge.svg?style=flat[link=https://mvnrepository.com/artifact/io.confluent.parallelconsumer/parallel-consumer-parent,Latest Parallel Consumer on Maven Central]


.Core Module Dependency
[source,xml,indent=0]
<dependency>
Expand All @@ -364,26 +375,31 @@ Where `${project.version}` is the version to be used:
.Setup the client
[source,java,indent=0]
----
var options = ParallelConsumerOptions.builder()
.ordering(KEY) // <1>
.maxConcurrency(1000) // <2>
.maxUncommittedMessagesToHandlePerPartition(10000) // <3>
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <1>
Producer<String, String> kafkaProducer = getKafkaProducer();

var options = ParallelConsumerOptions.<String, String>builder()
.ordering(KEY) // <2>
.maxMessagesToQueue(1000) // <3>
.maxNumberMessagesBeyondBaseCommitOffset(1000) // <4>
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.build();

Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <4>
if (!(kafkaConsumer instanceof MockConsumer)) {
kafkaConsumer.subscribe(UniLists.of(inputTopic)); // <5>
}
ParallelStreamProcessor<String, String> eosStreamProcessor =
ParallelStreamProcessor.createEosStreamProcessor(options);

eosStreamProcessor.subscribe(of(inputTopic)); // <5>

return ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, getKafkaProducer(), options);
return eosStreamProcessor;
----
<1> Choose your ordering type, `KEY` in this case.
<1> Setup your clients as per normal. A Producer is only required if using the `produce` flows.
<2> Choose your ordering type, `KEY` in this case.
This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also.
<2> The maximum number of concurrent processing operations to be performing at any given time
<3> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time.
<3> The maximum number of concurrent processing operations to be performing at any given time
<4> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time.
Also, because the library coordinates offsets, `enable.auto.commit` must be disabled in your consumer.
<4> Setup your consumer client as per normal
<5> Setup your topic subscriptions - (when using the `MockConsumer` you must use the `MockConsumer#assign` method)
<5> Subscribe to your topics

NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disabled.

Expand Down Expand Up @@ -433,15 +449,14 @@ You can also optionally provide a callback function to be run after the message(

.Usage - print message content out to the console in parallel
[source,java,indent=0]
this.parallelConsumer.pollAndProduce(record -> {
parallelConsumer.pollAndProduce(record -> {
var result = processBrokerRecord(record);
ProducerRecord<String, String> produceRecord =
new ProducerRecord<>(outputTopic, "a-key", result.payload);
return UniLists.of(produceRecord);
}, consumeProduceResult ->
log.info("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
consumeProduceResult.getMeta().offset())
return new ProducerRecord<>(outputTopic, record.key(), result.payload);
}, consumeProduceResult -> {
log.debug("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
consumeProduceResult.getMeta().offset());
}
);


Expand All @@ -463,7 +478,7 @@ In future versions, we plan to look at supporting other streaming systems like h
var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> {
log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
Map<String, String> params = UniMaps.of("recordKey", record.key(), "payload", record.value());
return new RequestInfo("localhost", "/api", params); // <1>
return new RequestInfo("localhost", port, "/api", params); // <1>
});
----
<1> Simply return an object representing the request, the Vert.x HTTP engine will handle the rest, using it's non-blocking engine
Expand Down Expand Up @@ -504,7 +519,7 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb
}

void concurrentProcess() {
setupConsumer();
setupParallelConsumer();

parallelConsumer.poll(record -> {
log.info("Concurrently processing a record: {}", record);
Expand Down Expand Up @@ -603,16 +618,33 @@ When your function is actually run, a result object will be streamed back to you
After your operation completes, you can also choose to publish a result message back to Kafka.
The message publishing metadata can be streamed back to your client code.

== Apache Kafka EoS Transaction Model
== Commit Mode

The system gives you three choices for how to do offset commits.
The simplest of the three are the two Consummer commits modes.
They are of course, `sycnhronous` and `asynchronous` mode.
The `transactional` mode is explained in the next section.

`Asychornous` mode is faster, as it doesn't block the control loop.

`Sycnhronous` will block the processing loop until a successful commit response is received, however, `asycnhronous` will still be capped by the max processing settings in the `Options` class.

The system uses Kafka's Exactly Once Semantics (EoS) system.
If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the `Asychornous` mode being similar to this.
We suggest starting with this mode, and it is the default.

=== Apache Kafka EoS Transaction Model

There is also the option to use Kafka's Exactly Once Semantics (EoS) system.
This causes all messages produced as a result of processing a message to be committed within a transaction, along with their source offset.
This means that even under failure, the results will exist exactly once in the Kafka output topic.
If as a part of
your processing, you create side effects in other systems, this pertains to the usual idempotency requirements when breaking of EoS Kafka boundaries.
If as a part of your processing, you create side effects in other systems, this pertains to the usual idempotency requirements when breaking of EoS Kafka boundaries.

NOTE:: As with the `synchronous` processing mode, this will also block the processing loop until a successful transaction completes

CAUTION: This cannot be true for any externally integrated third party system, unless that system is __idempotent__.

For implementations details, see the <<Transactional System Architecture>> section.

[[streams-usage]]
== Using with Kafka Streams

Expand All @@ -631,21 +663,20 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb

=== Short Term - What we're working on nowish ⏰

* Producer is optional
* Transactions optional
* Depth~ first or breadth first partition traversal
* JavaRX and other streaming modules

=== Medium Term - What's up next ⏲

* Automatic fanout (automatic selection of concurrency level based on downstream back pressure)
* https://github.com/confluentinc/parallel-consumer/issues/21[Automatic fanout] (automatic selection of concurrency level based on downstream back pressure) (https://github.com/confluentinc/parallel-consumer/pull/22[draft PR])
* Support for general Vert.x Verticles (non-blocking libraries)
* Dead Letter Queue (DLQ) handling
* Non-blocking I/O work management
** More customisable handling of HTTP interactions
** Chance to batch multiple consumer records into a single or multiple http request objects
* Distributed tracing integration
* Metrics
* https://github.com/confluentinc/parallel-consumer/issues/28[Distributed tracing integration]
* https://github.com/confluentinc/parallel-consumer/issues/24[Distributed rate limiting]
* https://github.com/confluentinc/parallel-consumer/issues/27[Metrics]

=== Long Term - The future ☁️

Expand Down Expand Up @@ -799,6 +830,10 @@ Instead of the work thread pool count being the degree of concurrency, it is con
.Vert.x Architecture
image::https://lucid.app/publicSegments/view/509df410-5997-46be-98e7-ac7f241780b4/image.png[Vert.x Architecture, align="center"]

=== Transactional System Architecture

image::https://lucid.app/publicSegments/view/7480d948-ed7d-4370-a308-8ec12e6b453b/image.png[]

=== Offset Map

==== Storage
Expand Down
6 changes: 6 additions & 0 deletions parallel-consumer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
<version>1.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
public class StringUtils {

public static String msg(String s, Object... args) {
String message = MessageFormatter.basicArrayFormat(s, args);
return message;
return MessageFormatter.basicArrayFormat(s, args);
}

public static boolean isBlank(final String property) {
if (property == null) return true;
else return property.trim().isEmpty(); // isBlank @since 11
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.confluent.parallelconsumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

@Slf4j
@RequiredArgsConstructor
public abstract class AbstractOffsetCommitter<K, V> implements OffsetCommitter {

protected final ConsumerManager<K, V> consumerMgr;
protected final WorkManager<K, V> wm;

/**
* Get offsets from {@link WorkManager} that are ready to commit
*/
@Override
public void retrieveOffsetsAndCommit() {
log.debug("Commit starting - find completed work to commit offsets");
// todo shouldn't be removed until commit succeeds (there's no harm in committing the same offset twice)
preAcquireWork();
try {
Map<TopicPartition, OffsetAndMetadata> offsetsToSend = wm.findCompletedEligibleOffsetsAndRemove();
if (offsetsToSend.isEmpty()) {
log.trace("No offsets ready");
} else {
log.debug("Will commit offsets for {} partition(s): {}", offsetsToSend.size(), offsetsToSend);
ConsumerGroupMetadata groupMetadata = consumerMgr.groupMetadata();

log.debug("Begin commit");
commitOffsets(offsetsToSend, groupMetadata);

log.debug("On commit success");
onOffsetCommitSuccess(offsetsToSend);
}
} finally {
postCommit();
}
}

protected void postCommit() {
// default noop
}

protected void preAcquireWork() {
// default noop
}

private void onOffsetCommitSuccess(final Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
wm.onOffsetCommitSuccess(offsetsToSend);
}

protected abstract void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsetsToSend, final ConsumerGroupMetadata groupMetadata);

}
Loading