Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #412: ClassCastException with retryDelayProvider #417

Merged
merged 21 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ endif::[]
** Introduces lock acquisition timeouts.
** Fixes a potential issue with removing records from the retry queue incorrectly, by having an inconsistency between compareTo and equals in the retry TreeMap.

=== Fixes

* Fixes #412: ClassCastException with retryDelayProvider (#417)

== v0.5.2.3

=== Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
Expand Down Expand Up @@ -178,7 +177,7 @@ public enum CommitMode {

/**
* Periodically synchronous commits with the Consumer. Much faster than
* {@link #PERIODIC_TRANSACTIONAL_PRODUCER}. Slower but potentially less duplicates than
* {@link #PERIODIC_TRANSACTIONAL_PRODUCER}. Slower but potentially fewer duplicates than
* {@link #PERIODIC_CONSUMER_ASYNCHRONOUS} upon replay.
*/
PERIODIC_CONSUMER_SYNC,
Expand Down Expand Up @@ -287,15 +286,21 @@ public void setCommitInterval(Duration commitInterval) {

public static final int DEFAULT_MAX_CONCURRENCY = 16;

public static final Duration DEFAULT_STATIC_RETRY_DELAY = Duration.ofSeconds(1);

/**
* When a message fails, how long the system should wait before trying that message again. Note that this will not
* be exact, and is just a target.
*
* @deprecated will be renamed to static retry delay
*/
@Deprecated
@Builder.Default
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);
private final Duration defaultMessageRetryDelay = DEFAULT_STATIC_RETRY_DELAY;

/**
* When present, use this to generate the retry delay, instead of {@link #getDefaultMessageRetryDelay()}.
* When present, use this to generate a dynamic retry delay, instead of a static one with
* {@link #getDefaultMessageRetryDelay()}.
* <p>
* Overrides {@link #defaultMessageRetryDelay}, even if it's set.
*/
Expand Down Expand Up @@ -375,9 +380,6 @@ public void validate() {
Objects.requireNonNull(consumer, "A consumer must be supplied");

transactionsValidation();

//
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());
}

private void transactionsValidation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;

import java.time.Clock;

/**
* Minimum dependency injection system, modled on how Dagger works.
* <p>
Expand Down Expand Up @@ -69,11 +71,12 @@ protected ConsumerManager<K, V> consumerManager() {
return consumerManager;
}

@Setter
private WorkManager<K, V> workManager;

public WorkManager<K, V> workManager() {
if (workManager == null) {
workManager = new WorkManager<>(this, dynamicExtraLoadFactor(), TimeUtils.getClock());
workManager = new WorkManager<>(this, dynamicExtraLoadFactor());
}
return workManager;
}
Expand All @@ -100,4 +103,7 @@ protected BrokerPollSystem<K, V> brokerPoller(AbstractParallelEoSStreamProcessor
return brokerPollSystem;
}

public Clock clock() {
return TimeUtils.getClock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.internal.*;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -22,7 +17,6 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.time.Clock;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
Expand All @@ -38,10 +32,10 @@
* @see PartitionState
*/
@Slf4j
@RequiredArgsConstructor
public class PartitionStateManager<K, V> implements ConsumerRebalanceListener {

public static final double USED_PAYLOAD_THRESHOLD_MULTIPLIER_DEFAULT = 0.75;

/**
* Best efforts attempt to prevent usage of offset payload beyond X% - as encoding size test is currently only done
* per batch, we need to leave some buffer for the required space to overrun before hitting the hard limit where we
Expand All @@ -56,8 +50,6 @@ public class PartitionStateManager<K, V> implements ConsumerRebalanceListener {

private final ShardManager<K, V> sm;

private final ParallelConsumerOptions<K, V> options;

/**
* Hold the tracking state for each of our managed partitions.
*/
Expand All @@ -74,7 +66,13 @@ public class PartitionStateManager<K, V> implements ConsumerRebalanceListener {
*/
private final Map<TopicPartition, Long> partitionsAssignmentEpochs = new ConcurrentHashMap<>();

private final Clock clock;
private final PCModule<K, V> module;

public PartitionStateManager(PCModule<K, V> module, ShardManager<K, V> sm) {
this.consumer = module.consumer();
this.sm = sm;
this.module = module;
}

public PartitionState<K, V> getPartitionState(TopicPartition tp) {
return partitionStates.get(tp);
Expand Down Expand Up @@ -358,7 +356,7 @@ private void maybeRegisterNewRecordAsWork(@NonNull Long epochOfInboundRecords, @
if (isRecordPreviouslyCompleted(rec)) {
log.trace("Record previously completed, skipping. offset: {}", rec.offset());
} else {
var work = new WorkContainer<>(epochOfInboundRecords, rec, options.getRetryDelayProvider(), clock);
var work = new WorkContainer<>(epochOfInboundRecords, rec, module);

sm.addWorkContainer(work);
addWorkContainer(work);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void logSlowWork(Set<WorkContainer<?, ?>> slowWork) {
private void addToSlowWorkMaybe(Set<WorkContainer<?, ?>> slowWork, WorkContainer<?, ?> workContainer) {
var msgTemplate = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
Duration timeInFlight = workContainer.getTimeInFlight();
var msg = msg(msgTemplate, workContainer, workContainer.hasDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight);
var msg = msg(msgTemplate, workContainer, workContainer.isDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight);
Duration slowThreshold = options.getThresholdForTimeSpendInQueueWarning();
if (isGreaterThan(timeInFlight, slowThreshold)) {
slowWork.add(workContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.TopicPartition;

import java.time.Clock;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static lombok.AccessLevel.PRIVATE;

/**
* Shards are local queues of work to be processed.
Expand All @@ -42,9 +40,6 @@ public class ShardManager<K, V> {

private final WorkManager<K, V> wm;

@Getter(PRIVATE)
private final Clock clock;

/**
* Map of Object keys to Shard
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,41 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.RecordContext;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.ProducerManager;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Function;

import static io.confluent.csid.utils.KafkaUtils.toTopicPartition;
import static java.util.Optional.of;

/**
* Model object for metadata around processing state of {@link ConsumerRecord}s.
*/
@Slf4j
@EqualsAndHashCode
public class WorkContainer<K, V> implements Comparable<WorkContainer<K, V>> {

static final String DEFAULT_TYPE = "DEFAULT";

/**
* Instance reference to otherwise static state, for access to the instance type parameters of WorkContainer as
* static fields cannot access them.
*/
@NonNull
private final PCModule<K, V> module;

/**
* Assignment generation this record comes from. Used for fencing messages after partition loss, for work lingering
* in the system of in flight.
Expand All @@ -53,8 +57,6 @@ public class WorkContainer<K, V> implements Comparable<WorkContainer<K, V>> {
@Getter
private final ConsumerRecord<K, V> cr;

private final Clock clock;

@Getter
private int numberOfFailedAttempts = 0;

Expand All @@ -72,45 +74,30 @@ public class WorkContainer<K, V> implements Comparable<WorkContainer<K, V>> {
@Getter
private Optional<Boolean> maybeUserFunctionSucceeded = Optional.empty();

/**
* @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
*/
@Setter
static Duration defaultRetryDelay = Duration.ofSeconds(1);

@Getter
@Setter(AccessLevel.PUBLIC)
private Future<List<?>> future;

private Optional<Long> timeTakenAsWorkMs = Optional.empty();

// static instance so can't access generics - but don't need them as Options class ensures type is correct
private static Function<Object, Duration> retryDelayProvider;

public WorkContainer(long epoch, ConsumerRecord<K, V> cr, Function<RecordContext<K, V>, Duration> retryDelayProvider, String workType, Clock clock) {
Objects.requireNonNull(workType);

public WorkContainer(long epoch, ConsumerRecord<K, V> cr, @NonNull PCModule<K, V> module, @NonNull String workType) {
this.epoch = epoch;
this.cr = cr;
this.workType = workType;
this.clock = clock;

if (WorkContainer.retryDelayProvider == null) { // only set once
// static instance so can't access generics - but don't need them as Options class ensures type is correct
WorkContainer.retryDelayProvider = (Function) retryDelayProvider;
}
this.module = module;
}

public WorkContainer(long epoch, ConsumerRecord<K, V> cr, Function<RecordContext<K, V>, Duration> retryDelayProvider, Clock clock) {
this(epoch, cr, retryDelayProvider, DEFAULT_TYPE, clock);
public WorkContainer(long epoch, ConsumerRecord<K, V> cr, PCModule<K, V> module) {
this(epoch, cr, module, DEFAULT_TYPE);
}

public void endFlight() {
log.trace("Ending flight {}", this);
inFlight = false;
}

public boolean hasDelayPassed() {
public boolean isDelayPassed() {
if (!hasPreviouslyFailed()) {
// if never failed, there is no artificial delay, so "delay" has always passed
return true;
Expand All @@ -124,7 +111,7 @@ public boolean hasDelayPassed() {
* @return time until it should be retried
*/
public Duration getDelayUntilRetryDue() {
Instant now = clock.instant();
Instant now = module.clock().instant();
Temporal nextAttemptAt = getRetryDueAt();
return Duration.between(now, nextAttemptAt);
}
Expand All @@ -147,10 +134,12 @@ public Instant getRetryDueAt() {
* @return the delay between retries e.g. retry after 1 second
*/
public Duration getRetryDelayConfig() {
var options = module.options();
var retryDelayProvider = options.getRetryDelayProvider();
if (retryDelayProvider != null) {
return retryDelayProvider.apply(this);
return retryDelayProvider.apply(new RecordContext<>(this));
} else {
return defaultRetryDelay;
return options.getDefaultMessageRetryDelay();
}
}

Expand Down Expand Up @@ -181,7 +170,7 @@ public TopicPartition getTopicPartition() {
}

public void onUserFunctionSuccess() {
this.succeededAt = of(clock.instant());
this.succeededAt = of(module.clock().instant());
this.maybeUserFunctionSucceeded = of(true);
}

Expand All @@ -195,7 +184,7 @@ public void onUserFunctionFailure(Throwable cause) {

private void updateFailureHistory(Throwable cause) {
numberOfFailedAttempts++;
lastFailedAt = of(Instant.now(clock));
lastFailedAt = of(Instant.now(module.clock()));
lastFailureReason = Optional.ofNullable(cause);
}

Expand Down Expand Up @@ -236,7 +225,7 @@ public boolean hasPreviouslyFailed() {
* {@link PartitionStateManager#isAllowedMoreRecords(WorkContainer)}.
*/
public boolean isAvailableToTakeAsWork() {
return isNotInFlight() && !isUserFunctionSucceeded() && hasDelayPassed();
return isNotInFlight() && !isUserFunctionSucceeded() && isDelayPassed();
}

/**
Expand Down
Loading