Skip to content

Commit 6c93ad4

Browse files
author
Alan Bateman
committed
8351927: Change VirtualThread implementation to use use FJP delayed task handling
Reviewed-by: vklang
1 parent 7aeaa3c commit 6c93ad4

File tree

8 files changed

+364
-73
lines changed

8 files changed

+364
-73
lines changed

src/java.base/share/classes/java/lang/System.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2290,10 +2290,6 @@ public Executor virtualThreadDefaultScheduler() {
22902290
return VirtualThread.defaultScheduler();
22912291
}
22922292

2293-
public Stream<ScheduledExecutorService> virtualThreadDelayedTaskSchedulers() {
2294-
return VirtualThread.delayedTaskSchedulers();
2295-
}
2296-
22972293
public StackWalker newStackWalkerInstance(Set<StackWalker.Option> options,
22982294
ContinuationScope contScope,
22992295
Continuation continuation) {

src/java.base/share/classes/java/lang/VirtualThread.java

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -24,7 +24,6 @@
2424
*/
2525
package java.lang;
2626

27-
import java.util.Arrays;
2827
import java.util.Locale;
2928
import java.util.Objects;
3029
import java.util.concurrent.CountDownLatch;
@@ -38,7 +37,6 @@
3837
import java.util.concurrent.ScheduledExecutorService;
3938
import java.util.concurrent.ScheduledThreadPoolExecutor;
4039
import java.util.concurrent.TimeUnit;
41-
import java.util.stream.Stream;
4240
import jdk.internal.event.VirtualThreadEndEvent;
4341
import jdk.internal.event.VirtualThreadStartEvent;
4442
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
@@ -66,7 +64,6 @@ final class VirtualThread extends BaseVirtualThread {
6664
private static final Unsafe U = Unsafe.getUnsafe();
6765
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
6866
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
69-
private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
7067

7168
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
7269
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
@@ -193,13 +190,6 @@ static Executor defaultScheduler() {
193190
return DEFAULT_SCHEDULER;
194191
}
195192

196-
/**
197-
* Returns a stream of the delayed task schedulers used to support timed operations.
198-
*/
199-
static Stream<ScheduledExecutorService> delayedTaskSchedulers() {
200-
return Arrays.stream(DELAYED_TASK_SCHEDULERS);
201-
}
202-
203193
/**
204194
* Returns the continuation scope used for virtual threads.
205195
*/
@@ -567,8 +557,9 @@ private void afterYield() {
567557
setState(newState = PARKED);
568558
} else {
569559
// schedule unpark
560+
long timeout = this.timeout;
570561
assert timeout > 0;
571-
timeoutTask = schedule(this::unpark, timeout, NANOSECONDS);
562+
timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
572563
setState(newState = TIMED_PARKED);
573564
}
574565

@@ -618,6 +609,7 @@ private void afterYield() {
618609
// the timeout task to coordinate access to the sequence number and to
619610
// ensure the timeout task doesn't execute until the thread has got to
620611
// the TIMED_WAIT state.
612+
long timeout = this.timeout;
621613
assert timeout > 0;
622614
synchronized (timedWaitLock()) {
623615
byte seqNo = ++timedWaitSeqNo;
@@ -890,7 +882,19 @@ private void unblock() {
890882
}
891883

892884
/**
893-
* Invoked by timer thread when wait timeout for virtual thread has expired.
885+
* Invoked by FJP worker thread or STPE thread when park timeout expires.
886+
*/
887+
private void parkTimeoutExpired() {
888+
assert !VirtualThread.currentThread().isVirtual();
889+
if (!getAndSetParkPermit(true)
890+
&& (state() == TIMED_PARKED)
891+
&& compareAndSetState(TIMED_PARKED, UNPARKED)) {
892+
lazySubmitRunContinuation();
893+
}
894+
}
895+
896+
/**
897+
* Invoked by FJP worker thread or STPE thread when wait timeout expires.
894898
* If the virtual thread is in timed-wait then this method will unblock the thread
895899
* and submit its task so that it continues and attempts to reenter the monitor.
896900
* This method does nothing if the thread has been woken by notify or interrupt.
@@ -913,7 +917,7 @@ private void waitTimeoutExpired(byte seqNo) {
913917
}
914918
}
915919
if (unblocked) {
916-
submitRunContinuation();
920+
lazySubmitRunContinuation();
917921
return;
918922
}
919923
// need to retry when thread is suspended in time-wait
@@ -1444,40 +1448,54 @@ private static ForkJoinPool createDefaultScheduler() {
14441448
/**
14451449
* Schedule a runnable task to run after a delay.
14461450
*/
1447-
private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1448-
long tid = Thread.currentThread().threadId();
1449-
int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1);
1450-
return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit);
1451+
private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1452+
if (scheduler instanceof ForkJoinPool pool) {
1453+
return pool.schedule(command, delay, unit);
1454+
} else {
1455+
return DelayedTaskSchedulers.schedule(command, delay, unit);
1456+
}
14511457
}
14521458

14531459
/**
1454-
* Creates the ScheduledThreadPoolExecutors used to execute delayed tasks.
1460+
* Supports scheduling a runnable task to run after a delay. It uses a number
1461+
* of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
1462+
* work queue used. This class is used when using a custom scheduler.
14551463
*/
1456-
private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1457-
String propName = "jdk.virtualThreadScheduler.timerQueues";
1458-
String propValue = System.getProperty(propName);
1459-
int queueCount;
1460-
if (propValue != null) {
1461-
queueCount = Integer.parseInt(propValue);
1462-
if (queueCount != Integer.highestOneBit(queueCount)) {
1463-
throw new RuntimeException("Value of " + propName + " must be power of 2");
1464-
}
1465-
} else {
1466-
int ncpus = Runtime.getRuntime().availableProcessors();
1467-
queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
1464+
private static class DelayedTaskSchedulers {
1465+
private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();
1466+
1467+
static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
1468+
long tid = Thread.currentThread().threadId();
1469+
int index = (int) tid & (INSTANCE.length - 1);
1470+
return INSTANCE[index].schedule(command, delay, unit);
14681471
}
1469-
var schedulers = new ScheduledExecutorService[queueCount];
1470-
for (int i = 0; i < queueCount; i++) {
1471-
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1472-
Executors.newScheduledThreadPool(1, task -> {
1473-
Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
1474-
t.setDaemon(true);
1475-
return t;
1476-
});
1477-
stpe.setRemoveOnCancelPolicy(true);
1478-
schedulers[i] = stpe;
1472+
1473+
private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
1474+
String propName = "jdk.virtualThreadScheduler.timerQueues";
1475+
String propValue = System.getProperty(propName);
1476+
int queueCount;
1477+
if (propValue != null) {
1478+
queueCount = Integer.parseInt(propValue);
1479+
if (queueCount != Integer.highestOneBit(queueCount)) {
1480+
throw new RuntimeException("Value of " + propName + " must be power of 2");
1481+
}
1482+
} else {
1483+
int ncpus = Runtime.getRuntime().availableProcessors();
1484+
queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
1485+
}
1486+
var schedulers = new ScheduledExecutorService[queueCount];
1487+
for (int i = 0; i < queueCount; i++) {
1488+
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
1489+
Executors.newScheduledThreadPool(1, task -> {
1490+
Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
1491+
t.setDaemon(true);
1492+
return t;
1493+
});
1494+
stpe.setRemoveOnCancelPolicy(true);
1495+
schedulers[i] = stpe;
1496+
}
1497+
return schedulers;
14791498
}
1480-
return schedulers;
14811499
}
14821500

14831501
/**
@@ -1514,4 +1532,4 @@ private static void unblockVirtualThreads() {
15141532
unblocker.setDaemon(true);
15151533
unblocker.start();
15161534
}
1517-
}
1535+
}

src/java.base/share/classes/jdk/internal/access/JavaLangAccess.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2003, 2024, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2003, 2025, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -45,7 +45,6 @@
4545
import java.util.concurrent.ConcurrentHashMap;
4646
import java.util.concurrent.Executor;
4747
import java.util.concurrent.RejectedExecutionException;
48-
import java.util.concurrent.ScheduledExecutorService;
4948
import java.util.stream.Stream;
5049

5150
import jdk.internal.loader.NativeLibraries;
@@ -586,11 +585,6 @@ public interface JavaLangAccess {
586585
*/
587586
Executor virtualThreadDefaultScheduler();
588587

589-
/**
590-
* Returns a stream of the delayed task schedulers used for virtual threads.
591-
*/
592-
Stream<ScheduledExecutorService> virtualThreadDelayedTaskSchedulers();
593-
594588
/**
595589
* Creates a new StackWalker
596590
*/

src/java.base/share/classes/jdk/internal/vm/JcmdVThreadCommands.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2023, 2025, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -50,19 +50,6 @@ private static byte[] printScheduler() {
5050
sb.append(JLA.virtualThreadDefaultScheduler())
5151
.append(System.lineSeparator());
5252

53-
// break
54-
sb.append(System.lineSeparator());
55-
56-
// delayed task schedulers
57-
sb.append("Delayed task schedulers:").append(System.lineSeparator());
58-
var delayedTaskSchedulers = JLA.virtualThreadDelayedTaskSchedulers().toList();
59-
IntStream.range(0, delayedTaskSchedulers.size())
60-
.forEach(i -> sb.append('[')
61-
.append(i)
62-
.append("] ")
63-
.append(delayedTaskSchedulers.get(i))
64-
.append(System.lineSeparator()));
65-
6653
return sb.toString().getBytes(StandardCharsets.UTF_8);
6754
}
6855

test/hotspot/jtreg/serviceability/dcmd/thread/VThreadCommandsTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2023, 2025, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -59,13 +59,11 @@ class VThreadCommandsTest {
5959
*/
6060
@Test
6161
void testVThreadScheduler() {
62-
// ensure default scheduler and timeout schedulers are initialized
62+
// ensure default scheduler is initialized
6363
Thread.startVirtualThread(() -> { });
6464

6565
jcmd("Thread.vthread_scheduler")
66-
.shouldContain(Objects.toIdentityString(defaultScheduler()))
67-
.shouldContain("Delayed task schedulers:")
68-
.shouldContain("[0] " + ScheduledThreadPoolExecutor.class.getName());
66+
.shouldContain(Objects.toIdentityString(defaultScheduler()));
6967
}
7068

7169
/**
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation.
8+
*
9+
* This code is distributed in the hope that it will be useful, but WITHOUT
10+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12+
* version 2 for more details (a copy is included in the LICENSE file that
13+
* accompanied this code).
14+
*
15+
* You should have received a copy of the GNU General Public License version
16+
* 2 along with this work; if not, write to the Free Software Foundation,
17+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18+
*
19+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20+
* or visit www.oracle.com if you need additional information or have any
21+
* questions.
22+
*/
23+
package org.openjdk.bench.java.lang;
24+
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.locks.LockSupport;
27+
import org.openjdk.jmh.annotations.*;
28+
29+
@BenchmarkMode(Mode.Throughput)
30+
@State(Scope.Benchmark)
31+
@Warmup(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
32+
@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
33+
@Fork(value = 3)
34+
@OutputTimeUnit(TimeUnit.SECONDS)
35+
public class VirtualThreadParking {
36+
37+
@Param({"100", "1000", "10000"})
38+
int threadCount;
39+
40+
/**
41+
* Starts N threads that time-park, main thread unparks.
42+
*/
43+
@Benchmark
44+
public void timedParkAndUnpark1() throws Exception {
45+
var threads = new Thread[threadCount];
46+
var unparked = new boolean[threadCount];
47+
for (int i = 0; i < threadCount; i++) {
48+
threads[i] = Thread.ofVirtual().start(() -> {
49+
LockSupport.parkNanos(Long.MAX_VALUE);
50+
});
51+
}
52+
int remaining = threadCount;
53+
while (remaining > 0) {
54+
for (int i = 0; i < threadCount; i++) {
55+
if (!unparked[i]) {
56+
Thread t = threads[i];
57+
if (t.getState() == Thread.State.TIMED_WAITING) {
58+
LockSupport.unpark(t);
59+
unparked[i] = true;
60+
remaining--;
61+
}
62+
}
63+
}
64+
if (remaining > 0) {
65+
Thread.yield();
66+
}
67+
}
68+
for (Thread t : threads) {
69+
t.join();
70+
}
71+
}
72+
73+
/**
74+
* Starts N threads that time-park, start another N threads to unpark.
75+
*/
76+
@Benchmark
77+
public void timedParkAndUnpark2() throws Exception {
78+
var threads = new Thread[threadCount * 2];
79+
for (int i = 0; i < threadCount; i++) {
80+
threads[i] = Thread.ofVirtual().start(() -> {
81+
LockSupport.parkNanos(Long.MAX_VALUE);
82+
});
83+
}
84+
for (int i = 0; i < threadCount; i++) {
85+
Thread thread1 = threads[i];
86+
Thread thread2 = Thread.ofVirtual().start(() -> {
87+
while (thread1.getState() != Thread.State.TIMED_WAITING) {
88+
Thread.yield();
89+
}
90+
LockSupport.unpark(thread1);
91+
});
92+
threads[threadCount + i] = thread2;
93+
}
94+
for (Thread t : threads) {
95+
t.join();
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)