Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 210] - allow unexpected child shards to be ignored #240

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ public ITask createTask(ShardConsumer consumer) {
consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(),
consumer.isIgnoreUnexpectedChildShards(),
consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public class KinesisClientLibConfiguration {
*/
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;

/**
* Ignore child shards with open parents.
*/
public static final boolean DEFAULT_IGNORE_UNEXPECTED_CHILD_SHARDS = false;

/**
* Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
*/
Expand Down Expand Up @@ -200,6 +205,7 @@ public class KinesisClientLibConfiguration {
private boolean callProcessRecordsEvenForEmptyRecordList;
private long parentShardPollIntervalMillis;
private boolean cleanupLeasesUponShardCompletion;
private boolean ignoreUnexpectedChildShards;
private ClientConfiguration kinesisClientConfig;
private ClientConfiguration dynamoDBClientConfig;
private ClientConfiguration cloudWatchClientConfig;
Expand Down Expand Up @@ -289,6 +295,7 @@ public KinesisClientLibConfiguration(String applicationName,
DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
DEFAULT_IGNORE_UNEXPECTED_CHILD_SHARDS,
new ClientConfiguration(),
new ClientConfiguration(),
new ClientConfiguration(),
Expand Down Expand Up @@ -322,6 +329,7 @@ public KinesisClientLibConfiguration(String applicationName,
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
Expand Down Expand Up @@ -351,6 +359,7 @@ public KinesisClientLibConfiguration(String applicationName,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
boolean ignoreUnexpectedChildShards,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're changing a public constructor and adding a parameter in the middle of it. This will break anyone who uses the constructor. I would prefer to add a new constructor with the parameter. The other option is not to have the parameter, and use the with/set operations to configure the feature.

At some point we will look into switching to a builder pattern for the configuration.

ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
Expand All @@ -364,7 +373,7 @@ public KinesisClientLibConfiguration(String applicationName,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis,
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis,
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, ignoreUnexpectedChildShards,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis);
Expand Down Expand Up @@ -393,6 +402,7 @@ public KinesisClientLibConfiguration(String applicationName,
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
Expand Down Expand Up @@ -422,6 +432,7 @@ public KinesisClientLibConfiguration(String applicationName,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
boolean ignoreUnexpectedChildShards,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
Expand Down Expand Up @@ -458,6 +469,7 @@ public KinesisClientLibConfiguration(String applicationName,
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.workerIdentifier = workerId;
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
Expand Down Expand Up @@ -503,6 +515,7 @@ public KinesisClientLibConfiguration(String applicationName,
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
Expand Down Expand Up @@ -532,6 +545,7 @@ public KinesisClientLibConfiguration(String applicationName,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
boolean ignoreUnexpectedChildShards,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
Expand Down Expand Up @@ -567,6 +581,7 @@ public KinesisClientLibConfiguration(String applicationName,
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.workerIdentifier = workerId;
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
Expand Down Expand Up @@ -802,6 +817,13 @@ public boolean shouldCleanupLeasesUponShardCompletion() {
return cleanupLeasesUponShardCompletion;
}

/**
* @return true if we should ignore child shards which have open parents
*/
public boolean shouldIgnoreUnexpectedChildShards() {
return ignoreUnexpectedChildShards;
}

/**
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
Expand Down Expand Up @@ -1022,6 +1044,16 @@ public KinesisClientLibConfiguration withCleanupLeasesUponShardCompletion(
return this;
}

/**
* @param ignoreUnexpectedChildShards Ignore child shards with open parents.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withIgnoreUnexpectedChildShards(
boolean ignoreUnexpectedChildShards) {
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
return this;
}

/**
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
* @return KinesisClientLibConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,10 @@ boolean isCleanupLeasesOfCompletedShards() {
return cleanupLeasesOfCompletedShards;
}

boolean isIgnoreUnexpectedChildShards() {
return config.shouldIgnoreUnexpectedChildShards();
}

long getTaskBackoffTimeMillis() {
return taskBackoffTimeMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ShardSyncTask implements ITask {
private final ILeaseManager<KinesisClientLease> leaseManager;
private InitialPositionInStreamExtended initialPosition;
private final boolean cleanupLeasesUponShardCompletion;
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
private final TaskType taskType = TaskType.SHARDSYNC;

Expand All @@ -49,11 +50,13 @@ class ShardSyncTask implements ITask {
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards,
long shardSyncTaskIdleTimeMillis) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.initialPosition = initialPositionInStream;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncTaskIdleTimeMillis = shardSyncTaskIdleTimeMillis;
}

Expand All @@ -68,7 +71,8 @@ public TaskResult call() {
ShardSyncer.checkAndCreateLeasesForNewShards(kinesisProxy,
leaseManager,
initialPosition,
cleanupLeasesUponShardCompletion);
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ShardSyncTaskManager {
private final ExecutorService executorService;
private final InitialPositionInStreamExtended initialPositionInStream;
private boolean cleanupLeasesUponShardCompletion;
private boolean ignoreUnexpectedChildShards;
private final long shardSyncIdleTimeMillis;


Expand All @@ -55,6 +56,7 @@ class ShardSyncTaskManager {
* @param initialPositionInStream Initial position in stream
* @param cleanupLeasesUponShardCompletion Clean up leases for shards that we've finished processing (don't wait
* until they expire)
* @param ignoreUnexpectedChildShards Ignore child shards with open parents
* @param shardSyncIdleTimeMillis Time between tasks to sync leases and Kinesis shards
* @param metricsFactory Metrics factory
* @param executorService ExecutorService to execute the shard sync tasks
Expand All @@ -63,13 +65,15 @@ class ShardSyncTaskManager {
final ILeaseManager<KinesisClientLease> leaseManager,
final InitialPositionInStreamExtended initialPositionInStream,
final boolean cleanupLeasesUponShardCompletion,
final boolean ignoreUnexpectedChildShards,
final long shardSyncIdleTimeMillis,
final IMetricsFactory metricsFactory,
ExecutorService executorService) {
this.kinesisProxy = kinesisProxy;
this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
this.ignoreUnexpectedChildShards = ignoreUnexpectedChildShards;
this.shardSyncIdleTimeMillis = shardSyncIdleTimeMillis;
this.executorService = executorService;
this.initialPositionInStream = initialPositionInStream;
Expand Down Expand Up @@ -99,6 +103,7 @@ private synchronized boolean checkAndSubmitNextTask(Set<String> closedShardIds)
leaseManager,
initialPositionInStream,
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis), metricsFactory);
future = executorService.submit(currentTask);
submittedNewTask = true;
Expand Down
Loading