diff --git a/.gitignore b/.gitignore
index f5833c742..cb8ad9f7f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,4 +29,10 @@ hs_err_pid*
*.iml
target/
.DS_Store
-*.versionsBackup
\ No newline at end of file
+*.versionsBackup
+
+# JENV
+.java-version
+
+delombok/
+**/*.releaseBackup
\ No newline at end of file
diff --git a/.idea/runConfigurations/All_examples.xml b/.idea/runConfigurations/All_examples.xml
index ae2384feb..8c82d61b5 100644
--- a/.idea/runConfigurations/All_examples.xml
+++ b/.idea/runConfigurations/All_examples.xml
@@ -7,7 +7,7 @@
-
+
diff --git a/README.adoc b/README.adoc
index 837e65eac..a4bc0a20c 100644
--- a/README.adoc
+++ b/README.adoc
@@ -38,7 +38,10 @@ ifdef::env-github[]
:warning-caption: :warning:
endif::[]
-image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"] image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]
+image:https://maven-badges.herokuapp.com/maven-central/io.confluent.parallelconsumer/parallel-consumer-parent/badge.svg?style=flat[link=https://mvnrepository.com/artifact/io.confluent.parallelconsumer/parallel-consumer-parent,Latest Parallel Consumer on Maven Central]
+
+// travis badges temporarily disabled as travis isn't running CI currently
+//image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"] image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]
Parallel Apache Kafka client wrapper with client side queueing, a simpler consumer/producer API with *key concurrency* and *extendable non-blocking IO* processing.
@@ -113,10 +116,12 @@ This is effective in many situations, but falls short in a lot too.
* Primarily: You cannot use more consumers than you have partitions available to read from.
For example, if you have a topic with five partitions, you cannot use a group with more than five consumers to read from it.
-* Running more extra consumers has resource implications - each consumer takes up resources on both the client and broker side
+* Running more extra consumers has resource implications - each consumer takes up resources on both the client and broker side.
Each consumer adds a lot of overhead in terms of memory, CPU, and network bandwidth.
* Large consumer groups (especially many large groups) can cause a lot of strain on the consumer group coordination system, such as rebalance storms.
-* Even with several partitions, you cannot achieve the performance levels obtainable by *per-key* ordered or unordered concurrent processing
+* Even with several partitions, you cannot achieve the performance levels obtainable by *per-key* ordered or unordered concurrent processing.
+* A single slow or failing message will also still block all messages behind the problematic message, ie. the entire partition.
+The process may recover, but the latency of all the messages behind the problematic one will be negatively impacted severely.
Why not run more consumers __within__ your application instance?::
* This is in some respects a slightly easier way of running more consumer instances, and in others a more complicated way.
@@ -195,11 +200,12 @@ without operational burden or harming the clusters performance
* Vert.x non-blocking library integration (HTTP currently)
* Fair partition traversal
* Zero~ dependencies (`Slf4j` and `Lombok`) for the core module
+* Java 8 compatibility
* Throttle control and broker liveliness management
* Clean draining shutdown cycle
-image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]
-image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"]
+//image:https://codecov.io/gh/astubbs/parallel-consumer/branch/master/graph/badge.svg["Coverage",https://codecov.io/gh/astubbs/parallel-consumer]
+//image:https://travis-ci.com/astubbs/parallel-consumer.svg?branch=master["Build Status", link="https://travis-ci.com/astubbs/parallel-consumer"]
And more <>!
@@ -342,6 +348,11 @@ Latest version can be seen https://search.maven.org/artifact/io.confluent.parall
Where `${project.version}` is the version to be used:
+* group ID: `io.confluent.parallelconsumer`
+* artifact ID: `parallel-consumer-core`
+* version: image:https://maven-badges.herokuapp.com/maven-central/io.confluent.parallelconsumer/parallel-consumer-parent/badge.svg?style=flat[link=https://mvnrepository.com/artifact/io.confluent.parallelconsumer/parallel-consumer-parent,Latest Parallel Consumer on Maven Central]
+
+
.Core Module Dependency
[source,xml,indent=0]
@@ -364,26 +375,31 @@ Where `${project.version}` is the version to be used:
.Setup the client
[source,java,indent=0]
----
- var options = ParallelConsumerOptions.builder()
- .ordering(KEY) // <1>
- .maxConcurrency(1000) // <2>
- .maxUncommittedMessagesToHandlePerPartition(10000) // <3>
+ Consumer kafkaConsumer = getKafkaConsumer(); // <1>
+ Producer kafkaProducer = getKafkaProducer();
+
+ var options = ParallelConsumerOptions.builder()
+ .ordering(KEY) // <2>
+ .maxMessagesToQueue(1000) // <3>
+ .maxNumberMessagesBeyondBaseCommitOffset(1000) // <4>
+ .consumer(kafkaConsumer)
+ .producer(kafkaProducer)
.build();
- Consumer kafkaConsumer = getKafkaConsumer(); // <4>
- if (!(kafkaConsumer instanceof MockConsumer)) {
- kafkaConsumer.subscribe(UniLists.of(inputTopic)); // <5>
- }
+ ParallelStreamProcessor eosStreamProcessor =
+ ParallelStreamProcessor.createEosStreamProcessor(options);
+
+ eosStreamProcessor.subscribe(of(inputTopic)); // <5>
- return ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, getKafkaProducer(), options);
+ return eosStreamProcessor;
----
-<1> Choose your ordering type, `KEY` in this case.
+<1> Setup your clients as per normal. A Producer is only required if using the `produce` flows.
+<2> Choose your ordering type, `KEY` in this case.
This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also.
-<2> The maximum number of concurrent processing operations to be performing at any given time
-<3> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time.
+<3> The maximum number of concurrent processing operations to be performing at any given time
+<4> Regardless of the level of concurrency, don't have more than this many messages uncommitted at any given time.
Also, because the library coordinates offsets, `enable.auto.commit` must be disabled in your consumer.
-<4> Setup your consumer client as per normal
-<5> Setup your topic subscriptions - (when using the `MockConsumer` you must use the `MockConsumer#assign` method)
+<5> Subscribe to your topics
NOTE: Because the library coordinates offsets, `enable.auto.commit` must be disabled.
@@ -433,15 +449,14 @@ You can also optionally provide a callback function to be run after the message(
.Usage - print message content out to the console in parallel
[source,java,indent=0]
- this.parallelConsumer.pollAndProduce(record -> {
+ parallelConsumer.pollAndProduce(record -> {
var result = processBrokerRecord(record);
- ProducerRecord produceRecord =
- new ProducerRecord<>(outputTopic, "a-key", result.payload);
- return UniLists.of(produceRecord);
- }, consumeProduceResult ->
- log.info("Message {} saved to broker at offset {}",
- consumeProduceResult.getOut(),
- consumeProduceResult.getMeta().offset())
+ return new ProducerRecord<>(outputTopic, record.key(), result.payload);
+ }, consumeProduceResult -> {
+ log.debug("Message {} saved to broker at offset {}",
+ consumeProduceResult.getOut(),
+ consumeProduceResult.getMeta().offset());
+ }
);
@@ -463,7 +478,7 @@ In future versions, we plan to look at supporting other streaming systems like h
var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> {
log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
Map params = UniMaps.of("recordKey", record.key(), "payload", record.value());
- return new RequestInfo("localhost", "/api", params); // <1>
+ return new RequestInfo("localhost", port, "/api", params); // <1>
});
----
<1> Simply return an object representing the request, the Vert.x HTTP engine will handle the rest, using it's non-blocking engine
@@ -504,7 +519,7 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb
}
void concurrentProcess() {
- setupConsumer();
+ setupParallelConsumer();
parallelConsumer.poll(record -> {
log.info("Concurrently processing a record: {}", record);
@@ -603,16 +618,33 @@ When your function is actually run, a result object will be streamed back to you
After your operation completes, you can also choose to publish a result message back to Kafka.
The message publishing metadata can be streamed back to your client code.
-== Apache Kafka EoS Transaction Model
+== Commit Mode
+
+The system gives you three choices for how to do offset commits.
+The simplest of the three are the two Consummer commits modes.
+They are of course, `sycnhronous` and `asynchronous` mode.
+The `transactional` mode is explained in the next section.
+
+`Asychornous` mode is faster, as it doesn't block the control loop.
+
+`Sycnhronous` will block the processing loop until a successful commit response is received, however, `asycnhronous` will still be capped by the max processing settings in the `Options` class.
-The system uses Kafka's Exactly Once Semantics (EoS) system.
+If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the `Asychornous` mode being similar to this.
+We suggest starting with this mode, and it is the default.
+
+=== Apache Kafka EoS Transaction Model
+
+There is also the option to use Kafka's Exactly Once Semantics (EoS) system.
This causes all messages produced as a result of processing a message to be committed within a transaction, along with their source offset.
This means that even under failure, the results will exist exactly once in the Kafka output topic.
-If as a part of
-your processing, you create side effects in other systems, this pertains to the usual idempotency requirements when breaking of EoS Kafka boundaries.
+If as a part of your processing, you create side effects in other systems, this pertains to the usual idempotency requirements when breaking of EoS Kafka boundaries.
+
+NOTE:: As with the `synchronous` processing mode, this will also block the processing loop until a successful transaction completes
CAUTION: This cannot be true for any externally integrated third party system, unless that system is __idempotent__.
+For implementations details, see the <> section.
+
[[streams-usage]]
== Using with Kafka Streams
@@ -631,21 +663,20 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb
=== Short Term - What we're working on nowish ⏰
-* Producer is optional
-* Transactions optional
* Depth~ first or breadth first partition traversal
* JavaRX and other streaming modules
=== Medium Term - What's up next ⏲
-* Automatic fanout (automatic selection of concurrency level based on downstream back pressure)
+* https://github.com/confluentinc/parallel-consumer/issues/21[Automatic fanout] (automatic selection of concurrency level based on downstream back pressure) (https://github.com/confluentinc/parallel-consumer/pull/22[draft PR])
* Support for general Vert.x Verticles (non-blocking libraries)
* Dead Letter Queue (DLQ) handling
* Non-blocking I/O work management
** More customisable handling of HTTP interactions
** Chance to batch multiple consumer records into a single or multiple http request objects
-* Distributed tracing integration
-* Metrics
+* https://github.com/confluentinc/parallel-consumer/issues/28[Distributed tracing integration]
+* https://github.com/confluentinc/parallel-consumer/issues/24[Distributed rate limiting]
+* https://github.com/confluentinc/parallel-consumer/issues/27[Metrics]
=== Long Term - The future ☁️
@@ -799,6 +830,10 @@ Instead of the work thread pool count being the degree of concurrency, it is con
.Vert.x Architecture
image::https://lucid.app/publicSegments/view/509df410-5997-46be-98e7-ac7f241780b4/image.png[Vert.x Architecture, align="center"]
+=== Transactional System Architecture
+
+image::https://lucid.app/publicSegments/view/7480d948-ed7d-4370-a308-8ec12e6b453b/image.png[]
+
=== Offset Map
==== Storage
diff --git a/parallel-consumer-core/pom.xml b/parallel-consumer-core/pom.xml
index ae2be14c0..f2398e056 100644
--- a/parallel-consumer-core/pom.xml
+++ b/parallel-consumer-core/pom.xml
@@ -38,6 +38,12 @@
${junit.version}test
+
+ org.junit-pioneer
+ junit-pioneer
+ 1.0.0
+ test
+ org.testcontainerstestcontainers
diff --git a/parallel-consumer-core/src/main/java/io/confluent/csid/utils/StringUtils.java b/parallel-consumer-core/src/main/java/io/confluent/csid/utils/StringUtils.java
index f78ebf8d2..219066bd0 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/csid/utils/StringUtils.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/csid/utils/StringUtils.java
@@ -9,7 +9,11 @@
public class StringUtils {
public static String msg(String s, Object... args) {
- String message = MessageFormatter.basicArrayFormat(s, args);
- return message;
+ return MessageFormatter.basicArrayFormat(s, args);
+ }
+
+ public static boolean isBlank(final String property) {
+ if (property == null) return true;
+ else return property.trim().isEmpty(); // isBlank @since 11
}
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/AbstractOffsetCommitter.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/AbstractOffsetCommitter.java
new file mode 100644
index 000000000..4f0167987
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/AbstractOffsetCommitter.java
@@ -0,0 +1,59 @@
+package io.confluent.parallelconsumer;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+@Slf4j
+@RequiredArgsConstructor
+public abstract class AbstractOffsetCommitter implements OffsetCommitter {
+
+ protected final ConsumerManager consumerMgr;
+ protected final WorkManager wm;
+
+ /**
+ * Get offsets from {@link WorkManager} that are ready to commit
+ */
+ @Override
+ public void retrieveOffsetsAndCommit() {
+ log.debug("Commit starting - find completed work to commit offsets");
+ // todo shouldn't be removed until commit succeeds (there's no harm in committing the same offset twice)
+ preAcquireWork();
+ try {
+ Map offsetsToSend = wm.findCompletedEligibleOffsetsAndRemove();
+ if (offsetsToSend.isEmpty()) {
+ log.trace("No offsets ready");
+ } else {
+ log.debug("Will commit offsets for {} partition(s): {}", offsetsToSend.size(), offsetsToSend);
+ ConsumerGroupMetadata groupMetadata = consumerMgr.groupMetadata();
+
+ log.debug("Begin commit");
+ commitOffsets(offsetsToSend, groupMetadata);
+
+ log.debug("On commit success");
+ onOffsetCommitSuccess(offsetsToSend);
+ }
+ } finally {
+ postCommit();
+ }
+ }
+
+ protected void postCommit() {
+ // default noop
+ }
+
+ protected void preAcquireWork() {
+ // default noop
+ }
+
+ private void onOffsetCommitSuccess(final Map offsetsToSend) {
+ wm.onOffsetCommitSuccess(offsetsToSend);
+ }
+
+ protected abstract void commitOffsets(final Map offsetsToSend, final ConsumerGroupMetadata groupMetadata);
+
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BrokerPollSystem.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BrokerPollSystem.java
index aab44f9e7..9649c4824 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BrokerPollSystem.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BrokerPollSystem.java
@@ -4,14 +4,13 @@
* Copyright (C) 2020 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import lombok.Getter;
import lombok.Setter;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import pl.tlinkowski.unij.api.UniMaps;
import java.time.Duration;
import java.util.Optional;
@@ -20,8 +19,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import static io.confluent.csid.utils.BackportUtils.toSeconds;
+import static io.confluent.parallelconsumer.ParallelEoSStreamProcessor.State.closed;
+import static io.confluent.parallelconsumer.ParallelEoSStreamProcessor.State.running;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
@@ -31,11 +33,11 @@
* @param
*/
@Slf4j
-public class BrokerPollSystem {
+public class BrokerPollSystem implements OffsetCommitter {
- private final org.apache.kafka.clients.consumer.Consumer consumer;
+ private final ConsumerManager consumerManager;
- public ParallelEoSStreamProcessor.State state = ParallelEoSStreamProcessor.State.running;
+ private ParallelEoSStreamProcessor.State state = ParallelEoSStreamProcessor.State.running;
private Optional> pollControlThreadFuture;
@@ -43,20 +45,47 @@ public class BrokerPollSystem {
private final ParallelEoSStreamProcessor pc;
+ private Optional> committer = Optional.empty();
+
+ /**
+ * Note how this relates to {@link BrokerPollSystem#getLongPollTimeout()} - if longPollTimeout is high and loading
+ * factor is low, there may not be enough messages queued up to satisfy demand.
+ */
@Setter
@Getter
private static Duration longPollTimeout = Duration.ofMillis(2000);
private final WorkManager wm;
- public BrokerPollSystem(Consumer consumer, WorkManager wm, ParallelEoSStreamProcessor pc) {
- this.consumer = consumer;
+ public BrokerPollSystem(ConsumerManager consumerMgr, WorkManager wm, ParallelEoSStreamProcessor pc, final ParallelConsumerOptions options) {
this.wm = wm;
this.pc = pc;
+
+ this.consumerManager = consumerMgr;
+ switch (options.getCommitMode()) {
+ case CONSUMER_SYNC, CONSUMER_ASYNCHRONOUS -> {
+ ConsumerOffsetCommitter consumerCommitter = new ConsumerOffsetCommitter<>(consumerMgr, wm, options);
+ committer = Optional.of(consumerCommitter);
+ }
+ }
}
public void start() {
- this.pollControlThreadFuture = Optional.of(Executors.newSingleThreadExecutor().submit(this::controlLoop));
+ Future submit = Executors.newSingleThreadExecutor().submit(this::controlLoop);
+ this.pollControlThreadFuture = Optional.of(submit);
+ }
+
+ public void supervise() {
+ if (pollControlThreadFuture.isPresent()) {
+ Future booleanFuture = pollControlThreadFuture.get();
+ if (booleanFuture.isCancelled() || booleanFuture.isDone()) {
+ try {
+ booleanFuture.get();
+ } catch (Exception e) {
+ throw new InternalError("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
+ }
+ }
+ }
}
/**
@@ -65,42 +94,70 @@ public void start() {
private boolean controlLoop() {
Thread.currentThread().setName("broker-poll");
log.trace("Broker poll control loop start");
- while (state != ParallelEoSStreamProcessor.State.closed) {
- log.trace("Loop: Poll broker");
- ConsumerRecords polledRecords = pollBrokerForRecords();
-
- if (!polledRecords.isEmpty()) {
- log.trace("Loop: Register work");
- wm.registerWork(polledRecords);
+ committer.ifPresent(x -> x.claim());
+ try {
+ while (state != closed) {
+ log.trace("Loop: Broker poller: ({})", state);
+ if (state == running) {
+ ConsumerRecords polledRecords = pollBrokerForRecords();
- // notify control work has been registered
- pc.notifyNewWorkRegistered();
- }
+ if (!polledRecords.isEmpty()) {
+ log.trace("Loop: Register work");
+ wm.registerWork(polledRecords);
- switch (state) {
- case draining -> {
- doPause();
- // transition to closing
- state = ParallelEoSStreamProcessor.State.closing;
+ // notify control work has been registered
+ pc.notifyNewWorkRegistered();
+ }
}
- case closing -> {
- if (polledRecords.isEmpty()) {
+
+ maybeDoCommit();
+
+ switch (state) {
+ case draining -> {
+ doPause();
+ transitionToCloseMaybe();
+ }
+ case closing -> {
doClose();
- } else {
- log.info("Subscriptions are paused, but records are still being drained (count: {})", polledRecords.count());
}
}
}
+ log.debug("Broker poll thread finished, returning true to future");
+ return true;
+ } catch (Exception e) {
+ log.error("Unknown error", e);
+ throw e;
+ }
+ }
+
+ private void transitionToCloseMaybe() {
+ // make sure everything is committed
+ if (isResponsibleForCommits() && !wm.isRecordsAwaitingToBeCommitted()) {
+ // transition to closing
+ state = ParallelEoSStreamProcessor.State.closing;
}
- log.trace("Broker poll thread returning true");
- return true;
}
private void doClose() {
- log.debug("Closing {}, first closing consumer...", this.getClass().getSimpleName());
- this.consumer.close(DrainingCloseable.DEFAULT_TIMEOUT);
- log.debug("Consumer closed.");
- state = ParallelEoSStreamProcessor.State.closed;
+ doPause();
+ maybeCloseConsumer();
+ state = closed;
+ }
+
+ /**
+ * To keep things simple, make sure the correct thread which can make a commit, is the one to close the consumer.
+ * This way, if partitions are revoked, the commit can be made inline.
+ */
+ private void maybeCloseConsumer() {
+ if (isResponsibleForCommits()) {
+ log.debug("Closing {}, first closing consumer...", this.getClass().getSimpleName());
+ this.consumerManager.close(DrainingCloseable.DEFAULT_TIMEOUT);
+ log.debug("Consumer closed.");
+ }
+ }
+
+ private boolean isResponsibleForCommits() {
+ return committer.isPresent();
}
private ConsumerRecords pollBrokerForRecords() {
@@ -108,16 +165,8 @@ private ConsumerRecords pollBrokerForRecords() {
Duration thisLongPollTimeout = (state == ParallelEoSStreamProcessor.State.running) ? BrokerPollSystem.longPollTimeout : Duration.ofMillis(1); // Can't use Duration.ZERO - this causes Object#wait to wait forever
- log.debug("Long polling broker with timeout {} seconds, might appear to sleep here if no data available on broker.", toSeconds(thisLongPollTimeout)); // java 8
- ConsumerRecords records;
- try {
- records = this.consumer.poll(thisLongPollTimeout);
- log.debug("Poll completed normally and returned {}...", records.count());
- } catch (WakeupException w) {
- log.warn("Awoken from poll. State? {}", state);
- records = new ConsumerRecords<>(UniMaps.of());
- }
- return records;
+ log.debug("Long polling broker with timeout {} seconds, might appear to sleep here if subs are paused, or no data available on broker.", toSeconds(thisLongPollTimeout)); // java 8
+ return consumerManager.poll(thisLongPollTimeout);
}
/**
@@ -126,9 +175,9 @@ private ConsumerRecords pollBrokerForRecords() {
public void drain() {
// idempotent
if (state != ParallelEoSStreamProcessor.State.draining) {
- log.debug("Poll system signaling to drain...");
+ log.debug("Signaling poll system to drain, waking up consumer...");
state = ParallelEoSStreamProcessor.State.draining;
- consumer.wakeup();
+ consumerManager.wakeup();
}
}
@@ -139,8 +188,8 @@ private void doPause() {
} else {
paused = true;
log.debug("Pausing subs");
- Set assignment = consumer.assignment();
- consumer.pause(assignment);
+ Set assignment = consumerManager.assignment();
+ consumerManager.pause(assignment);
}
}
@@ -151,7 +200,7 @@ public void closeAndWait() throws TimeoutException, ExecutionException {
log.debug("Wait for loop to finish ending...");
Future pollControlResult = pollControlThreadFuture.get();
boolean interrupted = true;
- while(interrupted) {
+ while (interrupted) {
try {
Boolean pollShutdownSuccess = pollControlResult.get(DrainingCloseable.DEFAULT_TIMEOUT.toMillis(), MILLISECONDS);
interrupted = false;
@@ -159,9 +208,9 @@ public void closeAndWait() throws TimeoutException, ExecutionException {
log.warn("Broker poll control thread not closed cleanly.");
}
} catch (InterruptedException e) {
- log.debug("Interrupted", e);
+ log.debug("Interrupted waiting for broker poller thread to finish", e);
} catch (ExecutionException | TimeoutException e) {
- log.error("Execution or timeout exception", e);
+ log.error("Execution or timeout exception waiting for broker poller thread to finish", e);
throw e;
}
}
@@ -170,8 +219,9 @@ public void closeAndWait() throws TimeoutException, ExecutionException {
}
private void transitionToClosing() {
+ log.debug("Poller transitioning to closing, waking up consumer");
state = ParallelEoSStreamProcessor.State.closing;
- consumer.wakeup();
+ consumerManager.wakeup();
}
/**
@@ -192,11 +242,11 @@ private void managePauseOfSubscription() {
private void resumeIfPaused() {
// idempotent
if (paused) {
- log.debug("Resuming consumer");
- Set pausedTopics = consumer.paused();
- consumer.resume(pausedTopics);
+ log.debug("Resuming consumer, waking up");
+ Set pausedTopics = consumerManager.paused();
+ consumerManager.resume(pausedTopics);
// trigger consumer to perform a new poll without the assignments paused, otherwise it will continue to long poll on nothing
- consumer.wakeup();
+ consumerManager.wakeup();
paused = false;
}
}
@@ -204,4 +254,34 @@ private void resumeIfPaused() {
private boolean shouldThrottle() {
return wm.shouldThrottle();
}
+
+ /**
+ * Optionally blocks. Threadsafe
+ *
+ * @see CommitMode
+ */
+ @SneakyThrows
+ @Override
+ public void retrieveOffsetsAndCommit() {
+ // {@link Optional#ifPresentOrElse} only @since 9
+ ConsumerOffsetCommitter comitter = committer.orElseThrow(() -> {
+ // shouldn't be here
+ throw new IllegalStateException("No committer configured");
+ });
+ comitter.commit();
+ }
+
+ /**
+ * Will silently skip if not configured with a committer
+ */
+ private void maybeDoCommit() {
+ committer.ifPresent(ConsumerOffsetCommitter::maybeDoCommit);
+ }
+
+ /**
+ * Wakeup if colling the broker
+ */
+ public void wakeup() {
+ consumerManager.wakeup();
+ }
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ConsumerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ConsumerManager.java
new file mode 100644
index 000000000..e96663d8b
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ConsumerManager.java
@@ -0,0 +1,114 @@
+package io.confluent.parallelconsumer;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import pl.tlinkowski.unij.api.UniMaps;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Delegate for {@link KafkaConsumer}
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class ConsumerManager {
+
+ private final Consumer consumer;
+
+ private final AtomicBoolean pollingBroker = new AtomicBoolean(false);
+
+ private int erroneousWakups = 0;
+ private int correctPollWakeups = 0;
+ private int noWakeups = 0;
+
+ ConsumerRecords poll(Duration thisLongPollTimeout) {
+ ConsumerRecords records;
+ try {
+ pollingBroker.set(true);
+ records = consumer.poll(thisLongPollTimeout);
+ log.debug("Poll completed normally and returned {}...", records.count());
+ } catch (WakeupException w) {
+ correctPollWakeups++;
+ log.debug("Awoken from broker poll", w);
+ records = new ConsumerRecords<>(UniMaps.of());
+ } finally {
+ pollingBroker.set(false);
+ }
+ return records;
+ }
+
+ /**
+ * Wakes up the consumer, but only if it's polling.
+ *
+ * Otherwise we can interrupt other operations like {@link KafkaConsumer#commitSync()}.
+ */
+ public void wakeup() {
+ // boolean reduces the chances of a mis-timed call to wakeup, but doesn't prevent all spurious wake up calls to other methods like #commit
+ // if the call to wakeup happens /after/ the check for a wake up state inside #poll, then the next call will through the wake up exception (i.e. #commit)
+ if (pollingBroker.get()) {
+ log.debug("Waking up consumer");
+ consumer.wakeup();
+ }
+ }
+
+ public void commitSync(final Map offsetsToSend) {
+ // we dont' want to be woken up during a commit, only polls
+ boolean inProgress = true;
+ noWakeups++;
+ while (inProgress) {
+ try {
+ consumer.commitSync(offsetsToSend);
+ inProgress = false;
+ } catch (WakeupException w) {
+ log.debug("Got woken up, retry. errors: " + erroneousWakups + " none: " + noWakeups + " correct:" + correctPollWakeups, w);
+ erroneousWakups++;
+ }
+ }
+ }
+
+ public void commitAsync(Map offsets, OffsetCommitCallback callback) {
+ // we dont' want to be woken up during a commit, only polls
+ boolean inProgress = true;
+ noWakeups++;
+ while (inProgress) {
+ try {
+ consumer.commitAsync(offsets, callback);
+ inProgress = false;
+ } catch (WakeupException w) {
+ log.debug("Got woken up, retry. errors: " + erroneousWakups + " none: " + noWakeups + " correct:" + correctPollWakeups, w);
+ erroneousWakups++;
+ }
+ }
+ }
+
+ public ConsumerGroupMetadata groupMetadata() {
+ return consumer.groupMetadata();
+ }
+
+ public void close(final Duration defaultTimeout) {
+ consumer.close(defaultTimeout);
+ }
+
+ public Set assignment() {
+ return consumer.assignment();
+ }
+
+ public void pause(final Set assignment) {
+ consumer.pause(assignment);
+ }
+
+ public Set paused() {
+ return consumer.paused();
+ }
+
+ public void resume(final Set pausedTopics) {
+ consumer.resume(pausedTopics);
+ }
+
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ConsumerOffsetCommitter.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ConsumerOffsetCommitter.java
new file mode 100644
index 000000000..bbe525851
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ConsumerOffsetCommitter.java
@@ -0,0 +1,203 @@
+package io.confluent.parallelconsumer;
+
+import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.CONSUMER_SYNC;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.TRANSACTIONAL_PRODUCER;
+
+/**
+ * Committer that uses the Kafka Consumer to commit either synchronously or asynchronously
+ *
+ * @see CommitMode
+ */
+@Slf4j
+public class ConsumerOffsetCommitter extends AbstractOffsetCommitter implements OffsetCommitter {
+
+ private final CommitMode commitMode;
+
+ /**
+ * Used to synchronise threads on changing {@link #commitCount}, {@link #commitPerformed} and constructing the
+ * {@link Condition} to wait on, that's all.
+ */
+ private final ReentrantLock commitLock = new ReentrantLock(true);
+
+ /**
+ * Used to signal to waiting threads, that their commit has been performed when requested.
+ *
+ * @see #commitPerformed
+ * @see #commitAndWaitForCondition()
+ */
+ private Condition commitPerformed = commitLock.newCondition();
+
+ /**
+ * The number of commits made. Use as a logical clock to sanity check expectations and synchronisation when commits
+ * are requested versus performed.
+ */
+ private final AtomicLong commitCount = new AtomicLong(0);
+
+ /**
+ * Set to true when a thread requests a commit to be performed, by the controling thread.
+ */
+ private final AtomicBoolean commitRequested = new AtomicBoolean(false);
+
+ private Optional owningThread = Optional.empty();
+
+ public ConsumerOffsetCommitter(final ConsumerManager newConsumer, final WorkManager newWorkManager, final ParallelConsumerOptions options) {
+ super(newConsumer, newWorkManager);
+ commitMode = options.getCommitMode();
+ if (commitMode.equals(TRANSACTIONAL_PRODUCER)) {
+ throw new IllegalArgumentException("Cannot use " + commitMode + " when using " + this.getClass().getSimpleName());
+ }
+ }
+
+ // todo abstraction leak - find another way
+ private boolean direct = false;
+
+ /**
+ * Might block if using {@link CommitMode#CONSUMER_SYNC}
+ *
+ * @see CommitMode
+ */
+ void commit() {
+ if (isOwner()) {
+ // if owning thread is asking, then perform the commit directly (this is the thread that controls the consumer)
+ // this can happen when the system is closing, using Consumer commit mode, and partitions are revoked and we want to commit
+ direct = true;
+ retrieveOffsetsAndCommit();
+ } else if (isSync()) {
+ log.debug("Sync commit");
+ commitAndWaitForCondition();
+ log.debug("Finished waiting");
+ } else {
+ // async
+ // we just request the commit and hope
+ log.debug("Async commit to be requested");
+ requestCommitInternal();
+ }
+ }
+
+ @Override
+ protected void commitOffsets(final Map offsetsToSend, final ConsumerGroupMetadata groupMetadata) {
+ if (offsetsToSend.isEmpty()) {
+ log.trace("Nothing to commit");
+ return;
+ }
+ switch (commitMode) {
+ case CONSUMER_SYNC -> {
+ log.debug("Committing offsets Sync");
+ consumerMgr.commitSync(offsetsToSend);
+ }
+ case CONSUMER_ASYNCHRONOUS -> {
+ //
+ log.debug("Committing offsets Async");
+ consumerMgr.commitAsync(offsetsToSend, (offsets, exception) -> {
+ if (exception != null) {
+ log.error("Error committing offsets", exception);
+ // todo keep work in limbo until async response is received?
+ }
+ });
+ }
+ default -> {
+ throw new IllegalArgumentException("Cannot use " + commitMode + " when using " + this.getClass().getSimpleName());
+ }
+ }
+ }
+
+ /**
+ * @see #commit()
+ */
+ @Override
+ protected void postCommit() {
+ // only signal if we are in sync mode, and current thread isn't the owner (if we are owner, then we are committing directly)
+ if (!direct && commitMode.equals(CONSUMER_SYNC))
+ signalCommitPerformed();
+ }
+
+ private boolean isOwner() {
+ return Thread.currentThread().equals(owningThread.orElse(null));
+ }
+
+ private void signalCommitPerformed() {
+ log.debug("Starting Signaling commit finished");
+ if (!commitLock.isHeldByCurrentThread())
+ throw new IllegalStateException("Lock already held");
+ commitLock.lock();
+ try {
+ commitCount.incrementAndGet();
+ log.debug("Signaling commit finished");
+ commitPerformed.signalAll();
+ log.debug("Finished Signaling commit finished");
+ } finally {
+ commitLock.unlock();
+ }
+ }
+
+ private void commitAndWaitForCondition() {
+ commitLock.lock();
+
+ try {
+ this.commitPerformed = commitLock.newCondition();
+ long currentCount = commitCount.get();
+ requestCommitInternal();
+ consumerMgr.wakeup();
+ while (currentCount == commitCount.get()) {
+ if (currentCount == commitCount.get()) {
+ log.debug("Requesting commit again");
+ requestCommitInternal();
+ } else {
+ commitRequested.set(false);
+ }
+ try {
+ log.debug("Waiting on commit");
+ commitPerformed.await();
+ } catch (InterruptedException e) {
+ log.debug("Interrupted waiting for commit condition", e);
+ }
+ }
+ log.debug("Signaled");
+ } finally {
+ commitLock.unlock();
+ }
+ }
+
+ private void requestCommitInternal() {
+ commitLock.lock();
+ try {
+ commitRequested.set(true);
+ consumerMgr.wakeup();
+ } finally {
+ commitLock.unlock();
+ }
+
+ }
+
+ void maybeDoCommit() {
+ commitLock.lock();
+ try {
+ if (commitRequested.get()) {
+ retrieveOffsetsAndCommit();
+ }
+ } finally {
+ commitLock.unlock();
+ }
+ }
+
+ public boolean isSync() {
+ return commitMode.equals(CONSUMER_SYNC);
+ }
+
+ public void claim() {
+ owningThread = Optional.of(Thread.currentThread());
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ErrorInUserFunctionException.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ErrorInUserFunctionException.java
new file mode 100644
index 000000000..d41b51594
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ErrorInUserFunctionException.java
@@ -0,0 +1,10 @@
+package io.confluent.parallelconsumer;
+
+/**
+ * This exception is only used when there is an exception thrown from code provided by the user.
+ */
+public class ErrorInUserFunctionException extends RuntimeException {
+ public ErrorInUserFunctionException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/InternalError.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/InternalError.java
new file mode 100644
index 000000000..71c15870d
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/InternalError.java
@@ -0,0 +1,20 @@
+package io.confluent.parallelconsumer;
+
+public class InternalError extends RuntimeException {
+
+ public InternalError(final String message) {
+ super(message);
+ }
+
+ public InternalError(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public InternalError(final Throwable cause) {
+ super(cause);
+ }
+
+ public InternalError(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.java
index c0d93c568..db76b98a2 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.java
@@ -7,7 +7,6 @@
import io.confluent.csid.utils.Java8StreamUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.List;
@@ -22,10 +21,8 @@ public class JStreamParallelEoSStreamProcessor extends ParallelEoSStreamPr
private final ConcurrentLinkedDeque> userProcessResultsStream;
- public JStreamParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.Consumer consumer,
- Producer producer,
- ParallelConsumerOptions parallelConsumerOptions) {
- super(consumer, producer, parallelConsumerOptions);
+ public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsumerOptions) {
+ super(parallelConsumerOptions);
this.userProcessResultsStream = new ConcurrentLinkedDeque<>();
@@ -34,7 +31,7 @@ public JStreamParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.Consu
@Override
public Stream> pollProduceAndStream(Function, List>> userFunction) {
- super.pollAndProduce(userFunction, (result) -> {
+ super.pollAndProduceMany(userFunction, (result) -> {
log.trace("Wrapper callback applied, sending result to stream. Input: {}", result);
this.userProcessResultsStream.add(result);
});
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelStreamProcessor.java
index 0a27e3ef1..afd5424b0 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelStreamProcessor.java
@@ -13,15 +13,12 @@
public interface JStreamParallelStreamProcessor extends DrainingCloseable {
- static JStreamParallelStreamProcessor createJStreamEosStreamProcessor(
- org.apache.kafka.clients.consumer.Consumer consumer,
- org.apache.kafka.clients.producer.Producer producer,
- ParallelConsumerOptions options) {
- return new JStreamParallelEoSStreamProcessor<>(consumer, producer, options);
+ static JStreamParallelStreamProcessor createJStreamEosStreamProcessor(ParallelConsumerOptions options) {
+ return new JStreamParallelEoSStreamProcessor<>(options);
}
/**
- * Like {@link ParallelEoSStreamProcessor#pollAndProduce} but instead of callbacks, streams the results instead,
+ * Like {@link ParallelEoSStreamProcessor#pollAndProduceMany} but instead of callbacks, streams the results instead,
* after the produce result is ack'd by Kafka.
*
* @return a stream of results of applying the function to the polled records
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetCommitter.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetCommitter.java
new file mode 100644
index 000000000..19d40baa0
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetCommitter.java
@@ -0,0 +1,5 @@
+package io.confluent.parallelconsumer;
+
+public interface OffsetCommitter {
+ void retrieveOffsetsAndCommit();
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
index a1ea68039..361476010 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java
@@ -6,14 +6,41 @@
import lombok.Builder;
import lombok.Getter;
+import lombok.ToString;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+
+import java.util.Objects;
+
+import static io.confluent.csid.utils.StringUtils.msg;
+import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.TRANSACTIONAL_PRODUCER;
/**
* The options for the {@link ParallelEoSStreamProcessor} system.
+ *
+ * @see #builder()
+ * @see ParallelConsumerOptions.ParallelConsumerOptionsBuilder
*/
@Getter
-@Builder
-public class ParallelConsumerOptions {
+@Builder(toBuilder = true)
+@ToString
+public class ParallelConsumerOptions {
+
+ /**
+ * Required parameter for all use.
+ */
+ private final Consumer consumer;
+
+ /**
+ * Supplying a producer is only needed if using the produce flows.
+ *
+ * @see ParallelStreamProcessor
+ */
+ private final Producer producer;
+ /**
+ * The ordering guarantee to use.
+ */
public enum ProcessingOrder {
/**
* No ordering is guaranteed, not even partition order. Fastest. Concurrency is at most the max number of
@@ -21,8 +48,8 @@ public enum ProcessingOrder {
*/
UNORDERED,
/**
- * Process messages within a partition in order, but process multiple partitions in parallel. Similar to
- * running more consumer for a topic. Concurrency is at most the number of partitions.
+ * Process messages within a partition in order, but process multiple partitions in parallel. Similar to running
+ * more consumer for a topic. Concurrency is at most the number of partitions.
*/
PARTITION,
/**
@@ -32,27 +59,90 @@ public enum ProcessingOrder {
KEY
}
+ /**
+ * The type of commit to be made, with either a transactions configured Producer where messages produced are
+ * committed back to the Broker along with the offsets they originated from, or with the faster simpler Consumer
+ * offset system either synchronously or asynchronously
+ */
+ public enum CommitMode {
+ /**
+ * Commits through the Producer using transactions. Slowest fot he options, but no duplicates in Kafka
+ * guaranteed (message replay may cause duplicates in external systems which is unavoidable with Kafka).
+ *
+ * This is separate from using an IDEMPOTENT Producer, which can be used, along with {@link
+ * CommitMode#CONSUMER_SYNC} or {@link CommitMode#CONSUMER_ASYNCHRONOUS}.
+ */
+ TRANSACTIONAL_PRODUCER,
+ /**
+ * Synchronous commits with the Consumer. Much faster than {@link #TRANSACTIONAL_PRODUCER}. Slower but
+ * potentially less duplicates than {@link #CONSUMER_ASYNCHRONOUS} upon replay.
+ */
+ CONSUMER_SYNC,
+ /**
+ * Fastest option, under normal conditions will have few of no duplicates. Under failure revocery may have more
+ * duplicates than {@link #CONSUMER_SYNC}.
+ */
+ CONSUMER_ASYNCHRONOUS
+ }
+
/**
* The order type to use
*/
@Builder.Default
private final ProcessingOrder ordering = ProcessingOrder.UNORDERED;
+ @Builder.Default
+ private final CommitMode commitMode = CommitMode.CONSUMER_ASYNCHRONOUS;
+
/**
- * Don't have more than this many uncommitted messages in process
- * TODO change this to per topic? global?
+ * Total max number of messages to process beyond the base committed offsets.
+ *
+ * This acts as a sort sort of upper limit on the number of messages we should allow our system to handle, when
+ * working with large quantities of messages that haven't been included in the normal Broker offset commit protocol.
+ * I.e. if there is a single message that is failing to process, without this limit we will continue on forever with
+ * our system, with the actual (normal) committed offset never moving, and relying totally on our {@link
+ * OffsetMapCodecManager} to encode the process status of our records and store in metadata next to the committed
+ * offset.
+ *
+ * At the moment this is a sort of sanity check, and was chosen rather arbitriarly. However, one should consider
+ * that this is per client, and is a total across all assigned partitions.
*/
@Builder.Default
- private final int maxUncommittedMessagesToHandlePerPartition = 1000;
+ private final int maxNumberMessagesBeyondBaseCommitOffset = 1000;
/**
- * Don't process any more than this many messages concurrently
+ * Max number of messages to queue up in our execution system and attempt to process concurrently.
+ *
+ * In the core module, this will be constrained by the {@link #numberOfThreads} setting, as that is the max actual
+ * concurrency for processing the messages. To actually get this degree of concurrency, you would need to have a
+ * matching number of threads in the pool.
+ *
+ * However with the VertX engine, this will control how many messages at a time are being submitted to the Vert.x
+ * engine to process. As Vert.x isn't constrained by a thread count, this will be the actual degree of concurrency.
*/
@Builder.Default
- private final int maxConcurrency = 100;
+ private final int maxMessagesToQueue = 100;
+ /**
+ * Number of threads to use in the core's thread pool.
+ */
@Builder.Default
private final int numberOfThreads = 16;
+ public void validate() {
+ Objects.requireNonNull(consumer, "A consumer must be supplied");
+
+ if (isUsingTransactionalProducer() && producer == null) {
+ throw new IllegalArgumentException(msg("Wanting to use Transaction Producer mode ({}) without supplying a Producer instance",
+ commitMode));
+ }
+ }
+ protected boolean isUsingTransactionalProducer() {
+ return commitMode.equals(TRANSACTIONAL_PRODUCER);
+ }
+
+ public boolean isProducerSupplied() {
+ return getProducer() != null;
+ }
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
index 98ffa39e6..c54c6bd7a 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java
@@ -10,7 +10,6 @@
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.producer.*;
-import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.MDC;
@@ -32,6 +31,7 @@
import static io.confluent.csid.utils.BackportUtils.toSeconds;
import static io.confluent.csid.utils.Range.range;
import static io.confluent.csid.utils.StringUtils.msg;
+import static io.confluent.parallelconsumer.UserFunctions.carefullyRun;
import static java.time.Duration.ofSeconds;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -42,6 +42,8 @@
@Slf4j
public class ParallelEoSStreamProcessor implements ParallelStreamProcessor, ConsumerRebalanceListener, Closeable {
+ private final ParallelConsumerOptions options;
+
/**
* Injectable clock for testing
*/
@@ -54,9 +56,8 @@ public class ParallelEoSStreamProcessor implements ParallelStreamProcessor
private Instant lastCommit = Instant.now();
- private boolean inTransaction = false;
+ private final Optional> producerManager;
- private final org.apache.kafka.clients.producer.Producer producer;
private final org.apache.kafka.clients.consumer.Consumer consumer;
/**
@@ -95,13 +96,32 @@ public class ParallelEoSStreamProcessor implements ParallelStreamProcessor
*/
private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();
+ private final OffsetCommitter committer;
+
+ /**
+ * Used to request a commit asap
+ */
+ private final AtomicBoolean commitCommand = new AtomicBoolean(false);
+
/**
* The run state of the controller.
*
* @see #state
*/
enum State {
- unused, running, draining, closing, closed;
+ unused,
+ running,
+ /**
+ * When draining, the system will stop polling for more records, but will attempt to process all already
+ * downloaded records. Note that if you choose to close without draining, records already processed will still
+ * be committed first before closing.
+ *
+ * @see #closeDrainFirst()
+ * @see #close()
+ */
+ draining,
+ closing,
+ closed;
}
/**
@@ -122,47 +142,46 @@ enum State {
*
* @see ParallelConsumerOptions
*/
- public ParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.Consumer consumer,
- org.apache.kafka.clients.producer.Producer producer,
- ParallelConsumerOptions options) {
- log.debug("Confluent async consumer initialise");
+ public ParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) {
+ Objects.requireNonNull(newOptions, "Options must be supplied");
+
+ log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions);
- Objects.requireNonNull(consumer);
- Objects.requireNonNull(producer);
- Objects.requireNonNull(options);
+ options = newOptions;
+ options.validate();
+
+ this.consumer = options.getConsumer();
checkNotSubscribed(consumer);
checkAutoCommitIsDisabled(consumer);
- //
- this.producer = producer;
- this.consumer = consumer;
+ this.workerPool = Executors.newFixedThreadPool(newOptions.getNumberOfThreads());
- workerPool = Executors.newFixedThreadPool(options.getNumberOfThreads());
+ this.wm = new WorkManager<>(newOptions, consumer);
- //
- this.wm = new WorkManager<>(options, consumer);
+ ConsumerManager consumerMgr = new ConsumerManager<>(consumer);
- //
- this.brokerPollSubsystem = new BrokerPollSystem<>(consumer, wm, this);
+ this.brokerPollSubsystem = new BrokerPollSystem<>(consumerMgr, wm, this, newOptions);
- //
- try {
- log.debug("Initialising producer transaction session...");
- producer.initTransactions();
- } catch (KafkaException e) {
- log.error("Make sure your producer is setup for transactions - specifically make sure it's {} is set.", ProducerConfig.TRANSACTIONAL_ID_CONFIG, e);
- throw e;
+ if (options.isProducerSupplied()) {
+ this.producerManager = Optional.of(new ProducerManager<>(options.getProducer(), consumerMgr, this.wm, options));
+ if (options.isUsingTransactionalProducer())
+ this.committer = this.producerManager.get();
+ else
+ this.committer = this.brokerPollSubsystem;
+ } else {
+ this.producerManager = Optional.empty();
+ this.committer = this.brokerPollSubsystem;
}
}
- private void checkNotSubscribed(org.apache.kafka.clients.consumer.Consumer consumer) {
- if (consumer instanceof MockConsumer)
+ private void checkNotSubscribed(org.apache.kafka.clients.consumer.Consumer consumerToCheck) {
+ if (consumerToCheck instanceof MockConsumer)
// disabled for unit tests which don't test rebalancing
return;
- Set subscription = consumer.subscription();
- Set assignment = consumer.assignment();
- if (subscription.size() != 0 || assignment.size() != 0) {
+ Set subscription = consumerToCheck.subscription();
+ Set assignment = consumerToCheck.assignment();
+ if (!subscription.isEmpty() || !assignment.isEmpty()) {
throw new IllegalStateException("Consumer subscription must be managed by this class. Use " + this.getClass().getName() + "#subcribe methods instead.");
}
}
@@ -195,12 +214,19 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
/**
* Commit our offsets
+ *
+ * Make sure the calling thread is the thread which performs commit - i.e. is the {@link OffsetCommitter}.
*/
@Override
public void onPartitionsRevoked(Collection partitions) {
- commitOffsetsThatAreReady();
- wm.onPartitionsRevoked(partitions);
- usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsRevoked(partitions));
+ try {
+ log.debug("Partitions revoked (onPartitionsRevoked), state: {}", state);
+ commitOffsetsThatAreReady();
+ wm.onPartitionsRevoked(partitions);
+ usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsRevoked(partitions));
+ } catch (Exception e) {
+ throw new InternalError("onPartitionsRevoked event error", e);
+ }
}
/**
@@ -255,7 +281,10 @@ private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consume
public void poll(Consumer> usersVoidConsumptionFunction) {
Function, List