Skip to content

Commit

Permalink
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Browse files Browse the repository at this point in the history
Reviewed-by: martin, dholmes
  • Loading branch information
Doug Lea committed Feb 22, 2021
1 parent 419717d commit 5b7b18c
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 172 deletions.
149 changes: 92 additions & 57 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* terminate others by cancelling their unprocessed tasks, and
* waking them up. Calls to non-abrupt shutdown() preface this by
* checking isQuiescent before triggering the "STOP" phase of
* termination.
* termination. To conform to ExecutorService invoke, invokeAll,
* and invokeAny specs, we must track pool status while waiting,
* and interrupt interruptible callers on termination (see
* ForkJoinTask.joinForPoolInvoke etc).
*
* Joining Tasks
* =============
Expand Down Expand Up @@ -631,6 +634,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* amounts to an odd form of limited spin-wait before blocking in
* ForkJoinTask.join.
*
* Guarantees for common pool parallelism zero are limited to
* tasks that are joined by their callers in a tree-structured
* fashion or use CountedCompleters (as is true for jdk
* parallelStreams). Support infiltrates several methods,
* including those that retry helping steps until we are sure that
* none apply if there are no workers.
*
* As a more appropriate default in managed environments, unless
* overridden by system properties, we use workers of subclass
* InnocuousForkJoinWorkerThread when there is a SecurityManager
Expand Down Expand Up @@ -893,7 +903,7 @@ static final void setSlotVolatile(ForkJoinTask<?>[] a, int i,
}
static final boolean casSlotToNull(ForkJoinTask<?>[] a, int i,
ForkJoinTask<?> c) {
return QA.weakCompareAndSet(a, i, c, null);
return QA.compareAndSet(a, i, c, null);
}
final boolean tryLock() {
return SOURCE.compareAndSet(this, 0, 1);
Expand Down Expand Up @@ -1046,13 +1056,22 @@ final boolean tryUnpush(ForkJoinTask<?> task) {
*/
final boolean externalTryUnpush(ForkJoinTask<?> task) {
boolean taken = false;
int s = top, cap, k; ForkJoinTask<?>[] a;
if ((a = array) != null && (cap = a.length) > 0 &&
a[k = (cap - 1) & (s - 1)] == task && tryLock()) {
if (top == s && array == a &&
(taken = casSlotToNull(a, k, task)))
top = s - 1;
source = 0; // release lock
for (;;) {
int s = top, cap, k; ForkJoinTask<?>[] a;
if ((a = array) == null || (cap = a.length) <= 0 ||
a[k = (cap - 1) & (s - 1)] != task)
break;
if (tryLock()) {
if (top == s && array == a) {
if (taken = casSlotToNull(a, k, task)) {
top = s - 1;
source = 0;
break;
}
}
source = 0; // release lock for retry
}
Thread.yield(); // trylock failure
}
return taken;
}
Expand Down Expand Up @@ -1194,15 +1213,16 @@ else if (tryLock()) {
top = s;
source = 0;
}
if (taken)
t.doExec();
else if (!owned)
Thread.yield(); // tryLock failure
break;
}
else if ((f = f.completer) == null)
break;
}
if (!taken)
break;
t.doExec();
if (limit != 0 && --limit == 0)
if (taken && limit != 0 && --limit == 0)
break;
}
return status;
Expand Down Expand Up @@ -1586,7 +1606,7 @@ else if ((v = qs[i]) == null)
* @param w caller's WorkQueue (may be null on failed initialization)
*/
final void runWorker(WorkQueue w) {
if (w != null) { // skip on failed init
if (mode >= 0 && w != null) { // skip on failed init
w.config |= SRC; // mark as valid source
int r = w.stackPred, src = 0; // use seed from registerWorker
do {
Expand Down Expand Up @@ -1710,22 +1730,6 @@ else if ((deadline += keepAlive) == 0L)

// Utilities used by ForkJoinTask

/**
* Returns true if all workers are busy, possibly creating one if allowed
*/
final boolean isSaturated() {
int maxTotal = bounds >>> SWIDTH;
for (long c;;) {
if (((int)(c = ctl) & ~UNSIGNALLED) != 0)
return false;
if ((short)(c >>> TC_SHIFT) >= maxTotal)
return true;
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
if (compareAndSetCtl(c, nc))
return !createWorker();
}
}

/**
* Returns true if can start terminating if enabled, or already terminated
*/
Expand Down Expand Up @@ -1765,13 +1769,16 @@ final boolean canStop() {
*/
private int tryCompensate(long c) {
Predicate<? super ForkJoinPool> sat;
int b = bounds; // counts are signed; centered at parallelism level == 0
int md = mode, b = bounds;
// counts are signed; centered at parallelism level == 0
int minActive = (short)(b & SMASK),
maxTotal = b >>> SWIDTH,
active = (int)(c >> RC_SHIFT),
total = (short)(c >>> TC_SHIFT),
sp = (int)c & ~UNSIGNALLED;
if (total >= 0) {
if ((md & SMASK) == 0)
return 0; // cannot compensate if parallelism zero
else if (total >= 0) {
if (sp != 0) { // activate idle worker
WorkQueue[] qs; int n; WorkQueue v;
if ((qs = queues) != null && (n = qs.length) > 0 &&
Expand Down Expand Up @@ -1819,9 +1826,10 @@ final void uncompensate() {
*
* @param task the task
* @param w caller's WorkQueue
* @param canHelp if false, compensate only
* @return task status on exit, or UNCOMPENSATE for compensated blocking
*/
final int helpJoin(ForkJoinTask<?> task, WorkQueue w) {
final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean canHelp) {
int s = 0;
if (task != null && w != null) {
int wsrc = w.source, wid = w.config & SMASK, r = wid + 2;
Expand All @@ -1836,7 +1844,7 @@ else if (scan = !scan) { // previous scan was empty
else if (c == (c = ctl) && (s = tryCompensate(c)) >= 0)
break; // block
}
else { // scan for subtasks
else if (canHelp) { // scan for subtasks
WorkQueue[] qs = queues;
int n = (qs == null) ? 0 : qs.length, m = n - 1;
for (int i = n; i > 0; i -= 2, r += 2) {
Expand Down Expand Up @@ -2194,6 +2202,16 @@ static WorkQueue commonQueue() {
qs[(n - 1) & (r << 1)] : null;
}

/**
* Returns queue for an external thread, if one exists
*/
final WorkQueue externalQueue() {
WorkQueue[] qs;
int r = ThreadLocalRandom.getProbe(), n;
return ((qs = queues) != null && (n = qs.length) > 0 && r != 0) ?
qs[(n - 1) & (r << 1)] : null;
}

/**
* If the given executor is a ForkJoinPool, poll and execute
* AsynchronousCompletionTasks from worker's queue until none are
Expand All @@ -2205,8 +2223,8 @@ static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
if ((wt = (ForkJoinWorkerThread)t).pool == e)
w = wt.workQueue;
}
else if (e == common)
w = commonQueue();
else if (e instanceof ForkJoinPool)
w = ((ForkJoinPool)e).externalQueue();
if (w != null)
w.helpAsyncBlocker(blocker);
}
Expand Down Expand Up @@ -2292,14 +2310,18 @@ private boolean tryTerminate(boolean now, boolean enable) {
return false;
md = getAndBitwiseOrMode(STOP);
}
for (int k = 0; k < 2; ++k) { // twice in case of lagging qs updates
for (ForkJoinTask<?> t; (t = pollScan(false)) != null; )
for (boolean rescan = true;;) { // repeat until no changes
boolean changed = false;
for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) {
changed = true;
ForkJoinTask.cancelIgnoringExceptions(t); // help cancel
}
WorkQueue[] qs; int n; WorkQueue q; Thread thread;
if ((qs = queues) != null && (n = qs.length) > 0) {
for (int j = 1; j < n; j += 2) { // unblock other workers
if ((q = qs[j]) != null && (thread = q.owner) != null &&
!thread.isInterrupted()) {
changed = true;
try {
thread.interrupt();
} catch (Throwable ignore) {
Expand All @@ -2317,6 +2339,12 @@ private boolean tryTerminate(boolean now, boolean enable) {
cond.signalAll();
lock.unlock();
}
if (changed)
rescan = true;
else if (rescan)
rescan = false;
else
break;
}
return true;
}
Expand Down Expand Up @@ -2539,17 +2567,23 @@ private ForkJoinPool(byte forCommonPoolOnly) {
parallelism = Integer.parseInt(pp);
} catch (Exception ignore) {
}
int p = this.mode = Math.min(Math.max(parallelism, 0), MAX_CAP);
int size = 1 << (33 - Integer.numberOfLeadingZeros(p > 0 ? p - 1 : 1));
this.factory = (fac != null) ? fac :
new DefaultCommonPoolForkJoinWorkerThreadFactory();
this.ueh = handler;
this.keepAlive = DEFAULT_KEEPALIVE;
this.saturate = null;
this.workerNamePrefix = null;
this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
(((long)(-p) << RC_SHIFT) & RC_MASK));
int p = Math.min(Math.max(parallelism, 0), MAX_CAP), size;
if (p > 0) {
size = 1 << (33 - Integer.numberOfLeadingZeros(p - 1));
this.bounds = ((1 - p) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
this.ctl = ((((long)(-p) << TC_SHIFT) & TC_MASK) |
(((long)(-p) << RC_SHIFT) & RC_MASK));
} else { // zero min, max, spare counts, 1 slot
size = 1;
this.bounds = 0;
this.ctl = 0L;
}
this.factory = (fac != null) ? fac :
new DefaultCommonPoolForkJoinWorkerThreadFactory();
this.queues = new WorkQueue[size];
this.registrationLock = new ReentrantLock();
}
Expand Down Expand Up @@ -2593,7 +2627,7 @@ public static ForkJoinPool commonPool() {
*/
public <T> T invoke(ForkJoinTask<T> task) {
externalSubmit(task);
return task.join();
return task.joinForPoolInvoke(this);
}

/**
Expand Down Expand Up @@ -2685,7 +2719,7 @@ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
externalSubmit(f);
}
for (int i = futures.size() - 1; i >= 0; --i)
((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
((ForkJoinTask<?>)futures.get(i)).awaitPoolInvoke(this);
return futures;
} catch (Throwable t) {
for (Future<T> e : futures)
Expand Down Expand Up @@ -2715,11 +2749,7 @@ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
if (timedOut)
ForkJoinTask.cancelIgnoringExceptions(f);
else {
try {
f.get(ns, TimeUnit.NANOSECONDS);
} catch (CancellationException | TimeoutException |
ExecutionException ok) {
}
((ForkJoinTask<T>)f).awaitPoolInvoke(this, ns);
if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
timedOut = true;
}
Expand All @@ -2746,11 +2776,16 @@ static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
}
final void tryComplete(Callable<E> c) { // called by InvokeAnyTasks
Throwable ex = null;
boolean failed = (c == null || isCancelled() ||
(pool != null && pool.mode < 0));
if (!failed && !isDone()) {
boolean failed;
if (c == null || Thread.interrupted() ||
(pool != null && pool.mode < 0))
failed = true;
else if (isDone())
failed = false;
else {
try {
complete(c.call());
failed = false;
} catch (Throwable tx) {
ex = tx;
failed = true;
Expand Down Expand Up @@ -2817,7 +2852,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
if (root.isDone())
break;
}
return root.get();
return root.getForPoolInvoke(this);
} finally {
for (InvokeAnyTask<T> f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
Expand All @@ -2844,7 +2879,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
if (root.isDone())
break;
}
return root.get(nanos, TimeUnit.NANOSECONDS);
return root.getForPoolInvoke(this, nanos);
} finally {
for (InvokeAnyTask<T> f : fs)
ForkJoinTask.cancelIgnoringExceptions(f);
Expand Down
Loading

1 comment on commit 5b7b18c

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.