Skip to content

Commit

Permalink
Created listener for worker state change (#291)
Browse files Browse the repository at this point in the history
* Created listener for worker state change
#275
  • Loading branch information
lc-nyovchev authored and pfifer committed Feb 27, 2018
1 parent 9e33994 commit 24916ba
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener {

/**
* Empty constructor for NoOp Worker State Change Listener
*/
public NoOpWorkerStateChangeListener() {

}

@Override
public void onWorkerStateChange(WorkerState newState) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class Worker implements Runnable {
private static final Log LOG = LogFactory.getLog(Worker.class);

private static final int MAX_INITIALIZATION_ATTEMPTS = 20;
private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener();

private WorkerLog wlog = new WorkerLog();

Expand All @@ -93,7 +94,6 @@ public class Worker implements Runnable {
private final Optional<Integer> retryGetRecordsInSeconds;
private final Optional<Integer> maxGetRecordsThreadPool;

// private final KinesisClientLeaseManager leaseManager;
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager controlServer;

Expand All @@ -119,6 +119,8 @@ public class Worker implements Runnable {
@VisibleForTesting
protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();

private WorkerStateChangeListener workerStateChangeListener;

/**
* Constructor.
*
Expand Down Expand Up @@ -276,7 +278,8 @@ public Worker(
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
config.getShardPrioritizationStrategy(),
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool());
config.getMaxGetRecordsThreadPool(),
DEFAULT_WORKER_STATE_CHANGE_LISTENER);

// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
Expand Down Expand Up @@ -348,7 +351,7 @@ public Worker(
this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis,
shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService,
metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist,
shardPrioritization, Optional.empty(), Optional.empty());
shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER);
}

/**
Expand Down Expand Up @@ -395,7 +398,7 @@ public Worker(
KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService,
IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) {
this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory;
this.config = config;
Expand All @@ -417,6 +420,8 @@ public Worker(
this.shardPrioritization = shardPrioritization;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
}

/**
Expand Down Expand Up @@ -494,6 +499,7 @@ void runProcessLoop() {
}

private void initialize() {
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;
Exception lastException = null;

Expand Down Expand Up @@ -543,6 +549,7 @@ private void initialize() {
if (!isDone) {
throw new RuntimeException(lastException);
}
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
}

/**
Expand Down Expand Up @@ -593,10 +600,10 @@ private List<ShardInfo> getShardInfoForAssignments() {

/**
* Starts the requestedShutdown process, and returns a future that can be used to track the process.
*
*
* This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and
* indicates the process behavior
*
*
* @return a future that will be set once shutdown is completed.
*/
@Deprecated
Expand Down Expand Up @@ -640,7 +647,7 @@ public Void get(long timeout, TimeUnit unit)
* Requests a graceful shutdown of the worker, notifying record processors, that implement
* {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
* checkpoint.
*
*
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
* previous future.
*
Expand Down Expand Up @@ -755,6 +762,10 @@ ConcurrentMap<ShardInfo, ShardConsumer> getShardInfoShardConsumerMap() {
return shardInfoShardConsumerMap;
}

WorkerStateChangeListener getWorkerStateChangeListener() {
return workerStateChangeListener;
}

/**
* Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor
* services were passed to the worker by the user, worker will not attempt to shutdown those resources.
Expand Down Expand Up @@ -785,6 +796,7 @@ public void shutdown() {
// Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run().
leaseCoordinator.stop();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
}

/**
Expand All @@ -807,7 +819,7 @@ private void finalShutdown() {
/**
* Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()}
* method before every loop run, so method must do minimum amount of work to not impact shard processing timings.
*
*
* @return Whether worker should shutdown immediately.
*/
@VisibleForTesting
Expand Down Expand Up @@ -1012,7 +1024,7 @@ StreamConfig getStreamConfig() {

/**
* Given configuration, returns appropriate metrics factory.
*
*
* @param cloudWatchClient
* Amazon CloudWatch client
* @param config
Expand All @@ -1039,7 +1051,7 @@ private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClie

/**
* Returns default executor service that should be used by the worker.
*
*
* @return Default executor service that should be used by the worker.
*/
private static ExecutorService getExecutorService() {
Expand Down Expand Up @@ -1089,6 +1101,7 @@ public static class Builder {
private ExecutorService execService;
private ShardPrioritization shardPrioritization;
private IKinesisProxy kinesisProxy;
private WorkerStateChangeListener workerStateChangeListener;

/**
* Default constructor.
Expand Down Expand Up @@ -1209,10 +1222,10 @@ public Builder execService(ExecutorService execService) {

/**
* Provides logic how to prioritize shard processing.
*
*
* @param shardPrioritization
* shardPrioritization is responsible to order shards before processing
*
*
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder shardPrioritization(ShardPrioritization shardPrioritization) {
Expand All @@ -1233,6 +1246,17 @@ public Builder kinesisProxy(IKinesisProxy kinesisProxy) {
return this;
}

/**
* Set WorkerStateChangeListener for the worker
* @param workerStateChangeListener
* Sets the WorkerStateChangeListener
* @return A reference to this updated object so that method calls can be chained together.
*/
public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) {
this.workerStateChangeListener = workerStateChangeListener;
return this;
}

/**
* Build the Worker instance.
*
Expand Down Expand Up @@ -1305,6 +1329,10 @@ public Worker build() {
kinesisProxy = new KinesisProxy(config, kinesisClient);
}

if (workerStateChangeListener == null) {
workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER;
}

return new Worker(config.getApplicationName(),
recordProcessorFactory,
config,
Expand Down Expand Up @@ -1336,9 +1364,8 @@ public Worker build() {
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
shardPrioritization,
config.getRetryGetRecordsInSeconds(),
config.getMaxGetRecordsThreadPool());

config.getMaxGetRecordsThreadPool(),
workerStateChangeListener);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

/**
* A listener for callbacks on changes worker state
*/
@FunctionalInterface
public interface WorkerStateChangeListener {
enum WorkerState {
CREATED,
INITIALIZING,
STARTED,
SHUT_DOWN
}

void onWorkerStateChange(WorkerState newState);
}
Loading

0 comments on commit 24916ba

Please sign in to comment.