Skip to content

Commit

Permalink
Remove duplicated batchSize factor from target load; add a test case (#…
Browse files Browse the repository at this point in the history
…662)

* Remove duplicated batchSize factor from target load; add a test case
  • Loading branch information
cloudhunter89 authored Nov 16, 2023
1 parent 075b4f9 commit 0c9b7da
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ toc::[]
endif::[]
== 0.5.2.8

* fix: Fix target loading computation for inflight records

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,8 +997,7 @@ protected int getTargetOutForProcessing() {

protected int getQueueTargetLoaded() {
//noinspection unchecked
int batch = options.getBatchSize();
return getPoolLoadTarget() * dynamicExtraLoadFactor.getCurrentFactor() * batch;
return getPoolLoadTarget() * dynamicExtraLoadFactor.getCurrentFactor();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.TestParallelEoSStreamProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* Tests to verify the protected and internal methods of
* {@link io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor} work as expected.
* <p>
*
* @author Jonathon Koyle
*/
@Slf4j
class AbstractParallelEoSStreamProcessorConfigurationTest {

/**
* Test that the {@link io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor#getQueueTargetLoaded}
*/
@Test
void queueTargetLoad() {
final int batchSize = 10;
final int concurrency = 2;
final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
final ParallelConsumerOptions<String, String> testOptions = ParallelConsumerOptions.<String, String>builder()
.batchSize(batchSize)
.maxConcurrency(concurrency)
.consumer(consumer)
.build();
try (final TestParallelEoSStreamProcessor<String, String> testInstance = new TestParallelEoSStreamProcessor<>(testOptions)) {
final int defaultLoad = 2;
final int expectedTargetLoad = batchSize * concurrency * defaultLoad;

final int actualTargetLoad = testInstance.getTargetLoad();

Assertions.assertEquals(expectedTargetLoad, actualTargetLoad);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;

/**
* Provides a set of methods for testing internal and configuration based interfaces of
* {@link AbstractParallelEoSStreamProcessor}.
*/
public class TestParallelEoSStreamProcessor<K, V> extends AbstractParallelEoSStreamProcessor<K, V> {
public TestParallelEoSStreamProcessor(final ParallelConsumerOptions<K, V> newOptions) {
super(newOptions);
}

public int getTargetLoad() { return getQueueTargetLoaded(); }
}

0 comments on commit 0c9b7da

Please sign in to comment.