Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add minimum idleTimeBetweenReadsInMillis to multilang #4

Open
wants to merge 6 commits into
base: kcl-throttling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public class KinesisClientLibConfiguration {
*/
public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;

/**
* The minimum time {@link ShardConsumer} should sleep between calls to
* {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}.
*/
public static final long MIN_IDLE_MILLIS_BETWEEN_READS = 200L;

/**
* Don't call processRecords() on the record processor for empty record lists.
*/
Expand Down Expand Up @@ -1098,6 +1104,7 @@ public KinesisClientLibConfiguration withMaxRecords(int maxRecords) {
*/
public KinesisClientLibConfiguration withIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
idleTimeBetweenReadsInMillis = Math.max(idleTimeBetweenReadsInMillis, MIN_IDLE_MILLIS_BETWEEN_READS);
Copy link
Owner Author

@ethkatnic ethkatnic Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if changes in this file are necessary, as I believe this is only used in the KCL1, but I would like to verify this. Similar change was done here.

this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ shardSyncIntervalMillis = 60000
maxRecords = 10000

# Idle time between record reads in milliseconds.
# Setting this property lower than 200ms will be overwritten to 200ms.
idleTimeBetweenReadsInMillis = 1000

# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.kinesis.retrieval.DataFetcherProviderConfig;
Expand All @@ -38,12 +39,15 @@
@Setter
@ToString
@EqualsAndHashCode
@Slf4j
public class PollingConfig implements RetrievalSpecificConfig {

public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(30);

public static final int DEFAULT_MAX_RECORDS = 10000;

public static final long MIN_IDLE_MILLIS_BETWEEN_READS = 200L;
Copy link
Owner Author

@ethkatnic ethkatnic Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't like having 2 declarations of the same static final var, but don't see any existing constants classes. Centralizing this and any other static constants may be worthwhile but would bloat the PR.


/**
* Configurable functional interface to override the existing DataFetcher.
*/
Expand Down Expand Up @@ -138,9 +142,14 @@ public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
/**
* Set the value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in
* {@link RecordsFetcherFactory} will be used.
* {@link RecordsFetcherFactory} will be used. Cannot set value below MIN_IDLE_MILLIS_BETWEEN_READS
*/
public PollingConfig idleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
if (idleTimeBetweenReadsInMillis < MIN_IDLE_MILLIS_BETWEEN_READS) {
log.warn("idleTimeBetweenReadsInMillis config property is set below the minimum allowed value of 200ms."
+ " This value will be automatically adjusted to 200ms.");
idleTimeBetweenReadsInMillis = MIN_IDLE_MILLIS_BETWEEN_READS;
}
usePollingConfigIdleTimeValue = true;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ public void testInvalidStateMultiWithStreamName() {
public void testInvalidRecordLimit() {
config.maxRecords(PollingConfig.DEFAULT_MAX_RECORDS + 1);
}

@Test
public void testMinIdleMillisLimit() {
config.idleTimeBetweenReadsInMillis(0);
assertEquals(config.idleTimeBetweenReadsInMillis(), PollingConfig.MIN_IDLE_MILLIS_BETWEEN_READS);
}
}