Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add minimum idleTimeBetweenReadsInMillis to multilang #4

Open
wants to merge 6 commits into
base: kcl-throttling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public class KinesisClientLibConfiguration {
*/
public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;

/**
* The minimum time {@link ShardConsumer} should sleep between calls to
* {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}.
*/
public static final long MIN_IDLE_MILLIS_BETWEEN_READS = 200L;

/**
* Don't call processRecords() on the record processor for empty record lists.
*/
Expand Down Expand Up @@ -1098,6 +1104,10 @@ public KinesisClientLibConfiguration withMaxRecords(int maxRecords) {
*/
public KinesisClientLibConfiguration withIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
if (idleTimeBetweenReadsInMillis < MIN_IDLE_MILLIS_BETWEEN_READS) {
throw new IllegalArgumentException("idleTimeBetweenReadsInMillis must be greater than or equal to "
+ MIN_IDLE_MILLIS_BETWEEN_READS + " but current value is " + idleTimeBetweenReadsInMillis);
}
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class PollingConfig implements RetrievalSpecificConfig {

public static final int DEFAULT_MAX_RECORDS = 10000;

public static final long MIN_IDLE_MILLIS_BETWEEN_READS = 200L;
Copy link
Owner Author

@ethkatnic ethkatnic Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't like having 2 declarations of the same static final var, but don't see any existing constants classes. Centralizing this and any other static constants may be worthwhile but would bloat the PR.


/**
* Configurable functional interface to override the existing DataFetcher.
*/
Expand Down Expand Up @@ -138,9 +140,14 @@ public void setIdleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
/**
* Set the value for how long the ShardConsumer should sleep in between calls to
* {@link KinesisAsyncClient#getRecords(GetRecordsRequest)}. If this is not specified here the value provided in
* {@link RecordsFetcherFactory} will be used.
* {@link RecordsFetcherFactory} will be used. Cannot set value below MIN_IDLE_MILLIS_BETWEEN_READS
*/
public PollingConfig idleTimeBetweenReadsInMillis(long idleTimeBetweenReadsInMillis) {
if (idleTimeBetweenReadsInMillis < MIN_IDLE_MILLIS_BETWEEN_READS) {
throw new IllegalArgumentException("Value for idleTimeBetweenReadsInMillis must be less than or equal to "
+ MIN_IDLE_MILLIS_BETWEEN_READS + " but current value is "
+ idleTimeBetweenReadsInMillis());
}
usePollingConfigIdleTimeValue = true;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ public void testInvalidStateMultiWithStreamName() {
public void testInvalidRecordLimit() {
config.maxRecords(PollingConfig.DEFAULT_MAX_RECORDS + 1);
}

@Test(expected = IllegalArgumentException.class)
public void testInvalidIdleMillisLimit() {
config.idleTimeBetweenReadsInMillis(PollingConfig.MIN_IDLE_MILLIS_BETWEEN_READS - 1);
}
}