Skip to content

Commit

Permalink
Add awaitTerminationUninterruptibly.
Browse files Browse the repository at this point in the history
Fixes #3908
Fixes #1315
Somewhat relevant to #3655
Vaguely relevant to google/error-prone#1490, since it creates a `@CheckReturnValue` variant of a `java.util.concurrent` method that returns `false` to indicate timeout.

RELNOTES=`util.concurrent`: Added `awaitTerminationUninterruptibly`.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=322581454
  • Loading branch information
cpovirk committed Jul 22, 2020
1 parent 0281163 commit f07b954
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package com.google.common.util.concurrent;

import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread;
import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand All @@ -36,6 +38,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -465,6 +468,37 @@ public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() {
assertInterrupted();
}

// executor.awaitTermination Testcases
public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() {
ExecutorService executor = newFixedThreadPool(1);
requestInterruptIn(500);
executor.execute(new SleepTask(1000));
executor.shutdown();
assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS));
assertTrue(executor.isTerminated());
assertInterrupted();
}

public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() {
ExecutorService executor = newFixedThreadPool(1);
requestInterruptIn(500);
executor.execute(new SleepTask(10000));
executor.shutdown();
assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS));
assertFalse(executor.isTerminated());
assertInterrupted();
}

public void testTryAwaitTerminationInfiniteTimeout() {
ExecutorService executor = newFixedThreadPool(1);
requestInterruptIn(500);
executor.execute(new SleepTask(1000));
executor.shutdown();
awaitTerminationUninterruptibly(executor);
assertTrue(executor.isTerminated());
assertInterrupted();
}

/**
* Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a
* {@code Completion} starts the underlying stopwatch.
Expand Down Expand Up @@ -754,6 +788,15 @@ protected void doAction() {
}
}

private static final class SleepTask extends DelayedActionRunnable {
SleepTask(long tMinus) {
super(tMinus);
}

@Override
protected void doAction() {}
}

private static void sleepSuccessfully(long sleepMillis) {
Completion completed = new Completion(sleepMillis - SLEEP_SLACK);
Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

package com.google.common.util.concurrent;

import static com.google.common.base.Verify.verify;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import com.google.common.annotations.Beta;
import com.google.common.annotations.GwtCompatible;
import com.google.common.annotations.GwtIncompatible;
import com.google.common.base.Preconditions;
Expand All @@ -24,6 +26,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -396,6 +399,50 @@ public static boolean tryLockUninterruptibly(Lock lock, long timeout, TimeUnit u
}
}

/**
* Invokes {@code executor.}{@link ExecutorService#awaitTermination(long, TimeUnit)
* awaitTermination(long, TimeUnit)} uninterruptibly with no timeout.
*
* @since NEXT
*/
@Beta
@GwtIncompatible // concurrency
public static void awaitTerminationUninterruptibly(ExecutorService executor) {
// TODO(cpovirk): We could optimize this to avoid calling nanoTime() at all.
verify(awaitTerminationUninterruptibly(executor, Long.MAX_VALUE, NANOSECONDS));
}

/**
* Invokes {@code executor.}{@link ExecutorService#awaitTermination(long, TimeUnit)
* awaitTermination(long, TimeUnit)} uninterruptibly.
*
* @since NEXT
*/
@Beta
@GwtIncompatible // concurrency
@SuppressWarnings("GoodTime")
public static boolean awaitTerminationUninterruptibly(
ExecutorService executor, long timeout, TimeUnit unit) {
boolean interrupted = false;
try {
long remainingNanos = unit.toNanos(timeout);
long end = System.nanoTime() + remainingNanos;

while (true) {
try {
return executor.awaitTermination(remainingNanos, NANOSECONDS);
} catch (InterruptedException e) {
interrupted = true;
remainingNanos = end - System.nanoTime();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

// TODO(user): Add support for waitUninterruptibly.

private Uninterruptibles() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package com.google.common.util.concurrent;

import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread;
import static com.google.common.util.concurrent.Uninterruptibles.awaitTerminationUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.tryLockUninterruptibly;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand All @@ -32,10 +34,12 @@
import com.google.common.testing.TearDown;
import com.google.common.testing.TearDownStack;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.time.Duration;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -465,6 +469,57 @@ public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() {
assertInterrupted();
}

// executor.awaitTermination Testcases
public void testTryAwaitTerminationUninterruptiblyDuration_success() {
ExecutorService executor = newFixedThreadPool(1);
requestInterruptIn(500);
executor.execute(new SleepTask(1000));
executor.shutdown();
assertTrue(awaitTerminationUninterruptibly(executor, Duration.ofMillis(LONG_DELAY_MS)));
assertTrue(executor.isTerminated());
assertInterrupted();
}

public void testTryAwaitTerminationUninterruptiblyDuration_failure() {
ExecutorService executor = newFixedThreadPool(1);
requestInterruptIn(500);
executor.execute(new SleepTask(10000));
executor.shutdown();
assertFalse(awaitTerminationUninterruptibly(executor, Duration.ofMillis(1000)));
assertFalse(executor.isTerminated());
assertInterrupted();
}

public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_success() {
ExecutorService executor = newFixedThreadPool(1);
requestInterruptIn(500);
executor.execute(new SleepTask(1000));
executor.shutdown();
assertTrue(awaitTerminationUninterruptibly(executor, LONG_DELAY_MS, MILLISECONDS));
assertTrue(executor.isTerminated());
assertInterrupted();
}

public void testTryAwaitTerminationUninterruptiblyLongTimeUnit_failure() {
ExecutorService executor = newFixedThreadPool(1);
requestInterruptIn(500);
executor.execute(new SleepTask(10000));
executor.shutdown();
assertFalse(awaitTerminationUninterruptibly(executor, 1000, MILLISECONDS));
assertFalse(executor.isTerminated());
assertInterrupted();
}

public void testTryAwaitTerminationInfiniteTimeout() {
ExecutorService executor = newFixedThreadPool(1);
requestInterruptIn(500);
executor.execute(new SleepTask(1000));
executor.shutdown();
awaitTerminationUninterruptibly(executor);
assertTrue(executor.isTerminated());
assertInterrupted();
}

/**
* Wrapper around {@link Stopwatch} which also contains an "expected completion time." Creating a
* {@code Completion} starts the underlying stopwatch.
Expand Down Expand Up @@ -754,6 +809,15 @@ protected void doAction() {
}
}

private static final class SleepTask extends DelayedActionRunnable {
SleepTask(long tMinus) {
super(tMinus);
}

@Override
protected void doAction() {}
}

private static void sleepSuccessfully(long sleepMillis) {
Completion completed = new Completion(sleepMillis - SLEEP_SLACK);
Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS);
Expand Down
59 changes: 59 additions & 0 deletions guava/src/com/google/common/util/concurrent/Uninterruptibles.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.common.util.concurrent;

import static com.google.common.base.Verify.verify;
import static com.google.common.util.concurrent.Internal.toNanosSaturated;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand All @@ -27,6 +28,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -513,6 +515,63 @@ public static boolean tryLockUninterruptibly(Lock lock, long timeout, TimeUnit u
}
}

/**
* Invokes {@code executor.}{@link ExecutorService#awaitTermination(long, TimeUnit)
* awaitTermination(long, TimeUnit)} uninterruptibly with no timeout.
*
* @since NEXT
*/
@Beta
@GwtIncompatible // concurrency
public static void awaitTerminationUninterruptibly(ExecutorService executor) {
// TODO(cpovirk): We could optimize this to avoid calling nanoTime() at all.
verify(awaitTerminationUninterruptibly(executor, Long.MAX_VALUE, NANOSECONDS));
}

/**
* Invokes {@code executor.}{@link ExecutorService#awaitTermination(long, TimeUnit)
* awaitTermination(long, TimeUnit)} uninterruptibly.
*
* @since NEXT
*/
@Beta
@GwtIncompatible // concurrency
public static boolean awaitTerminationUninterruptibly(
ExecutorService executor, Duration timeout) {
return awaitTerminationUninterruptibly(executor, toNanosSaturated(timeout), NANOSECONDS);
}

/**
* Invokes {@code executor.}{@link ExecutorService#awaitTermination(long, TimeUnit)
* awaitTermination(long, TimeUnit)} uninterruptibly.
*
* @since NEXT
*/
@Beta
@GwtIncompatible // concurrency
@SuppressWarnings("GoodTime")
public static boolean awaitTerminationUninterruptibly(
ExecutorService executor, long timeout, TimeUnit unit) {
boolean interrupted = false;
try {
long remainingNanos = unit.toNanos(timeout);
long end = System.nanoTime() + remainingNanos;

while (true) {
try {
return executor.awaitTermination(remainingNanos, NANOSECONDS);
} catch (InterruptedException e) {
interrupted = true;
remainingNanos = end - System.nanoTime();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

// TODO(user): Add support for waitUninterruptibly.

private Uninterruptibles() {}
Expand Down

0 comments on commit f07b954

Please sign in to comment.