Skip to content

Commit

Permalink
Enable support for project loom for Jetty's thread pool (dropwizard#7457
Browse files Browse the repository at this point in the history
)

* Enable support for project loom for Jetty's thread pool

* Add support for virtual threads on the admin connectors

Co-authored-by: Peter Stackle <pstackle@users.noreply.github.com>

---------

Co-authored-by: Peter Stackle <pstackle@users.noreply.github.com>
  • Loading branch information
zUniQueX and pstackle authored Oct 18, 2023
1 parent e995fd2 commit 5fbcb6d
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 15 deletions.
29 changes: 16 additions & 13 deletions docs/source/manual/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ registerDefaultExceptionMappers true
enableThreadNameFilter true Whether or not to apply the ``ThreadNameFilter`` that adjusts thread names to include the request method and request URI.
dumpAfterStart false Whether or not to dump `Jetty Diagnostics`_ after start.
dumpBeforeStop false Whether or not to dump `Jetty Diagnostics`_ before stop.
enableVirtualThreads false Whether to enable virtual threads for Jetty's thread pool.
=================================== =============================================== =============================================================================

.. _Jetty Diagnostics: https://www.eclipse.org/jetty/documentation/9.4.x/jetty-dump-tool.html
Expand Down Expand Up @@ -242,6 +243,7 @@ Extends the attributes that are available to :ref:`all servers <man-configuratio
server:
adminMinThreads: 1
adminMaxThreads: 64
enableAdminVirtualThreads: false
adminContextPath: /
applicationContextPath: /
applicationConnectors:
Expand All @@ -262,19 +264,20 @@ Extends the attributes that are available to :ref:`all servers <man-configuratio
validateCerts: false
======================== ======================= =====================================================================
Name Default Description
======================== ======================= =====================================================================
applicationConnectors An `HTTP connector`_ A set of :ref:`connectors <man-configuration-connectors>` which will
listening on port 8080. handle application requests.
adminConnectors An `HTTP connector`_ An `HTTP connector`_ listening on port 8081.
listening on port 8081. A set of :ref:`connectors <man-configuration-connectors>` which will
handle admin requests.
adminMinThreads 1 The minimum number of threads to use for admin requests.
adminMaxThreads 64 The maximum number of threads to use for admin requests.
adminContextPath / The context path of the admin servlets, including metrics and tasks.
applicationContextPath / The context path of the application servlets, including Jersey.
======================== ======================= =====================================================================
========================= ======================= =====================================================================
Name Default Description
========================= ======================= =====================================================================
applicationConnectors An `HTTP connector`_ A set of :ref:`connectors <man-configuration-connectors>` which will
listening on port 8080. handle application requests.
adminConnectors An `HTTP connector`_ An `HTTP connector`_ listening on port 8081.
listening on port 8081. A set of :ref:`connectors <man-configuration-connectors>` which will
handle admin requests.
adminMinThreads 1 The minimum number of threads to use for admin requests.
adminMaxThreads 64 The maximum number of threads to use for admin requests.
enableAdminVirtualThreads false Whether to use virtual threads for the admin connectors.
adminContextPath / The context path of the admin servlets, including metrics and tasks.
applicationContextPath / The context path of the application servlets, including Jersey.
========================= ======================= =====================================================================

.. _`HTTP connector`: https://github.com/dropwizard/dropwizard/blob/master/dropwizard-jetty/src/main/java/io/dropwizard/jetty/HttpConnectorFactory.java

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.eclipse.jetty.setuid.RLimit;
import org.eclipse.jetty.setuid.SetUIDListener;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,10 +52,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.InvocationTargetException;
import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import static com.codahale.metrics.annotation.ResponseMeteredLevel.COARSE;
Expand Down Expand Up @@ -231,6 +235,13 @@
* Whether or not to dump jetty diagnostics before stop.
* </td>
* </tr>
* <tr>
* <td>{@code enableVirtualThreads}</td>
* <td>false</td>
* <td>
* Whether to use virtual threads for Jetty's thread pool.
* </td>
* </tr>
* </table>
*
* @see DefaultServerFactory
Expand Down Expand Up @@ -312,6 +323,8 @@ public abstract class AbstractServerFactory implements ServerFactory {

private boolean dumpBeforeStop = false;

private boolean enableVirtualThreads = false;

@JsonIgnore
@ValidationMethod(message = "must have a smaller minThreads than maxThreads")
public boolean isThreadPoolSizedCorrectly() {
Expand Down Expand Up @@ -581,6 +594,16 @@ public void setDumpBeforeStop(boolean dumpBeforeStop) {
this.dumpBeforeStop = dumpBeforeStop;
}

@JsonProperty
public boolean isEnableVirtualThreads() {
return enableVirtualThreads;
}

@JsonProperty
public void setEnableVirtualThreads(boolean enableVirtualThreads) {
this.enableVirtualThreads = enableVirtualThreads;
}

protected Handler createAdminServlet(Server server,
MutableServletContextHandler handler,
MetricRegistry metrics,
Expand Down Expand Up @@ -641,13 +664,34 @@ protected Handler createAppServlet(Server server,

protected ThreadPool createThreadPool(MetricRegistry metricRegistry) {
final BlockingQueue<Runnable> queue = new BlockingArrayQueue<>(minThreads, maxThreads, maxQueuedRequests);
final ThreadFactory threadFactory = getThreadFactory(enableVirtualThreads);
final InstrumentedQueuedThreadPool threadPool =
new InstrumentedQueuedThreadPool(metricRegistry, maxThreads, minThreads,
(int) idleThreadTimeout.toMilliseconds(), queue);
(int) idleThreadTimeout.toMilliseconds(), queue, threadFactory);
threadPool.setName("dw");
return threadPool;
}

protected ThreadFactory getThreadFactory(boolean virtualThreadsRequested) {
if (!virtualThreadsRequested) {
return Executors.defaultThreadFactory();
}

if (!VirtualThreads.areSupported()) {
throw new UnsupportedOperationException("Virtual threads are requested but not supported on the current runtime");
}

try {
Class<?> threadBuilderClass = Class.forName("java.lang.Thread$Builder");
Object virtualThreadBuilder = threadBuilderClass.cast(Thread.class.getDeclaredMethod("ofVirtual").invoke(null));
return (ThreadFactory) threadBuilderClass.getDeclaredMethod("factory").invoke(virtualThreadBuilder);
} catch (InvocationTargetException invocationTargetException) {
throw new IllegalStateException("Error while enabling virtual threads", invocationTargetException.getCause());
} catch (Exception exception) {
throw new IllegalStateException("Error while enabling virtual threads", exception);
}
}

protected Server buildServer(LifecycleEnvironment lifecycle,
ThreadPool threadPool) {
final Server server = new Server(threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;

/**
* The default implementation of {@link ServerFactory}, which allows for multiple sets of
Expand Down Expand Up @@ -59,6 +60,11 @@
* <td>1</td>
* <td>The minimum number of threads to use for admin requests.</td>
* </tr>
* <tr>
* <td>{@code enableAdminVirtualThreads}</td>
* <td>false</td>
* <td>Whether to use virtual threads for the admin connectors</td>
* </tr>
* </table>
* <p/>
* For more configuration parameters, see {@link AbstractServerFactory}.
Expand All @@ -84,6 +90,8 @@ public class DefaultServerFactory extends AbstractServerFactory {
@Min(1)
private int adminMinThreads = 1;

private boolean enableAdminVirtualThreads;

@NotEmpty
private String applicationContextPath = "/";

Expand Down Expand Up @@ -130,6 +138,16 @@ public void setAdminMinThreads(int adminMinThreads) {
this.adminMinThreads = adminMinThreads;
}

@JsonProperty
public boolean isEnableAdminVirtualThreads() {
return enableAdminVirtualThreads;
}

@JsonProperty
public void setEnableAdminVirtualThreads(boolean enableAdminVirtualThreads) {
this.enableAdminVirtualThreads = enableAdminVirtualThreads;
}

@JsonProperty
public String getApplicationContextPath() {
return applicationContextPath;
Expand Down Expand Up @@ -211,9 +229,17 @@ private RoutingHandler buildRoutingHandler(MetricRegistry metricRegistry,
}

private List<Connector> buildAdminConnectors(MetricRegistry metricRegistry, Server server) {
final ThreadFactory threadFactory = getThreadFactory(enableAdminVirtualThreads);
// threadpool is shared between all the connectors, so it should be managed by the server instead of the
// individual connectors
final QueuedThreadPool threadPool = new InstrumentedQueuedThreadPool(metricRegistry, adminMaxThreads, adminMinThreads);
@SuppressWarnings("NullAway")
final QueuedThreadPool threadPool = new InstrumentedQueuedThreadPool(
metricRegistry,
adminMaxThreads,
adminMinThreads,
60000, // overload default
null, // overload default
threadFactory);
threadPool.setName("dw-admin");
server.addBean(threadPool);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package io.dropwizard.core;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.health.HealthCheckRegistry;
import io.dropwizard.core.server.DefaultServerFactory;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.jackson.Jackson;
import io.dropwizard.jersey.validation.Validators;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.VirtualThreads;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;

@EnabledForJreRange(min = JRE.JAVA_21)
class VirtualThreadsTest {
private static class VirtualThreadsConfiguration extends Configuration {
}

@Test
void virtualThreadsEnabledWhenRequested() throws Exception {
boolean isVirtualThread = probeVirtualThread(
defaultServerFactory -> defaultServerFactory.setEnableVirtualThreads(true),
this::selectServerThreadPool
);

assertThat(isVirtualThread).isTrue();
}

@Test
void virtualThreadsDisabledWhenNotRequested() throws Exception {
boolean isVirtualThread = probeVirtualThread(
defaultServerFactory -> defaultServerFactory.setEnableVirtualThreads(false),
this::selectServerThreadPool
);

assertThat(isVirtualThread).isFalse();
}

@Test
void virtualAdminThreadsEnabledWhenRequested() throws Exception {
boolean isVirtualThread = probeVirtualThread(
defaultServerFactory -> defaultServerFactory.setEnableAdminVirtualThreads(true),
this::selectAdminThreadPool
);

assertThat(isVirtualThread).isTrue();
}

@Test
void virtualAdminThreadsDisabledWhenNotRequested() throws Exception {
boolean isVirtualThread = probeVirtualThread(
defaultServerFactory -> defaultServerFactory.setEnableAdminVirtualThreads(false),
this::selectAdminThreadPool
);

assertThat(isVirtualThread).isFalse();
}

private boolean probeVirtualThread(Consumer<DefaultServerFactory> defaultServerFactoryConsumer,
Function<Server, ThreadPool> threadPoolSelector) throws Exception {
final AtomicBoolean isVirtualThread = new AtomicBoolean(false);

Environment environment = new Environment("VirtualThreadsTest", Jackson.newMinimalObjectMapper(),
Validators.newValidatorFactory(), new MetricRegistry(), this.getClass().getClassLoader(),
new HealthCheckRegistry(), new VirtualThreadsConfiguration());
DefaultServerFactory defaultServerFactory = new DefaultServerFactory();
defaultServerFactoryConsumer.accept(defaultServerFactory);
Server server = defaultServerFactory.build(environment);
server.start();
try {
ThreadPool threadPool = threadPoolSelector.apply(server);
threadPool.execute(
() -> isVirtualThread.set(VirtualThreads.isVirtualThread())
);
} finally {
server.stop();
}

return isVirtualThread.get();
}

private ThreadPool selectServerThreadPool(Server server) {
return server.getThreadPool();
}

private ThreadPool selectAdminThreadPool(Server server) {
final int adminPort = 8081;
return Arrays.stream(server.getConnectors())
.filter(ServerConnector.class::isInstance)
.map(ServerConnector.class::cast)
.filter(serverConnector -> serverConnector.getLocalPort() == adminPort)
.map(AbstractConnector::getExecutor)
.filter(ThreadPool.class::isInstance)
.map(ThreadPool.class::cast)
.findFirst()
.orElseThrow(() -> new IllegalStateException("Couldn't find thread pool of admin connector"));
}
}

0 comments on commit 5fbcb6d

Please sign in to comment.