-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a listener to capture task execution in shardConsumer #417
Conversation
/** | ||
* Empty constructor for NoOp Shard Prioritization. | ||
*/ | ||
public NoOpTaskExecutionListener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to have the empty constructor. Could you remove the empty lines between the brackets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
this(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, shardConsumerArgument, | ||
ConsumerStates.INITIAL_STATE, | ||
taskExecutionListener, ConsumerStates.INITIAL_STATE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for changing the order for the chained constructor? If not could you add the new parameter to the end of the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly, I just wanted to avoid adding a reference type (TaskExecutionListener) after a primitive type (int bufferSize) in the constructor as it didn't look aesthetically pleasing.
But I'm fine with adding this new argument to the end of the list.
Why would you not want to use builder pattern for the constructor here?
@@ -392,6 +397,7 @@ private synchronized void executeTask(ProcessRecordsInput input) { | |||
} | |||
taskOutcome = resultToOutcome(result); | |||
} | |||
taskExecutionListener.onTaskEnd(currentState, shardInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- What happens when task.call() throws exception? Not call onTaskEnd? Is it safe to assume that task is never going to throw exception?
- Is there a scenario where only onTaskEnd/Begin() is called or called out of order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Documentation on ConsumerTask.java says that any exception during task.call() will be captured in TaskResult, returned by task.call(). I also looked at all *.Task classes to confirm that, so it would be safe to assume that it won't throw exception and onTaskBegin will always be accompanied by onTaskEnd in all known cases.
-
onTaskBegin and onTaskEnd should always be called in the same order.
|
||
void onTaskBegin(ConsumerState state, ShardInfo shardInfo); | ||
|
||
void onTaskEnd(ConsumerState state, ShardInfo shardInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding a third argument, TaskOutcome ? Task Execution related metrics, like time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TaskOutcome - Sure, as suggested by @sahilpalvia I'm going to create a data class and I can include things like ShardInfo, TaskType, TaskOutcome in it. Task execution time can be implicitly calculated on the client side by looking at the delta between onTaskBegin and onTaskEnd.
*/ | ||
public interface TaskExecutionListener { | ||
|
||
void onTaskBegin(ConsumerState state, ShardInfo shardInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a function header ? explicitly stating that concrete implementation should not make remote service calls and therefore should not block the task execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added that to the class header because it applies to both methods of the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General feedback on interface and integration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few more changes needed. Thanks for providing this.
amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
Show resolved
Hide resolved
amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java
Show resolved
Hide resolved
...inesis-client/src/main/java/software/amazon/kinesis/lifecycle/NoOpTaskExecutionListener.java
Outdated
Show resolved
Hide resolved
...on-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/TaskExecutionListener.java
Outdated
Show resolved
Hide resolved
amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java
Show resolved
Hide resolved
...lient/src/main/java/software/amazon/kinesis/lifecycle/events/TaskExecutionListenerInput.java
Outdated
Show resolved
Hide resolved
...lient/src/main/java/software/amazon/kinesis/lifecycle/events/TaskExecutionListenerInput.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for providing this.
Could you updated the branch to avoid merge conflicts. |
Thanks for providing this. Can please resolve the conflicts and push and update to the PR? |
I fixed the merge conflict here itself in the github UI and looks like the checks have passed. Is that sufficient? |
Issue #, if available:
Description of changes:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.