-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce initialization exception handler in KCL V2 #369
Changes from 1 commit
5205d25
73d3320
7d1d091
31f2ce6
c8404ef
fbecf73
5f841a9
8f9f8b7
d52873b
20209c2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Amazon Software License (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/asl/ | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.kinesis.coordinator; | ||
|
||
/** | ||
* | ||
*/ | ||
public interface CoordinatorExceptionHandler { | ||
void propogate(Exception e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above, some documentation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Amazon Software License (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/asl/ | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.kinesis.coordinator; | ||
|
||
public class NoOpCoordinatorExceptionHandler implements CoordinatorExceptionHandler { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A better name would be PassThroughCoordinatorExceptionHandler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, will change |
||
|
||
/** | ||
* Empty constructor for NoOp Exception Handler | ||
*/ | ||
public NoOpCoordinatorExceptionHandler() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could use @NoArgumentConstructor from lombok There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. I was trying to be consistent with, for example, NoOpWorkerStateChangeListener.java and NoOpShardPrioritization.java. They are having empty constructors |
||
|
||
} | ||
|
||
@Override | ||
public void propogate(Exception e) { | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,6 +97,7 @@ public class Scheduler implements Runnable { | |
private final LeaseCoordinator leaseCoordinator; | ||
private final ShardSyncTaskManager shardSyncTaskManager; | ||
private final ShardPrioritization shardPrioritization; | ||
private final CoordinatorExceptionHandler coordinatorExceptionHandler; | ||
private final boolean cleanupLeasesUponShardCompletion; | ||
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; | ||
private final GracefulShutdownCoordinator gracefulShutdownCoordinator; | ||
|
@@ -165,6 +166,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, | |
this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() | ||
.createShardSyncTaskManager(this.metricsFactory); | ||
this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); | ||
this.coordinatorExceptionHandler = this.coordinatorConfig.coordinatorExceptionHandler(); | ||
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); | ||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = | ||
this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(); | ||
|
@@ -193,20 +195,26 @@ public void run() { | |
return; | ||
} | ||
|
||
Exception exceptionToThrow = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't feel like the right place for this. Should we instead put onInitializationException at the point that the exception was thrown? |
||
try { | ||
initialize(); | ||
log.info("Initialization complete. Starting worker loop."); | ||
} catch (RuntimeException e) { | ||
log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); | ||
shutdown(); | ||
exceptionToThrow = e; | ||
} | ||
|
||
while (!shouldShutdown()) { | ||
exceptionToThrow = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need not set this. If exception is caught from initialize, shutdown is invoked and shouldShutdown will return true if the Scheduler can successfully shutdown. It can enter to make sure that ShardConsumers have been shutdown successfully. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will delete |
||
runProcessLoop(); | ||
} | ||
|
||
finalShutdown(); | ||
log.info("Worker loop is complete. Exiting from worker."); | ||
if (exceptionToThrow != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't feel this is the correct location to report the exception. We should report as near as possible to where the exception originally occurred. This would be important if the user was also attempting to create a stack trace for the original call site. |
||
coordinatorExceptionHandler.propogate(exceptionToThrow); | ||
} | ||
} | ||
|
||
private void initialize() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure