Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove duplicated batchSize factor from target load; add a test case #662

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ toc::[]
endif::[]
== 0.5.2.8

* fix: Fix target loading computation for inflight records

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,8 +997,7 @@ protected int getTargetOutForProcessing() {

protected int getQueueTargetLoaded() {
//noinspection unchecked
int batch = options.getBatchSize();
return getPoolLoadTarget() * dynamicExtraLoadFactor.getCurrentFactor() * batch;
return getPoolLoadTarget() * dynamicExtraLoadFactor.getCurrentFactor();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.TestParallelEoSStreamProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* Tests that the protected and internal methods of
* {@link io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor} work as expected.
* <p>
*
* @author Jonathon Koyle
*/
@Slf4j
class ParallelEoSStreamProcessorNonRunningTest {
cloudhunter89 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Test that the mock consumer works as expected
*/
@Test
void getTargetLoad() {
final int batchSize = 10;
final int concurrency = 2;
final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
final ParallelConsumerOptions<String, String> testOptions = ParallelConsumerOptions.<String, String>builder()
.batchSize(batchSize)
.maxConcurrency(concurrency)
.consumer(consumer)
.build();
try (final TestParallelEoSStreamProcessor<String, String> testInstance = new TestParallelEoSStreamProcessor<>(testOptions)) {
final int defaultLoad = 2;
final int expectedTargetLoad = batchSize * concurrency * defaultLoad;

final int actualTargetLoad = testInstance.getTargetLoad();

Assertions.assertEquals(expectedTargetLoad, actualTargetLoad);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;

/**
* Provides a set of methods for testing internal and configuration based interfaces of
* {@link AbstractParallelEoSStreamProcessor}.
*/
public class TestParallelEoSStreamProcessor<K, V> extends AbstractParallelEoSStreamProcessor<K, V> {
public TestParallelEoSStreamProcessor(final ParallelConsumerOptions<K, V> newOptions) {
super(newOptions);
}

public int getTargetLoad() { return getQueueTargetLoaded(); }
}