From 5205d25f2b4ebb460567b33cfc4117618e3a7a28 Mon Sep 17 00:00:00 2001 From: Jiaxuan Lu Date: Wed, 15 Aug 2018 15:32:31 -0700 Subject: [PATCH 1/8] Introduce initialization exception handler in KCL V2 --- .../coordinator/CoordinatorConfig.java | 2 ++ .../CoordinatorExceptionHandler.java | 23 ++++++++++++++ .../NoOpCoordinatorExceptionHandler.java | 30 +++++++++++++++++++ .../amazon/kinesis/coordinator/Scheduler.java | 8 +++++ 4 files changed, 63 insertions(+) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpCoordinatorExceptionHandler.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index 929a40d27..44a1db755 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -67,6 +67,8 @@ public class CoordinatorConfig { */ private ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); + private CoordinatorExceptionHandler coordinatorExceptionHandler = new NoOpCoordinatorExceptionHandler(); + private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java new file mode 100644 index 000000000..93fa7ef89 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java @@ -0,0 +1,23 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +/** + * + */ +public interface CoordinatorExceptionHandler { + void propogate(Exception e); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpCoordinatorExceptionHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpCoordinatorExceptionHandler.java new file mode 100644 index 000000000..161dcb82c --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpCoordinatorExceptionHandler.java @@ -0,0 +1,30 @@ +/* + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +public class NoOpCoordinatorExceptionHandler implements CoordinatorExceptionHandler { + + /** + * Empty constructor for NoOp Exception Handler + */ + public NoOpCoordinatorExceptionHandler() { + + } + + @Override + public void propogate(Exception e) { + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 10e477a59..2c7664e59 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -97,6 +97,7 @@ public class Scheduler implements Runnable { private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; private final ShardPrioritization shardPrioritization; + private final CoordinatorExceptionHandler coordinatorExceptionHandler; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final GracefulShutdownCoordinator gracefulShutdownCoordinator; @@ -165,6 +166,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() .createShardSyncTaskManager(this.metricsFactory); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); + this.coordinatorExceptionHandler = this.coordinatorConfig.coordinatorExceptionHandler(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(); @@ -193,20 +195,26 @@ public void run() { return; } + Exception exceptionToThrow = null; try { initialize(); log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); shutdown(); + exceptionToThrow = e; } while (!shouldShutdown()) { + exceptionToThrow = null; runProcessLoop(); } finalShutdown(); log.info("Worker loop is complete. Exiting from worker."); + if (exceptionToThrow != null) { + coordinatorExceptionHandler.propogate(exceptionToThrow); + } } private void initialize() { From 73d33201cd5945871b048c071577bba647db08dd Mon Sep 17 00:00:00 2001 From: jiaxul Date: Thu, 16 Aug 2018 14:24:22 -0700 Subject: [PATCH 2/8] added javadocs for CoordinatorExceptionHandler and modified method/class names --- .../kinesis/coordinator/CoordinatorConfig.java | 7 ++++++- .../coordinator/CoordinatorExceptionHandler.java | 9 +++++++-- ...> PassThroughCoordinatorExceptionHandler.java} | 15 +++++++-------- .../amazon/kinesis/coordinator/Scheduler.java | 3 +-- 4 files changed, 21 insertions(+), 13 deletions(-) rename amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/{NoOpCoordinatorExceptionHandler.java => PassThroughCoordinatorExceptionHandler.java} (66%) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index 44a1db755..b326b48dd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -67,7 +67,12 @@ public class CoordinatorConfig { */ private ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); - private CoordinatorExceptionHandler coordinatorExceptionHandler = new NoOpCoordinatorExceptionHandler(); + /** + * Scheduler Initialization Exception Handler + * + *

Default value: {@link PassThroughCoordinatorExceptionHandler}

+ */ + private CoordinatorExceptionHandler coordinatorExceptionHandler = new PassThroughCoordinatorExceptionHandler(); private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java index 93fa7ef89..2d78eb392 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java @@ -16,8 +16,13 @@ package software.amazon.kinesis.coordinator; /** - * + * This class handles exceptions that occur during initialization of the Scheduler */ public interface CoordinatorExceptionHandler { - void propogate(Exception e); + + /** + * method to handle the exceptions thrown while initializing a scheduler + * @param e Exception + */ + void schedulerInitializationExceptionHandler(Exception e); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpCoordinatorExceptionHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PassThroughCoordinatorExceptionHandler.java similarity index 66% rename from amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpCoordinatorExceptionHandler.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PassThroughCoordinatorExceptionHandler.java index 161dcb82c..c7b06947c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpCoordinatorExceptionHandler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PassThroughCoordinatorExceptionHandler.java @@ -15,16 +15,15 @@ package software.amazon.kinesis.coordinator; -public class NoOpCoordinatorExceptionHandler implements CoordinatorExceptionHandler { +import lombok.NoArgsConstructor; - /** - * Empty constructor for NoOp Exception Handler - */ - public NoOpCoordinatorExceptionHandler() { - - } +/** + * A NoOp implementation of CoordinatorExceptionHandler to pass through CoordinatorConfig as default + */ +@NoArgsConstructor +public class PassThroughCoordinatorExceptionHandler implements CoordinatorExceptionHandler { @Override - public void propogate(Exception e) { + public void schedulerInitializationExceptionHandler(Exception e) { } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 2c7664e59..aae7d2a43 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -206,14 +206,13 @@ public void run() { } while (!shouldShutdown()) { - exceptionToThrow = null; runProcessLoop(); } finalShutdown(); log.info("Worker loop is complete. Exiting from worker."); if (exceptionToThrow != null) { - coordinatorExceptionHandler.propogate(exceptionToThrow); + coordinatorExceptionHandler.schedulerInitializationExceptionHandler(exceptionToThrow); } } From 7d1d091a895b5563ee85ccd0821d3f4c2430282e Mon Sep 17 00:00:00 2001 From: jiaxul Date: Thu, 16 Aug 2018 16:21:25 -0700 Subject: [PATCH 3/8] Move exception handling to WorkerStateChangeListener --- .../coordinator/CoordinatorConfig.java | 7 ----- .../CoordinatorExceptionHandler.java | 28 ------------------ .../NoOpWorkerStateChangeListener.java | 5 ++++ ...assThroughCoordinatorExceptionHandler.java | 29 ------------------- .../amazon/kinesis/coordinator/Scheduler.java | 9 ++---- .../WorkerStateChangeListener.java | 4 ++- 6 files changed, 10 insertions(+), 72 deletions(-) delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java delete mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PassThroughCoordinatorExceptionHandler.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index b326b48dd..929a40d27 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -67,13 +67,6 @@ public class CoordinatorConfig { */ private ShardPrioritization shardPrioritization = new NoOpShardPrioritization(); - /** - * Scheduler Initialization Exception Handler - * - *

Default value: {@link PassThroughCoordinatorExceptionHandler}

- */ - private CoordinatorExceptionHandler coordinatorExceptionHandler = new PassThroughCoordinatorExceptionHandler(); - private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java deleted file mode 100644 index 2d78eb392..000000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorExceptionHandler.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.kinesis.coordinator; - -/** - * This class handles exceptions that occur during initialization of the Scheduler - */ -public interface CoordinatorExceptionHandler { - - /** - * method to handle the exceptions thrown while initializing a scheduler - * @param e Exception - */ - void schedulerInitializationExceptionHandler(Exception e); -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java index f316b351e..ade88c801 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java @@ -27,4 +27,9 @@ public NoOpWorkerStateChangeListener() { public void onWorkerStateChange(WorkerState newState) { } + + @Override + public void onInitializationError(WorkerState newState, Exception e) { + + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PassThroughCoordinatorExceptionHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PassThroughCoordinatorExceptionHandler.java deleted file mode 100644 index c7b06947c..000000000 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PassThroughCoordinatorExceptionHandler.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.kinesis.coordinator; - -import lombok.NoArgsConstructor; - -/** - * A NoOp implementation of CoordinatorExceptionHandler to pass through CoordinatorConfig as default - */ -@NoArgsConstructor -public class PassThroughCoordinatorExceptionHandler implements CoordinatorExceptionHandler { - - @Override - public void schedulerInitializationExceptionHandler(Exception e) { - } -} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index aae7d2a43..ba4b748d7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -97,7 +97,6 @@ public class Scheduler implements Runnable { private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; private final ShardPrioritization shardPrioritization; - private final CoordinatorExceptionHandler coordinatorExceptionHandler; private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private final GracefulShutdownCoordinator gracefulShutdownCoordinator; @@ -166,7 +165,6 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() .createShardSyncTaskManager(this.metricsFactory); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); - this.coordinatorExceptionHandler = this.coordinatorConfig.coordinatorExceptionHandler(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = this.coordinatorConfig.skipShardSyncAtWorkerInitializationIfLeasesExist(); @@ -195,14 +193,12 @@ public void run() { return; } - Exception exceptionToThrow = null; try { initialize(); log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); shutdown(); - exceptionToThrow = e; } while (!shouldShutdown()) { @@ -211,9 +207,6 @@ public void run() { finalShutdown(); log.info("Worker loop is complete. Exiting from worker."); - if (exceptionToThrow != null) { - coordinatorExceptionHandler.schedulerInitializationExceptionHandler(exceptionToThrow); - } } private void initialize() { @@ -263,6 +256,8 @@ private void initialize() { } if (!isDone) { + workerStateChangeListener.onInitializationError(WorkerStateChangeListener.WorkerState.INITIALIZATIONERROR, + new RuntimeException(lastException)); throw new RuntimeException(lastException); } workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java index 0137de30c..1d7b44423 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java @@ -17,14 +17,16 @@ /** * A listener for callbacks on changes worker state */ -@FunctionalInterface public interface WorkerStateChangeListener { enum WorkerState { CREATED, INITIALIZING, + INITIALIZATIONERROR, STARTED, SHUT_DOWN } void onWorkerStateChange(WorkerState newState); + + void onInitializationError(WorkerState newState, Exception e); } From 31f2ce6b966e48e4ed89bd20d164c72ce9909a05 Mon Sep 17 00:00:00 2001 From: jiaxul Date: Thu, 16 Aug 2018 17:00:56 -0700 Subject: [PATCH 4/8] Remove WorkerState InitializationError --- .../coordinator/NoOpWorkerStateChangeListener.java | 2 +- .../software/amazon/kinesis/coordinator/Scheduler.java | 8 ++++++-- .../kinesis/coordinator/WorkerStateChangeListener.java | 3 +-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java index ade88c801..fed0b0221 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java @@ -29,7 +29,7 @@ public void onWorkerStateChange(WorkerState newState) { } @Override - public void onInitializationError(WorkerState newState, Exception e) { + public void onInitializationError(Exception e) { } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index ba4b748d7..55b149d9d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -193,12 +193,14 @@ public void run() { return; } + Exception exceptionToThrow = null; try { initialize(); log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); shutdown(); + exceptionToThrow = e; } while (!shouldShutdown()) { @@ -207,6 +209,10 @@ public void run() { finalShutdown(); log.info("Worker loop is complete. Exiting from worker."); + + if (exceptionToThrow != null) { + workerStateChangeListener.onInitializationError(exceptionToThrow); + } } private void initialize() { @@ -256,8 +262,6 @@ private void initialize() { } if (!isDone) { - workerStateChangeListener.onInitializationError(WorkerStateChangeListener.WorkerState.INITIALIZATIONERROR, - new RuntimeException(lastException)); throw new RuntimeException(lastException); } workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java index 1d7b44423..d06458afd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java @@ -21,12 +21,11 @@ public interface WorkerStateChangeListener { enum WorkerState { CREATED, INITIALIZING, - INITIALIZATIONERROR, STARTED, SHUT_DOWN } void onWorkerStateChange(WorkerState newState); - void onInitializationError(WorkerState newState, Exception e); + void onInitializationError(Exception e); } From c8404efcd85d14918f22f6e627e7ceff796155ab Mon Sep 17 00:00:00 2001 From: jiaxul Date: Fri, 17 Aug 2018 17:37:28 -0700 Subject: [PATCH 5/8] make initialization failure handler function a default method in WorkerStateChangeListener interface and call it when scheduler determines an initialization is failed --- .../kinesis/coordinator/NoOpWorkerStateChangeListener.java | 4 ---- .../java/software/amazon/kinesis/coordinator/Scheduler.java | 6 +----- .../kinesis/coordinator/WorkerStateChangeListener.java | 4 +++- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java index fed0b0221..791e398a0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java @@ -28,8 +28,4 @@ public void onWorkerStateChange(WorkerState newState) { } - @Override - public void onInitializationError(Exception e) { - - } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 55b149d9d..800cc7ba1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -193,14 +193,13 @@ public void run() { return; } - Exception exceptionToThrow = null; try { initialize(); log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); shutdown(); - exceptionToThrow = e; + workerStateChangeListener.onInitializationFailure(e); } while (!shouldShutdown()) { @@ -210,9 +209,6 @@ public void run() { finalShutdown(); log.info("Worker loop is complete. Exiting from worker."); - if (exceptionToThrow != null) { - workerStateChangeListener.onInitializationError(exceptionToThrow); - } } private void initialize() { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java index d06458afd..6b71540ab 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java @@ -17,6 +17,7 @@ /** * A listener for callbacks on changes worker state */ +@FunctionalInterface public interface WorkerStateChangeListener { enum WorkerState { CREATED, @@ -27,5 +28,6 @@ enum WorkerState { void onWorkerStateChange(WorkerState newState); - void onInitializationError(Exception e); + default void onInitializationFailure(Exception e) { + } } From fbecf73049efd90556b3505f434be4c80ada1962 Mon Sep 17 00:00:00 2001 From: jiaxul Date: Thu, 23 Aug 2018 14:50:11 -0700 Subject: [PATCH 6/8] change Exception to Throwable --- .../amazon/kinesis/coordinator/WorkerStateChangeListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java index 6b71540ab..75df298d4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java @@ -28,6 +28,6 @@ enum WorkerState { void onWorkerStateChange(WorkerState newState); - default void onInitializationFailure(Exception e) { + default void onInitializationFailure(Throwable e) { } } From 5f841a9fbe52d9102f5fdfb40cb6a5eec7d8addc Mon Sep 17 00:00:00 2001 From: jiaxul Date: Thu, 23 Aug 2018 14:50:11 -0700 Subject: [PATCH 7/8] change method name and invoke position --- .../kinesis/coordinator/NoOpWorkerStateChangeListener.java | 1 - .../java/software/amazon/kinesis/coordinator/Scheduler.java | 3 +-- .../amazon/kinesis/coordinator/WorkerStateChangeListener.java | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java index 791e398a0..f316b351e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/NoOpWorkerStateChangeListener.java @@ -27,5 +27,4 @@ public NoOpWorkerStateChangeListener() { public void onWorkerStateChange(WorkerState newState) { } - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 800cc7ba1..3bed53726 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -198,8 +198,8 @@ public void run() { log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); + workerStateChangeListener.onInitializationFailed(e); shutdown(); - workerStateChangeListener.onInitializationFailure(e); } while (!shouldShutdown()) { @@ -208,7 +208,6 @@ public void run() { finalShutdown(); log.info("Worker loop is complete. Exiting from worker."); - } private void initialize() { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java index 6b71540ab..09549aa97 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java @@ -28,6 +28,6 @@ enum WorkerState { void onWorkerStateChange(WorkerState newState); - default void onInitializationFailure(Exception e) { + default void onInitializationFailed(Throwable e) { } } From d52873b0d9e4bca89c49bbafea6deccecf68800e Mon Sep 17 00:00:00 2001 From: jiaxul Date: Thu, 30 Aug 2018 14:07:15 -0700 Subject: [PATCH 8/8] change method name --- .../java/software/amazon/kinesis/coordinator/Scheduler.java | 2 +- .../amazon/kinesis/coordinator/WorkerStateChangeListener.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 3bed53726..07532eb9c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -198,7 +198,7 @@ public void run() { log.info("Initialization complete. Starting worker loop."); } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", lifecycleConfig.maxInitializationAttempts(), e); - workerStateChangeListener.onInitializationFailed(e); + workerStateChangeListener.onAllInitializationAttemptsFailed(e); shutdown(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java index 09549aa97..2ca08aa4a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/WorkerStateChangeListener.java @@ -28,6 +28,6 @@ enum WorkerState { void onWorkerStateChange(WorkerState newState); - default void onInitializationFailed(Throwable e) { + default void onAllInitializationAttemptsFailed(Throwable e) { } }