Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add diagnostic events for logging visibility #559

Merged
merged 9 commits into from
Jun 28, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,4 @@ public class CoordinatorConfig {
private GracefulShutdownCoordinator gracefulShutdownCoordinator = new GracefulShutdownCoordinator();

private CoordinatorFactory coordinatorFactory = new SchedulerCoordinatorFactory();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;

/**
* An interface to implement various types of stateful events that can be used for diagnostics throughout the KCL.
*/
interface DiagnosticEvent {
/**
* DiagnosticEvent is part of a visitor pattern and it accepts DiagnosticEventHandler visitors.
*
* @param visitor A handler that that controls the behavior of the DiagnosticEvent when invoked.
*/
void accept(DiagnosticEventHandler visitor);

/**
* The string to output to logs when a DiagnosticEvent occurs.
*/
String message();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;

/**
* An interface to implement behaviors associated with a {@link DiagnosticEvent}. Uses the visitor pattern to visit
* the DiagnosticEvent when the behavior is desired. A default implementation that performs simple logging is found in
* {@link DiagnosticEventLogger}.
*/
interface DiagnosticEventHandler {
/**
* @param event Log or otherwise react to periodic pulses on the thread pool executor state.
*/
void visit(ExecutorStateEvent event);

/**
* @param event Log or otherwise react to rejected tasks in the RxJavaPlugin layer.
*/
void visit(RejectedTaskEvent event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;

import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;

/**
* Internal implementation of {@link DiagnosticEventHandler} used by {@link Scheduler} to log executor state both
* 1) in normal conditions periodically, and 2) in reaction to rejected task exceptions.
*/
@NoArgsConstructor
@Slf4j
@KinesisClientInternalApi
class DiagnosticEventLogger implements DiagnosticEventHandler {
private static final long executorLogIntervalMillis = 30000L;
micah-jaffe marked this conversation as resolved.
Show resolved Hide resolved
private long nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis;

/**
* {@inheritDoc}
*
* Only log at info level every 30s to avoid over-logging, else log at debug level
*/
@Override
public void visit(ExecutorStateEvent event) {
if (System.currentTimeMillis() >= nextExecutorLogTime) {
log.info(event.message());
nextExecutorLogTime = System.currentTimeMillis() + executorLogIntervalMillis;
} else {
log.debug(event.message());
}
}

/**
* {@inheritDoc}
*/
@Override
public void visit(RejectedTaskEvent event) {
log.error(event.message(), event.getThrowable());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;

import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.LeaseCoordinator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

@Getter
@ToString(exclude = {"MESSAGE", "isThreadPoolExecutor"})
@Slf4j
@KinesisClientInternalApi
class ExecutorStateEvent implements DiagnosticEvent {
private final String MESSAGE = "Current thread pool executor state: ";
micah-jaffe marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker: Seems like this is only used in one place, so doesn't need to be a class variable.

public String message() {
    return String.format("Current threadpool executor state: %s", toString());
}


private boolean isThreadPoolExecutor;
private int currentQueueSize;
private int activeThreads;
private int coreThreads;
private int leasesOwned;
private int largestPoolSize;
private int maximumPoolSize;

ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) {
if (executor instanceof ThreadPoolExecutor) {
micah-jaffe marked this conversation as resolved.
Show resolved Hide resolved
this.isThreadPoolExecutor = true;

ThreadPoolExecutor ex = (ThreadPoolExecutor) executor;
this.currentQueueSize = ex.getQueue().size();
this.activeThreads = ex.getActiveCount();
this.coreThreads = ex.getCorePoolSize();
this.largestPoolSize = ex.getLargestPoolSize();
this.maximumPoolSize = ex.getMaximumPoolSize();
}

this.leasesOwned = leaseCoordinator.getAssignments().size();
}


@Override
public void accept(DiagnosticEventHandler visitor) {
// logging is only meaningful for a ThreadPoolExecutor executor service (default config)
if (isThreadPoolExecutor) {
visitor.visit(this);
}
}

@Override
public String message() {
return MESSAGE + this.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.coordinator;

import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;

@Getter
@ToString(exclude = "MESSAGE")
@Slf4j
@KinesisClientInternalApi
class RejectedTaskEvent implements DiagnosticEvent {
private final String MESSAGE = "This is a misconfiguration of the thread pool. We currently " +
micah-jaffe marked this conversation as resolved.
Show resolved Hide resolved
"only support the default thread pool configuration, until the next KCL release. ";

private ExecutorStateEvent executorStateEvent;
private Throwable throwable;

RejectedTaskEvent(ExecutorStateEvent executorStateEvent, Throwable throwable) {
this.executorStateEvent = executorStateEvent;
this.throwable = throwable;
}

@Override
public void accept(DiagnosticEventHandler visitor) {
visitor.visit(this);
}

@Override
public String message() {
return MESSAGE + executorStateEvent.message();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;

import io.reactivex.plugins.RxJavaPlugins;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -69,6 +72,8 @@
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalConfig;

import javax.annotation.Nullable;

/**
*
*/
Expand All @@ -95,6 +100,7 @@ public class Scheduler implements Runnable {
// parent shards
private final long parentShardPollIntervalMillis;
private final ExecutorService executorService;
private final DiagnosticEventHandler diagnosticEventHandler;
// private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final LeaseCoordinator leaseCoordinator;
private final ShardSyncTaskManager shardSyncTaskManager;
Expand Down Expand Up @@ -167,6 +173,7 @@ public Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.shardConsumerDispatchPollIntervalMillis = this.coordinatorConfig.shardConsumerDispatchPollIntervalMillis();
this.parentShardPollIntervalMillis = this.coordinatorConfig.parentShardPollIntervalMillis();
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventHandler = new DiagnosticEventLogger();

this.shardSyncTaskManager = this.leaseManagementConfig.leaseManagementFactory()
.createShardSyncTaskManager(this.metricsFactory);
Expand Down Expand Up @@ -212,7 +219,7 @@ public void run() {
try {
initialize();
log.info("Initialization complete. Starting worker loop.");
} catch (RuntimeException e) {
} catch (RuntimeException e) {
log.error("Unable to initialize after {} attempts. Shutting down.", maxInitializationAttempts, e);
workerStateChangeListener.onAllInitializationAttemptsFailed(e);
shutdown();
Expand All @@ -228,6 +235,7 @@ public void run() {

private void initialize() {
synchronized (lock) {
registerErrorHandlerForUndeliverableAsyncTaskExceptions(null);
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false;
Exception lastException = null;
Expand Down Expand Up @@ -305,6 +313,7 @@ void runProcessLoop() {
// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards);

logExecutorState();
slog.info("Sleeping ...");
Thread.sleep(shardConsumerDispatchPollIntervalMillis);
} catch (Exception e) {
Expand Down Expand Up @@ -612,6 +621,32 @@ void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
}
}

/**
* Exceptions in the RxJava layer can fail silently unless an error handler is set to propagate these exceptions
* back to the KCL, as is done below. This method is internal to the class and has package access solely
* for testing.
*
* @param handler This method accepts a handler param solely for testing. All non-test calls to this method will
* have a null input.
*/
@VisibleForTesting

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be tested as part of runProcessLoop() method?
We want to make sure this is invoked from runProcessLoop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the unit test for this method to call RxJavaPlugins.onError() in runProcessLoop() on a scheduler spy to verify that the error handler works as expected

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get the scheduler take an EventFactory, then we can mock the event vended to the scheduler, and assert that .accept is invoked on the event upon failure.

That way, this method can be marked private, and no need to support custom handler as an argument to this method.

Copy link
Contributor Author

@micah-jaffe micah-jaffe Jun 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to mock the EventFactory for testability, we would need a public interface for setting/modifying the EventFactory (either in configs or a setter). Currently we don't want users to be able to interact with this factory so this would only solve the problem if we introduced a 'backdoor' non-public setter for testing, which still isn't really designing for testability. Unless there's another way to inject a mock that doesn't rely on a public gateway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use a factory by overloading with a protected constructor for testing

void registerErrorHandlerForUndeliverableAsyncTaskExceptions(@Nullable Consumer<Throwable> handler) {
if (handler == null) {
RxJavaPlugins.setErrorHandler(t -> {
ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator);
RejectedTaskEvent rejectedTaskEvent = new RejectedTaskEvent(executorStateEvent, t);
rejectedTaskEvent.accept(diagnosticEventHandler);
});
} else {
RxJavaPlugins.setErrorHandler(t -> handler.accept(t));
}
}

private void logExecutorState() {
ExecutorStateEvent executorStateEvent = new ExecutorStateEvent(executorService, leaseCoordinator);
executorStateEvent.accept(diagnosticEventHandler);
}

/**
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
Expand Down
Loading