From 906f49fea1bf7d1229b5045466bd82e5fc04bfb7 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 27 Sep 2022 14:55:44 +0100 Subject: [PATCH 01/20] START: CCE in retry delay provider --- .../ParallelConsumerOptions.java | 6 +-- .../parallelconsumer/state/WorkContainer.java | 3 ++ .../parallelconsumer/state/ModelUtils.java | 7 ++- .../state/WorkContainerTest.java | 44 ++++++++++++++++++- 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 771e3e52d..a1fb73a88 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -96,9 +96,9 @@ public enum CommitMode { PERIODIC_TRANSACTIONAL_PRODUCER, /** - * Periodically synchronous commits with the Consumer. Much faster than {@link - * #PERIODIC_TRANSACTIONAL_PRODUCER}. Slower but potentially less duplicates than {@link - * #PERIODIC_CONSUMER_ASYNCHRONOUS} upon replay. + * Periodically synchronous commits with the Consumer. Much faster than + * {@link #PERIODIC_TRANSACTIONAL_PRODUCER}. Slower but potentially fewer duplicates than + * {@link #PERIODIC_CONSUMER_ASYNCHRONOUS} upon replay. */ PERIODIC_CONSUMER_SYNC, diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index a14ed5285..868465140 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -83,8 +83,10 @@ public class WorkContainer implements Comparable> { private Optional timeTakenAsWorkMs = Optional.empty(); // static instance so can't access generics - but don't need them as Options class ensures type is correct +// private static Function retryDelayProvider; private static Function retryDelayProvider; + public WorkContainer(long epoch, ConsumerRecord cr, Function, Duration> retryDelayProvider, String workType, Clock clock) { Objects.requireNonNull(workType); @@ -143,6 +145,7 @@ private Temporal tryAgainAt() { */ public Duration getRetryDelayConfig() { if (retryDelayProvider != null) { +// return retryDelayProvider.apply(new RecordContext<>(this)); return retryDelayProvider.apply(this); } else { return defaultRetryDelay; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java index ab24b16ee..73773d706 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java @@ -10,15 +10,18 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.mockito.Mockito; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + @RequiredArgsConstructor public class ModelUtils { private final PCModuleTestEnv module; public WorkContainer createWorkFor(long offset) { - //noinspection unchecked ConsumerRecord mockCr = Mockito.mock(ConsumerRecord.class); - WorkContainer workContainer = new WorkContainer<>(0, mockCr, null, TimeUtils.getClock()); + WorkContainer workContainer = new WorkContainer<>(0, mockCr, mock(Function.class), TimeUtils.getClock()); Mockito.doReturn(offset).when(mockCr).offset(); return workContainer; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index bbaebeb91..d6db32684 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -4,15 +4,55 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.parallelconsumer.ManagedTruth; +import io.confluent.parallelconsumer.FakeRuntimeError; +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.RecordContext; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; +import org.threeten.extra.MutableClock; + +import java.time.Duration; +import java.util.function.Function; + +import static io.confluent.parallelconsumer.ManagedTruth.assertThat; +import static org.mockito.Mockito.mock; class WorkContainerTest { @Test void basics() { var workContainer = new ModelUtils(new PCModuleTestEnv()).createWorkFor(0); - ManagedTruth.assertThat(workContainer).getDelayUntilRetryDue().isNotNegative(); + assertThat(workContainer).getDelayUntilRetryDue().isNotNegative(); + } + + @Test + void retryDelayProvider() { + Function, Duration> retryDelayProvider = context -> { + int numberOfFailedAttempts = context.getNumberOfFailedAttempts(); + return Duration.ofSeconds(numberOfFailedAttempts); + }; + + // + var opts = ParallelConsumerOptions.builder() + .retryDelayProvider(retryDelayProvider) + .build(); + + WorkContainer wc = new WorkContainer(0, + mock(ConsumerRecord.class), + opts.getRetryDelayProvider(), + MutableClock.epochUTC()); + + // + int numberOfFailures = 3; + wc.onUserFunctionFailure(new FakeRuntimeError("")); + wc.onUserFunctionFailure(new FakeRuntimeError("")); + wc.onUserFunctionFailure(new FakeRuntimeError("")); + + // + Duration retryDelayConfig = wc.getRetryDelayConfig(); + + // + assertThat(retryDelayConfig).getSeconds().isEqualTo(numberOfFailures); } } From 259f406eb61ea86a05b2436cec46965f3cea104a Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 27 Sep 2022 15:03:51 +0100 Subject: [PATCH 02/20] fix --- .../io/confluent/parallelconsumer/state/WorkContainer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 868465140..679c9dc04 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -145,8 +145,7 @@ private Temporal tryAgainAt() { */ public Duration getRetryDelayConfig() { if (retryDelayProvider != null) { -// return retryDelayProvider.apply(new RecordContext<>(this)); - return retryDelayProvider.apply(this); + return retryDelayProvider.apply(new RecordContext<>(this)); } else { return defaultRetryDelay; } From 021378d3157e754d0bdc4e29c4c7fcae4a31703d Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 27 Sep 2022 15:45:52 +0100 Subject: [PATCH 03/20] step --- .../io/confluent/parallelconsumer/state/WorkContainer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 679c9dc04..abb424a61 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -72,8 +72,10 @@ public class WorkContainer implements Comparable> { /** * @see ParallelConsumerOptions#getDefaultMessageRetryDelay() + * @see ParallelConsumerOptions#validate() */ @Setter + // todo remove unused default field init static Duration defaultRetryDelay = Duration.ofSeconds(1); @Getter @@ -83,8 +85,7 @@ public class WorkContainer implements Comparable> { private Optional timeTakenAsWorkMs = Optional.empty(); // static instance so can't access generics - but don't need them as Options class ensures type is correct -// private static Function retryDelayProvider; - private static Function retryDelayProvider; + private static Function retryDelayProvider; public WorkContainer(long epoch, ConsumerRecord cr, Function, Duration> retryDelayProvider, String workType, Clock clock) { From 930bb7e547275d01fc6380ffeb2bff64d2b857bb Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 27 Sep 2022 15:48:33 +0100 Subject: [PATCH 04/20] step --- .../java/io/confluent/parallelconsumer/state/WorkContainer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index abb424a61..be048bf8f 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -98,6 +98,7 @@ public WorkContainer(long epoch, ConsumerRecord cr, Function Date: Tue, 27 Sep 2022 15:11:13 +0100 Subject: [PATCH 05/20] save --- .../parallelconsumer/state/WorkContainer.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index be048bf8f..f51fc5b94 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -6,6 +6,7 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.RecordContext; +import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -85,8 +86,7 @@ public class WorkContainer implements Comparable> { private Optional timeTakenAsWorkMs = Optional.empty(); // static instance so can't access generics - but don't need them as Options class ensures type is correct - private static Function retryDelayProvider; - + private static Function, Duration> retryDelayProvider; public WorkContainer(long epoch, ConsumerRecord cr, Function, Duration> retryDelayProvider, String workType, Clock clock) { Objects.requireNonNull(workType); @@ -97,9 +97,9 @@ public WorkContainer(long epoch, ConsumerRecord cr, Function(this)); } else { From 0c0d80ff6b9d700669a20c4ed05fd5e588b74360 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 27 Sep 2022 15:42:49 +0100 Subject: [PATCH 06/20] save --- .../parallelconsumer/ParallelConsumerOptions.java | 3 ++- .../parallelconsumer/state/WorkContainer.java | 11 +++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index a1fb73a88..993cdecd3 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -238,8 +238,9 @@ public void validate() { commitMode)); } - // + // set static work container settings WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay()); + WorkContainer.setRetryDelayProvider((Function) retryDelayProvider); } public boolean isUsingTransactionalProducer() { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index f51fc5b94..83c5ff635 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -6,7 +6,6 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.RecordContext; -import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -77,7 +76,7 @@ public class WorkContainer implements Comparable> { */ @Setter // todo remove unused default field init - static Duration defaultRetryDelay = Duration.ofSeconds(1); + static Duration defaultRetryDelay; @Getter @Setter(AccessLevel.PUBLIC) @@ -86,7 +85,11 @@ public class WorkContainer implements Comparable> { private Optional timeTakenAsWorkMs = Optional.empty(); // static instance so can't access generics - but don't need them as Options class ensures type is correct - private static Function, Duration> retryDelayProvider; + @Setter + private static Function, Duration> retryDelayProvider; + // private static Function retryDelayProvider; + private static ParallelConsumerOptions opts; + public WorkContainer(long epoch, ConsumerRecord cr, Function, Duration> retryDelayProvider, String workType, Clock clock) { Objects.requireNonNull(workType); @@ -97,7 +100,7 @@ public WorkContainer(long epoch, ConsumerRecord cr, Function Date: Tue, 27 Sep 2022 16:28:23 +0100 Subject: [PATCH 07/20] START: PARENT approach: remove by using single reference to parent (not static, but only one reference) --- .../ParallelConsumerOptions.java | 5 -- .../state/PartitionStateManager.java | 5 +- .../parallelconsumer/state/WorkContainer.java | 60 +++++++------------ .../state/WorkContainerTest.java | 8 ++- .../state/WorkManagerTest.java | 3 +- 5 files changed, 34 insertions(+), 47 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 993cdecd3..7af8ab03c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -5,7 +5,6 @@ */ import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; -import io.confluent.parallelconsumer.state.WorkContainer; import lombok.Builder; import lombok.Getter; import lombok.ToString; @@ -237,10 +236,6 @@ public void validate() { throw new IllegalArgumentException(msg("Wanting to use Transaction Producer mode ({}) without supplying a Producer instance", commitMode)); } - - // set static work container settings - WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay()); - WorkContainer.setRetryDelayProvider((Function) retryDelayProvider); } public boolean isUsingTransactionalProducer() { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index 1ec416567..8a093c760 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -11,6 +11,7 @@ import io.confluent.parallelconsumer.internal.EpochAndRecordsMap; import io.confluent.parallelconsumer.internal.InternalRuntimeError; import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager; +import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; @@ -55,6 +56,7 @@ public class PartitionStateManager implements ConsumerRebalanceListener { private final ShardManager sm; + @Getter(AccessLevel.PACKAGE) private final ParallelConsumerOptions options; /** @@ -73,6 +75,7 @@ public class PartitionStateManager implements ConsumerRebalanceListener { */ private final Map partitionsAssignmentEpochs = new ConcurrentHashMap<>(); + @Getter(AccessLevel.PACKAGE) private final Clock clock; public PartitionState getPartitionState(TopicPartition tp) { @@ -356,7 +359,7 @@ private void maybeRegisterNewRecordAsWork(Long epochOfInboundRecords, ConsumerRe if (isRecordPreviouslyCompleted(rec)) { log.trace("Record previously completed, skipping. offset: {}", rec.offset()); } else { - var work = new WorkContainer<>(epochOfInboundRecords, rec, options.getRetryDelayProvider(), clock); + var work = new WorkContainer<>(epochOfInboundRecords, rec, this); sm.addWorkContainer(work); addWorkContainer(work); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 83c5ff635..4dcefc5b4 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -4,17 +4,12 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.RecordContext; -import lombok.AccessLevel; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; +import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.time.temporal.Temporal; @@ -22,7 +17,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.Future; -import java.util.function.Function; import static io.confluent.csid.utils.KafkaUtils.toTopicPartition; import static java.util.Optional.of; @@ -33,6 +27,18 @@ public class WorkContainer implements Comparable> { static final String DEFAULT_TYPE = "DEFAULT"; + /** + * Reference to parent for memory efficient static object access with generic parameters. + *

+ * Not static, but only a single reference - replacing previous single reference, but allows for access to several + * global state instances and simplifies the architecture. + * + * @see PartitionStateManager#getClock + * @see PartitionStateManager#getOptions + */ + @NonNull + private final PartitionStateManager partitionStateManagerParent; + /** * Assignment generation this record comes from. Used for fencing messages after partition loss, for work lingering * in the system of in flight. @@ -51,8 +57,6 @@ public class WorkContainer implements Comparable> { @Getter private final ConsumerRecord cr; - private final Clock clock; - @Getter private int numberOfFailedAttempts = 0; @@ -70,44 +74,25 @@ public class WorkContainer implements Comparable> { @Getter private Optional maybeUserFunctionSucceeded = Optional.empty(); - /** - * @see ParallelConsumerOptions#getDefaultMessageRetryDelay() - * @see ParallelConsumerOptions#validate() - */ - @Setter - // todo remove unused default field init - static Duration defaultRetryDelay; - @Getter @Setter(AccessLevel.PUBLIC) private Future> future; private Optional timeTakenAsWorkMs = Optional.empty(); - // static instance so can't access generics - but don't need them as Options class ensures type is correct - @Setter - private static Function, Duration> retryDelayProvider; - // private static Function retryDelayProvider; - private static ParallelConsumerOptions opts; - - public WorkContainer(long epoch, ConsumerRecord cr, Function, Duration> retryDelayProvider, String workType, Clock clock) { + public WorkContainer(long epoch, ConsumerRecord cr, String workType, PartitionStateManager psm) { Objects.requireNonNull(workType); this.epoch = epoch; this.cr = cr; this.workType = workType; - this.clock = clock; - if (WorkContainer.retryDelayProvider == null) { // only set once -// AbstractParallelEoSStreamProcessor. - // todo this case removes any type safety - which got bit in issue #412 - WorkContainer.retryDelayProvider = retryDelayProvider; - } + this.partitionStateManagerParent = psm; } - public WorkContainer(long epoch, ConsumerRecord cr, Function, Duration> retryDelayProvider, Clock clock) { - this(epoch, cr, retryDelayProvider, DEFAULT_TYPE, clock); + public WorkContainer(long epoch, ConsumerRecord cr, PartitionStateManager psm) { + this(epoch, cr, DEFAULT_TYPE, psm); } public void endFlight() { @@ -129,7 +114,7 @@ public boolean hasDelayPassed() { * @return time until it should be retried */ public Duration getDelayUntilRetryDue() { - Instant now = clock.instant(); + Instant now = partitionStateManagerParent.getClock().instant(); Temporal nextAttemptAt = tryAgainAt(); return Duration.between(now, nextAttemptAt); } @@ -149,11 +134,12 @@ private Temporal tryAgainAt() { * @return the delay between retries e.g. retry after 1 second */ public Duration getRetryDelayConfig() { - this. + var options = partitionStateManagerParent.getOptions(); + var retryDelayProvider = options.getRetryDelayProvider(); if (retryDelayProvider != null) { return retryDelayProvider.apply(new RecordContext<>(this)); } else { - return defaultRetryDelay; + return options.getDefaultMessageRetryDelay(); } } @@ -184,7 +170,7 @@ public TopicPartition getTopicPartition() { } public void onUserFunctionSuccess() { - this.succeededAt = of(clock.instant()); + this.succeededAt = of(partitionStateManagerParent.getClock().instant()); this.maybeUserFunctionSucceeded = of(true); } @@ -198,7 +184,7 @@ public void onUserFunctionFailure(Throwable cause) { private void updateFailureHistory(Throwable cause) { numberOfFailedAttempts++; - lastFailedAt = of(Instant.now(clock)); + lastFailedAt = of(Instant.now(partitionStateManagerParent.getClock())); lastFailureReason = Optional.ofNullable(cause); } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index d6db32684..19fcac85a 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -8,10 +8,11 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.RecordContext; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; -import org.threeten.extra.MutableClock; +import java.time.Clock; import java.time.Duration; import java.util.function.Function; @@ -38,10 +39,11 @@ void retryDelayProvider() { .retryDelayProvider(retryDelayProvider) .build(); + PartitionStateManager psm = new PartitionStateManager<>(mock(Consumer.class), mock(ShardManager.class), opts, mock(Clock.class)); + WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class), - opts.getRetryDelayProvider(), - MutableClock.epochUTC()); + psm); // int numberOfFailures = 3; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index ce4467126..e503d08fe 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -43,6 +43,7 @@ import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.*; import static java.time.Duration.ofSeconds; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import static pl.tlinkowski.unij.api.UniLists.of; /** @@ -329,7 +330,7 @@ void testOrderedAndDelayed() { @Test void containerDelay() { - var wc = new WorkContainer(0, null, null, WorkContainer.DEFAULT_TYPE, this.time); + var wc = new WorkContainer(0, null, WorkContainer.DEFAULT_TYPE, mock(PartitionStateManager.class)); assertThat(wc.hasDelayPassed()).isTrue(); // when new, there's no delay wc.onUserFunctionFailure(new FakeRuntimeError("")); assertThat(wc.hasDelayPassed()).isFalse(); From b9fe49ee4b26fcf76d5767f395b2ca9910c5279c Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 27 Sep 2022 16:57:42 +0100 Subject: [PATCH 08/20] step --- .../io/confluent/parallelconsumer/state/WorkContainer.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 4dcefc5b4..41e0a0aa1 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -21,6 +21,9 @@ import static io.confluent.csid.utils.KafkaUtils.toTopicPartition; import static java.util.Optional.of; +/** + * TODO docs + */ @Slf4j @EqualsAndHashCode public class WorkContainer implements Comparable> { @@ -30,8 +33,8 @@ public class WorkContainer implements Comparable> { /** * Reference to parent for memory efficient static object access with generic parameters. *

- * Not static, but only a single reference - replacing previous single reference, but allows for access to several - * global state instances and simplifies the architecture. + * Not static, but only a single reference - replacing previous single reference field, but allows for access to + * several global state instances and simplifies the architecture. * * @see PartitionStateManager#getClock * @see PartitionStateManager#getOptions From 4edefedd02b12d1643a6cc99776368df06a2e0d6 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 27 Sep 2022 18:30:57 +0100 Subject: [PATCH 09/20] START: use context class --- .../internal/WorkContainerContext.java | 17 +++++++++++++ .../state/PartitionStateManager.java | 23 ++++++++++-------- .../parallelconsumer/state/WorkContainer.java | 24 +++++++++---------- .../state/WorkContainerTest.java | 7 ++---- 4 files changed, 43 insertions(+), 28 deletions(-) create mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/WorkContainerContext.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/WorkContainerContext.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/WorkContainerContext.java new file mode 100644 index 000000000..552252420 --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/WorkContainerContext.java @@ -0,0 +1,17 @@ +package io.confluent.parallelconsumer.internal; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import lombok.Value; + +import java.time.Clock; + +/** + * todo docs + * + * @author Antony Stubbs + */ +@Value +public class WorkContainerContext { + ParallelConsumerOptions options; + Clock clock; +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index 8a093c760..e5e111dff 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -6,14 +6,9 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; -import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; -import io.confluent.parallelconsumer.internal.BrokerPollSystem; -import io.confluent.parallelconsumer.internal.EpochAndRecordsMap; -import io.confluent.parallelconsumer.internal.InternalRuntimeError; +import io.confluent.parallelconsumer.internal.*; import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager; -import lombok.AccessLevel; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -38,10 +33,10 @@ * @see PartitionState */ @Slf4j -@RequiredArgsConstructor public class PartitionStateManager implements ConsumerRebalanceListener { public static final double USED_PAYLOAD_THRESHOLD_MULTIPLIER_DEFAULT = 0.75; + /** * Best efforts attempt to prevent usage of offset payload beyond X% - as encoding size test is currently only done * per batch, we need to leave some buffer for the required space to overrun before hitting the hard limit where we @@ -56,7 +51,6 @@ public class PartitionStateManager implements ConsumerRebalanceListener { private final ShardManager sm; - @Getter(AccessLevel.PACKAGE) private final ParallelConsumerOptions options; /** @@ -75,9 +69,18 @@ public class PartitionStateManager implements ConsumerRebalanceListener { */ private final Map partitionsAssignmentEpochs = new ConcurrentHashMap<>(); - @Getter(AccessLevel.PACKAGE) private final Clock clock; + private final WorkContainerContext context; + + public PartitionStateManager(Consumer consumer, ShardManager sm, ParallelConsumerOptions options, Clock clock) { + this.consumer = consumer; + this.sm = sm; + this.options = options; + this.clock = clock; + this.context = new WorkContainerContext<>(options, clock); + } + public PartitionState getPartitionState(TopicPartition tp) { return partitionStates.get(tp); } @@ -359,7 +362,7 @@ private void maybeRegisterNewRecordAsWork(Long epochOfInboundRecords, ConsumerRe if (isRecordPreviouslyCompleted(rec)) { log.trace("Record previously completed, skipping. offset: {}", rec.offset()); } else { - var work = new WorkContainer<>(epochOfInboundRecords, rec, this); + var work = new WorkContainer<>(epochOfInboundRecords, rec, context); sm.addWorkContainer(work); addWorkContainer(work); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 41e0a0aa1..9b24f3732 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -5,6 +5,7 @@ */ import io.confluent.parallelconsumer.RecordContext; +import io.confluent.parallelconsumer.internal.WorkContainerContext; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -31,16 +32,13 @@ public class WorkContainer implements Comparable> { static final String DEFAULT_TYPE = "DEFAULT"; /** - * Reference to parent for memory efficient static object access with generic parameters. + * Memory efficient static object access with generic parameters. *

* Not static, but only a single reference - replacing previous single reference field, but allows for access to * several global state instances and simplifies the architecture. - * - * @see PartitionStateManager#getClock - * @see PartitionStateManager#getOptions */ @NonNull - private final PartitionStateManager partitionStateManagerParent; + private final WorkContainerContext context; /** * Assignment generation this record comes from. Used for fencing messages after partition loss, for work lingering @@ -84,18 +82,18 @@ public class WorkContainer implements Comparable> { private Optional timeTakenAsWorkMs = Optional.empty(); - public WorkContainer(long epoch, ConsumerRecord cr, String workType, PartitionStateManager psm) { + public WorkContainer(long epoch, ConsumerRecord cr, WorkContainerContext context, String workType) { Objects.requireNonNull(workType); this.epoch = epoch; this.cr = cr; this.workType = workType; - this.partitionStateManagerParent = psm; + this.context = context; } - public WorkContainer(long epoch, ConsumerRecord cr, PartitionStateManager psm) { - this(epoch, cr, DEFAULT_TYPE, psm); + public WorkContainer(long epoch, ConsumerRecord cr, WorkContainerContext context) { + this(epoch, cr, context, DEFAULT_TYPE); } public void endFlight() { @@ -117,7 +115,7 @@ public boolean hasDelayPassed() { * @return time until it should be retried */ public Duration getDelayUntilRetryDue() { - Instant now = partitionStateManagerParent.getClock().instant(); + Instant now = context.getClock().instant(); Temporal nextAttemptAt = tryAgainAt(); return Duration.between(now, nextAttemptAt); } @@ -137,7 +135,7 @@ private Temporal tryAgainAt() { * @return the delay between retries e.g. retry after 1 second */ public Duration getRetryDelayConfig() { - var options = partitionStateManagerParent.getOptions(); + var options = context.getOptions(); var retryDelayProvider = options.getRetryDelayProvider(); if (retryDelayProvider != null) { return retryDelayProvider.apply(new RecordContext<>(this)); @@ -173,7 +171,7 @@ public TopicPartition getTopicPartition() { } public void onUserFunctionSuccess() { - this.succeededAt = of(partitionStateManagerParent.getClock().instant()); + this.succeededAt = of(context.getClock().instant()); this.maybeUserFunctionSucceeded = of(true); } @@ -187,7 +185,7 @@ public void onUserFunctionFailure(Throwable cause) { private void updateFailureHistory(Throwable cause) { numberOfFailedAttempts++; - lastFailedAt = of(Instant.now(partitionStateManagerParent.getClock())); + lastFailedAt = of(Instant.now(context.getClock())); lastFailureReason = Optional.ofNullable(cause); } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index 19fcac85a..6607fe04c 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -8,11 +8,10 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.RecordContext; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; -import org.apache.kafka.clients.consumer.Consumer; +import io.confluent.parallelconsumer.internal.WorkContainerContext; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; -import java.time.Clock; import java.time.Duration; import java.util.function.Function; @@ -39,11 +38,9 @@ void retryDelayProvider() { .retryDelayProvider(retryDelayProvider) .build(); - PartitionStateManager psm = new PartitionStateManager<>(mock(Consumer.class), mock(ShardManager.class), opts, mock(Clock.class)); - WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class), - psm); + mock(WorkContainerContext.class)); // int numberOfFailures = 3; From 806b505ef5d5c8096d98d74eb9d56f2a16674d0a Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 27 Sep 2022 19:56:41 +0100 Subject: [PATCH 10/20] START: use pcmodule injector --- .../parallelconsumer/internal/PCModule.java | 7 ++++- .../internal/WorkContainerContext.java | 17 ------------ .../state/PartitionStateManager.java | 18 ++++--------- .../parallelconsumer/state/ShardManager.java | 5 ---- .../parallelconsumer/state/WorkContainer.java | 27 +++++++------------ .../parallelconsumer/state/WorkManager.java | 19 +++++-------- .../parallelconsumer/state/ModelUtils.java | 6 ++--- .../state/WorkContainerTest.java | 4 +-- .../state/WorkManagerTest.java | 2 +- 9 files changed, 32 insertions(+), 73 deletions(-) delete mode 100644 parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/WorkContainerContext.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java index 5961ee073..a63a7fdc3 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java @@ -12,6 +12,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; +import java.time.Clock; + /** * Minimum dependency injection system, modled on how Dagger works. *

@@ -73,7 +75,7 @@ protected ConsumerManager consumerManager() { public WorkManager workManager() { if (workManager == null) { - workManager = new WorkManager<>(this, dynamicExtraLoadFactor(), TimeUtils.getClock()); + workManager = new WorkManager<>(this, dynamicExtraLoadFactor()); } return workManager; } @@ -100,4 +102,7 @@ protected BrokerPollSystem brokerPoller(AbstractParallelEoSStreamProcessor return brokerPollSystem; } + public Clock clock() { + return TimeUtils.getClock(); + } } \ No newline at end of file diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/WorkContainerContext.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/WorkContainerContext.java deleted file mode 100644 index 552252420..000000000 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/WorkContainerContext.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.confluent.parallelconsumer.internal; - -import io.confluent.parallelconsumer.ParallelConsumerOptions; -import lombok.Value; - -import java.time.Clock; - -/** - * todo docs - * - * @author Antony Stubbs - */ -@Value -public class WorkContainerContext { - ParallelConsumerOptions options; - Clock clock; -} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index e5e111dff..7c16458d0 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -4,7 +4,6 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; import io.confluent.parallelconsumer.internal.*; import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager; @@ -17,7 +16,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import java.time.Clock; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -51,8 +49,6 @@ public class PartitionStateManager implements ConsumerRebalanceListener { private final ShardManager sm; - private final ParallelConsumerOptions options; - /** * Hold the tracking state for each of our managed partitions. */ @@ -69,16 +65,12 @@ public class PartitionStateManager implements ConsumerRebalanceListener { */ private final Map partitionsAssignmentEpochs = new ConcurrentHashMap<>(); - private final Clock clock; - - private final WorkContainerContext context; + private final PCModule module; - public PartitionStateManager(Consumer consumer, ShardManager sm, ParallelConsumerOptions options, Clock clock) { - this.consumer = consumer; + public PartitionStateManager(PCModule module, ShardManager sm) { + this.consumer = module.consumer(); this.sm = sm; - this.options = options; - this.clock = clock; - this.context = new WorkContainerContext<>(options, clock); + this.module = module; } public PartitionState getPartitionState(TopicPartition tp) { @@ -362,7 +354,7 @@ private void maybeRegisterNewRecordAsWork(Long epochOfInboundRecords, ConsumerRe if (isRecordPreviouslyCompleted(rec)) { log.trace("Record previously completed, skipping. offset: {}", rec.offset()); } else { - var work = new WorkContainer<>(epochOfInboundRecords, rec, context); + var work = new WorkContainer<>(epochOfInboundRecords, rec, module); sm.addWorkContainer(work); addWorkContainer(work); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java index 9aba6f4b7..4c37ed3fc 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java @@ -14,7 +14,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.TopicPartition; -import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -22,7 +21,6 @@ import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY; import static java.util.Optional.empty; import static java.util.Optional.of; -import static lombok.AccessLevel.PRIVATE; /** * Shards are local queues of work to be processed. @@ -42,9 +40,6 @@ public class ShardManager { private final WorkManager wm; - @Getter(PRIVATE) - private final Clock clock; - /** * Map of Object keys to Shard *

diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 9b24f3732..a5bba4fff 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -5,7 +5,7 @@ */ import io.confluent.parallelconsumer.RecordContext; -import io.confluent.parallelconsumer.internal.WorkContainerContext; +import io.confluent.parallelconsumer.internal.PCModule; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -31,14 +31,8 @@ public class WorkContainer implements Comparable> { static final String DEFAULT_TYPE = "DEFAULT"; - /** - * Memory efficient static object access with generic parameters. - *

- * Not static, but only a single reference - replacing previous single reference field, but allows for access to - * several global state instances and simplifies the architecture. - */ @NonNull - private final WorkContainerContext context; + private final PCModule module; /** * Assignment generation this record comes from. Used for fencing messages after partition loss, for work lingering @@ -82,18 +76,17 @@ public class WorkContainer implements Comparable> { private Optional timeTakenAsWorkMs = Optional.empty(); - public WorkContainer(long epoch, ConsumerRecord cr, WorkContainerContext context, String workType) { + public WorkContainer(long epoch, ConsumerRecord cr, PCModule module, String workType) { Objects.requireNonNull(workType); this.epoch = epoch; this.cr = cr; this.workType = workType; - - this.context = context; + this.module = module; } - public WorkContainer(long epoch, ConsumerRecord cr, WorkContainerContext context) { - this(epoch, cr, context, DEFAULT_TYPE); + public WorkContainer(long epoch, ConsumerRecord cr, PCModule module) { + this(epoch, cr, module, DEFAULT_TYPE); } public void endFlight() { @@ -115,7 +108,7 @@ public boolean hasDelayPassed() { * @return time until it should be retried */ public Duration getDelayUntilRetryDue() { - Instant now = context.getClock().instant(); + Instant now = module.clock().instant(); Temporal nextAttemptAt = tryAgainAt(); return Duration.between(now, nextAttemptAt); } @@ -135,7 +128,7 @@ private Temporal tryAgainAt() { * @return the delay between retries e.g. retry after 1 second */ public Duration getRetryDelayConfig() { - var options = context.getOptions(); + var options = module.options(); var retryDelayProvider = options.getRetryDelayProvider(); if (retryDelayProvider != null) { return retryDelayProvider.apply(new RecordContext<>(this)); @@ -171,7 +164,7 @@ public TopicPartition getTopicPartition() { } public void onUserFunctionSuccess() { - this.succeededAt = of(context.getClock().instant()); + this.succeededAt = of(module.clock().instant()); this.maybeUserFunctionSucceeded = of(true); } @@ -185,7 +178,7 @@ public void onUserFunctionFailure(Throwable cause) { private void updateFailureHistory(Throwable cause) { numberOfFailedAttempts++; - lastFailedAt = of(Instant.now(context.getClock())); + lastFailedAt = of(Instant.now(module.clock())); lastFailureReason = Optional.ofNullable(cause); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java index 8f02453cc..2dc7eedf3 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java @@ -4,7 +4,6 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.internal.*; import lombok.Getter; @@ -14,7 +13,6 @@ import org.apache.kafka.common.TopicPartition; import pl.tlinkowski.unij.api.UniLists; -import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.function.Consumer; @@ -68,24 +66,19 @@ public class WorkManager implements ConsumerRebalanceListener { @Getter(PUBLIC) private final List>> successfulWorkListeners = new ArrayList<>(); - public WorkManager(PCModule module) { - this(module, new DynamicLoadFactor(), TimeUtils.getClock()); - } - /** * Use a private {@link DynamicLoadFactor}, useful for testing. */ - public WorkManager(PCModule module, Clock clock) { - this(module, new DynamicLoadFactor(), clock); + public WorkManager(PCModule module) { + this(module, new DynamicLoadFactor()); } - public WorkManager(final PCModule module, - final DynamicLoadFactor dynamicExtraLoadFactor, - final Clock clock) { + public WorkManager(PCModule module, + DynamicLoadFactor dynamicExtraLoadFactor) { this.options = module.options(); this.dynamicLoadFactor = dynamicExtraLoadFactor; - this.sm = new ShardManager<>(options, this, clock); - this.pm = new PartitionStateManager<>(module.consumer(), sm, options, clock); + this.sm = new ShardManager<>(options, this); + this.pm = new PartitionStateManager<>(module, sm); } /** diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java index 73773d706..92eeec428 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java @@ -4,14 +4,12 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.csid.utils.TimeUtils; +import io.confluent.parallelconsumer.internal.PCModule; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.mockito.Mockito; -import java.util.function.Function; - import static org.mockito.Mockito.mock; @RequiredArgsConstructor @@ -21,7 +19,7 @@ public class ModelUtils { public WorkContainer createWorkFor(long offset) { ConsumerRecord mockCr = Mockito.mock(ConsumerRecord.class); - WorkContainer workContainer = new WorkContainer<>(0, mockCr, mock(Function.class), TimeUtils.getClock()); + WorkContainer workContainer = new WorkContainer<>(0, mockCr, mock(PCModule.class)); Mockito.doReturn(offset).when(mockCr).offset(); return workContainer; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index 6607fe04c..976cdbe2c 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -7,8 +7,8 @@ import io.confluent.parallelconsumer.FakeRuntimeError; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.RecordContext; +import io.confluent.parallelconsumer.internal.PCModule; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; -import io.confluent.parallelconsumer.internal.WorkContainerContext; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -40,7 +40,7 @@ void retryDelayProvider() { WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class), - mock(WorkContainerContext.class)); + mock(PCModule.class)); // int numberOfFailures = 3; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index e503d08fe..b05598b1e 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -75,7 +75,7 @@ private void setupWorkManager(ParallelConsumerOptions options) { var mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); var optsOverride = options.toBuilder().consumer(mockConsumer).build(); - wm = new WorkManager<>(new PCModule(optsOverride), time); + wm = new WorkManager<>(new PCModule(optsOverride)); // inject time wm.getSuccessfulWorkListeners().add((work) -> { log.debug("Heard some successful work: {}", work); successfulWork.add(work); From ed6494858303f6d8dfdb5d74909dc70ac9fe1759 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 29 Sep 2022 13:31:35 +0100 Subject: [PATCH 11/20] review --- .../io/confluent/parallelconsumer/state/WorkContainer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index af773a5be..2318df544 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -4,11 +4,10 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.RecordContext; -import io.confluent.parallelconsumer.internal.ProducerManager; import io.confluent.parallelconsumer.internal.PCModule; +import io.confluent.parallelconsumer.internal.ProducerManager; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -26,7 +25,7 @@ import static java.util.Optional.of; /** - * TODO docs + * Model object for metadata around processing state of {@link ConsumerRecord}s. */ @Slf4j @EqualsAndHashCode From 3c223563fb8154d25d8433b45598c97d7f5a9a04 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 29 Sep 2022 13:50:09 +0100 Subject: [PATCH 12/20] another version: cast static reference of module to instance type params - currently guaranteed to be the same --- .../ParallelConsumerOptions.java | 13 ++++++---- .../parallelconsumer/internal/PCModule.java | 7 +++++ .../state/PartitionStateManager.java | 6 +---- .../parallelconsumer/state/WorkContainer.java | 26 ++++++++++++------- .../OffsetEncodingBackPressureTest.java | 13 +++++----- .../OffsetEncodingBackPressureUnitTest.java | 6 ----- .../WorkManagerOffsetMapCodecManagerTest.java | 3 +-- .../parallelconsumer/state/ModelUtils.java | 5 +--- .../state/WorkContainerTest.java | 5 +--- .../state/WorkManagerTest.java | 10 +++---- 10 files changed, 46 insertions(+), 48 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 79f996f37..3105a291a 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -286,15 +286,21 @@ public void setCommitInterval(Duration commitInterval) { public static final int DEFAULT_MAX_CONCURRENCY = 16; + public static final Duration DEFAULT_STATIC_RETRY_DELAY = Duration.ofSeconds(1); + /** * When a message fails, how long the system should wait before trying that message again. Note that this will not * be exact, and is just a target. + * + * @deprecated will be renamed to static retry delay */ + @Deprecated @Builder.Default - private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1); + private final Duration defaultMessageRetryDelay = DEFAULT_STATIC_RETRY_DELAY; /** - * When present, use this to generate the retry delay, instead of {@link #getDefaultMessageRetryDelay()}. + * When present, use this to generate a dynamic retry delay, instead of a static one with + * {@link #getDefaultMessageRetryDelay()}. *

* Overrides {@link #defaultMessageRetryDelay}, even if it's set. */ @@ -374,9 +380,6 @@ public void validate() { Objects.requireNonNull(consumer, "A consumer must be supplied"); transactionsValidation(); - - // - WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay()); } private void transactionsValidation() { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java index a63a7fdc3..5ee2e4d58 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java @@ -7,6 +7,7 @@ import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; +import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.Setter; import org.apache.kafka.clients.consumer.Consumer; @@ -30,6 +31,12 @@ public class PCModule { public PCModule(ParallelConsumerOptions options) { this.optionsInstance = options; + + setStaticReferences(); + } + + private void setStaticReferences() { + WorkContainer.setStaticModule(this); } public ParallelConsumerOptions options() { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index 0ade3bf57..c7badcede 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -9,7 +9,6 @@ import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager; import lombok.Getter; import lombok.NonNull; -import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -67,12 +66,9 @@ public class PartitionStateManager implements ConsumerRebalanceListener { */ private final Map partitionsAssignmentEpochs = new ConcurrentHashMap<>(); - private final PCModule module; - public PartitionStateManager(PCModule module, ShardManager sm) { this.consumer = module.consumer(); this.sm = sm; - this.module = module; } public PartitionState getPartitionState(TopicPartition tp) { @@ -357,7 +353,7 @@ private void maybeRegisterNewRecordAsWork(@NonNull Long epochOfInboundRecords, @ if (isRecordPreviouslyCompleted(rec)) { log.trace("Record previously completed, skipping. offset: {}", rec.offset()); } else { - var work = new WorkContainer<>(epochOfInboundRecords, rec, module); + var work = new WorkContainer<>(epochOfInboundRecords, rec); sm.addWorkContainer(work); addWorkContainer(work); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 2318df544..43d74fa8d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -33,8 +33,17 @@ public class WorkContainer implements Comparable> { static final String DEFAULT_TYPE = "DEFAULT"; + /** + * @see PCModule#setStaticReferences() + */ + @Setter @NonNull - private final PCModule module; + private static PCModule staticModule; + + private PCModule getModule() { + // Cast the type parameters of WorkContainer as static fields cannot access them, however as built they are guaranteed to match. + return (PCModule) staticModule; + } /** * Assignment generation this record comes from. Used for fencing messages after partition loss, for work lingering @@ -78,17 +87,16 @@ public class WorkContainer implements Comparable> { private Optional timeTakenAsWorkMs = Optional.empty(); - public WorkContainer(long epoch, ConsumerRecord cr, PCModule module, String workType) { + public WorkContainer(long epoch, ConsumerRecord cr, String workType) { Objects.requireNonNull(workType); this.epoch = epoch; this.cr = cr; this.workType = workType; - this.module = module; } - public WorkContainer(long epoch, ConsumerRecord cr, PCModule module) { - this(epoch, cr, module, DEFAULT_TYPE); + public WorkContainer(long epoch, ConsumerRecord cr) { + this(epoch, cr, DEFAULT_TYPE); } public void endFlight() { @@ -110,7 +118,7 @@ public boolean hasDelayPassed() { * @return time until it should be retried */ public Duration getDelayUntilRetryDue() { - Instant now = module.clock().instant(); + Instant now = getModule().clock().instant(); Temporal nextAttemptAt = getRetryDueAt(); return Duration.between(now, nextAttemptAt); } @@ -133,7 +141,7 @@ public Instant getRetryDueAt() { * @return the delay between retries e.g. retry after 1 second */ public Duration getRetryDelayConfig() { - var options = module.options(); + var options = getModule().options(); var retryDelayProvider = options.getRetryDelayProvider(); if (retryDelayProvider != null) { return retryDelayProvider.apply(new RecordContext<>(this)); @@ -169,7 +177,7 @@ public TopicPartition getTopicPartition() { } public void onUserFunctionSuccess() { - this.succeededAt = of(module.clock().instant()); + this.succeededAt = of(getModule().clock().instant()); this.maybeUserFunctionSucceeded = of(true); } @@ -183,7 +191,7 @@ public void onUserFunctionFailure(Throwable cause) { private void updateFailureHistory(Throwable cause) { numberOfFailedAttempts++; - lastFailedAt = of(Instant.now(module.clock())); + lastFailedAt = of(Instant.now(getModule().clock())); lastFailureReason = Optional.ofNullable(cause); } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureTest.java index 885e137ea..7dd7cb96d 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureTest.java @@ -7,9 +7,13 @@ import com.google.common.truth.Truth; import com.google.common.truth.Truth8; import io.confluent.parallelconsumer.FakeRuntimeError; +import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase; import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes; -import io.confluent.parallelconsumer.state.*; +import io.confluent.parallelconsumer.state.PartitionState; +import io.confluent.parallelconsumer.state.PartitionStateManager; +import io.confluent.parallelconsumer.state.ShardManager; +import io.confluent.parallelconsumer.state.WorkManager; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -21,7 +25,6 @@ import org.junit.jupiter.api.parallel.ResourceLock; import pl.tlinkowski.unij.api.UniLists; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -37,7 +40,6 @@ import static io.confluent.parallelconsumer.ManagedTruth.assertTruth; import static io.confluent.parallelconsumer.ManagedTruth.assertWithMessage; import static io.confluent.parallelconsumer.state.PartitionStateManager.USED_PAYLOAD_THRESHOLD_MULTIPLIER_DEFAULT; -import static java.time.Duration.ofMillis; import static java.time.Duration.ofSeconds; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -255,16 +257,13 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O log.debug("Test that failed messages can retry, causing partition to un-block"); { - Duration aggressiveDelay = ofMillis(100); - WorkContainer.setDefaultRetryDelay(aggressiveDelay); // more aggressive retry - // release message that was blocking partition progression // fail the message finalMsgLock.countDown(); // wait for the retry awaitForOneLoopCycle(); - sleepQuietly(aggressiveDelay.toMillis()); + sleepQuietly(ParallelConsumerOptions.DEFAULT_STATIC_RETRY_DELAY.toMillis()); await().until(() -> attempts.get() >= 2); // assert partition still blocked diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureUnitTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureUnitTest.java index 7d420d5a2..7db82589e 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureUnitTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/OffsetEncodingBackPressureUnitTest.java @@ -25,7 +25,6 @@ import pl.tlinkowski.unij.api.UniLists; import pl.tlinkowski.unij.api.UniMaps; -import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -33,7 +32,6 @@ import static io.confluent.parallelconsumer.ManagedTruth.assertTruth; import static io.confluent.parallelconsumer.ManagedTruth.assertWithMessage; import static io.confluent.parallelconsumer.state.PartitionStateManager.USED_PAYLOAD_THRESHOLD_MULTIPLIER_DEFAULT; -import static java.time.Duration.ofMillis; /** * UnitTest version of {@link OffsetEncodingBackPressureTest}. @@ -154,10 +152,6 @@ void backPressureShouldPreventTooManyMessagesBeingQueuedForProcessing() throws O assertTruth(workIfAvailable1).doesNotContain(0L); } - // more aggressive retry - Duration aggressiveDelay = ofMillis(100); - WorkContainer.setDefaultRetryDelay(aggressiveDelay); - // release message that was blocking partition progression wm.onFailureResult(findWC(workIfAvailable, 0L)); diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java index cc3f21f56..0b8b7f2e9 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java @@ -5,7 +5,6 @@ */ import com.google.common.truth.Truth; -import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.internal.PCModule; import io.confluent.parallelconsumer.state.PartitionState; @@ -87,7 +86,7 @@ void setupMock() { } private void injectSucceededWorkAtOffset(long offset) { - WorkContainer workContainer = new WorkContainer<>(0, mockCr, null, TimeUtils.getClock()); + WorkContainer workContainer = new WorkContainer<>(0, mockCr, null); Mockito.doReturn(offset).when(mockCr).offset(); state.addWorkContainer(workContainer); state.onSuccess(workContainer); // in this case the highest seen is also the highest succeeded diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java index c32537dfb..6704c79c2 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java @@ -4,7 +4,6 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.parallelconsumer.internal.PCModule; import io.confluent.parallelconsumer.internal.EpochAndRecordsMap; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; import lombok.Getter; @@ -22,8 +21,6 @@ import java.util.ArrayList; import java.util.List; -import static org.mockito.Mockito.mock; - @RequiredArgsConstructor public class ModelUtils { @@ -31,7 +28,7 @@ public class ModelUtils { public WorkContainer createWorkFor(long offset) { ConsumerRecord mockCr = Mockito.mock(ConsumerRecord.class); - WorkContainer workContainer = new WorkContainer<>(0, mockCr, mock(PCModule.class)); + WorkContainer workContainer = new WorkContainer<>(0, mockCr); Mockito.doReturn(offset).when(mockCr).offset(); return workContainer; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index 976cdbe2c..43ff8354a 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -7,7 +7,6 @@ import io.confluent.parallelconsumer.FakeRuntimeError; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.RecordContext; -import io.confluent.parallelconsumer.internal.PCModule; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -38,9 +37,7 @@ void retryDelayProvider() { .retryDelayProvider(retryDelayProvider) .build(); - WorkContainer wc = new WorkContainer(0, - mock(ConsumerRecord.class), - mock(PCModule.class)); + WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class)); // int numberOfFailures = 3; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index b05598b1e..6659a32c5 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -7,7 +7,6 @@ import com.google.common.truth.Truth; import io.confluent.csid.utils.KafkaTestUtils; import io.confluent.csid.utils.LongPollingMockConsumer; -import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.FakeRuntimeError; import io.confluent.parallelconsumer.ManagedTruth; import io.confluent.parallelconsumer.ParallelConsumerOptions; @@ -43,7 +42,6 @@ import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.*; import static java.time.Duration.ofSeconds; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; import static pl.tlinkowski.unij.api.UniLists.of; /** @@ -330,7 +328,7 @@ void testOrderedAndDelayed() { @Test void containerDelay() { - var wc = new WorkContainer(0, null, WorkContainer.DEFAULT_TYPE, mock(PartitionStateManager.class)); + var wc = new WorkContainer(0, null, WorkContainer.DEFAULT_TYPE); assertThat(wc.hasDelayPassed()).isTrue(); // when new, there's no delay wc.onUserFunctionFailure(new FakeRuntimeError("")); assertThat(wc.hasDelayPassed()).isFalse(); @@ -342,13 +340,13 @@ void containerDelay() { } private void advanceClockBySlightlyLessThanDelay() { - Duration retryDelay = WorkContainer.defaultRetryDelay; + Duration retryDelay = ParallelConsumerOptions.DEFAULT_STATIC_RETRY_DELAY; Duration duration = retryDelay.dividedBy(2); time.add(duration); } private void advanceClockByDelay() { - time.add(WorkContainer.defaultRetryDelay); + time.add(ParallelConsumerOptions.DEFAULT_STATIC_RETRY_DELAY); } private void advanceClock(Duration by) { @@ -590,7 +588,7 @@ void treeMapOrderingCorrect() { var treeMap = new TreeMap>(); for (ConsumerRecord record : records) { - treeMap.put(record.offset(), new WorkContainer<>(0, record, null, TimeUtils.getClock())); + treeMap.put(record.offset(), new WorkContainer<>(0, record, null)); } // read back, assert correct order From 555f03dd9e240674404b07a7fdcdf179e5a27e51 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 29 Sep 2022 14:49:51 +0100 Subject: [PATCH 13/20] fix test --- .../io/confluent/parallelconsumer/state/WorkContainer.java | 5 +---- .../offsets/WorkManagerOffsetMapCodecManagerTest.java | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 43d74fa8d..103ab9275 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -17,7 +17,6 @@ import java.time.Instant; import java.time.temporal.Temporal; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.Future; @@ -87,9 +86,7 @@ private PCModule getModule() { private Optional timeTakenAsWorkMs = Optional.empty(); - public WorkContainer(long epoch, ConsumerRecord cr, String workType) { - Objects.requireNonNull(workType); - + public WorkContainer(long epoch, ConsumerRecord cr, @NonNull String workType) { this.epoch = epoch; this.cr = cr; this.workType = workType; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java index 0b8b7f2e9..2aaa14b78 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java @@ -86,7 +86,7 @@ void setupMock() { } private void injectSucceededWorkAtOffset(long offset) { - WorkContainer workContainer = new WorkContainer<>(0, mockCr, null); + WorkContainer workContainer = new WorkContainer<>(0, mockCr); Mockito.doReturn(offset).when(mockCr).offset(); state.addWorkContainer(workContainer); state.onSuccess(workContainer); // in this case the highest seen is also the highest succeeded From 6efaf8108405ca1f98fc3b2ee140e6336e9a10aa Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 29 Sep 2022 15:05:16 +0100 Subject: [PATCH 14/20] fix test --- .../state/WorkContainerTest.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index 43ff8354a..cac842e18 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -7,6 +7,7 @@ import io.confluent.parallelconsumer.FakeRuntimeError; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.RecordContext; +import io.confluent.parallelconsumer.internal.PCModule; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -21,34 +22,37 @@ class WorkContainerTest { @Test void basics() { - var workContainer = new ModelUtils(new PCModuleTestEnv()).createWorkFor(0); + final var workContainer = new ModelUtils(new PCModuleTestEnv()).createWorkFor(0); assertThat(workContainer).getDelayUntilRetryDue().isNotNegative(); } @Test void retryDelayProvider() { - Function, Duration> retryDelayProvider = context -> { - int numberOfFailedAttempts = context.getNumberOfFailedAttempts(); - return Duration.ofSeconds(numberOfFailedAttempts); + final int uniqueMultiplier = 7; + + final Function, Duration> retryDelayProvider = context -> { + final int numberOfFailedAttempts = context.getNumberOfFailedAttempts(); + return Duration.ofSeconds(numberOfFailedAttempts * uniqueMultiplier); }; // - var opts = ParallelConsumerOptions.builder() + final var opts = ParallelConsumerOptions.builder() .retryDelayProvider(retryDelayProvider) .build(); + final PCModule module = new PCModuleTestEnv(opts); - WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class)); + final WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class)); // - int numberOfFailures = 3; + final int numberOfFailures = 3; wc.onUserFunctionFailure(new FakeRuntimeError("")); wc.onUserFunctionFailure(new FakeRuntimeError("")); wc.onUserFunctionFailure(new FakeRuntimeError("")); // - Duration retryDelayConfig = wc.getRetryDelayConfig(); + final Duration retryDelayConfig = wc.getRetryDelayConfig(); // - assertThat(retryDelayConfig).getSeconds().isEqualTo(numberOfFailures); + assertThat(retryDelayConfig).getSeconds().isEqualTo(numberOfFailures * uniqueMultiplier); } } From 50b1ac73d342ad0977c6d0160a09f764e78ca804 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 29 Sep 2022 15:06:13 +0100 Subject: [PATCH 15/20] fix test --- .../state/WorkContainerTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index cac842e18..efb651cd1 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -22,35 +22,35 @@ class WorkContainerTest { @Test void basics() { - final var workContainer = new ModelUtils(new PCModuleTestEnv()).createWorkFor(0); + var workContainer = new ModelUtils(new PCModuleTestEnv()).createWorkFor(0); assertThat(workContainer).getDelayUntilRetryDue().isNotNegative(); } @Test void retryDelayProvider() { - final int uniqueMultiplier = 7; + int uniqueMultiplier = 7; - final Function, Duration> retryDelayProvider = context -> { + Function, Duration> retryDelayProvider = context -> { final int numberOfFailedAttempts = context.getNumberOfFailedAttempts(); return Duration.ofSeconds(numberOfFailedAttempts * uniqueMultiplier); }; // - final var opts = ParallelConsumerOptions.builder() + var opts = ParallelConsumerOptions.builder() .retryDelayProvider(retryDelayProvider) .build(); - final PCModule module = new PCModuleTestEnv(opts); + PCModule module = new PCModuleTestEnv(opts); - final WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class)); + WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class)); // - final int numberOfFailures = 3; + int numberOfFailures = 3; wc.onUserFunctionFailure(new FakeRuntimeError("")); wc.onUserFunctionFailure(new FakeRuntimeError("")); wc.onUserFunctionFailure(new FakeRuntimeError("")); // - final Duration retryDelayConfig = wc.getRetryDelayConfig(); + Duration retryDelayConfig = wc.getRetryDelayConfig(); // assertThat(retryDelayConfig).getSeconds().isEqualTo(numberOfFailures * uniqueMultiplier); From 0e1bef02bc7c307d5254c64409f4a6413878d220 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 29 Sep 2022 19:18:44 +0100 Subject: [PATCH 16/20] fix test --- .../parallelconsumer/internal/PCModule.java | 1 + .../state/ProcessingShard.java | 2 +- .../parallelconsumer/state/WorkContainer.java | 4 +- .../internal/PCModuleTestEnv.java | 13 ++++++ .../state/WorkManagerTest.java | 45 +++++++++++++------ 5 files changed, 48 insertions(+), 17 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java index 5ee2e4d58..f6a90cc9b 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java @@ -78,6 +78,7 @@ protected ConsumerManager consumerManager() { return consumerManager; } + @Setter private WorkManager workManager; public WorkManager workManager() { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java index 90a9d1e97..de3dbd11c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java @@ -147,7 +147,7 @@ private void logSlowWork(Set> slowWork) { private void addToSlowWorkMaybe(Set> slowWork, WorkContainer workContainer) { var msgTemplate = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}."; Duration timeInFlight = workContainer.getTimeInFlight(); - var msg = msg(msgTemplate, workContainer, workContainer.hasDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight); + var msg = msg(msgTemplate, workContainer, workContainer.isDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight); Duration slowThreshold = options.getThresholdForTimeSpendInQueueWarning(); if (isGreaterThan(timeInFlight, slowThreshold)) { slowWork.add(workContainer); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 103ab9275..64060af15 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -101,7 +101,7 @@ public void endFlight() { inFlight = false; } - public boolean hasDelayPassed() { + public boolean isDelayPassed() { if (!hasPreviouslyFailed()) { // if never failed, there is no artificial delay, so "delay" has always passed return true; @@ -229,7 +229,7 @@ public boolean hasPreviouslyFailed() { * {@link PartitionStateManager#isAllowedMoreRecords(WorkContainer)}. */ public boolean isAvailableToTakeAsWork() { - return isNotInFlight() && !isUserFunctionSucceeded() && hasDelayPassed(); + return isNotInFlight() && !isUserFunctionSucceeded() && isDelayPassed(); } /** diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/PCModuleTestEnv.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/PCModuleTestEnv.java index b99f0adba..7125c6280 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/PCModuleTestEnv.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/PCModuleTestEnv.java @@ -6,10 +6,14 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.state.ModelUtils; +import lombok.Getter; import lombok.NonNull; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; import org.mockito.Mockito; +import org.threeten.extra.MutableClock; + +import java.time.Clock; /** * Version of the {@link PCModule} in test contexts. @@ -73,4 +77,13 @@ protected ConsumerManager consumerManager() { return consumerManager; } + + @Getter + private final MutableClock mutableClock = MutableClock.epochUTC(); + + @Override + public Clock clock() { + return mutableClock; + } + } \ No newline at end of file diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index 6659a32c5..206576f56 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -12,6 +12,7 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.internal.EpochAndRecordsMap; import io.confluent.parallelconsumer.internal.PCModule; +import io.confluent.parallelconsumer.internal.PCModuleTestEnv; import io.confluent.parallelconsumer.truth.CommitHistorySubject; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -26,6 +27,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; @@ -42,11 +45,16 @@ import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.*; import static java.time.Duration.ofSeconds; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import static pl.tlinkowski.unij.api.UniLists.of; /** + * Needs to run in {@link ExecutionMode#SAME_THREAD} because it manipulates the static state in + * {@link WorkContainer#setStaticModule(PCModule)}. + * * @see WorkManager */ +@Execution(ExecutionMode.SAME_THREAD) @Slf4j public class WorkManagerTest { @@ -57,7 +65,7 @@ public class WorkManagerTest { int offset; - MutableClock time = MutableClock.epochUTC(); + PCModuleTestEnv module; @BeforeEach public void setup() { @@ -65,6 +73,10 @@ public void setup() { setupWorkManager(options); } + private MutableClock getClock() { + return module.getMutableClock(); + } + protected List> successfulWork = new ArrayList<>(); private void setupWorkManager(ParallelConsumerOptions options) { @@ -73,12 +85,15 @@ private void setupWorkManager(ParallelConsumerOptions options) { var mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); var optsOverride = options.toBuilder().consumer(mockConsumer).build(); - wm = new WorkManager<>(new PCModule(optsOverride)); // inject time + module = new PCModuleTestEnv(optsOverride); + + wm = new WorkManager<>(module); wm.getSuccessfulWorkListeners().add((work) -> { log.debug("Heard some successful work: {}", work); successfulWork.add(work); }); + module.setWorkManager(wm); } private void assignPartition(final int partition) { @@ -328,29 +343,31 @@ void testOrderedAndDelayed() { @Test void containerDelay() { - var wc = new WorkContainer(0, null, WorkContainer.DEFAULT_TYPE); - assertThat(wc.hasDelayPassed()).isTrue(); // when new, there's no delay + var wc = new WorkContainer(0, mock(ConsumerRecord.class), WorkContainer.DEFAULT_TYPE); + assertThat(wc.isDelayPassed()).isTrue(); // when new, there's no delay wc.onUserFunctionFailure(new FakeRuntimeError("")); - assertThat(wc.hasDelayPassed()).isFalse(); + assertThat(wc.isDelayPassed()).isFalse(); advanceClockBySlightlyLessThanDelay(); - assertThat(wc.hasDelayPassed()).isFalse(); + assertThat(wc.isDelayPassed()).isFalse(); advanceClockByDelay(); - boolean actual = wc.hasDelayPassed(); - assertThat(actual).isTrue(); + final MutableClock mutableClock = module.getMutableClock(); + boolean delayPassed = wc.isDelayPassed(); + ManagedTruth.assertThat(wc).isDelayPassed(); } private void advanceClockBySlightlyLessThanDelay() { - Duration retryDelay = ParallelConsumerOptions.DEFAULT_STATIC_RETRY_DELAY; + Duration retryDelay = module.options().getDefaultMessageRetryDelay(); Duration duration = retryDelay.dividedBy(2); - time.add(duration); + getClock().add(duration); } private void advanceClockByDelay() { - time.add(ParallelConsumerOptions.DEFAULT_STATIC_RETRY_DELAY); + Duration retryDelay = module.options().getDefaultMessageRetryDelay(); + getClock().add(retryDelay); } private void advanceClock(Duration by) { - time.add(by); + getClock().add(by); } @Test @@ -588,7 +605,7 @@ void treeMapOrderingCorrect() { var treeMap = new TreeMap>(); for (ConsumerRecord record : records) { - treeMap.put(record.offset(), new WorkContainer<>(0, record, null)); + treeMap.put(record.offset(), new WorkContainer<>(0, record)); } // read back, assert correct order @@ -687,7 +704,7 @@ void resumesFromNextShard(ParallelConsumerOptions.ProcessingOrder order) { * initial request (without needing to iterate to other shards) * * @see #236 Under some conditions, a - * shard (by partition or key), can get starved for attention + * shard (by partition or key), can get starved for attention */ @Test void starvation() { From f7fa39aba3dd9a77cd0e2bd006a480cfc623009a Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 30 Sep 2022 12:24:55 +0100 Subject: [PATCH 17/20] remove double assert --- .../io/confluent/parallelconsumer/state/WorkManagerTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index 206576f56..401457645 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -350,8 +350,6 @@ void containerDelay() { advanceClockBySlightlyLessThanDelay(); assertThat(wc.isDelayPassed()).isFalse(); advanceClockByDelay(); - final MutableClock mutableClock = module.getMutableClock(); - boolean delayPassed = wc.isDelayPassed(); ManagedTruth.assertThat(wc).isDelayPassed(); } From 2bdf776329cbc0f4e8a84996fd18e1d36cfde0b0 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 30 Sep 2022 12:34:29 +0100 Subject: [PATCH 18/20] manual revert: another version: cast static reference of module to instance type params - currently guaranteed to be the same --- .../parallelconsumer/internal/PCModule.java | 7 ----- .../state/PartitionStateManager.java | 5 +++- .../parallelconsumer/state/WorkContainer.java | 26 ++++++++----------- .../WorkManagerOffsetMapCodecManagerTest.java | 4 ++- .../parallelconsumer/state/ModelUtils.java | 5 +++- .../state/WorkContainerTest.java | 4 ++- .../state/WorkManagerTest.java | 4 +-- 7 files changed, 27 insertions(+), 28 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java index f6a90cc9b..4e6968256 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java @@ -7,7 +7,6 @@ import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; -import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.Setter; import org.apache.kafka.clients.consumer.Consumer; @@ -31,12 +30,6 @@ public class PCModule { public PCModule(ParallelConsumerOptions options) { this.optionsInstance = options; - - setStaticReferences(); - } - - private void setStaticReferences() { - WorkContainer.setStaticModule(this); } public ParallelConsumerOptions options() { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index c7badcede..2aff1eeb8 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -66,9 +66,12 @@ public class PartitionStateManager implements ConsumerRebalanceListener { */ private final Map partitionsAssignmentEpochs = new ConcurrentHashMap<>(); + private final PCModule module; + public PartitionStateManager(PCModule module, ShardManager sm) { this.consumer = module.consumer(); this.sm = sm; + this.module = module; } public PartitionState getPartitionState(TopicPartition tp) { @@ -353,7 +356,7 @@ private void maybeRegisterNewRecordAsWork(@NonNull Long epochOfInboundRecords, @ if (isRecordPreviouslyCompleted(rec)) { log.trace("Record previously completed, skipping. offset: {}", rec.offset()); } else { - var work = new WorkContainer<>(epochOfInboundRecords, rec); + var work = new WorkContainer<>(epochOfInboundRecords, rec, module); sm.addWorkContainer(work); addWorkContainer(work); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 64060af15..874d32941 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -33,16 +33,11 @@ public class WorkContainer implements Comparable> { static final String DEFAULT_TYPE = "DEFAULT"; /** - * @see PCModule#setStaticReferences() + * Instance reference to otherwise static state, for access to the instance type parameters of WorkContainer as + * static fields cannot access them. */ - @Setter @NonNull - private static PCModule staticModule; - - private PCModule getModule() { - // Cast the type parameters of WorkContainer as static fields cannot access them, however as built they are guaranteed to match. - return (PCModule) staticModule; - } + private final PCModule module; /** * Assignment generation this record comes from. Used for fencing messages after partition loss, for work lingering @@ -86,14 +81,15 @@ private PCModule getModule() { private Optional timeTakenAsWorkMs = Optional.empty(); - public WorkContainer(long epoch, ConsumerRecord cr, @NonNull String workType) { + public WorkContainer(long epoch, ConsumerRecord cr, @NonNull PCModule module, @NonNull String workType) { this.epoch = epoch; this.cr = cr; this.workType = workType; + this.module = module; } - public WorkContainer(long epoch, ConsumerRecord cr) { - this(epoch, cr, DEFAULT_TYPE); + public WorkContainer(long epoch, ConsumerRecord cr, PCModule module) { + this(epoch, cr, module, DEFAULT_TYPE); } public void endFlight() { @@ -115,7 +111,7 @@ public boolean isDelayPassed() { * @return time until it should be retried */ public Duration getDelayUntilRetryDue() { - Instant now = getModule().clock().instant(); + Instant now = module.clock().instant(); Temporal nextAttemptAt = getRetryDueAt(); return Duration.between(now, nextAttemptAt); } @@ -138,7 +134,7 @@ public Instant getRetryDueAt() { * @return the delay between retries e.g. retry after 1 second */ public Duration getRetryDelayConfig() { - var options = getModule().options(); + var options = module.options(); var retryDelayProvider = options.getRetryDelayProvider(); if (retryDelayProvider != null) { return retryDelayProvider.apply(new RecordContext<>(this)); @@ -174,7 +170,7 @@ public TopicPartition getTopicPartition() { } public void onUserFunctionSuccess() { - this.succeededAt = of(getModule().clock().instant()); + this.succeededAt = of(module.clock().instant()); this.maybeUserFunctionSucceeded = of(true); } @@ -188,7 +184,7 @@ public void onUserFunctionFailure(Throwable cause) { private void updateFailureHistory(Throwable cause) { numberOfFailedAttempts++; - lastFailedAt = of(Instant.now(getModule().clock())); + lastFailedAt = of(Instant.now(module.clock())); lastFailureReason = Optional.ofNullable(cause); } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java index 2aaa14b78..5bdca1c90 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/offsets/WorkManagerOffsetMapCodecManagerTest.java @@ -7,6 +7,7 @@ import com.google.common.truth.Truth; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.internal.PCModule; +import io.confluent.parallelconsumer.internal.PCModuleTestEnv; import io.confluent.parallelconsumer.state.PartitionState; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; @@ -46,6 +47,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Optional.of; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; // todo refactor - remove tests which use hard coded state vs dynamic state - #compressionCycle, #selialiseCycle, #runLengthEncoding, #loadCompressedRunLengthRncoding @Slf4j @@ -86,7 +88,7 @@ void setupMock() { } private void injectSucceededWorkAtOffset(long offset) { - WorkContainer workContainer = new WorkContainer<>(0, mockCr); + WorkContainer workContainer = new WorkContainer<>(0, mockCr, mock(PCModuleTestEnv.class)); Mockito.doReturn(offset).when(mockCr).offset(); state.addWorkContainer(workContainer); state.onSuccess(workContainer); // in this case the highest seen is also the highest succeeded diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java index 6704c79c2..47f242db2 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java @@ -5,6 +5,7 @@ */ import io.confluent.parallelconsumer.internal.EpochAndRecordsMap; +import io.confluent.parallelconsumer.internal.PCModule; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; import lombok.Getter; import lombok.NonNull; @@ -21,6 +22,8 @@ import java.util.ArrayList; import java.util.List; +import static org.mockito.Mockito.mock; + @RequiredArgsConstructor public class ModelUtils { @@ -28,7 +31,7 @@ public class ModelUtils { public WorkContainer createWorkFor(long offset) { ConsumerRecord mockCr = Mockito.mock(ConsumerRecord.class); - WorkContainer workContainer = new WorkContainer<>(0, mockCr); + WorkContainer workContainer = new WorkContainer<>(0, mockCr, mock(PCModule.class)); Mockito.doReturn(offset).when(mockCr).offset(); return workContainer; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index efb651cd1..5a9ccc310 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -41,7 +41,9 @@ void retryDelayProvider() { .build(); PCModule module = new PCModuleTestEnv(opts); - WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class)); + WorkContainer wc = new WorkContainer(0, + mock(ConsumerRecord.class), + mock(PCModule.class)); // int numberOfFailures = 3; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index 401457645..297a7c2b6 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -343,7 +343,7 @@ void testOrderedAndDelayed() { @Test void containerDelay() { - var wc = new WorkContainer(0, mock(ConsumerRecord.class), WorkContainer.DEFAULT_TYPE); + var wc = new WorkContainer(0, mock(ConsumerRecord.class), mock(PCModuleTestEnv.class)); assertThat(wc.isDelayPassed()).isTrue(); // when new, there's no delay wc.onUserFunctionFailure(new FakeRuntimeError("")); assertThat(wc.isDelayPassed()).isFalse(); @@ -603,7 +603,7 @@ void treeMapOrderingCorrect() { var treeMap = new TreeMap>(); for (ConsumerRecord record : records) { - treeMap.put(record.offset(), new WorkContainer<>(0, record)); + treeMap.put(record.offset(), new WorkContainer<>(0, record, mock(PCModuleTestEnv.class))); } // read back, assert correct order From 0a590f96b503220032a997dbbc8c5cb3cc7590e1 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 30 Sep 2022 13:37:12 +0100 Subject: [PATCH 19/20] fixup! manual revert: another version: cast static reference of module to instance type params - currently guaranteed to be the same --- .../java/io/confluent/parallelconsumer/state/ModelUtils.java | 5 +---- .../confluent/parallelconsumer/state/WorkContainerTest.java | 2 +- .../io/confluent/parallelconsumer/state/WorkManagerTest.java | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java index 47f242db2..a3ecd025f 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/ModelUtils.java @@ -5,7 +5,6 @@ */ import io.confluent.parallelconsumer.internal.EpochAndRecordsMap; -import io.confluent.parallelconsumer.internal.PCModule; import io.confluent.parallelconsumer.internal.PCModuleTestEnv; import lombok.Getter; import lombok.NonNull; @@ -22,8 +21,6 @@ import java.util.ArrayList; import java.util.List; -import static org.mockito.Mockito.mock; - @RequiredArgsConstructor public class ModelUtils { @@ -31,7 +28,7 @@ public class ModelUtils { public WorkContainer createWorkFor(long offset) { ConsumerRecord mockCr = Mockito.mock(ConsumerRecord.class); - WorkContainer workContainer = new WorkContainer<>(0, mockCr, mock(PCModule.class)); + WorkContainer workContainer = new WorkContainer<>(0, mockCr, module); Mockito.doReturn(offset).when(mockCr).offset(); return workContainer; } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java index 5a9ccc310..abc9bfa71 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkContainerTest.java @@ -43,7 +43,7 @@ void retryDelayProvider() { WorkContainer wc = new WorkContainer(0, mock(ConsumerRecord.class), - mock(PCModule.class)); + module); // int numberOfFailures = 3; diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index 297a7c2b6..d2b750e00 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -343,7 +343,7 @@ void testOrderedAndDelayed() { @Test void containerDelay() { - var wc = new WorkContainer(0, mock(ConsumerRecord.class), mock(PCModuleTestEnv.class)); + var wc = new WorkContainer(0, mock(ConsumerRecord.class), module); assertThat(wc.isDelayPassed()).isTrue(); // when new, there's no delay wc.onUserFunctionFailure(new FakeRuntimeError("")); assertThat(wc.isDelayPassed()).isFalse(); From 51ade3edc1fd8c4a131ae1809eaf2dc3543fb2a2 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 30 Sep 2022 14:15:39 +0100 Subject: [PATCH 20/20] changelog --- CHANGELOG.adoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index e7fd2402b..fbe6ef1c1 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -24,6 +24,10 @@ endif::[] ** Introduces lock acquisition timeouts. ** Fixes a potential issue with removing records from the retry queue incorrectly, by having an inconsistency between compareTo and equals in the retry TreeMap. +=== Fixes + +* Fixes #412: ClassCastException with retryDelayProvider (#417) + == v0.5.2.3 === Improvements