-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce initialization exception handler in KCL V2 #369
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments
|
||
package software.amazon.kinesis.coordinator; | ||
|
||
public class NoOpCoordinatorExceptionHandler implements CoordinatorExceptionHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better name would be PassThroughCoordinatorExceptionHandler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, will change
|
||
package software.amazon.kinesis.coordinator; | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
* | ||
*/ | ||
public interface CoordinatorExceptionHandler { | ||
void propogate(Exception e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, some documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
/** | ||
* Empty constructor for NoOp Exception Handler | ||
*/ | ||
public NoOpCoordinatorExceptionHandler() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use @NoArgumentConstructor from lombok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I was trying to be consistent with, for example, NoOpWorkerStateChangeListener.java and NoOpShardPrioritization.java. They are having empty constructors
} | ||
|
||
while (!shouldShutdown()) { | ||
exceptionToThrow = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need not set this. If exception is caught from initialize, shutdown is invoked and shouldShutdown will return true if the Scheduler can successfully shutdown. It can enter to make sure that ShardConsumers have been shutdown successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will delete
@@ -27,4 +27,9 @@ public NoOpWorkerStateChangeListener() { | |||
public void onWorkerStateChange(WorkerState newState) { | |||
|
|||
} | |||
|
|||
@Override | |||
public void onInitializationError(Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a default on the interface.
Would prefer the name to be onInitializationException
@@ -193,12 +193,14 @@ public void run() { | |||
return; | |||
} | |||
|
|||
Exception exceptionToThrow = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't feel like the right place for this. Should we instead put onInitializationException at the point that the exception was thrown?
This just reports the last exception, which means a cascaded failure may be misreported.
@@ -207,6 +209,10 @@ public void run() { | |||
|
|||
finalShutdown(); | |||
log.info("Worker loop is complete. Exiting from worker."); | |||
|
|||
if (exceptionToThrow != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel this is the correct location to report the exception. We should report as near as possible to where the exception originally occurred. This would be important if the user was also attempting to create a stack trace for the original call site.
@@ -27,4 +26,6 @@ | |||
} | |||
|
|||
void onWorkerStateChange(WorkerState newState); | |||
|
|||
void onInitializationError(Exception e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change, and must be defaulted.
I also want to think more about how state reporting and actions happen. Finally this should accept a Throwable
vs an Exception
.
…erStateChangeListener interface and call it when scheduler determines an initialization is failed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, a few more comments to be addressed.
@@ -199,6 +199,7 @@ public void run() { | |||
} catch (RuntimeException e) { | |||
log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); | |||
shutdown(); | |||
workerStateChangeListener.onInitializationFailure(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to invoke the handler before calling shutdown. The handler is called as a reaction of the exception. It needs to be invoked as soon as the exception is thrown before performing any other action.
@@ -27,4 +27,7 @@ | |||
} | |||
|
|||
void onWorkerStateChange(WorkerState newState); | |||
|
|||
default void onInitializationFailure(Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you rename the method to something like allInitializationAttemptsFailed(Throwable t)
, indicating that the handler is invoked after all the initialization attempt have failed.
@@ -207,6 +208,7 @@ public void run() { | |||
|
|||
finalShutdown(); | |||
log.info("Worker loop is complete. Exiting from worker."); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid whitespace changes
@@ -27,4 +27,5 @@ public NoOpWorkerStateChangeListener() { | |||
public void onWorkerStateChange(WorkerState newState) { | |||
|
|||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid whitespace changes
Issue #, if available:
Introduce initialization exception handler in KCL V2.
KCL scheduler does not throw any exceptions that occurs during processing a stream. It just logs it as an error.
Description of changes:
add a CoordinatorExceptionHandler interface
add CoordinatorExceptionHandler as a parameter in CoordinatorConfig
add a NoOpCoordinatorExceptionHandler to pass in as default so it does not change the behavior of the Scheduler class
The client applications can implement the CoordinatorExceptionHandler and pass in CoordinatorConfig so that they can specify how they want to handle the exceptions during processing the streams.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.