From 24916ba552ae6834ed8bc71af56ce268f0287309 Mon Sep 17 00:00:00 2001 From: nyo Date: Tue, 27 Feb 2018 15:54:16 +0100 Subject: [PATCH] Created listener for worker state change (#291) * Created listener for worker state change #275 --- .../worker/NoOpWorkerStateChangeListener.java | 16 +++ .../clientlibrary/lib/worker/Worker.java | 57 +++++--- .../lib/worker/WorkerStateChangeListener.java | 16 +++ .../clientlibrary/lib/worker/WorkerTest.java | 127 ++++++++++++++++-- 4 files changed, 189 insertions(+), 27 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerStateChangeListener.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java new file mode 100644 index 000000000..152a43af8 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/NoOpWorkerStateChangeListener.java @@ -0,0 +1,16 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +public class NoOpWorkerStateChangeListener implements WorkerStateChangeListener { + + /** + * Empty constructor for NoOp Worker State Change Listener + */ + public NoOpWorkerStateChangeListener() { + + } + + @Override + public void onWorkerStateChange(WorkerState newState) { + + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index daf89c28c..644f42258 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -71,6 +71,7 @@ public class Worker implements Runnable { private static final Log LOG = LogFactory.getLog(Worker.class); private static final int MAX_INITIALIZATION_ATTEMPTS = 20; + private static final WorkerStateChangeListener DEFAULT_WORKER_STATE_CHANGE_LISTENER = new NoOpWorkerStateChangeListener(); private WorkerLog wlog = new WorkerLog(); @@ -93,7 +94,6 @@ public class Worker implements Runnable { private final Optional retryGetRecordsInSeconds; private final Optional maxGetRecordsThreadPool; - // private final KinesisClientLeaseManager leaseManager; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager controlServer; @@ -119,6 +119,8 @@ public class Worker implements Runnable { @VisibleForTesting protected GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); + private WorkerStateChangeListener workerStateChangeListener; + /** * Constructor. * @@ -276,7 +278,8 @@ public Worker( config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy(), config.getRetryGetRecordsInSeconds(), - config.getMaxGetRecordsThreadPool()); + config.getMaxGetRecordsThreadPool(), + DEFAULT_WORKER_STATE_CHANGE_LISTENER); // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { @@ -348,7 +351,7 @@ public Worker( this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - shardPrioritization, Optional.empty(), Optional.empty()); + shardPrioritization, Optional.empty(), Optional.empty(), DEFAULT_WORKER_STATE_CHANGE_LISTENER); } /** @@ -395,7 +398,7 @@ public Worker( KinesisClientLibLeaseCoordinator leaseCoordinator, ExecutorService execService, IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, - Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool) { + Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -417,6 +420,8 @@ public Worker( this.shardPrioritization = shardPrioritization; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.maxGetRecordsThreadPool = maxGetRecordsThreadPool; + this.workerStateChangeListener = workerStateChangeListener; + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); } /** @@ -494,6 +499,7 @@ void runProcessLoop() { } private void initialize() { + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; Exception lastException = null; @@ -543,6 +549,7 @@ private void initialize() { if (!isDone) { throw new RuntimeException(lastException); } + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED); } /** @@ -593,10 +600,10 @@ private List getShardInfoForAssignments() { /** * Starts the requestedShutdown process, and returns a future that can be used to track the process. - * + * * This is deprecated in favor of {@link #startGracefulShutdown()}, which returns a more complete future, and * indicates the process behavior - * + * * @return a future that will be set once shutdown is completed. */ @Deprecated @@ -640,7 +647,7 @@ public Void get(long timeout, TimeUnit unit) * Requests a graceful shutdown of the worker, notifying record processors, that implement * {@link IShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to * checkpoint. - * + * * This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the * previous future. * @@ -755,6 +762,10 @@ ConcurrentMap getShardInfoShardConsumerMap() { return shardInfoShardConsumerMap; } + WorkerStateChangeListener getWorkerStateChangeListener() { + return workerStateChangeListener; + } + /** * Signals worker to shutdown. Worker will try initiating shutdown of all record processors. Note that if executor * services were passed to the worker by the user, worker will not attempt to shutdown those resources. @@ -785,6 +796,7 @@ public void shutdown() { // Lost leases will force Worker to begin shutdown process for all shard consumers in // Worker.run(). leaseCoordinator.stop(); + workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } /** @@ -807,7 +819,7 @@ private void finalShutdown() { /** * Returns whether worker can shutdown immediately. Note that this method is called from Worker's {{@link #run()} * method before every loop run, so method must do minimum amount of work to not impact shard processing timings. - * + * * @return Whether worker should shutdown immediately. */ @VisibleForTesting @@ -1012,7 +1024,7 @@ StreamConfig getStreamConfig() { /** * Given configuration, returns appropriate metrics factory. - * + * * @param cloudWatchClient * Amazon CloudWatch client * @param config @@ -1039,7 +1051,7 @@ private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClie /** * Returns default executor service that should be used by the worker. - * + * * @return Default executor service that should be used by the worker. */ private static ExecutorService getExecutorService() { @@ -1089,6 +1101,7 @@ public static class Builder { private ExecutorService execService; private ShardPrioritization shardPrioritization; private IKinesisProxy kinesisProxy; + private WorkerStateChangeListener workerStateChangeListener; /** * Default constructor. @@ -1209,10 +1222,10 @@ public Builder execService(ExecutorService execService) { /** * Provides logic how to prioritize shard processing. - * + * * @param shardPrioritization * shardPrioritization is responsible to order shards before processing - * + * * @return A reference to this updated object so that method calls can be chained together. */ public Builder shardPrioritization(ShardPrioritization shardPrioritization) { @@ -1233,6 +1246,17 @@ public Builder kinesisProxy(IKinesisProxy kinesisProxy) { return this; } + /** + * Set WorkerStateChangeListener for the worker + * @param workerStateChangeListener + * Sets the WorkerStateChangeListener + * @return A reference to this updated object so that method calls can be chained together. + */ + public Builder workerStateChangeListener(WorkerStateChangeListener workerStateChangeListener) { + this.workerStateChangeListener = workerStateChangeListener; + return this; + } + /** * Build the Worker instance. * @@ -1305,6 +1329,10 @@ public Worker build() { kinesisProxy = new KinesisProxy(config, kinesisClient); } + if (workerStateChangeListener == null) { + workerStateChangeListener = DEFAULT_WORKER_STATE_CHANGE_LISTENER; + } + return new Worker(config.getApplicationName(), recordProcessorFactory, config, @@ -1336,9 +1364,8 @@ public Worker build() { config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization, config.getRetryGetRecordsInSeconds(), - config.getMaxGetRecordsThreadPool()); - + config.getMaxGetRecordsThreadPool(), + workerStateChangeListener); } - } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerStateChangeListener.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerStateChangeListener.java new file mode 100644 index 000000000..36ee39f0e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerStateChangeListener.java @@ -0,0 +1,16 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +/** + * A listener for callbacks on changes worker state + */ +@FunctionalInterface +public interface WorkerStateChangeListener { + enum WorkerState { + CREATED, + INITIALIZING, + STARTED, + SHUT_DOWN + } + + void onWorkerStateChange(WorkerState newState); +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 037a54b2d..21aaa8ac2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -89,6 +90,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; @@ -142,7 +144,7 @@ public class WorkerTest { private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; - + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; @@ -170,7 +172,9 @@ public class WorkerTest { private Future taskFuture; @Mock private TaskResult taskResult; - + @Mock + private WorkerStateChangeListener workerStateChangeListener; + @Before public void setup() { config = spy(new KinesisClientLibConfiguration("app", null, null, null)); @@ -179,7 +183,7 @@ public void setup() { } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES - private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = + private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() { @Override @@ -212,8 +216,8 @@ public void initialize(String shardId) { }; } }; - - private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = + + private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY); @@ -619,7 +623,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class)); - + RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); @@ -659,7 +663,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. * This behavior makes the test a bit racy, since we need to ensure a specific order of events. - * + * * @throws Exception */ @Test @@ -1356,7 +1360,7 @@ public List answer(InvocationOnMock invocation) throws Throwable { executorService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis, + failoverTimeMillis, false, shardPrioritization); @@ -1432,7 +1436,7 @@ public List answer(InvocationOnMock invocation) throws Throwable { config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, - parentShardPollIntervalMillis, + parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, @@ -1500,6 +1504,105 @@ public void testBuilderWhenKinesisProxyIsSet() { Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy); } + @Test + public void testBuilderForWorkerStateListener() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .build(); + Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener); + } + + @Test + public void testBuilderWhenWorkerStateListenerIsSet() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .workerStateChangeListener(workerStateChangeListener) + .config(config) + .build(); + Assert.assertSame(workerStateChangeListener, worker.getWorkerStateChangeListener()); + } + + @Test + public void testWorkerStateListenerStatePassesThroughCreatedState() { + IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .workerStateChangeListener(workerStateChangeListener) + .config(config) + .build(); + + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED)); + } + + @Test + public void testWorkerStateChangeListenerGoesThroughStates() throws Exception { + + final CountDownLatch workerInitialized = new CountDownLatch(1); + final CountDownLatch workerStarted = new CountDownLatch(1); + final IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); + final IRecordProcessor processor = mock(IRecordProcessor.class); + + ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L); + KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint) + .withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L) + .withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self"); + final List leases = new ArrayList<>(); + KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build(); + leases.add(lease); + + doAnswer(new Answer() { + @Override + public Boolean answer(InvocationOnMock invocation) throws Throwable { + workerInitialized.countDown(); + return true; + } + }).when(leaseManager).waitUntilLeaseTableExists(anyLong(), anyLong()); + doAnswer(new Answer() { + @Override + public IRecordProcessor answer(InvocationOnMock invocation) throws Throwable { + workerStarted.countDown(); + return processor; + } + }).when(recordProcessorFactory).createProcessor(); + + when(config.getWorkerIdentifier()).thenReturn("Self"); + when(leaseManager.listLeases()).thenReturn(leases); + when(leaseManager.renewLease(leases.get(0))).thenReturn(true); + when(executorService.submit(Matchers.> any())) + .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); + when(taskFuture.isDone()).thenReturn(true); + when(taskFuture.get()).thenReturn(taskResult); + when(taskResult.isShardEndReached()).thenReturn(true); + + Worker worker = new Worker.Builder() + .recordProcessorFactory(recordProcessorFactory) + .config(config) + .leaseManager(leaseManager) + .kinesisProxy(kinesisProxy) + .execService(executorService) + .workerStateChangeListener(workerStateChangeListener) + .build(); + + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED)); + + WorkerThread workerThread = new WorkerThread(worker); + workerThread.start(); + + workerInitialized.await(); + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.INITIALIZING)); + + workerStarted.await(); + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.STARTED)); + + boolean workerShutdown = worker.createGracefulShutdownCallable() + .call(); + + verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.SHUT_DOWN)); + } + @Test public void testBuilderWithDefaultLeaseManager() { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); @@ -1801,7 +1904,7 @@ private void runAndTestWorker(List shardList, TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier); ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); - + WorkerThread workerThread = runWorker( shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis, numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig); @@ -1857,7 +1960,7 @@ private WorkerThread runWorker(List shardList, idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + Worker worker = new Worker(stageName, recordProcessorFactory, @@ -1874,7 +1977,7 @@ private WorkerThread runWorker(List shardList, failoverTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread;