Skip to content

Commit

Permalink
Instrument rejected tasks in ThreadPoolExecutor (#4272)
Browse files Browse the repository at this point in the history
  • Loading branch information
gal-leib authored Oct 3, 2024
1 parent 90fa922 commit 80a0e2d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -30,6 +31,7 @@ public class InstrumentedExecutorService implements ExecutorService {
private final Meter submitted;
private final Counter running;
private final Meter completed;
private final Counter rejected;
private final Timer idle;
private final Timer duration;

Expand Down Expand Up @@ -57,6 +59,7 @@ public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry regi
this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));
this.running = registry.counter(MetricRegistry.name(name, "running"));
this.completed = registry.meter(MetricRegistry.name(name, "completed"));
this.rejected = registry.counter(MetricRegistry.name(name, "rejected"));
this.idle = registry.timer(MetricRegistry.name(name, "idle"));
this.duration = registry.timer(MetricRegistry.name(name, "duration"));

Expand All @@ -81,6 +84,8 @@ private void registerInternalMetrics() {
queue::size);
registry.registerGauge(MetricRegistry.name(name, "tasks.capacity"),
queue::remainingCapacity);
RejectedExecutionHandler delegateHandler = executor.getRejectedExecutionHandler();
executor.setRejectedExecutionHandler(new InstrumentedRejectedExecutionHandler(delegateHandler));
} else if (delegate instanceof ForkJoinPool) {
ForkJoinPool forkJoinPool = (ForkJoinPool) delegate;
registry.registerGauge(MetricRegistry.name(name, "tasks.stolen"),
Expand Down Expand Up @@ -223,6 +228,20 @@ public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedExc
return delegate.awaitTermination(l, timeUnit);
}

private class InstrumentedRejectedExecutionHandler implements RejectedExecutionHandler {
private final RejectedExecutionHandler delegateHandler;

public InstrumentedRejectedExecutionHandler(RejectedExecutionHandler delegateHandler) {
this.delegateHandler = delegateHandler;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejected.inc();
this.delegateHandler.rejectedExecution(r, executor);
}
}

private class InstrumentedRunnable implements Runnable {
private final Runnable task;
private final Timer.Context idleContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class InstrumentedExecutorServiceTest {

Expand Down Expand Up @@ -166,6 +169,32 @@ public void reportsTasksInformationForThreadPoolExecutor() throws Exception {
assertThat(poolSize.getValue()).isEqualTo(1);
}

@Test
public void reportsRejectedTasksForThreadPoolExecutor() throws Exception {
executor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1));
instrumentedExecutorService = new InstrumentedExecutorService(executor, registry, "tp");
final Counter rejected = registry.counter("tp.rejected");
assertThat(rejected.getCount()).isEqualTo(0);

final CountDownLatch latch = new CountDownLatch(1);

Runnable runnable = () -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};

Future<?> executingFuture = instrumentedExecutorService.submit(runnable);
Future<?> queuedFuture = instrumentedExecutorService.submit(runnable);
assertThatThrownBy(() -> instrumentedExecutorService.submit(runnable))
.isInstanceOf(RejectedExecutionException.class);
latch.countDown();
assertThat(rejected.getCount()).isEqualTo(1);
}

@Test
public void removesMetricsAfterShutdownForThreadPoolExecutor() {
executor = new ThreadPoolExecutor(4, 16,
Expand Down

0 comments on commit 80a0e2d

Please sign in to comment.