Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix usages of some async utilities #1610

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ public void runIterations(IDataIterator dataIterator, IIterationRunner iteration
dataIterator.forEachRemaining(arguments::add);
for (int attempt = 0; attempt < maxAttempts; attempt++) {
for (Object[] args : arguments) {
try {
ExecutionResult executionResult = iterationRunner.runIteration(args, maxIterations).get();
if (executionResult == ExecutionResult.FAILED) {
return;
}
} catch (InterruptedException | ExecutionException e) {
ExecutionResult executionResult = iterationRunner.runIteration(args, maxIterations).join();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using join to make it un-interruptible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not get uninterruptible.
It just does not throw checked exceptions but unchecked ones.
If you use get() you might get InterruptedException and then have to handle it properly.
Just ignoring is not ok at that spot as this is not a self-managed thread, so in most cases you should either rethrow or at least re-interrupt so that code higher in the stack can properly handle the interrupt and so on.
And by using join() here you can simply get around all that, as you don't mess with the interrupted exception yourself.

And ExecutionException would never come here anyway, as the future is never completed exceptionally.

if (executionResult == ExecutionResult.FAILED) {
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,57 @@ public void run() {
long timeoutAt = 0;
int unsuccessfulInterruptAttempts = 0;

syncWithThread(startLatch, "feature", methodName);
boolean syncedWithFeature = false;
try {
startLatch.countDown();
syncedWithFeature = startLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
// this is our own thread, so we can ignore the interruption safely
}
if (!syncedWithFeature) {
System.out.printf("[spock.lang.Timeout] Could not sync with Feature for method '%s'", methodName);
}

while (waitMillis > 0) {
long waitStart = System.nanoTime();
try {
synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
// this is our own thread, so we can ignore the interruption safely and continue the remaining waiting
waitMillis -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - waitStart);
continue;
}
break;
}
Comment on lines +84 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it indeed happens that the watcher thread is interrupted during the initial wait, it should continue the initial wait, shouldn't it?
Or should it really forward the interrupting to the Feature thread, just because it got interrupted for whatever reason?

if (!synced) {
stackTrace = mainThread.getStackTrace();
waitMillis = 250;
}
while (!synced) {
mainThread.interrupt();
try {
synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
// The mission of this thread is to repeatedly interrupt the main thread until
// the latter returns. Once this mission has been accomplished, this thread will die quickly
}
if (!synced) {
long now = System.nanoTime();
if (stackTrace.length == 0) {
logMethodTimeout(methodName, timeoutSeconds);
stackTrace = mainThread.getStackTrace();
waitMillis = 250;
timeoutAt = now;
} else {
waitMillis *= 2;
logUnsuccessfulInterrupt(methodName, now, timeoutAt, waitMillis, ++unsuccessfulInterruptAttempts);
}
mainThread.interrupt();
System.out.printf("[spock.lang.Timeout] Method '%s' has not yet returned - interrupting. Next try in %1.2f seconds.\n",
methodName, waitMillis / 1000.);
}
}
}
}.start();

syncWithThread(startLatch, "watcher", methodName);
boolean syncedWithWatcher = false;
try {
startLatch.countDown();
syncedWithWatcher = startLatch.await(5, TimeUnit.SECONDS);
} finally {
if (!syncedWithWatcher) {
System.out.printf("[spock.lang.Timeout] Could not sync with Watcher for method '%s'", invocation.getMethod().getName());
}
}

Throwable saved = null;
try {
Expand Down Expand Up @@ -216,13 +240,4 @@ private static Pair<Integer, Integer> findThreadSection(List<String> lines, Stri

return null;
}

private static void syncWithThread(CountDownLatch startLatch, String threadName, String methodName) {
try {
startLatch.countDown();
startLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
System.out.printf("[spock.lang.Timeout] Could not sync with %s thread for method '%s'", threadName, methodName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,14 @@ public void await() throws Throwable {
* @throws Throwable the first exception thrown by an evaluate block
*/
public void await(double seconds) throws Throwable {
latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS);
boolean evalBlocksFinished = latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS);
if (!exceptions.isEmpty())
throw exceptions.poll();

long pendingEvalBlocks = latch.getCount();
if (pendingEvalBlocks > 0) {
if (!evalBlocksFinished) {
String msg = String.format("Async conditions timed out " +
"after %1.2f seconds; %d out of %d evaluate blocks did not complete in time",
seconds, pendingEvalBlocks, numEvalBlocks);
seconds, latch.getCount(), numEvalBlocks);
throw new SpockTimeoutError(seconds, msg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class ParallelSpec extends EmbeddedSpecification {
@ResourceLock(value = "a", mode = ResourceAccessMode.READ)
def writeA() {
when:
incrementAndBlock(atomicInteger, latch)
incrementAndBlock(atomicInteger, latch, 10000)
then:
atomicInteger.get() == 3
Expand Down Expand Up @@ -519,15 +519,15 @@ class ParallelSpec extends EmbeddedSpecification {
throws InterruptedException {
int value = sharedResource.incrementAndGet()
countDownLatch.countDown()
countDownLatch.await(timeout, MILLISECONDS)
assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired'
return value
}

static void storeAndBlockAndCheck(AtomicInteger sharedResource, CountDownLatch countDownLatch, long timeout = 100)
throws InterruptedException {
int value = sharedResource.get()
countDownLatch.countDown()
countDownLatch.await(timeout, MILLISECONDS)
assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired'
assert value == sharedResource.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,11 @@ class InvokingMocksFromMultipleThreads extends Specification {
def numThreads = 10
def list = Mock(List)
def latch = new CountDownLatch(numThreads)
@AutoCleanup("shutdownNow")
@Shared
def executorService = createExecutorService()

def "invoking a mock from multiple threads"() {
when:
numThreads.times { threadId ->
executorService.submit {
Thread.start {
try {
100.times { count ->
list.add(count)
Expand All @@ -51,7 +48,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
awaitLatch()
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
Expand All @@ -63,7 +60,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
def "invoking a mock from multiple threads - too many invocations"() {
when:
numThreads.times { threadId ->
executorService.submit {
Thread.start {
try {
100.times { count ->
list.add(count)
Expand All @@ -77,7 +74,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
awaitLatch()
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
Expand All @@ -89,7 +86,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
def "invoking a mock from multiple threads - too few invocations"() {
when:
numThreads.times { threadId ->
executorService.submit {
Thread.start {
try {
100.times { count ->
if (!(threadId == 0 && count == 99)) list.add(count)
Expand All @@ -102,22 +99,11 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
awaitLatch()
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
100.times { count -> numThreads * list.add(count) }
}
}


private static ExecutorService createExecutorService() {
return Jvm.current.java21Compatible ? Executors."newVirtualThreadPerTaskExecutor"() : Executors.newCachedThreadPool() { new Thread(it).tap { it.daemon = true } }
}

private void awaitLatch() {
if (!latch.await(WAIT_TIME_S, TimeUnit.SECONDS)) {
throw new IllegalStateException("The test threads did not terminate in ${WAIT_TIME_S}s.")
}
}
}
Loading