Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bubble-up exceptions from scheduler #38441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,29 @@

package org.elasticsearch.threadpool;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.junit.After;
import org.junit.Before;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -260,6 +257,50 @@ public void testExecutionExceptionOnScheduler() throws InterruptedException {
}
}

@SuppressForbidden(reason = "this tests that the deprecated method still works")
public void testDeprecatedSchedule() throws ExecutionException, InterruptedException {
verifyDeprecatedSchedule(((threadPool, runnable)
-> threadPool.schedule(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable)));
}

public void testThreadPoolScheduleDeprecated() throws ExecutionException, InterruptedException {
verifyDeprecatedSchedule(((threadPool, runnable)
-> threadPool.scheduleDeprecated(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable)));
}

private void verifyDeprecatedSchedule(BiFunction<ThreadPool,
Runnable, ScheduledFuture<?>> scheduleFunction) throws InterruptedException, ExecutionException {
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
CountDownLatch missingExceptions = new CountDownLatch(1);
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
missingExceptions.countDown();
});
try {
ThreadPool threadPool = new TestThreadPool("test");
CountDownLatch missingExecutions = new CountDownLatch(1);
try {
scheduleFunction.apply(threadPool, missingExecutions::countDown)
.get();
assertEquals(0, missingExecutions.getCount());

ExecutionException exception = expectThrows(ExecutionException.class,
"schedule(...).get() must throw exception from runnable",
() -> scheduleFunction.apply(threadPool,
() -> {
throw new IllegalArgumentException("FAIL");
}
).get());

assertEquals(IllegalArgumentException.class, exception.getCause().getClass());
assertTrue(missingExceptions.await(30, TimeUnit.SECONDS));
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
} finally {
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
}
}

private Runnable delayMillis(Runnable r, int ms) {
return () -> {
try {
Expand Down Expand Up @@ -369,60 +410,26 @@ private void runExecutionTest(

final CountDownLatch supplierLatch = new CountDownLatch(1);

Runnable job = () -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
};

// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
final MockLogAppender appender = new MockLogAppender();
appender.addExpectation(
new MockLogAppender.LoggingExpectation() {
@Override
public void match(LogEvent event) {
if (event.getLevel() == Level.WARN) {
assertThat("no other warnings than those expected",
event.getMessage().getFormattedMessage(),
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
assertTrue(expectThrowable);
assertNotNull(event.getThrown());
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
uncaughtExceptionHandlerLatch.countDown();
}
}

@Override
public void assertMatched() {
try {
runner.accept(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
});
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

appender.start();
Loggers.addAppender(schedulerLogger, appender);
try {
try {
runner.accept(job);
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

supplierLatch.await();
supplierLatch.await();

if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}
} finally {
Loggers.removeAppender(schedulerLogger, appender);
appender.stop();
if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}

consumer.accept(Optional.ofNullable(throwableReference.get()));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} finally {
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpo
* before we could be cancelled by the notification. In this case, our listener here would
* not be in the map and we should not fire the timeout logic.
*/
removed = listeners.remove(listener).v2() != null;
removed = listeners.remove(listener) != null;
}
if (removed) {
final TimeoutException e = new TimeoutException(timeout.getStringRep());
Expand Down
19 changes: 9 additions & 10 deletions server/src/main/java/org/elasticsearch/threadpool/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -47,8 +47,8 @@ public interface Scheduler {
/**
* Create a scheduler that can be used client side. Server side, please use <code>ThreadPool.schedule</code> instead.
*
* Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started
* using execute, submit and schedule.
* Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will
* be logged as a warning. This includes jobs started using execute, submit and schedule.
* @param settings the settings to use
* @return executor
*/
Expand Down Expand Up @@ -272,7 +272,8 @@ public void onAfter() {
}

/**
* This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks
* This subclass ensures to properly bubble up Throwable instances of both type Error and Exception thrown in submitted/scheduled
* tasks to the uncaught exception handler
*/
class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class);
Expand All @@ -294,12 +295,10 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) {

@Override
protected void afterExecute(Runnable r, Throwable t) {
Throwable exception = EsExecutors.rethrowErrors(r);
if (exception != null) {
logger.warn(() ->
new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()),
exception);
}
if (t != null) return;
// Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we
// accept the wrapped exception in the output.
ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.Counter;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -497,7 +498,16 @@ class ThreadedRunnable implements Runnable {

@Override
public void run() {
executor.execute(runnable);
try {
executor.execute(runnable);
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down",
runnable, executor), e);
} else {
throw e;
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -29,12 +28,9 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -157,37 +153,4 @@ public void testScheduledOnScheduler() throws InterruptedException {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
}
}

@SuppressForbidden(reason = "this tests that the deprecated method still works")
public void testDeprecatedSchedule() throws ExecutionException, InterruptedException {
verifyDeprecatedSchedule(((threadPool, runnable)
-> threadPool.schedule(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable)));
}

public void testThreadPoolScheduleDeprecated() throws ExecutionException, InterruptedException {
verifyDeprecatedSchedule(((threadPool, runnable)
-> threadPool.scheduleDeprecated(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable)));
}

private void verifyDeprecatedSchedule(BiFunction<ThreadPool,
Runnable, ScheduledFuture<?>> scheduleFunction) throws InterruptedException, ExecutionException {
ThreadPool threadPool = new TestThreadPool("test");
CountDownLatch missingExecutions = new CountDownLatch(1);
try {
scheduleFunction.apply(threadPool, missingExecutions::countDown)
.get();
assertEquals(0, missingExecutions.getCount());

ExecutionException exception = expectThrows(ExecutionException.class,
"schedule(...).get() must throw exception from runnable",
() -> scheduleFunction.apply(threadPool,
() -> { throw new IllegalArgumentException("FAIL"); }
).get());

assertEquals(IllegalArgumentException.class, exception.getCause().getClass());
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}

}