Skip to content

Commit

Permalink
feature: Poll Context object for API (#223)
Browse files Browse the repository at this point in the history
- Provides an organised view per TopicPartition and includes failure counts for records
- Unify batch and non-batch, record failure history
- Migrate to threeten's MutableClock
  • Loading branch information
astubbs committed Mar 23, 2022
1 parent e96281c commit 7c5ac01
Show file tree
Hide file tree
Showing 47 changed files with 907 additions and 642 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ ifndef::github_name[]
toc::[]
endif::[]

== Next Version

=== Features

* PollContext API - provides central access to result set with various convenience methods as well as metadata about records, such as failure count
* Batching - all API methods now support batching.
See the Options class set batch size for more information.

=== Fixes and Improvements

* Event system - better CPU usage in control thread
* Concurrency stability improvements
* Update dependencies

== v0.4.0.0
// https://github.com/confluentinc/parallel-consumer/releases/tag/0.4.0.0

Expand Down
50 changes: 32 additions & 18 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ without operational burden or harming the cluster's performance
* Solution for the https://en.wikipedia.org/wiki/Head-of-line_blocking["head of line"] blocking problem where continued failure of a single message, prevents progress for messages behind it in the queue
* Per `key` concurrent processing, per partition and unordered message processing
* Offsets committed correctly, in order, of only processed messages, regardless of concurrency level or retries
* Batch version fo the API to process batches of messages in parallel instead of single messages.
* Batch support in all versions of the API to process batches of messages in parallel instead of single messages.
** Particularly useful for when your processing function can work with more than a single record at a time - e.g. sending records to an API which has a batch version like Elasticsearch
* Vert.x and Reactor.io non-blocking library integration
** Non-blocking I/O work management
** Vert.x's WebClient and general Vert.x Future support
** Reactor.io Publisher (Mono/Flux) and Java's CompletableFuture (through `Mono#fromFuture`)
* Reactor non-blocking library integration
* Fair partition traversal
* Zero~ dependencies (`Slf4j` and `Lombok`) for the core module
* Java 8 compatibility
Expand Down Expand Up @@ -491,9 +491,10 @@ You can also optionally provide a callback function to be run after the message(

.Usage - print message content out to the console in parallel
[source,java,indent=0]
parallelConsumer.pollAndProduce(record -> {
var result = processBrokerRecord(record);
return new ProducerRecord<>(outputTopic, record.key(), result.payload);
parallelConsumer.pollAndProduce(context -> {
var consumerRecord = context.getSingleRecord().getConsumerRecord();
var result = processBrokerRecord(consumerRecord);
return new ProducerRecord<>(outputTopic, consumerRecord.key(), result.payload);
}, consumeProduceResult -> {
log.debug("Message {} saved to broker at offset {}",
consumeProduceResult.getOut(),
Expand All @@ -513,10 +514,12 @@ In future versions, we plan to look at supporting other streaming systems like h
[[batching]]
=== Batching

The library also support a batch version of the API.
Using this, you can process batches of messages at once.
The library also supports sending a batch or records as input to the users processing function in parallel.
Using this, you can process several records in your function at once.

To use it, use one of the `batch` versions instead.
To use it, set a `batch size` in the options class.

There are then various access methods for the batch of records - see the `PollContext` object for more information.

IMPORTANT: If an exception is thrown while processing the batch, all messages in the batch will be returned to the queue, to be retried with the standard retry system.
There is no guarantee that the messages will be retried again in the same batch.
Expand All @@ -531,10 +534,10 @@ There is no guarantee that the messages will be retried again in the same batch.
.maxConcurrency(100)
.batchSize(5) // <1>
.build());
parallelConsumer.pollBatch(batchOfRecords -> {
parallelConsumer.poll(context -> {
// convert the batch into the payload for our processing
List<String> payload = batchOfRecords.stream()
.map(this::pareparePayload)
List<String> payload = context.stream()
.map(this::preparePayload)
.collect(Collectors.toList());
// process the entire batch payload at once
processBatchPayload(payload);
Expand All @@ -554,9 +557,10 @@ There is no guarantee that the messages will be retried again in the same batch.
.Call an HTTP endpoint for each message usage
[source,java,indent=0]
----
var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> {
log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
Map<String, String> params = UniMaps.of("recordKey", record.key(), "payload", record.value());
var resultStream = parallelConsumer.vertxHttpReqInfoStream(context -> {
var consumerRecord = context.getSingleConsumerRecord();
log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
Map<String, String> params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value());
return new RequestInfo("localhost", port, "/api", params); // <1>
});
----
Expand Down Expand Up @@ -633,6 +637,14 @@ From the `Clients` view, get the connection information customized to your clust
.. Use these settings presented to https://docs.confluent.io/clients-kafka-java/current/overview.html[configure your clients].
. Use these clients for steps outlined in the <<common_preparation>> section.

[[upgrading]]
== Upgrading

=== From 0.4 to 0.5

This version has a breaking change in the API - instead of passing in `ConsumerRecord` instances, it passes in a `PollContext` object which has extra information and utility methods.
See the `PollContext` class for more information.

[[ordering-guarantees]]
== Ordering Guarantees

Expand Down Expand Up @@ -734,8 +746,8 @@ You can access the retry count of a record through it's wrapped `WorkContainer`
final int baseDelaySecond = 1;
ParallelConsumerOptions.<String, String>builder()
.retryDelayProvider(workContainer -> {
int numberOfFailedAttempts = workContainer.getNumberOfFailedAttempts();
.retryDelayProvider(recordContext -> {
int numberOfFailedAttempts = recordContext.getNumberOfFailedAttempts();
long delayMillis = (long) (baseDelaySecond * Math.pow(multiplier, numberOfFailedAttempts) * 1000);
return Duration.ofMillis(delayMillis);
});
Expand All @@ -757,7 +769,8 @@ Implementing a https://github.com/confluentinc/parallel-consumer/issues/196[max
final int maxRetries = 10;
final Map<ConsumerRecord<String, String>, Long> retriesCount = new ConcurrentHashMap<>();
pc.poll(consumerRecord -> {
pc.poll(context -> {
var consumerRecord = context.getSingleRecord().getConsumerRecord();
Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L);
if (retryCount < maxRetries) {
processRecord(consumerRecord);
Expand All @@ -783,7 +796,8 @@ This will put the message back in the queue.
----
final Map<String, Boolean> upMap = new ConcurrentHashMap<>();
pc.poll(consumerRecord -> {
pc.poll(context -> {
var consumerRecord = context.getSingleRecord().getConsumerRecord();
String serverId = extractServerId(consumerRecord);
boolean up = upMap.computeIfAbsent(serverId, ignore -> true);
Expand Down
6 changes: 6 additions & 0 deletions parallel-consumer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
<groupId>com.google.flogger</groupId>
<artifactId>flogger-slf4j-backend</artifactId>
</dependency>
<dependency>
<groupId>org.threeten</groupId>
<artifactId>threeten-extra</artifactId>
<version>1.7.0</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package io.confluent.csid.utils;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.SneakyThrows;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.Callable;

@Slf4j
@UtilityClass
public class TimeUtils {

public Clock getClock() {
return Clock.systemUTC();
}

@SneakyThrows
public static <RESULT> RESULT time(final Callable<RESULT> func) {
long start = System.currentTimeMillis();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.Java8StreamUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.List;
Expand All @@ -21,7 +20,7 @@ public class JStreamParallelEoSStreamProcessor<K, V> extends ParallelEoSStreamPr

private final ConcurrentLinkedDeque<ConsumeProduceResult<K, V, K, V>> userProcessResultsStream;

public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsumerOptions) {
public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> parallelConsumerOptions) {
super(parallelConsumerOptions);

this.userProcessResultsStream = new ConcurrentLinkedDeque<>();
Expand All @@ -30,7 +29,7 @@ public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsume
}

@Override
public Stream<ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction) {
public Stream<ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction) {
super.pollAndProduceMany(userFunction, (result) -> {
log.trace("Wrapper callback applied, sending result to stream. Input: {}", result);
this.userProcessResultsStream.add(result);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.List;
Expand All @@ -14,7 +14,7 @@

public interface JStreamParallelStreamProcessor<K, V> extends DrainingCloseable {

static <KK, VV> JStreamParallelStreamProcessor<KK, VV> createJStreamEosStreamProcessor(ParallelConsumerOptions options) {
static JStreamParallelStreamProcessor createJStreamEosStreamProcessor(ParallelConsumerOptions<?, ?> options) {
return new JStreamParallelEoSStreamProcessor<>(options);
}

Expand All @@ -24,6 +24,7 @@ static <KK, VV> JStreamParallelStreamProcessor<KK, VV> createJStreamEosStreamPro
*
* @return a stream of results of applying the function to the polled records
*/
Stream<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(Function<ConsumerRecord<K, V>,
List<ProducerRecord<K, V>>> userFunction);
Stream<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(
Function<PollContext<K, V>,
List<ProducerRecord<K, V>>> userFunction);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

import static io.confluent.csid.utils.StringUtils.msg;
Expand Down Expand Up @@ -145,6 +144,7 @@ public enum CommitMode {
*/
@Builder.Default
private final int maxConcurrency = DEFAULT_MAX_CONCURRENCY;

public static final int DEFAULT_MAX_CONCURRENCY = 16;

/**
Expand All @@ -159,14 +159,7 @@ public enum CommitMode {
* <p>
* Overrides {@link #defaultMessageRetryDelay}, even if it's set.
*/
@Builder.Default
private final Function<WorkContainer, Duration> retryDelayProvider;

/**
* Dirty global access to the {@link #retryDelayProvider}.
*/
// TODO remove need for writeable global access
public static Function<WorkContainer, Duration> retryDelayProviderStatic;
private final Function<RecordContext<K, V>, Duration> retryDelayProvider;

/**
* Controls how long to block while waiting for the {@link Producer#send} to complete for any ProducerRecords
Expand All @@ -189,8 +182,16 @@ public enum CommitMode {
private final Duration offsetCommitTimeout = Duration.ofSeconds(10);

/**
* The maximum number of messages to attempt to pass into the {@code batch} versions of the user function. Batch
* sizes may sometimes be less than this size, but will never be more.
* The maximum number of messages to attempt to pass into the user functions.
* <p>
* Batch sizes may sometimes be less than this size, but will never be more.
* <p>
* The system will treat the messages as a set, so if an error is thrown by the user code, then all messages will be
* marked as failed and be retried (Note that when they are retried, there is no guarantee they will all be in the
* same batch again). So if you're going to process messages individually, then don't set a batch size.
* <p>
* Otherwise, if you're going to process messages in sub sets from this batch, it's better to instead adjust the
* {@link ParallelConsumerOptions#getBatchSize()} instead to the actual desired size, and process them as a whole.
* <p>
* Note that there is no relationship between the {@link ConsumerConfig} setting of {@link
* ConsumerConfig#MAX_POLL_RECORDS_CONFIG} and this configured batch size, as this library introduces a large layer
Expand All @@ -202,31 +203,31 @@ public enum CommitMode {
* <p>
* If we have enough, then we actively manage pausing our subscription so that we can continue calling {@code poll}
* without pulling in even more messages.
* <p>
*
* @see ParallelConsumerOptions#getBatchSize()
*/
private final Integer batchSize;
@Builder.Default
private final Integer batchSize = 1;

/**
* Configure the amount of delay a record experiences, before a warning is logged.
*/
@Builder.Default
private final Duration thresholdForTimeSpendInQueueWarning = Duration.ofSeconds(10);

/**
* @see #batchSize
*/
public Optional<Integer> getBatchSize() {
return Optional.ofNullable(batchSize);
}

public boolean isUsingBatching() {
return this.getBatchSize().isPresent();
return getBatchSize() > 1;
}

@Builder.Default
private final int maxFailureHistory = 10;

/**
* @return the combined target of the desired concurrency by the configured batch size
*/
public int getTargetAmountOfRecordsInFlight() {
return getMaxConcurrency() * getBatchSize().orElse(1);
return getMaxConcurrency() * getBatchSize();
}

public void validate() {
Expand All @@ -239,7 +240,6 @@ public void validate() {

//
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());
ParallelConsumerOptions.retryDelayProviderStatic = getRetryDelayProvider();
}

public boolean isUsingTransactionalProducer() {
Expand Down
Loading

0 comments on commit 7c5ac01

Please sign in to comment.