From 9f1768b93bef1f6d4ba0ee514dea343f73a5f982 Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Mon, 24 Jun 2019 20:28:20 -0700 Subject: [PATCH 1/9] Add diagnostic events for logging visibility --- .../coordinator/CoordinatorConfig.java | 6 + .../DefaultDiagnosticEventHandler.java | 34 ++++ .../kinesis/coordinator/DiagnosticEvent.java | 30 ++++ .../coordinator/DiagnosticEventHandler.java | 35 ++++ .../coordinator/ExecutorStateEvent.java | 55 +++++++ .../coordinator/RejectedTaskEvent.java | 39 +++++ .../amazon/kinesis/coordinator/Scheduler.java | 34 ++++ .../coordinator/DiagnosticEventsTest.java | 152 ++++++++++++++++++ 8 files changed, 385 insertions(+) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DefaultDiagnosticEventHandler.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index 6240eda9b..54f90844f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -90,4 +90,10 @@ public class CoordinatorConfig { private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); + /** + * The number of milliseconds to sleep in between logging the executor state. + * + *

Default value: 30,000 milliseconds

+ */ + private long executorDiagnosticsDaemonSleepTimeMillis = 30000L; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DefaultDiagnosticEventHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DefaultDiagnosticEventHandler.java new file mode 100644 index 000000000..dd4d86d48 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DefaultDiagnosticEventHandler.java @@ -0,0 +1,34 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +@NoArgsConstructor +@Slf4j +@KinesisClientInternalApi +public class DefaultDiagnosticEventHandler implements DiagnosticEventHandler { + @Override + public void visit(ExecutorStateEvent event) { + log.info("Current state of thread pool executor: {}", event); + } + + @Override + public void visit(RejectedTaskEvent event) { + log.info("Undeliverable task exception: {}", event); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java new file mode 100644 index 000000000..1e7f3815d --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +/** + * An interface to implement various types of stateful events that can be used for diagnostics throughout the KCL. + */ +@KinesisClientInternalApi +public interface DiagnosticEvent { + /** + * DiagnosticEvent is part of a visitor pattern along with DiagnosticEventHandler visitors. + * + * @param visitor A handler that that controls the behavior of the DiagnosticEvent when invoked. + */ + void accept(DiagnosticEventHandler visitor); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java new file mode 100644 index 000000000..5da3cc371 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +/** + * An interface to implement behaviors associated with a {@link DiagnosticEvent}. Uses the visitor pattern to visit + * the DiagnosticEvent when the behavior is desired. A default implementation that performs simple logging is found in + * {@link DefaultDiagnosticEventHandler}. + */ +@KinesisClientInternalApi +public interface DiagnosticEventHandler { + /** + * @param event Log or otherwise react to periodic pulses on the thread pool executor state. + */ + void visit(ExecutorStateEvent event); + + /** + * @param event Log or otherwise react to rejected tasks in the RxJavaPlugin layer. + */ + void visit(RejectedTaskEvent event); +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java new file mode 100644 index 000000000..6663cc8aa --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.leases.LeaseCoordinator; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +@Getter +@ToString +@Slf4j +@KinesisClientInternalApi +public class ExecutorStateEvent implements DiagnosticEvent { + private int currentQueueSize; + private int activeThreads; + private int coreThreads; + private int leasesOwned; + private int largestPoolSize; + private int maximumPoolSize; + + public ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) { + if (executor instanceof ThreadPoolExecutor) { + ThreadPoolExecutor ex = (ThreadPoolExecutor) executor; + this.currentQueueSize = ex.getQueue().size(); + this.activeThreads = ex.getActiveCount(); + this.coreThreads = ex.getCorePoolSize(); + this.largestPoolSize = ex.getLargestPoolSize(); + this.maximumPoolSize = ex.getMaximumPoolSize(); + } + + this.leasesOwned = leaseCoordinator.getAssignments().size(); + } + + @Override + public void accept(DiagnosticEventHandler visitor) { + visitor.visit(this); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java new file mode 100644 index 000000000..af4ab0e64 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import lombok.Getter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; +import software.amazon.kinesis.leases.LeaseCoordinator; + +import java.util.concurrent.ExecutorService; + +@Getter +@ToString +@Slf4j +@KinesisClientInternalApi +public class RejectedTaskEvent extends ExecutorStateEvent { + private Throwable throwable; + + public RejectedTaskEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator, Throwable throwable) { + super(executor, leaseCoordinator); + this.throwable = throwable; + } + + @Override + public void accept(DiagnosticEventHandler visitor) { visitor.visit(this); } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index b1293f1ce..e975655bb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting; +import io.reactivex.plugins.RxJavaPlugins; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -95,6 +96,8 @@ public class Scheduler implements Runnable { // parent shards private final long parentShardPollIntervalMillis; private final ExecutorService executorService; + private final DiagnosticEventHandler diagnosticEventHandler; + private final long executorDiagnosticsDaemonSleepTimeMillis; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; @@ -167,6 +170,9 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis(); this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis(); this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); + this.diagnosticEventHandler = new DefaultDiagnosticEventHandler(); + this.executorDiagnosticsDaemonSleepTimeMillis = + this.coordinatorConfig.executorDiagnosticsDaemonSleepTimeMillis(); this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() .createShardSyncTaskManager(this.metricsFactory); @@ -228,6 +234,8 @@ public void run() { private void initialize() { synchronized (lock) { + startExecutorDiagnosticsDaemon(); + registerErrorHandlerForUndeliverableAsyncTaskExceptions(); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; Exception lastException = null; @@ -612,6 +620,32 @@ void cleanupShardConsumers(Set assignedShards) { } } + private void startExecutorDiagnosticsDaemon() { + log.info("Starting executor diagnostics daemon."); + + Thread diagnosticsThread = new Thread(new ThreadGroup("Diagnostics"), () -> { + while (true) { + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); + executorStateEvent.accept(diagnosticEventHandler); + try { + Thread.sleep(executorDiagnosticsDaemonSleepTimeMillis); + } catch (InterruptedException e) { + log.error("Executor diagnostics thread interrupted", e); + } + } + }); + + diagnosticsThread.setDaemon(true); + diagnosticsThread.start(); + } + + private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() { + RxJavaPlugins.setErrorHandler(t -> { + RejectedTaskEvent rejectedTaskEvent = new RejectedTaskEvent(executorService, leaseCoordinator, t); + rejectedTaskEvent.accept(diagnosticEventHandler); + }); + } + /** * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java new file mode 100644 index 000000000..855d97dbb --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -0,0 +1,152 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.coordinator; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseBuilder; +import software.amazon.kinesis.leases.LeaseCoordinator; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Slf4j +@RunWith(MockitoJUnitRunner.class) +public class DiagnosticEventsTest { + @Mock + private ThreadPoolExecutor executor; + @Mock + private LeaseCoordinator leaseCoordinator; + @Mock + private DiagnosticEventHandler defaultHandler; + + private DiagnosticEventHandler customHandler; + private boolean wasCustomHandlerInvoked; + + private Throwable throwable; + + private int activeThreadCount; + private int corePoolSize; + private int largestPoolSize; + private int maximumPoolSize; + + private SynchronousQueue executorQueue; + private Collection leaseAssignments; + + @Before + public final void setup() { + customHandler = new CustomHandler(); + wasCustomHandlerInvoked = false; + + throwable = new TestRejectedTaskException(); + + activeThreadCount = 2; + corePoolSize = 4; + largestPoolSize = 8; + maximumPoolSize = 16; + + executorQueue = new SynchronousQueue<>(); + + final Lease lease = new LeaseBuilder().build(); + leaseAssignments = Collections.singletonList(lease); + + when(executor.getQueue()).thenReturn(executorQueue); + when(executor.getActiveCount()).thenReturn(activeThreadCount); + when(executor.getCorePoolSize()).thenReturn(corePoolSize); + when(executor.getLargestPoolSize()).thenReturn(largestPoolSize); + when(executor.getMaximumPoolSize()).thenReturn(maximumPoolSize); + when(leaseCoordinator.getAssignments()).thenReturn(leaseAssignments); + } + + @Test + public final void testExecutorStateEventWithDefaultHandler() { + ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator); + event.accept(defaultHandler); + + assertEquals(event.getActiveThreads(), activeThreadCount); + assertEquals(event.getCoreThreads(), corePoolSize); + assertEquals(event.getLargestPoolSize(), largestPoolSize); + assertEquals(event.getMaximumPoolSize(), maximumPoolSize); + assertEquals(event.getLeasesOwned(), leaseAssignments.size()); + assertEquals(event.getCurrentQueueSize(),0); + + verify(defaultHandler, times(1)).visit(event); + } + + @Test + public final void testExecutorStateEventWithCustomHandler() { + ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator); + event.accept(customHandler); + + assertTrue(wasCustomHandlerInvoked); + wasCustomHandlerInvoked = false; + } + + @Test + public final void testRejectedTaskEventWithDefaultHandler() { + RejectedTaskEvent event = new RejectedTaskEvent(executor, leaseCoordinator, throwable); + event.accept(defaultHandler); + + assertEquals(event.getActiveThreads(), activeThreadCount); + assertEquals(event.getCoreThreads(), corePoolSize); + assertEquals(event.getLargestPoolSize(), largestPoolSize); + assertEquals(event.getMaximumPoolSize(), maximumPoolSize); + assertEquals(event.getLeasesOwned(), leaseAssignments.size()); + assertEquals(event.getCurrentQueueSize(),0); + System.out.println(event.getThrowable()); + assertTrue(event.getThrowable() instanceof TestRejectedTaskException); + + verify(defaultHandler, times(1)).visit(event); + } + + @Test + public final void testRejectedTaskEventWithCustomHandler() { + RejectedTaskEvent event = new RejectedTaskEvent(executor, leaseCoordinator, throwable); + customHandler = new CustomHandler(); + event.accept(customHandler); + + assertTrue(wasCustomHandlerInvoked); + wasCustomHandlerInvoked = false; + } + + private class TestRejectedTaskException extends Exception { + private TestRejectedTaskException() { super(); } + } + + private class CustomHandler implements DiagnosticEventHandler { + @Override + public void visit(ExecutorStateEvent event) { + wasCustomHandlerInvoked = true; + } + + @Override + public void visit(RejectedTaskEvent event) { + wasCustomHandlerInvoked = true; + } + } +} From 3ccf6ecaeb392dbb583be99d0a08f47efe2f3081 Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Tue, 25 Jun 2019 21:33:25 -0700 Subject: [PATCH 2/9] Refactor logging diagnostics into main Scheduler loop --- .../coordinator/CoordinatorConfig.java | 7 -- .../kinesis/coordinator/DiagnosticEvent.java | 10 ++- .../coordinator/DiagnosticEventHandler.java | 5 +- ...andler.java => DiagnosticEventLogger.java} | 16 +++- .../coordinator/ExecutorStateEvent.java | 9 ++- .../coordinator/RejectedTaskEvent.java | 22 ++++-- .../amazon/kinesis/coordinator/Scheduler.java | 76 ++++++++++++------- .../coordinator/DiagnosticEventsTest.java | 51 +++++-------- .../kinesis/coordinator/SchedulerTest.java | 15 ++++ 9 files changed, 128 insertions(+), 83 deletions(-) rename amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/{DefaultDiagnosticEventHandler.java => DiagnosticEventLogger.java} (68%) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index 54f90844f..ebd891efe 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -89,11 +89,4 @@ public class CoordinatorConfig { private GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); - - /** - * The number of milliseconds to sleep in between logging the executor state. - * - *

Default value: 30,000 milliseconds

- */ - private long executorDiagnosticsDaemonSleepTimeMillis = 30000L; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java index 1e7f3815d..a71535865 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java @@ -14,17 +14,19 @@ */ package software.amazon.kinesis.coordinator; -import software.amazon.kinesis.annotations.KinesisClientInternalApi; - /** * An interface to implement various types of stateful events that can be used for diagnostics throughout the KCL. */ -@KinesisClientInternalApi public interface DiagnosticEvent { /** - * DiagnosticEvent is part of a visitor pattern along with DiagnosticEventHandler visitors. + * DiagnosticEvent is part of a visitor pattern and it accepts DiagnosticEventHandler visitors. * * @param visitor A handler that that controls the behavior of the DiagnosticEvent when invoked. */ void accept(DiagnosticEventHandler visitor); + + /** + * The string to output to logs when a DiagnosticEvent occurs. + */ + String message(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java index 5da3cc371..10e2322fd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java @@ -14,14 +14,11 @@ */ package software.amazon.kinesis.coordinator; -import software.amazon.kinesis.annotations.KinesisClientInternalApi; - /** * An interface to implement behaviors associated with a {@link DiagnosticEvent}. Uses the visitor pattern to visit * the DiagnosticEvent when the behavior is desired. A default implementation that performs simple logging is found in - * {@link DefaultDiagnosticEventHandler}. + * {@link DiagnosticEventLogger}. */ -@KinesisClientInternalApi public interface DiagnosticEventHandler { /** * @param event Log or otherwise react to periodic pulses on the thread pool executor state. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DefaultDiagnosticEventHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java similarity index 68% rename from amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DefaultDiagnosticEventHandler.java rename to amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java index dd4d86d48..004d13485 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DefaultDiagnosticEventHandler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java @@ -18,17 +18,27 @@ import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.annotations.KinesisClientInternalApi; +/** + * Internal implementation of {@link DiagnosticEventHandler} used by {@link Scheduler} to log executor state both + * 1) in normal conditions periodically, and 2) in reaction to rejected task exceptions. + */ @NoArgsConstructor @Slf4j @KinesisClientInternalApi -public class DefaultDiagnosticEventHandler implements DiagnosticEventHandler { +public class DiagnosticEventLogger implements DiagnosticEventHandler { + /** + * {@inheritDoc} + */ @Override public void visit(ExecutorStateEvent event) { - log.info("Current state of thread pool executor: {}", event); + log.info(event.message()); } + /** + * {@inheritDoc} + */ @Override public void visit(RejectedTaskEvent event) { - log.info("Undeliverable task exception: {}", event); + log.error(event.message(), event.getThrowable()); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java index 6663cc8aa..c395280ee 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -24,10 +24,12 @@ import java.util.concurrent.ThreadPoolExecutor; @Getter -@ToString +@ToString(exclude = "MESSAGE") @Slf4j @KinesisClientInternalApi public class ExecutorStateEvent implements DiagnosticEvent { + private final String MESSAGE = "Current thread pool executor state: "; + private int currentQueueSize; private int activeThreads; private int coreThreads; @@ -52,4 +54,9 @@ public ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordi public void accept(DiagnosticEventHandler visitor) { visitor.visit(this); } + + @Override + public String message() { + return MESSAGE + this.toString(); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java index af4ab0e64..981fd26a8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java @@ -18,22 +18,30 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.annotations.KinesisClientInternalApi; -import software.amazon.kinesis.leases.LeaseCoordinator; - -import java.util.concurrent.ExecutorService; @Getter -@ToString +@ToString(exclude = "MESSAGE") @Slf4j @KinesisClientInternalApi -public class RejectedTaskEvent extends ExecutorStateEvent { +public class RejectedTaskEvent implements DiagnosticEvent { + private final String MESSAGE = "Unexpected task rejection occurred. This could possibly " + + "be an issue or a bug. Please search for the exception/error online to check what is " + + "going on. If the issue persists or is a recurring problem, feel free to open an issue " + + "at https://github.com/awslabs/amazon-kinesis-client. "; + + private ExecutorStateEvent executorStateEvent; private Throwable throwable; - public RejectedTaskEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator, Throwable throwable) { - super(executor, leaseCoordinator); + public RejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable throwable) { + this.executorStateEvent = executorStateEvent; this.throwable = throwable; } @Override public void accept(DiagnosticEventHandler visitor) { visitor.visit(this); } + + @Override + public String message() { + return MESSAGE + executorStateEvent.message(); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e975655bb..b93167f08 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; @@ -70,6 +71,8 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; +import javax.annotation.Nullable; + /** * */ @@ -97,7 +100,6 @@ public class Scheduler implements Runnable { private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final DiagnosticEventHandler diagnosticEventHandler; - private final long executorDiagnosticsDaemonSleepTimeMillis; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; private final ShardSyncTaskManager shardSyncTaskManager; @@ -129,6 +131,9 @@ public class Scheduler implements Runnable { private final Object lock = new Object(); + private final long executorLogIntervalMillis = TimeUnit.SECONDS.toMillis(30); + private long nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis; + /** * Used to ensure that only one requestedShutdown is in progress at a time. */ @@ -170,9 +175,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis(); this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis(); this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); - this.diagnosticEventHandler = new DefaultDiagnosticEventHandler(); - this.executorDiagnosticsDaemonSleepTimeMillis = - this.coordinatorConfig.executorDiagnosticsDaemonSleepTimeMillis(); + this.diagnosticEventHandler = new DiagnosticEventLogger(); this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() .createShardSyncTaskManager(this.metricsFactory); @@ -218,7 +221,7 @@ public void run() { try { initialize(); log.info("Initialization complete. Starting worker loop."); - } catch (RuntimeException e) { + } catch (RuntimeException e) { log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e); workerStateChangeListener.onAllInitializationAttemptsFailed(e); shutdown(); @@ -234,8 +237,7 @@ public void run() { private void initialize() { synchronized (lock) { - startExecutorDiagnosticsDaemon(); - registerErrorHandlerForUndeliverableAsyncTaskExceptions(); + registerErrorHandlerForUndeliverableAsyncTaskExceptions(null); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; Exception lastException = null; @@ -313,6 +315,7 @@ void runProcessLoop() { // clean up shard consumers for unassigned shards cleanupShardConsumers(assignedShards); + logExecutorState(); slog.info("Sleeping ..."); Thread.sleep(shardConsumerDispatchPollIntervalMillis); } catch (Exception e) { @@ -620,30 +623,49 @@ void cleanupShardConsumers(Set assignedShards) { } } - private void startExecutorDiagnosticsDaemon() { - log.info("Starting executor diagnostics daemon."); + /** + * Exceptions in the RxJava layer can fail silently unless an error handler is set to propagate these exceptions + * back to the KCL, as is done below. This method is internal to the class and has package access solely + * for testing. + * + * @param handler This method accepts a handler param solely for testing. All non-test calls to this method will + * have a null input. + */ + @VisibleForTesting + void registerErrorHandlerForUndeliverableAsyncTaskExceptions(@Nullable Consumer handler) { + if (handler == null) { + RxJavaPlugins.setErrorHandler(t -> { + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); + RejectedTaskEvent rejectedTaskEvent = new RejectedTaskEvent(executorStateEvent, t); + rejectedTaskEvent.accept(diagnosticEventHandler); + }); + } else { + RxJavaPlugins.setErrorHandler(t -> handler.accept(t)); + } + } - Thread diagnosticsThread = new Thread(new ThreadGroup("Diagnostics"), () -> { - while (true) { - ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); - executorStateEvent.accept(diagnosticEventHandler); - try { - Thread.sleep(executorDiagnosticsDaemonSleepTimeMillis); - } catch (InterruptedException e) { - log.error("Executor diagnostics thread interrupted", e); - } + private void logExecutorState() { + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); + DiagnosticEventHandler debugHandler = new DiagnosticEventHandler() { + @Override + public void visit(ExecutorStateEvent event) { + log.debug(event.message()); } - }); - diagnosticsThread.setDaemon(true); - diagnosticsThread.start(); - } + @Override + public void visit(RejectedTaskEvent event) { + // no op as RejectedTaskEvents get handled at Error level by diagnosticEventHandler + } + }; - private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() { - RxJavaPlugins.setErrorHandler(t -> { - RejectedTaskEvent rejectedTaskEvent = new RejectedTaskEvent(executorService, leaseCoordinator, t); - rejectedTaskEvent.accept(diagnosticEventHandler); - }); + // only log at info level (behavior of diagnosticEventHandler) every 30s to avoid over-logging, else log at + // debug level + if (System.currentTimeMillis() >= nextExecutorLogTime) { + executorStateEvent.accept(diagnosticEventHandler); + nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis; + } else { + executorStateEvent.accept(debugHandler); + } } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java index 855d97dbb..2f879421e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -45,31 +45,23 @@ public class DiagnosticEventsTest { @Mock private DiagnosticEventHandler defaultHandler; - private DiagnosticEventHandler customHandler; + private DiagnosticEventHandler customHandler = new CustomHandler();; private boolean wasCustomHandlerInvoked; - private Throwable throwable; + private final Throwable throwable = new TestRejectedTaskException(); - private int activeThreadCount; - private int corePoolSize; - private int largestPoolSize; - private int maximumPoolSize; + private final int activeThreadCount = 2; + private final int corePoolSize = 4; + private final int largestPoolSize = 8; + private final int maximumPoolSize = 16; private SynchronousQueue executorQueue; private Collection leaseAssignments; @Before - public final void setup() { - customHandler = new CustomHandler(); + public void setup() { wasCustomHandlerInvoked = false; - throwable = new TestRejectedTaskException(); - - activeThreadCount = 2; - corePoolSize = 4; - largestPoolSize = 8; - maximumPoolSize = 16; - executorQueue = new SynchronousQueue<>(); final Lease lease = new LeaseBuilder().build(); @@ -84,7 +76,7 @@ public final void setup() { } @Test - public final void testExecutorStateEventWithDefaultHandler() { + public void testExecutorStateEventWithDefaultHandler() { ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator); event.accept(defaultHandler); @@ -99,39 +91,38 @@ public final void testExecutorStateEventWithDefaultHandler() { } @Test - public final void testExecutorStateEventWithCustomHandler() { + public void testExecutorStateEventWithCustomHandler() { ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator); event.accept(customHandler); assertTrue(wasCustomHandlerInvoked); - wasCustomHandlerInvoked = false; } @Test - public final void testRejectedTaskEventWithDefaultHandler() { - RejectedTaskEvent event = new RejectedTaskEvent(executor, leaseCoordinator, throwable); + public void testRejectedTaskEventWithDefaultHandler() { + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executor, leaseCoordinator); + RejectedTaskEvent event = new RejectedTaskEvent(executorStateEvent, throwable); event.accept(defaultHandler); - assertEquals(event.getActiveThreads(), activeThreadCount); - assertEquals(event.getCoreThreads(), corePoolSize); - assertEquals(event.getLargestPoolSize(), largestPoolSize); - assertEquals(event.getMaximumPoolSize(), maximumPoolSize); - assertEquals(event.getLeasesOwned(), leaseAssignments.size()); - assertEquals(event.getCurrentQueueSize(),0); - System.out.println(event.getThrowable()); + assertEquals(event.getExecutorStateEvent().getActiveThreads(), activeThreadCount); + assertEquals(event.getExecutorStateEvent().getCoreThreads(), corePoolSize); + assertEquals(event.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); + assertEquals(event.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize); + assertEquals(event.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size()); + assertEquals(event.getExecutorStateEvent().getCurrentQueueSize(),0); assertTrue(event.getThrowable() instanceof TestRejectedTaskException); verify(defaultHandler, times(1)).visit(event); } @Test - public final void testRejectedTaskEventWithCustomHandler() { - RejectedTaskEvent event = new RejectedTaskEvent(executor, leaseCoordinator, throwable); + public void testRejectedTaskEventWithCustomHandler() { + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executor, leaseCoordinator); + RejectedTaskEvent event = new RejectedTaskEvent(executorStateEvent, throwable); customHandler = new CustomHandler(); event.accept(customHandler); assertTrue(wasCustomHandlerInvoked); - wasCustomHandlerInvoked = false; } private class TestRejectedTaskException extends Exception { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index e051be245..fed0847e0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -35,7 +35,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import io.reactivex.plugins.RxJavaPlugins; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -269,6 +273,17 @@ public final void testSchedulerShutdown() { verify(workerStateChangeListener, times(1)).onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN); } + @Test + public final void testErrorHandlerForUndeliverableAsyncTaskExceptions() { + AtomicBoolean wasHandlerInvoked = new AtomicBoolean(false); + Consumer testHandler = t -> wasHandlerInvoked.compareAndSet(false, true); + + scheduler.registerErrorHandlerForUndeliverableAsyncTaskExceptions(testHandler); + + // trigger rejected task in RxJava layer + RxJavaPlugins.onError(new RejectedExecutionException("Test exception.")); + assertTrue(wasHandlerInvoked.get()); + } /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { final int numberOfRecordsPerShard = 10; From de78716d4549ad432df51ebd72f0fd6a359f8b5f Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Wed, 26 Jun 2019 12:31:56 -0700 Subject: [PATCH 3/9] Refactor log timing and level and change privacies --- .../kinesis/coordinator/DiagnosticEvent.java | 2 +- .../coordinator/DiagnosticEventHandler.java | 2 +- .../coordinator/DiagnosticEventLogger.java | 14 +++++++-- .../coordinator/ExecutorStateEvent.java | 19 +++++------- .../coordinator/RejectedTaskEvent.java | 14 ++++----- .../amazon/kinesis/coordinator/Scheduler.java | 31 ++++--------------- .../coordinator/DiagnosticEventsTest.java | 2 +- .../kinesis/coordinator/SchedulerTest.java | 17 ++++++++-- 8 files changed, 49 insertions(+), 52 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java index a71535865..0416f68c7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java @@ -17,7 +17,7 @@ /** * An interface to implement various types of stateful events that can be used for diagnostics throughout the KCL. */ -public interface DiagnosticEvent { +interface DiagnosticEvent { /** * DiagnosticEvent is part of a visitor pattern and it accepts DiagnosticEventHandler visitors. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java index 10e2322fd..a7a0a2ef5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java @@ -19,7 +19,7 @@ * the DiagnosticEvent when the behavior is desired. A default implementation that performs simple logging is found in * {@link DiagnosticEventLogger}. */ -public interface DiagnosticEventHandler { +interface DiagnosticEventHandler { /** * @param event Log or otherwise react to periodic pulses on the thread pool executor state. */ diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java index 004d13485..813523c52 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java @@ -25,13 +25,23 @@ @NoArgsConstructor @Slf4j @KinesisClientInternalApi -public class DiagnosticEventLogger implements DiagnosticEventHandler { +class DiagnosticEventLogger implements DiagnosticEventHandler { + private static final long executorLogIntervalMillis = 30000L; + private long nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis; + /** * {@inheritDoc} + * + * Only log at info level every 30s to avoid over-logging, else log at debug level */ @Override public void visit(ExecutorStateEvent event) { - log.info(event.message()); + if (System.currentTimeMillis() >= nextExecutorLogTime) { + log.info(event.message()); + nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis; + } else { + log.debug(event.message()); + } } /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java index c395280ee..b491e5fa1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -20,14 +20,13 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.LeaseCoordinator; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @Getter @ToString(exclude = "MESSAGE") @Slf4j @KinesisClientInternalApi -public class ExecutorStateEvent implements DiagnosticEvent { +class ExecutorStateEvent implements DiagnosticEvent { private final String MESSAGE = "Current thread pool executor state: "; private int currentQueueSize; @@ -37,16 +36,12 @@ public class ExecutorStateEvent implements DiagnosticEvent { private int largestPoolSize; private int maximumPoolSize; - public ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) { - if (executor instanceof ThreadPoolExecutor) { - ThreadPoolExecutor ex = (ThreadPoolExecutor) executor; - this.currentQueueSize = ex.getQueue().size(); - this.activeThreads = ex.getActiveCount(); - this.coreThreads = ex.getCorePoolSize(); - this.largestPoolSize = ex.getLargestPoolSize(); - this.maximumPoolSize = ex.getMaximumPoolSize(); - } - + ExecutorStateEvent(ThreadPoolExecutor executor, LeaseCoordinator leaseCoordinator) { + this.currentQueueSize = executor.getQueue().size(); + this.activeThreads = executor.getActiveCount(); + this.coreThreads = executor.getCorePoolSize(); + this.largestPoolSize = executor.getLargestPoolSize(); + this.maximumPoolSize = executor.getMaximumPoolSize(); this.leasesOwned = leaseCoordinator.getAssignments().size(); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java index 981fd26a8..844ea8a50 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java @@ -23,22 +23,22 @@ @ToString(exclude = "MESSAGE") @Slf4j @KinesisClientInternalApi -public class RejectedTaskEvent implements DiagnosticEvent { - private final String MESSAGE = "Unexpected task rejection occurred. This could possibly " + - "be an issue or a bug. Please search for the exception/error online to check what is " + - "going on. If the issue persists or is a recurring problem, feel free to open an issue " + - "at https://github.com/awslabs/amazon-kinesis-client. "; +class RejectedTaskEvent implements DiagnosticEvent { + private final String MESSAGE = "This is a misconfiguration of the thread pool. We currently " + + "only support the default thread pool configuration, until the next KCL release. "; private ExecutorStateEvent executorStateEvent; private Throwable throwable; - public RejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable throwable) { + RejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable throwable) { this.executorStateEvent = executorStateEvent; this.throwable = throwable; } @Override - public void accept(DiagnosticEventHandler visitor) { visitor.visit(this); } + public void accept(DiagnosticEventHandler visitor) { + visitor.visit(this); + } @Override public String message() { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index b93167f08..54d3ac449 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -131,9 +132,6 @@ public class Scheduler implements Runnable { private final Object lock = new Object(); - private final long executorLogIntervalMillis = TimeUnit.SECONDS.toMillis(30); - private long nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis; - /** * Used to ensure that only one requestedShutdown is in progress at a time. */ @@ -635,7 +633,8 @@ void cleanupShardConsumers(Set assignedShards) { void registerErrorHandlerForUndeliverableAsyncTaskExceptions(@Nullable Consumer handler) { if (handler == null) { RxJavaPlugins.setErrorHandler(t -> { - ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent((ThreadPoolExecutor) executorService, + leaseCoordinator); RejectedTaskEvent rejectedTaskEvent = new RejectedTaskEvent(executorStateEvent, t); rejectedTaskEvent.accept(diagnosticEventHandler); }); @@ -645,27 +644,9 @@ void registerErrorHandlerForUndeliverableAsyncTaskExceptions(@Nullable Consumer< } private void logExecutorState() { - ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); - DiagnosticEventHandler debugHandler = new DiagnosticEventHandler() { - @Override - public void visit(ExecutorStateEvent event) { - log.debug(event.message()); - } - - @Override - public void visit(RejectedTaskEvent event) { - // no op as RejectedTaskEvents get handled at Error level by diagnosticEventHandler - } - }; - - // only log at info level (behavior of diagnosticEventHandler) every 30s to avoid over-logging, else log at - // debug level - if (System.currentTimeMillis() >= nextExecutorLogTime) { - executorStateEvent.accept(diagnosticEventHandler); - nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis; - } else { - executorStateEvent.accept(debugHandler); - } + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent((ThreadPoolExecutor) executorService, + leaseCoordinator); + executorStateEvent.accept(diagnosticEventHandler); } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java index 2f879421e..c1fcc5918 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -45,7 +45,7 @@ public class DiagnosticEventsTest { @Mock private DiagnosticEventHandler defaultHandler; - private DiagnosticEventHandler customHandler = new CustomHandler();; + private DiagnosticEventHandler customHandler = new CustomHandler(); private boolean wasCustomHandlerInvoked; private final Throwable throwable = new TestRejectedTaskException(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index fed0847e0..459a3d713 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -24,7 +24,9 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -44,8 +46,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; @@ -278,10 +282,17 @@ public final void testErrorHandlerForUndeliverableAsyncTaskExceptions() { AtomicBoolean wasHandlerInvoked = new AtomicBoolean(false); Consumer testHandler = t -> wasHandlerInvoked.compareAndSet(false, true); - scheduler.registerErrorHandlerForUndeliverableAsyncTaskExceptions(testHandler); + Scheduler schedulerSpy = spy(scheduler); + + doAnswer(invocation -> { + // trigger rejected task in RxJava layer + RxJavaPlugins.onError(new RejectedExecutionException("Test exception.")); + return null; + }).when(schedulerSpy).runProcessLoop(); + + schedulerSpy.registerErrorHandlerForUndeliverableAsyncTaskExceptions(testHandler); + schedulerSpy.runProcessLoop(); - // trigger rejected task in RxJava layer - RxJavaPlugins.onError(new RejectedExecutionException("Test exception.")); assertTrue(wasHandlerInvoked.get()); } From 2cab2e0fd8c3cc176ec96136f12476abab888782 Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Wed, 26 Jun 2019 13:35:55 -0700 Subject: [PATCH 4/9] Revert ExecutorStateEvent to accept ExecutorService input type --- .../coordinator/ExecutorStateEvent.java | 28 +++++++++++++------ .../amazon/kinesis/coordinator/Scheduler.java | 6 ++-- .../kinesis/coordinator/SchedulerTest.java | 3 -- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java index b491e5fa1..eb6aa6d1b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -20,15 +20,17 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.LeaseCoordinator; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @Getter -@ToString(exclude = "MESSAGE") +@ToString(exclude = {"MESSAGE", "isThreadPoolExecutor"}) @Slf4j @KinesisClientInternalApi class ExecutorStateEvent implements DiagnosticEvent { private final String MESSAGE = "Current thread pool executor state: "; + private boolean isThreadPoolExecutor; private int currentQueueSize; private int activeThreads; private int coreThreads; @@ -36,18 +38,28 @@ class ExecutorStateEvent implements DiagnosticEvent { private int largestPoolSize; private int maximumPoolSize; - ExecutorStateEvent(ThreadPoolExecutor executor, LeaseCoordinator leaseCoordinator) { - this.currentQueueSize = executor.getQueue().size(); - this.activeThreads = executor.getActiveCount(); - this.coreThreads = executor.getCorePoolSize(); - this.largestPoolSize = executor.getLargestPoolSize(); - this.maximumPoolSize = executor.getMaximumPoolSize(); + ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) { + if (executor instanceof ThreadPoolExecutor) { + this.isThreadPoolExecutor = true; + + ThreadPoolExecutor ex = (ThreadPoolExecutor) executor; + this.currentQueueSize = ex.getQueue().size(); + this.activeThreads = ex.getActiveCount(); + this.coreThreads = ex.getCorePoolSize(); + this.largestPoolSize = ex.getLargestPoolSize(); + this.maximumPoolSize = ex.getMaximumPoolSize(); + } + this.leasesOwned = leaseCoordinator.getAssignments().size(); } + @Override public void accept(DiagnosticEventHandler visitor) { - visitor.visit(this); + // logging is only meaningful for a ThreadPoolExecutor executor service (default config) + if (isThreadPoolExecutor) { + visitor.visit(this); + } } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 54d3ac449..54998f0ec 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -633,8 +633,7 @@ void cleanupShardConsumers(Set assignedShards) { void registerErrorHandlerForUndeliverableAsyncTaskExceptions(@Nullable Consumer handler) { if (handler == null) { RxJavaPlugins.setErrorHandler(t -> { - ExecutorStateEvent executorStateEvent = new ExecutorStateEvent((ThreadPoolExecutor) executorService, - leaseCoordinator); + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); RejectedTaskEvent rejectedTaskEvent = new RejectedTaskEvent(executorStateEvent, t); rejectedTaskEvent.accept(diagnosticEventHandler); }); @@ -644,8 +643,7 @@ void registerErrorHandlerForUndeliverableAsyncTaskExceptions(@Nullable Consumer< } private void logExecutorState() { - ExecutorStateEvent executorStateEvent = new ExecutorStateEvent((ThreadPoolExecutor) executorService, - leaseCoordinator); + ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); executorStateEvent.accept(diagnosticEventHandler); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 459a3d713..f993f7b5c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -26,7 +26,6 @@ import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -46,10 +45,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.stubbing.Answer; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; From 3efca58f394806168022b3fe3ede5e3b3c59aea2 Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Wed, 26 Jun 2019 16:21:36 -0700 Subject: [PATCH 5/9] Minor style and messaging fixes --- .../amazon/kinesis/coordinator/CoordinatorConfig.java | 1 + .../amazon/kinesis/coordinator/DiagnosticEvent.java | 1 + .../amazon/kinesis/coordinator/DiagnosticEventHandler.java | 1 + .../amazon/kinesis/coordinator/DiagnosticEventLogger.java | 7 ++++--- .../amazon/kinesis/coordinator/ExecutorStateEvent.java | 3 +++ .../amazon/kinesis/coordinator/RejectedTaskEvent.java | 5 +++-- .../amazon/kinesis/coordinator/DiagnosticEventsTest.java | 1 + .../software/amazon/kinesis/coordinator/SchedulerTest.java | 4 ++-- 8 files changed, 16 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java index ebd891efe..6240eda9b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java @@ -89,4 +89,5 @@ public class CoordinatorConfig { private GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator(); private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory(); + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java index 0416f68c7..c7c5bfce1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEvent.java @@ -12,6 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package software.amazon.kinesis.coordinator; /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java index a7a0a2ef5..fad959447 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventHandler.java @@ -12,6 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package software.amazon.kinesis.coordinator; /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java index 813523c52..cafaac130 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventLogger.java @@ -12,6 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package software.amazon.kinesis.coordinator; import lombok.NoArgsConstructor; @@ -26,8 +27,8 @@ @Slf4j @KinesisClientInternalApi class DiagnosticEventLogger implements DiagnosticEventHandler { - private static final long executorLogIntervalMillis = 30000L; - private long nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis; + private static final long EXECUTOR_LOG_INTERVAL_MILLIS = 30000L; + private long nextExecutorLogTime = System.currentTimeMillis() + EXECUTOR_LOG_INTERVAL_MILLIS; /** * {@inheritDoc} @@ -38,7 +39,7 @@ class DiagnosticEventLogger implements DiagnosticEventHandler { public void visit(ExecutorStateEvent event) { if (System.currentTimeMillis() >= nextExecutorLogTime) { log.info(event.message()); - nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis; + nextExecutorLogTime = System.currentTimeMillis() + EXECUTOR_LOG_INTERVAL_MILLIS; } else { log.debug(event.message()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java index eb6aa6d1b..4732934ae 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -12,6 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package software.amazon.kinesis.coordinator; import lombok.Getter; @@ -31,6 +32,7 @@ class ExecutorStateEvent implements DiagnosticEvent { private final String MESSAGE = "Current thread pool executor state: "; private boolean isThreadPoolExecutor; + private String executorName; private int currentQueueSize; private int activeThreads; private int coreThreads; @@ -43,6 +45,7 @@ class ExecutorStateEvent implements DiagnosticEvent { this.isThreadPoolExecutor = true; ThreadPoolExecutor ex = (ThreadPoolExecutor) executor; + this.executorName = ex.getClass().getSimpleName(); this.currentQueueSize = ex.getQueue().size(); this.activeThreads = ex.getActiveCount(); this.coreThreads = ex.getCorePoolSize(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java index 844ea8a50..6bba9e2ed 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java @@ -12,6 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package software.amazon.kinesis.coordinator; import lombok.Getter; @@ -24,8 +25,8 @@ @Slf4j @KinesisClientInternalApi class RejectedTaskEvent implements DiagnosticEvent { - private final String MESSAGE = "This is a misconfiguration of the thread pool. We currently " + - "only support the default thread pool configuration, until the next KCL release. "; + private final String MESSAGE = "Review your thread configuration to prevent task rejections. " + + "Until next release, KCL will not be resilient to task rejections. "; private ExecutorStateEvent executorStateEvent; private Throwable throwable; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java index c1fcc5918..e194ee57e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -12,6 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package software.amazon.kinesis.coordinator; import lombok.extern.slf4j.Slf4j; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index f993f7b5c..aecdf9b75 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -275,7 +275,7 @@ public final void testSchedulerShutdown() { } @Test - public final void testErrorHandlerForUndeliverableAsyncTaskExceptions() { + public void testErrorHandlerForUndeliverableAsyncTaskExceptions() { AtomicBoolean wasHandlerInvoked = new AtomicBoolean(false); Consumer testHandler = t -> wasHandlerInvoked.compareAndSet(false, true); @@ -287,7 +287,7 @@ public final void testErrorHandlerForUndeliverableAsyncTaskExceptions() { return null; }).when(schedulerSpy).runProcessLoop(); - schedulerSpy.registerErrorHandlerForUndeliverableAsyncTaskExceptions(testHandler); + schedulerSpy.registerErrorHandlerForUndeliverableAsyncTaskExceptions(null); schedulerSpy.runProcessLoop(); assertTrue(wasHandlerInvoked.get()); From 296d5f36ebbb2a3acde0e8c88667540fb4391ecd Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Wed, 26 Jun 2019 19:05:11 -0700 Subject: [PATCH 6/9] Fix failing unit test --- .../java/software/amazon/kinesis/coordinator/Scheduler.java | 1 - .../java/software/amazon/kinesis/coordinator/SchedulerTest.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 54998f0ec..15059131c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index aecdf9b75..3659aba80 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -287,7 +287,7 @@ public void testErrorHandlerForUndeliverableAsyncTaskExceptions() { return null; }).when(schedulerSpy).runProcessLoop(); - schedulerSpy.registerErrorHandlerForUndeliverableAsyncTaskExceptions(null); + schedulerSpy.registerErrorHandlerForUndeliverableAsyncTaskExceptions(testHandler); schedulerSpy.runProcessLoop(); assertTrue(wasHandlerInvoked.get()); From 0f6f38fb7684f0827c2605b274c1e85708e2f481 Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Thu, 27 Jun 2019 16:01:03 -0700 Subject: [PATCH 7/9] Refactor diagnostic events to use factory for testing --- .../coordinator/DiagnosticEventFactory.java | 41 +++++++++++++++ .../coordinator/ExecutorStateEvent.java | 4 +- .../coordinator/RejectedTaskEvent.java | 4 +- .../amazon/kinesis/coordinator/Scheduler.java | 52 +++++++++++-------- .../coordinator/DiagnosticEventsTest.java | 26 +++++++++- .../kinesis/coordinator/SchedulerTest.java | 41 ++++++++++----- 6 files changed, 127 insertions(+), 41 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java new file mode 100644 index 000000000..bb6af23de --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.coordinator; + +import software.amazon.kinesis.leases.LeaseCoordinator; + +import java.util.concurrent.ExecutorService; + +/** + * Holds state and emits {@link DiagnosticEvent}s for logging and visibility + */ +class DiagnosticEventFactory { + private ExecutorService executorService; + private LeaseCoordinator leaseCoordinator; + + DiagnosticEventFactory(ExecutorService executorService, LeaseCoordinator leaseCoordinator) { + this.executorService = executorService; + this.leaseCoordinator = leaseCoordinator; + } + + ExecutorStateEvent emitExecutorStateEvent() { + return new ExecutorStateEvent(executorService, leaseCoordinator); + } + + RejectedTaskEvent emitRejectedTaskEvent(Throwable t) { + return new RejectedTaskEvent(emitExecutorStateEvent(), t); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java index 4732934ae..1c86cfec4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -25,11 +25,11 @@ import java.util.concurrent.ThreadPoolExecutor; @Getter -@ToString(exclude = {"MESSAGE", "isThreadPoolExecutor"}) +@ToString(exclude = "isThreadPoolExecutor") @Slf4j @KinesisClientInternalApi class ExecutorStateEvent implements DiagnosticEvent { - private final String MESSAGE = "Current thread pool executor state: "; + private static final String MESSAGE = "Current thread pool executor state: "; private boolean isThreadPoolExecutor; private String executorName; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java index 6bba9e2ed..d78b64b6c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/RejectedTaskEvent.java @@ -21,11 +21,11 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; @Getter -@ToString(exclude = "MESSAGE") +@ToString @Slf4j @KinesisClientInternalApi class RejectedTaskEvent implements DiagnosticEvent { - private final String MESSAGE = "Review your thread configuration to prevent task rejections. " + + private static final String MESSAGE = "Review your thread configuration to prevent task rejections. " + "Until next release, KCL will not be resilient to task rejections. "; private ExecutorStateEvent executorStateEvent; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 15059131c..8c4e385d1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; @@ -71,8 +70,6 @@ import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RetrievalConfig; -import javax.annotation.Nullable; - /** * */ @@ -99,6 +96,7 @@ public class Scheduler implements Runnable { // parent shards private final long parentShardPollIntervalMillis; private final ExecutorService executorService; + private DiagnosticEventFactory diagnosticEventFactory; private final DiagnosticEventHandler diagnosticEventHandler; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; @@ -172,6 +170,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis(); this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis(); this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); + this.diagnosticEventFactory = new DiagnosticEventFactory(this.executorService, this.leaseCoordinator); this.diagnosticEventHandler = new DiagnosticEventLogger(); this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() @@ -206,6 +205,24 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); } + /** + * Customers do not currently have the ability to customize the DiagnosticEventFactory, but this visibility + * is desired for testing. This constructor is only used for testing to provide a mock DiagnosticEventFactory. + */ + @VisibleForTesting + protected Scheduler(@NonNull final CheckpointConfig checkpointConfig, + @NonNull final CoordinatorConfig coordinatorConfig, + @NonNull final LeaseManagementConfig leaseManagementConfig, + @NonNull final LifecycleConfig lifecycleConfig, + @NonNull final MetricsConfig metricsConfig, + @NonNull final ProcessorConfig processorConfig, + @NonNull final RetrievalConfig retrievalConfig, + @NonNull final DiagnosticEventFactory diagnosticEventFactory) { + this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, + processorConfig, retrievalConfig); + this.diagnosticEventFactory = diagnosticEventFactory; + } + /** * Start consuming data from the stream, and pass it to the application record processors. */ @@ -232,9 +249,10 @@ public void run() { log.info("Worker loop is complete. Exiting from worker."); } - private void initialize() { + @VisibleForTesting + void initialize() { synchronized (lock) { - registerErrorHandlerForUndeliverableAsyncTaskExceptions(null); + registerErrorHandlerForUndeliverableAsyncTaskExceptions(); workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING); boolean isDone = false; Exception lastException = null; @@ -622,27 +640,17 @@ void cleanupShardConsumers(Set assignedShards) { /** * Exceptions in the RxJava layer can fail silently unless an error handler is set to propagate these exceptions - * back to the KCL, as is done below. This method is internal to the class and has package access solely - * for testing. - * - * @param handler This method accepts a handler param solely for testing. All non-test calls to this method will - * have a null input. + * back to the KCL, as is done below. */ - @VisibleForTesting - void registerErrorHandlerForUndeliverableAsyncTaskExceptions(@Nullable Consumer handler) { - if (handler == null) { - RxJavaPlugins.setErrorHandler(t -> { - ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); - RejectedTaskEvent rejectedTaskEvent = new RejectedTaskEvent(executorStateEvent, t); - rejectedTaskEvent.accept(diagnosticEventHandler); - }); - } else { - RxJavaPlugins.setErrorHandler(t -> handler.accept(t)); - } + private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() { + RxJavaPlugins.setErrorHandler(t -> { + RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.emitRejectedTaskEvent(t); + rejectedTaskEvent.accept(diagnosticEventHandler); + }); } private void logExecutorState() { - ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator); + ExecutorStateEvent executorStateEvent = diagnosticEventFactory.emitExecutorStateEvent(); executorStateEvent.accept(diagnosticEventHandler); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java index e194ee57e..4e0e021c4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -77,7 +77,7 @@ public void setup() { } @Test - public void testExecutorStateEventWithDefaultHandler() { + public void testExecutorStateEvent() { ExecutorStateEvent event = new ExecutorStateEvent(executor, leaseCoordinator); event.accept(defaultHandler); @@ -100,7 +100,7 @@ public void testExecutorStateEventWithCustomHandler() { } @Test - public void testRejectedTaskEventWithDefaultHandler() { + public void testRejectedTaskEvent() { ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executor, leaseCoordinator); RejectedTaskEvent event = new RejectedTaskEvent(executorStateEvent, throwable); event.accept(defaultHandler); @@ -126,6 +126,28 @@ public void testRejectedTaskEventWithCustomHandler() { assertTrue(wasCustomHandlerInvoked); } + @Test + public void testDiagnosticEventFactory() { + DiagnosticEventFactory factory = new DiagnosticEventFactory(executor, leaseCoordinator); + + ExecutorStateEvent executorStateEvent = factory.emitExecutorStateEvent(); + assertEquals(executorStateEvent.getActiveThreads(), activeThreadCount); + assertEquals(executorStateEvent.getCoreThreads(), corePoolSize); + assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize); + assertEquals(executorStateEvent.getMaximumPoolSize(), maximumPoolSize); + assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size()); + assertEquals(executorStateEvent.getCurrentQueueSize(),0); + + RejectedTaskEvent rejectedTaskEvent = factory.emitRejectedTaskEvent(new TestRejectedTaskException()); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getActiveThreads(), activeThreadCount); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCoreThreads(), corePoolSize); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getMaximumPoolSize(), maximumPoolSize); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLeasesOwned(), leaseAssignments.size()); + assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCurrentQueueSize(),0); + assertTrue(rejectedTaskEvent.getThrowable() instanceof TestRejectedTaskException); + } + private class TestRejectedTaskException extends Exception { private TestRejectedTaskException() { super(); } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 3659aba80..9304cfd5b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -24,8 +24,9 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -37,8 +38,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import io.reactivex.plugins.RxJavaPlugins; import org.junit.Before; @@ -276,21 +275,37 @@ public final void testSchedulerShutdown() { @Test public void testErrorHandlerForUndeliverableAsyncTaskExceptions() { - AtomicBoolean wasHandlerInvoked = new AtomicBoolean(false); - Consumer testHandler = t -> wasHandlerInvoked.compareAndSet(false, true); + DiagnosticEventFactory eventFactory = mock(DiagnosticEventFactory.class); + ExecutorStateEvent executorStateEvent = mock(ExecutorStateEvent.class); + RejectedTaskEvent rejectedTaskEvent = mock(RejectedTaskEvent.class); - Scheduler schedulerSpy = spy(scheduler); + when(eventFactory.emitRejectedTaskEvent(any())).thenReturn(rejectedTaskEvent); + when(eventFactory.emitExecutorStateEvent()).thenReturn(executorStateEvent); + + Scheduler testScheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, + lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, eventFactory); + + Scheduler schedulerSpy = spy(testScheduler); - doAnswer(invocation -> { - // trigger rejected task in RxJava layer - RxJavaPlugins.onError(new RejectedExecutionException("Test exception.")); - return null; - }).when(schedulerSpy).runProcessLoop(); + // reject task on third loop + doCallRealMethod() + .doCallRealMethod() + .doAnswer(invocation -> { + // trigger rejected task in RxJava layer + RxJavaPlugins.onError(new RejectedExecutionException("Test exception.")); + return null; + }).when(schedulerSpy).runProcessLoop(); - schedulerSpy.registerErrorHandlerForUndeliverableAsyncTaskExceptions(testHandler); + // Scheduler sets error handler in initialize method + schedulerSpy.initialize(); + schedulerSpy.runProcessLoop(); + schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop(); - assertTrue(wasHandlerInvoked.get()); + verify(eventFactory, times(2)).emitExecutorStateEvent(); + verify(executorStateEvent, times(2)).accept(any()); + verify(eventFactory, times(1)).emitRejectedTaskEvent(any()); + verify(rejectedTaskEvent, times(1)).accept(any()); } /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { From a0b6ef8ea76dfdf18518eb2cad6fdf58f59ebe34 Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Thu, 27 Jun 2019 17:02:16 -0700 Subject: [PATCH 8/9] Fix constructor overloading for testing --- .../coordinator/DiagnosticEventFactory.java | 25 +++++++++-- .../amazon/kinesis/coordinator/Scheduler.java | 44 +++++++++---------- .../coordinator/DiagnosticEventsTest.java | 4 +- .../kinesis/coordinator/SchedulerTest.java | 8 ++-- 4 files changed, 49 insertions(+), 32 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java index bb6af23de..56c345499 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java @@ -16,26 +16,43 @@ package software.amazon.kinesis.coordinator; import software.amazon.kinesis.leases.LeaseCoordinator; +import software.amazon.kinesis.leases.LeaseManagementConfig; +import software.amazon.kinesis.metrics.MetricsConfig; +import software.amazon.kinesis.metrics.MetricsFactory; import java.util.concurrent.ExecutorService; /** - * Holds state and emits {@link DiagnosticEvent}s for logging and visibility + * Creates {@link DiagnosticEvent}s for logging and visibility */ class DiagnosticEventFactory { private ExecutorService executorService; private LeaseCoordinator leaseCoordinator; + /** + * Used for passing in specific instances of ExecutorService and LeaseCoordinator + */ DiagnosticEventFactory(ExecutorService executorService, LeaseCoordinator leaseCoordinator) { this.executorService = executorService; this.leaseCoordinator = leaseCoordinator; } - ExecutorStateEvent emitExecutorStateEvent() { + /** + * Used to create DiagnosticEventFactory from high-level configs + */ + DiagnosticEventFactory(CoordinatorConfig coordinatorConfig, LeaseManagementConfig leaseManagementConfig, + MetricsConfig metricsConfig) { + this.executorService = coordinatorConfig.coordinatorFactory().createExecutorService(); + + MetricsFactory metricsFactory = metricsConfig.metricsFactory(); + this.leaseCoordinator = leaseManagementConfig.leaseManagementFactory().createLeaseCoordinator(metricsFactory); + } + + ExecutorStateEvent executorStateEvent() { return new ExecutorStateEvent(executorService, leaseCoordinator); } - RejectedTaskEvent emitRejectedTaskEvent(Throwable t) { - return new RejectedTaskEvent(emitExecutorStateEvent(), t); + RejectedTaskEvent rejectedTaskEvent(Throwable t) { + return new RejectedTaskEvent(executorStateEvent(), t); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 8c4e385d1..dbe79ea6c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -96,7 +96,7 @@ public class Scheduler implements Runnable { // parent shards private final long parentShardPollIntervalMillis; private final ExecutorService executorService; - private DiagnosticEventFactory diagnosticEventFactory; + private final DiagnosticEventFactory diagnosticEventFactory; private final DiagnosticEventHandler diagnosticEventHandler; // private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final LeaseCoordinator leaseCoordinator; @@ -143,6 +143,24 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, @NonNull final MetricsConfig metricsConfig, @NonNull final ProcessorConfig processorConfig, @NonNull final RetrievalConfig retrievalConfig) { + this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, + processorConfig, retrievalConfig, new DiagnosticEventFactory(coordinatorConfig, leaseManagementConfig, + metricsConfig)); + } + + /** + * Customers do not currently have the ability to customize the DiagnosticEventFactory, but this visibility + * is desired for testing. This constructor is only used for testing to provide a mock DiagnosticEventFactory. + */ + @VisibleForTesting + protected Scheduler(@NonNull final CheckpointConfig checkpointConfig, + @NonNull final CoordinatorConfig coordinatorConfig, + @NonNull final LeaseManagementConfig leaseManagementConfig, + @NonNull final LifecycleConfig lifecycleConfig, + @NonNull final MetricsConfig metricsConfig, + @NonNull final ProcessorConfig processorConfig, + @NonNull final RetrievalConfig retrievalConfig, + @NonNull final DiagnosticEventFactory diagnosticEventFactory) { this.checkpointConfig = checkpointConfig; this.coordinatorConfig = coordinatorConfig; this.leaseManagementConfig = leaseManagementConfig; @@ -170,7 +188,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis(); this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis(); this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); - this.diagnosticEventFactory = new DiagnosticEventFactory(this.executorService, this.leaseCoordinator); + this.diagnosticEventFactory = diagnosticEventFactory; this.diagnosticEventHandler = new DiagnosticEventLogger(); this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory() @@ -205,24 +223,6 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); } - /** - * Customers do not currently have the ability to customize the DiagnosticEventFactory, but this visibility - * is desired for testing. This constructor is only used for testing to provide a mock DiagnosticEventFactory. - */ - @VisibleForTesting - protected Scheduler(@NonNull final CheckpointConfig checkpointConfig, - @NonNull final CoordinatorConfig coordinatorConfig, - @NonNull final LeaseManagementConfig leaseManagementConfig, - @NonNull final LifecycleConfig lifecycleConfig, - @NonNull final MetricsConfig metricsConfig, - @NonNull final ProcessorConfig processorConfig, - @NonNull final RetrievalConfig retrievalConfig, - @NonNull final DiagnosticEventFactory diagnosticEventFactory) { - this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, - processorConfig, retrievalConfig); - this.diagnosticEventFactory = diagnosticEventFactory; - } - /** * Start consuming data from the stream, and pass it to the application record processors. */ @@ -644,13 +644,13 @@ void cleanupShardConsumers(Set assignedShards) { */ private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() { RxJavaPlugins.setErrorHandler(t -> { - RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.emitRejectedTaskEvent(t); + RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.rejectedTaskEvent(t); rejectedTaskEvent.accept(diagnosticEventHandler); }); } private void logExecutorState() { - ExecutorStateEvent executorStateEvent = diagnosticEventFactory.emitExecutorStateEvent(); + ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(); executorStateEvent.accept(diagnosticEventHandler); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java index 4e0e021c4..dea1542a3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -130,7 +130,7 @@ public void testRejectedTaskEventWithCustomHandler() { public void testDiagnosticEventFactory() { DiagnosticEventFactory factory = new DiagnosticEventFactory(executor, leaseCoordinator); - ExecutorStateEvent executorStateEvent = factory.emitExecutorStateEvent(); + ExecutorStateEvent executorStateEvent = factory.executorStateEvent(); assertEquals(executorStateEvent.getActiveThreads(), activeThreadCount); assertEquals(executorStateEvent.getCoreThreads(), corePoolSize); assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize); @@ -138,7 +138,7 @@ public void testDiagnosticEventFactory() { assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size()); assertEquals(executorStateEvent.getCurrentQueueSize(),0); - RejectedTaskEvent rejectedTaskEvent = factory.emitRejectedTaskEvent(new TestRejectedTaskException()); + RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(new TestRejectedTaskException()); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getActiveThreads(), activeThreadCount); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCoreThreads(), corePoolSize); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 9304cfd5b..69e6588f4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -279,8 +279,8 @@ public void testErrorHandlerForUndeliverableAsyncTaskExceptions() { ExecutorStateEvent executorStateEvent = mock(ExecutorStateEvent.class); RejectedTaskEvent rejectedTaskEvent = mock(RejectedTaskEvent.class); - when(eventFactory.emitRejectedTaskEvent(any())).thenReturn(rejectedTaskEvent); - when(eventFactory.emitExecutorStateEvent()).thenReturn(executorStateEvent); + when(eventFactory.rejectedTaskEvent(any())).thenReturn(rejectedTaskEvent); + when(eventFactory.executorStateEvent()).thenReturn(executorStateEvent); Scheduler testScheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, eventFactory); @@ -302,9 +302,9 @@ public void testErrorHandlerForUndeliverableAsyncTaskExceptions() { schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop(); - verify(eventFactory, times(2)).emitExecutorStateEvent(); + verify(eventFactory, times(2)).executorStateEvent(); verify(executorStateEvent, times(2)).accept(any()); - verify(eventFactory, times(1)).emitRejectedTaskEvent(any()); + verify(eventFactory, times(1)).rejectedTaskEvent(any()); verify(rejectedTaskEvent, times(1)).accept(any()); } From 0596b8b5d20e1e7308750043232241c401b69c18 Mon Sep 17 00:00:00 2001 From: micah-jaffe Date: Thu, 27 Jun 2019 17:29:50 -0700 Subject: [PATCH 9/9] Refactor DiagnosticEventHandler to no args constructor --- .../coordinator/DiagnosticEventFactory.java | 33 +++---------------- .../coordinator/ExecutorStateEvent.java | 1 - .../amazon/kinesis/coordinator/Scheduler.java | 10 +++--- .../coordinator/DiagnosticEventsTest.java | 7 ++-- .../kinesis/coordinator/SchedulerTest.java | 8 ++--- 5 files changed, 18 insertions(+), 41 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java index 56c345499..316313aa9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DiagnosticEventFactory.java @@ -15,44 +15,21 @@ package software.amazon.kinesis.coordinator; +import lombok.NoArgsConstructor; import software.amazon.kinesis.leases.LeaseCoordinator; -import software.amazon.kinesis.leases.LeaseManagementConfig; -import software.amazon.kinesis.metrics.MetricsConfig; -import software.amazon.kinesis.metrics.MetricsFactory; import java.util.concurrent.ExecutorService; /** * Creates {@link DiagnosticEvent}s for logging and visibility */ +@NoArgsConstructor class DiagnosticEventFactory { - private ExecutorService executorService; - private LeaseCoordinator leaseCoordinator; - - /** - * Used for passing in specific instances of ExecutorService and LeaseCoordinator - */ - DiagnosticEventFactory(ExecutorService executorService, LeaseCoordinator leaseCoordinator) { - this.executorService = executorService; - this.leaseCoordinator = leaseCoordinator; - } - - /** - * Used to create DiagnosticEventFactory from high-level configs - */ - DiagnosticEventFactory(CoordinatorConfig coordinatorConfig, LeaseManagementConfig leaseManagementConfig, - MetricsConfig metricsConfig) { - this.executorService = coordinatorConfig.coordinatorFactory().createExecutorService(); - - MetricsFactory metricsFactory = metricsConfig.metricsFactory(); - this.leaseCoordinator = leaseManagementConfig.leaseManagementFactory().createLeaseCoordinator(metricsFactory); - } - - ExecutorStateEvent executorStateEvent() { + ExecutorStateEvent executorStateEvent(ExecutorService executorService, LeaseCoordinator leaseCoordinator) { return new ExecutorStateEvent(executorService, leaseCoordinator); } - RejectedTaskEvent rejectedTaskEvent(Throwable t) { - return new RejectedTaskEvent(executorStateEvent(), t); + RejectedTaskEvent rejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable t) { + return new RejectedTaskEvent(executorStateEvent, t); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java index 1c86cfec4..3333cc42b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -56,7 +56,6 @@ class ExecutorStateEvent implements DiagnosticEvent { this.leasesOwned = leaseCoordinator.getAssignments().size(); } - @Override public void accept(DiagnosticEventHandler visitor) { // logging is only meaningful for a ThreadPoolExecutor executor service (default config) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index dbe79ea6c..0f26c3d66 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -144,8 +144,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig, @NonNull final ProcessorConfig processorConfig, @NonNull final RetrievalConfig retrievalConfig) { this(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, - processorConfig, retrievalConfig, new DiagnosticEventFactory(coordinatorConfig, leaseManagementConfig, - metricsConfig)); + processorConfig, retrievalConfig, new DiagnosticEventFactory()); } /** @@ -644,13 +643,16 @@ void cleanupShardConsumers(Set assignedShards) { */ private void registerErrorHandlerForUndeliverableAsyncTaskExceptions() { RxJavaPlugins.setErrorHandler(t -> { - RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.rejectedTaskEvent(t); + ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService, + leaseCoordinator); + RejectedTaskEvent rejectedTaskEvent = diagnosticEventFactory.rejectedTaskEvent(executorStateEvent, t); rejectedTaskEvent.accept(diagnosticEventHandler); }); } private void logExecutorState() { - ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(); + ExecutorStateEvent executorStateEvent = diagnosticEventFactory.executorStateEvent(executorService, + leaseCoordinator); executorStateEvent.accept(diagnosticEventHandler); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java index dea1542a3..d6098cca6 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/DiagnosticEventsTest.java @@ -128,9 +128,9 @@ public void testRejectedTaskEventWithCustomHandler() { @Test public void testDiagnosticEventFactory() { - DiagnosticEventFactory factory = new DiagnosticEventFactory(executor, leaseCoordinator); + DiagnosticEventFactory factory = new DiagnosticEventFactory(); - ExecutorStateEvent executorStateEvent = factory.executorStateEvent(); + ExecutorStateEvent executorStateEvent = factory.executorStateEvent(executor, leaseCoordinator); assertEquals(executorStateEvent.getActiveThreads(), activeThreadCount); assertEquals(executorStateEvent.getCoreThreads(), corePoolSize); assertEquals(executorStateEvent.getLargestPoolSize(), largestPoolSize); @@ -138,7 +138,8 @@ public void testDiagnosticEventFactory() { assertEquals(executorStateEvent.getLeasesOwned(), leaseAssignments.size()); assertEquals(executorStateEvent.getCurrentQueueSize(),0); - RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(new TestRejectedTaskException()); + RejectedTaskEvent rejectedTaskEvent = factory.rejectedTaskEvent(executorStateEvent, + new TestRejectedTaskException()); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getActiveThreads(), activeThreadCount); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getCoreThreads(), corePoolSize); assertEquals(rejectedTaskEvent.getExecutorStateEvent().getLargestPoolSize(), largestPoolSize); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 69e6588f4..8be1bb8fc 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -279,8 +279,8 @@ public void testErrorHandlerForUndeliverableAsyncTaskExceptions() { ExecutorStateEvent executorStateEvent = mock(ExecutorStateEvent.class); RejectedTaskEvent rejectedTaskEvent = mock(RejectedTaskEvent.class); - when(eventFactory.rejectedTaskEvent(any())).thenReturn(rejectedTaskEvent); - when(eventFactory.executorStateEvent()).thenReturn(executorStateEvent); + when(eventFactory.rejectedTaskEvent(any(), any())).thenReturn(rejectedTaskEvent); + when(eventFactory.executorStateEvent(any(), any())).thenReturn(executorStateEvent); Scheduler testScheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig, eventFactory); @@ -302,9 +302,7 @@ public void testErrorHandlerForUndeliverableAsyncTaskExceptions() { schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop(); - verify(eventFactory, times(2)).executorStateEvent(); - verify(executorStateEvent, times(2)).accept(any()); - verify(eventFactory, times(1)).rejectedTaskEvent(any()); + verify(eventFactory, times(1)).rejectedTaskEvent(eq(executorStateEvent), any()); verify(rejectedTaskEvent, times(1)).accept(any()); }