Skip to content

Commit

Permalink
feature: Choose between Consumer commit or Producer transactional com…
Browse files Browse the repository at this point in the history
…mits

- Choose either Consumer sync or async commits
- Fixes confluentinc#25 confluentinc#25:
-- Sometimes a a transaction error occurs - Cannot call send in state COMMITTING_TRANSACTION confluentinc#25
- ReentrantReadWrite lock protects non-thread safe transactional producer from incorrect multithreaded use
- Wider lock to prevent transaction's containing produced messages that they shouldn't
- Implement non transactional synchronous commit sync properly
- Select tests adapted to non transactional as well
- Must start tx in MockProducer as well
- Adds supervision to poller
- Fixes a performance issue with the async committer not being woken up
- Enhances tests to run under multiple commit modes
- Fixes example app tests - incorrectly testing wrong thing and MockProducer not configured to auto complete
- Make committer thread revoke partitions and commit
- Have onPartitionsRevoked be responsible for committing on close, instead of an explicit call to commit by controller
- Make sure Broker Poller now drains properly, committing any waiting work
- Add missing revoke flow to MockConsumer wrapper
- Add missing latch timeout check
  • Loading branch information
JorgenRingen authored and astubbs committed Nov 20, 2020
1 parent 0b96d5e commit ac393f2
Show file tree
Hide file tree
Showing 41 changed files with 2,036 additions and 533 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ hs_err_pid*
*.iml
target/
.DS_Store
*.versionsBackup
*.versionsBackup

# JENV
.java-version
2 changes: 1 addition & 1 deletion .idea/runConfigurations/All_examples.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 39 additions & 20 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -313,18 +313,17 @@ Where `${project.version}` is the version to be used:
.Setup the client
[source,java,indent=0]
----
var options = ParallelConsumerOptions.builder()
.ordering(KEY) // <1>
.maxConcurrency(1000) // <2>
.maxUncommittedMessagesToHandlePerPartition(10000) // <3>
.build();
ParallelConsumerOptions options = getOptions();
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <4>
Producer<String, String> kafkaProducer = getKafkaProducer();
ParallelStreamProcessor<String, String> eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, kafkaProducer, options);
if (!(kafkaConsumer instanceof MockConsumer)) {
kafkaConsumer.subscribe(UniLists.of(inputTopic)); // <5>
eosStreamProcessor.subscribe(UniLists.of(inputTopic)); // <5>
}
return ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, getKafkaProducer(), options);
return eosStreamProcessor;
----
<1> Choose your ordering type, `KEY` in this case.
This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also.
Expand Down Expand Up @@ -384,13 +383,12 @@ You can also optionally provide a callback function to be run after the message(
[source,java,indent=0]
this.parallelConsumer.pollAndProduce(record -> {
var result = processBrokerRecord(record);
ProducerRecord<String, String> produceRecord =
new ProducerRecord<>(outputTopic, "a-key", result.payload);
return UniLists.of(produceRecord);
}, consumeProduceResult ->
log.info("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
consumeProduceResult.getMeta().offset())
return new ProducerRecord<>(outputTopic, record.key(), result.payload);
}, consumeProduceResult -> {
log.debug("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
consumeProduceResult.getMeta().offset());
}
);


Expand All @@ -412,7 +410,7 @@ In future versions, we plan to look at supporting other streaming systems like h
var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> {
log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
Map<String, String> params = UniMaps.of("recordKey", record.key(), "payload", record.value());
return new RequestInfo("localhost", "/api", params); // <1>
return new RequestInfo("localhost", port, "/api", params); // <1>
});
----
<1> Simply return an object representing the request, the Vert.x HTTP engine will handle the rest, using it's non-blocking engine
Expand Down Expand Up @@ -453,7 +451,7 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb
}
void concurrentProcess() {
setupConsumer();
setupParallelConsumer();
parallelConsumer.poll(record -> {
log.info("Concurrently processing a record: {}", record);
Expand Down Expand Up @@ -552,16 +550,33 @@ When your function is actually run, a result object will be streamed back to you
After your operation completes, you can also choose to publish a result message back to Kafka.
The message publishing metadata can be streamed back to your client code.

== Apache Kafka EoS Transaction Model
== Commit Mode

The system gives you three choices for how to do offset commits.
The simplest of the three are the two Consummer commits modes.
They are of course, `sycnhronous` and `asynchronous` mode.
The `transactional` mode is explained in the next section.

`Asychornous` mode is faster, as it doesn't block the control loop.

`Sycnhronous` will block the processing loop until a successful commit response is received, however, `asycnhronous` will still be capped by the max processing settings in the `Options` class.

If you're used to using the auto commit mode in the normal Kafka consumer, you can think of the `Asychornous` mode being similar to this.
We suggest starting with this mode, and it is the default.

The system uses Kafka's Exactly Once Semantics (EoS) system.
=== Apache Kafka EoS Transaction Model

There is also the option to use Kafka's Exactly Once Semantics (EoS) system.
This causes all messages produced as a result of processing a message to be committed within a transaction, along with their source offset.
This means that even under failure, the results will exist exactly once in the Kafka output topic.
If as a part of
your processing, you create side effects in other systems, this pertains to the usual idempotency requirements when breaking of EoS Kafka boundaries.
If as a part of your processing, you create side effects in other systems, this pertains to the usual idempotency requirements when breaking of EoS Kafka boundaries.

NOTE:: As with the `synchronous` processing mode, this will also block the processing loop until a successful transaction completes

CAUTION: This cannot be true for any externally integrated third party system, unless that system is __idempotent__.

For implementations details, see the <<Transactional System Architecture>> section.

[[streams-usage]]
== Using with Kafka Streams

Expand Down Expand Up @@ -748,6 +763,10 @@ Instead of the work thread pool count being the degree of concurrency, it is con
.Vert.x Architecture
image::https://lucid.app/publicSegments/view/509df410-5997-46be-98e7-ac7f241780b4/image.png[Vert.x Architecture, align="center"]

=== Transactional System Architecture

image::https://lucid.app/publicSegments/view/7480d948-ed7d-4370-a308-8ec12e6b453b/image.png[]

=== Offset Map

==== Storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
public class StringUtils {

public static String msg(String s, Object... args) {
String message = MessageFormatter.basicArrayFormat(s, args);
return message;
return MessageFormatter.basicArrayFormat(s, args);
}

public static boolean isBlank(final String property) {
if (property == null) return true;
else return property.trim().isEmpty(); // isBlank @since 11
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.confluent.parallelconsumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

@Slf4j
@RequiredArgsConstructor
public abstract class AbstractOffsetCommitter<K, V> implements OffsetCommitter {

protected final ConsumerManager<K, V> consumerMgr;
protected final WorkManager<K, V> wm;

/**
* Get offsets from {@link WorkManager} that are ready to commit
*/
@Override
public void retrieveOffsetsAndCommit() {
log.debug("Commit starting - find completed work to commit offsets");
// todo shouldn't be removed until commit succeeds (there's no harm in committing the same offset twice)
preAcquireWork();
try {
Map<TopicPartition, OffsetAndMetadata> offsetsToSend = wm.findCompletedEligibleOffsetsAndRemove();
if (offsetsToSend.isEmpty()) {
log.trace("No offsets ready");
} else {
log.debug("Will commit offsets for {} partition(s): {}", offsetsToSend.size(), offsetsToSend);
ConsumerGroupMetadata groupMetadata = consumerMgr.groupMetadata();

log.debug("Begin commit");
commitOffsets(offsetsToSend, groupMetadata);

log.debug("On commit success");
onOffsetCommitSuccess(offsetsToSend);
}
} finally {
postCommit();
}
}

protected void postCommit() {
// default noop
}

protected void preAcquireWork() {
// default noop
}

private void onOffsetCommitSuccess(final Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
wm.onOffsetCommitSuccess(offsetsToSend);
}

protected abstract void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsetsToSend, final ConsumerGroupMetadata groupMetadata);

}
Loading

0 comments on commit ac393f2

Please sign in to comment.