diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java new file mode 100644 index 000000000..c7c5bfce1 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +/** + * An interface to implement various types of stateful events that can be used for diagnostics throughout the KCL. + */ +interface DiagnosticEvent { + /** + * DiagnosticEvent is part of a visitor pattern and it accepts DiagnosticEventHandler visitors. + * + * @param visitor A handler that that controls the behavior of the DiagnosticEvent when invoked. + */ + void accept(DiagnosticEventHandler visitor); + + /** + * The string to output to logs when a DiagnosticEvent occurs. + */ + String message(); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java new file mode 100644 index 000000000..316313aa9 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +import lombok.NoArgsConstructor; +import software.amazon.kinesis.leases.LeaseCoordinator; + +import java.util.concurrent.ExecutorService; + +/** + * Creates {@link DiagnosticEvent}s for logging and visibility + */ +@NoArgsConstructor +class DiagnosticEventFactory { + ExecutorStateEvent executorStateEvent(ExecutorService executorService, LeaseCoordinator leaseCoordinator) { + return new ExecutorStateEvent(executorService, leaseCoordinator); + } + + RejectedTaskEvent rejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable t) { + return new RejectedTaskEvent(executorStateEvent, t); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java new file mode 100644 index 000000000..fad959447 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +/** + * An interface to implement behaviors associated with a {@link DiagnosticEvent}. Uses the visitor pattern to visit + * the DiagnosticEvent when the behavior is desired. A default implementation that performs simple logging is found in + * {@link DiagnosticEventLogger}. + */ +interface DiagnosticEventHandler { + /** + * @param event Log or otherwise react to periodic pulses on the thread pool executor state. + */ + void visit(ExecutorStateEvent event); + + /** + * @param event Log or otherwise react to rejected tasks in the RxJavaPlugin layer. + */ + void visit(RejectedTaskEvent event); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java new file mode 100644 index 000000000..cafaac130 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +/** + * Internal implementation of {@link DiagnosticEventHandler} used by {@link Scheduler} to log executor state both + * 1) in normal conditions periodically, and 2) in reaction to rejected task exceptions. + */ +@NoArgsConstructor +@Slf4j +@KinesisClientInternalApi +class DiagnosticEventLogger implements DiagnosticEventHandler { + private static final long EXECUTOR_LOG_INTERVAL_MILLIS = 30000L; + private long nextExecutorLogTime = System.currentTimeMillis() + EXECUTOR_LOG_INTERVAL_MILLIS; + + /** + * {@inheritDoc} + * + * Only log at info level every 30s to avoid over-logging, else log at debug level + */ + @Override + public void visit(ExecutorStateEvent event) { + if (System.currentTimeMillis() >= nextExecutorLogTime) { + log.info(event.message()); + nextExecutorLogTime = System.currentTimeMillis() + EXECUTOR_LOG_INTERVAL_MILLIS; + } else { + log.debug(event.message()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void visit(RejectedTaskEvent event) { + log.error(event.message(), event.getThrowable()); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java new file mode 100644 index 000000000..3333cc42b --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -0,0 +1,71 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.leases.LeaseCoordinator; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +@Getter +@ToString(exclude = "isThreadPoolExecutor") +@Slf4j +@KinesisClientInternalApi +class ExecutorStateEvent implements DiagnosticEvent { + private static final String MESSAGE = "Current thread pool executor state: "; + + private boolean isThreadPoolExecutor; + private String executorName; + private int currentQueueSize; + private int activeThreads; + private int coreThreads; + private int leasesOwned; + private int largestPoolSize; + private int maximumPoolSize; + + ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) { + if (executor instanceof ThreadPoolExecutor) { + this.isThreadPoolExecutor = true; + + ThreadPoolExecutor ex = (ThreadPoolExecutor) executor; + this.executorName = ex.getClass().getSimpleName(); + this.currentQueueSize = ex.getQueue().size(); + this.activeThreads = ex.getActiveCount(); + this.coreThreads = ex.getCorePoolSize(); + this.largestPoolSize = ex.getLargestPoolSize(); + this.maximumPoolSize = ex.getMaximumPoolSize(); + } + + this.leasesOwned = leaseCoordinator.getAssignments().size(); + } + + @Override + public void accept(DiagnosticEventHandler visitor) { + // logging is only meaningful for a ThreadPoolExecutor executor service (default config) + if (isThreadPoolExecutor) { + visitor.visit(this); + } + } + + @Override + public String message() { + return MESSAGE + this.toString(); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java new file mode 100644 index 000000000..d78b64b6c --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +@Getter +@ToString +@Slf4j +@KinesisClientInternalApi +class RejectedTaskEvent implements DiagnosticEvent { + private static final String MESSAGE = "Review your thread configuration to prevent task rejections. " + + "Until next release, KCL will not be resilient to task rejections. "; + + private ExecutorStateEvent executorStateEvent; + private Throwable throwable; + + RejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable throwable) { + this.executorStateEvent = executorStateEvent; + this.throwable = throwable; + } + + @Override + public void accept(DiagnosticEventHandler visitor) { + visitor.visit(this); + } + + @Override + public String message() { + return MESSAGE + executorStateEvent.message(); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index b1293f1ce..0f26c3d66 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting; +import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -95,6 +96,8 @@ public class Scheduler implements Runnable { // parent shards private final long parentShardPollIntervalMillis; private final ExecutorService executorService; + private final DiagnosticEventFactory diagnosticEventFactory; + private final DiagnosticEventHandler diagnosticEventHandler; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; @@ -140,6 +143,23 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, @NonNull final MetricsConfig metricsConfig, @NonNull final ProcessorConfig processorConfig, @NonNull final RetrievalConfig retrievalConfig) { + this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, + processorConfig, retrievalConfig, new DiagnosticEventFactory()); + } + + /** + * Customers do not currently have the ability to customize the DiagnosticEventFactory, but this visibility + * is desired for testing. This constructor is only used for testing to provide a mock DiagnosticEventFactory. + */ + @VisibleForTesting + protected Scheduler(@NonNull final CheckpointConfig checkpointConfig, + @NonNull final CoordinatorConfig coordinatorConfig, + @NonNull final LeaseManagementConfig leaseManagementConfig, + @NonNull final LifecycleConfig lifecycleConfig, + @NonNull final MetricsConfig metricsConfig, + @NonNull final ProcessorConfig processorConfig, + @NonNull final RetrievalConfig retrievalConfig, + @NonNull final DiagnosticEventFactory diagnosticEventFactory) { this.checkpointConfig = checkpointConfig; this.coordinatorConfig = coordinatorConfig; this.leaseManagementConfig = leaseManagementConfig; @@ -167,6 +187,8 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis(); this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis(); this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); + this.diagnosticEventFactory = diagnosticEventFactory; + this.diagnosticEventHandler = new DiagnosticEventLogger(); this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() .createShardSyncTaskManager(this.metricsFactory); @@ -212,7 +234,7 @@ public void run() { try { initialize(); log.info("Initialization complete. Starting worker loop."); - } catch (RuntimeException e) { + } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e); workerStateChangeListener.onAllInitializationAttemptsFailed(e); shutdown(); @@ -226,8 +248,10 @@ public void run() { log.info("Worker loop is complete. Exiting from worker."); } - private void initialize() { + @VisibleForTesting + void initialize() { synchronized (lock) { + registerErrorHandlerForUndeliverableAsyncTaskExceptions(); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; Exception lastException = null; @@ -305,6 +329,7 @@ void runProcessLoop() { // clean up shard consumers for unassigned shards cleanupShardConsumers(assignedShards); + logExecutorState(); slog.info("Sleeping ..."); Thread.sleep(shardConsumerDispatchPollIntervalMillis); } catch (Exception e) { @@ -612,6 +637,25 @@ void cleanupShardConsumers(Set assignedShards) { } } + /** + * Exceptions in the RxJava layer can fail silently unless an error handler is set to propagate these exceptions + * back to the KCL, as is done below. + */ + private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() { + RxJavaPlugins.setErrorHandler(t -> { + ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService, + leaseCoordinator); + RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.rejectedTaskEvent(executorStateEvent, t); + rejectedTaskEvent.accept(diagnosticEventHandler); + }); + } + + private void logExecutorState() { + ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService, + leaseCoordinator); + executorStateEvent.accept(diagnosticEventHandler); + } + /** * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java new file mode 100644 index 000000000..d6098cca6 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -0,0 +1,167 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseBuilder; +import software.amazon.kinesis.leases.LeaseCoordinator; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Slf4j +@RunWith(MockitoJUnitRunner.class) +public class DiagnosticEventsTest { + @Mock + private ThreadPoolExecutor executor; + @Mock + private LeaseCoordinator leaseCoordinator; + @Mock + private DiagnosticEventHandler defaultHandler; + + private DiagnosticEventHandler customHandler = new CustomHandler(); + private boolean wasCustomHandlerInvoked; + + private final Throwable throwable = new TestRejectedTaskException(); + + private final int activeThreadCount = 2; + private final int corePoolSize = 4; + private final int largestPoolSize = 8; + private final int maximumPoolSize = 16; + + private SynchronousQueue executorQueue; + private Collection leaseAssignments; + + @Before + public void setup() { + wasCustomHandlerInvoked = false; + + executorQueue = new SynchronousQueue<>(); + + final Lease lease = new LeaseBuilder().build(); + leaseAssignments = Collections.singletonList(lease); + + when(executor.getQueue()).thenReturn(executorQueue); + when(executor.getActiveCount()).thenReturn(activeThreadCount); + when(executor.getCorePoolSize()).thenReturn(corePoolSize); + when(executor.getLargestPoolSize()).thenReturn(largestPoolSize); + when(executor.getMaximumPoolSize()).thenReturn(maximumPoolSize); + when(leaseCoordinator.getAssignments()).thenReturn(leaseAssignments); + } + + @Test + public void testExecutorStateEvent() { + ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator); + event.accept(defaultHandler); + + assertEquals(event.getActiveThreads(), activeThreadCount); + assertEquals(event.getCoreThreads(), corePoolSize); + assertEquals(event.getLargestPoolSize(), largestPoolSize); + assertEquals(event.getMaximumPoolSize(), maximumPoolSize); + assertEquals(event.getLeasesOwned(), leaseAssignments.size()); + assertEquals(event.getCurrentQueueSize(),0); + + verify(defaultHandler, times(1)).visit(event); + } + + @Test + public void testExecutorStateEventWithCustomHandler() { + ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator); + event.accept(customHandler); + + assertTrue(wasCustomHandlerInvoked); + } + + @Test + public void testRejectedTaskEvent() { + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executor, leaseCoordinator); + RejectedTaskEvent event = new RejectedTaskEvent(executorStateEvent, throwable); + event.accept(defaultHandler); + + assertEquals(event.getExecutorStateEvent().getActiveThreads(), activeThreadCount); + assertEquals(event.getExecutorStateEvent().getCoreThreads(), corePoolSize); + assertEquals(event.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); + assertEquals(event.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize); + assertEquals(event.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size()); + assertEquals(event.getExecutorStateEvent().getCurrentQueueSize(),0); + assertTrue(event.getThrowable() instanceof TestRejectedTaskException); + + verify(defaultHandler, times(1)).visit(event); + } + + @Test + public void testRejectedTaskEventWithCustomHandler() { + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executor, leaseCoordinator); + RejectedTaskEvent event = new RejectedTaskEvent(executorStateEvent, throwable); + customHandler = new CustomHandler(); + event.accept(customHandler); + + assertTrue(wasCustomHandlerInvoked); + } + + @Test + public void testDiagnosticEventFactory() { + DiagnosticEventFactory factory = new DiagnosticEventFactory(); + + ExecutorStateEvent executorStateEvent = factory.executorStateEvent(executor, leaseCoordinator); + assertEquals(executorStateEvent.getActiveThreads(), activeThreadCount); + assertEquals(executorStateEvent.getCoreThreads(), corePoolSize); + assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize); + assertEquals(executorStateEvent.getMaximumPoolSize(), maximumPoolSize); + assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size()); + assertEquals(executorStateEvent.getCurrentQueueSize(),0); + + RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(executorStateEvent, + new TestRejectedTaskException()); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getActiveThreads(), activeThreadCount); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCoreThreads(), corePoolSize); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size()); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize(),0); + assertTrue(rejectedTaskEvent.getThrowable() instanceof TestRejectedTaskException); + } + + private class TestRejectedTaskException extends Exception { + private TestRejectedTaskException() { super(); } + } + + private class CustomHandler implements DiagnosticEventHandler { + @Override + public void visit(ExecutorStateEvent event) { + wasCustomHandlerInvoked = true; + } + + @Override + public void visit(RejectedTaskEvent event) { + wasCustomHandlerInvoked = true; + } + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index e051be245..8be1bb8fc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -24,7 +24,9 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -35,7 +37,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import io.reactivex.plugins.RxJavaPlugins; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -269,6 +273,38 @@ public final void testSchedulerShutdown() { verify(workerStateChangeListener, times(1)).onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } + @Test + public void testErrorHandlerForUndeliverableAsyncTaskExceptions() { + DiagnosticEventFactory eventFactory = mock(DiagnosticEventFactory.class); + ExecutorStateEvent executorStateEvent = mock(ExecutorStateEvent.class); + RejectedTaskEvent rejectedTaskEvent = mock(RejectedTaskEvent.class); + + when(eventFactory.rejectedTaskEvent(any(), any())).thenReturn(rejectedTaskEvent); + when(eventFactory.executorStateEvent(any(), any())).thenReturn(executorStateEvent); + + Scheduler testScheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, + lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, eventFactory); + + Scheduler schedulerSpy = spy(testScheduler); + + // reject task on third loop + doCallRealMethod() + .doCallRealMethod() + .doAnswer(invocation -> { + // trigger rejected task in RxJava layer + RxJavaPlugins.onError(new RejectedExecutionException("Test exception.")); + return null; + }).when(schedulerSpy).runProcessLoop(); + + // Scheduler sets error handler in initialize method + schedulerSpy.initialize(); + schedulerSpy.runProcessLoop(); + schedulerSpy.runProcessLoop(); + schedulerSpy.runProcessLoop(); + + verify(eventFactory, times(1)).rejectedTaskEvent(eq(executorStateEvent), any()); + verify(rejectedTaskEvent, times(1)).accept(any()); + } /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { final int numberOfRecordsPerShard = 10;