-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run multiple instance of scheduler on one JVM #395
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of small changes
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). | ||
* It deletes leases for shards that have been trimmed from Kinesis, or if we've completed processing it | ||
* and begun processing it's child shards. | ||
*/ | ||
@NoArgsConstructor(access = AccessLevel.PRIVATE) | ||
@NoArgsConstructor(access = AccessLevel.PUBLIC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need to add this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream, | ||
boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, | ||
ExecutorService executorService, MetricsFactory metricsFactory) { | ||
this(shardDetector, leaseRefresher, initialPositionInStream, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, executorService, metricsFactory, new ShardSyncer()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not depend on the ShardSyncTaskManager
creating the ShardSyncer
, instead handle the creation in DynamoDBLeaseManagementFactory
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified.
…d LeaseManagementConfig
@@ -220,6 +220,15 @@ public LeaseManagementConfig metricsFactory(final MetricsFactory metricsFactory) | |||
} | |||
}; | |||
|
|||
private ShardSyncer shardSyncer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to create the getter
private ShardSyncer shardSyncer = new ShardSyncer();
This should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes you are right
@NonNull | ||
private final ShardSyncer shardSyncer; | ||
|
||
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change the order of MetricsFactory and ShardSyncer. We have maintained MetricsFactory as the last parameter of the constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified
@@ -71,7 +68,7 @@ | |||
* @throws KinesisClientLibIOException | |||
*/ | |||
// CHECKSTYLE:OFF CyclomaticComplexity | |||
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, | |||
public synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a breaking change. A better way would be to deprecate the existing method and creating a new non-static method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, modified.
* worker and checks if the second worker can start syncing shards. | ||
*/ | ||
@Test(timeout = 30000L) | ||
public final void testWorkersCanSyncShardsInParallel() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid multithreaded testing in unit tests. Testing a single instance should be fine for unit testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will remove this unit test.
…e the order for metricsFactory and HierarchichalShardSyncer in ShardConsumerArgument
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, just a few more minor changes
@@ -236,7 +239,8 @@ private void initialize() { | |||
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { | |||
log.info("Syncing Kinesis shard info"); | |||
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, | |||
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, metricsFactory); | |||
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, | |||
hierarchichalShardSyncer, metricsFactory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misspelled Hierarchical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ye.. Fixed
private final MetricsFactory metricsFactory; | ||
|
||
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you create the deprecated constructor too, with the default being HierarchicalShardSyncer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -81,7 +97,7 @@ private synchronized boolean checkAndSubmitNextTask() { | |||
initialPositionInStream, | |||
cleanupLeasesUponShardCompletion, | |||
ignoreUnexpectedChildShards, | |||
shardSyncIdleTimeMillis, | |||
shardSyncIdleTimeMillis, hierarchichalShardSyncer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you follow the formatting style for the file and include the parameter on a new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -54,6 +55,7 @@ | |||
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); | |||
private static final ShutdownReason TERMINATE_SHUTDOWN_REASON = ShutdownReason.SHARD_END; | |||
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); | |||
private static final HierarchichalShardSyncer SHARD_SYNCER = new HierarchichalShardSyncer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to be an instance of the ShardSyncer? Or can it be mocked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DOne
@@ -109,7 +110,8 @@ public void setup() { | |||
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, | |||
listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, | |||
shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, | |||
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, metricsFactory, new AggregatorUtil()); | |||
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, new AggregatorUtil(), | |||
new HierarchichalShardSyncer(), metricsFactory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to be a new instance of the ShardSyncer? Or can it be a mock object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before this change, you used static methods in ShardSyncer.class and you never mock it. That's why I think we need to create a new instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use mock
* @param initialLeaseTableWriteCapacity | ||
* @param hierarchichalShardSyncer | ||
*/ | ||
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make it use the formatter on this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. Thanks for providing this.
Thanks for the change, and sorry about the delay. One more thing can you please pull resolve the conflicts, and update the PR. |
Description of changes:
Enable KCL V2 to run multiple instances of the Scheduler on a single JVM.
Multiple KCL schedulers should be able to run with different event source mappings of different customers in the same JVM.
Why?
Lambda event bridge application handles a high number of event source mappings of the customers. Running multiple KCL schedulers on the same instance and on the same JVM reduces the number of hosts required process the streams.
Solution:
There are static synchronized functions in KCL and they are not suitable for this scenario. They have to be modified to be synchronized per event source mapping and not per class.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.