From 8e2e6e1b9bedb148ecd603b66acb8f4666942379 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 8 Aug 2024 15:14:46 +0200 Subject: [PATCH] Vertx HTTP: execute custom logic when HTTP server is started - resolves #42366 --- docs/src/main/asciidoc/http-reference.adoc | 21 ++++++ .../http/start/HttpServerStartEventsTest.java | 69 +++++++++++++++++++ .../vertx/http/DomainSocketServerStart.java | 21 ++++++ .../quarkus/vertx/http/HttpServerStart.java | 21 ++++++ .../quarkus/vertx/http/HttpsServerStart.java | 21 ++++++ .../vertx/http/runtime/VertxHttpRecorder.java | 55 ++++++++++++--- 6 files changed, 197 insertions(+), 11 deletions(-) create mode 100644 extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/start/HttpServerStartEventsTest.java create mode 100644 extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/DomainSocketServerStart.java create mode 100644 extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/HttpServerStart.java create mode 100644 extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/HttpsServerStart.java diff --git a/docs/src/main/asciidoc/http-reference.adoc b/docs/src/main/asciidoc/http-reference.adoc index b7f9786b4db77..8adc6a05806a3 100644 --- a/docs/src/main/asciidoc/http-reference.adoc +++ b/docs/src/main/asciidoc/http-reference.adoc @@ -498,6 +498,27 @@ public class MyCustomizer implements HttpServerOptionsCustomizer { <1> By making the class a managed bean, Quarkus will take the customizer into account when it starts the Vert.x servers <2> In this case, we only care about customizing the HTTP server, so we just override the `customizeHttpServer` method, but users should be aware that `HttpServerOptionsCustomizer` allows configuring the HTTPS and Domain Socket servers as well + +== How to execute logic when HTTP server started + +In order to execute some custom action when the HTTP server is started you'll need to declare an _asynchronous_ CDI observer method. +Quarkus _asynchronously_ fires CDI events of types `io.quarkus.vertx.http.HttpServerStart`, `io.quarkus.vertx.http.HttpsServerStart` and `io.quarkus.vertx.http.DomainSocketServerStart` when the corresponding HTTP server starts listening on the configured host and port. + +.`HttpServerStart` example +[source,java] +---- +@ApplicationScoped +public class MyListener { + + void httpStarted(@ObservesAsync HttpServerStart start) { <1> + // ...notified when the HTTP server starts listening + } +} +---- +<1> An asynchronous `HttpServerStart` observer method may be declared by annotating an `HttpServerStart` parameter with `@jakarta.enterprise.event.ObservesAsync`. + +NOTE: It's not possible to use the `StartupEvent` for this particular use case because this CDI event is fired before the HTTP server is started. + [[reverse-proxy]] == Running behind a reverse proxy diff --git a/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/start/HttpServerStartEventsTest.java b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/start/HttpServerStartEventsTest.java new file mode 100644 index 0000000000000..41185b7e27224 --- /dev/null +++ b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/start/HttpServerStartEventsTest.java @@ -0,0 +1,69 @@ +package io.quarkus.vertx.http.start; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.Dependent; +import jakarta.enterprise.event.ObservesAsync; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.vertx.http.HttpServerStart; +import io.quarkus.vertx.http.HttpsServerStart; +import io.smallrye.certs.Format; +import io.smallrye.certs.junit5.Certificate; +import io.smallrye.certs.junit5.Certificates; + +@Certificates(baseDir = "target/certs", certificates = @Certificate(name = "ssl-test", password = "secret", formats = { + Format.JKS, Format.PKCS12, Format.PEM })) +public class HttpServerStartEventsTest { + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .withApplicationRoot(root -> root.addClasses(MyListener.class) + .addAsResource(new File("target/certs/ssl-test-keystore.jks"), "server-keystore.jks")) + .overrideConfigKey("quarkus.http.ssl.certificate.key-store-file", "server-keystore.jks") + .overrideConfigKey("quarkus.http.ssl.certificate.key-store-password", "secret"); + + @Test + public void test() throws InterruptedException { + assertTrue(MyListener.HTTP.await(5, TimeUnit.SECONDS)); + assertTrue(MyListener.HTTPS.await(5, TimeUnit.SECONDS)); + // httpsStarted() is static + assertEquals(1, MyListener.COUNTER.get()); + } + + @Dependent + public static class MyListener { + + static final AtomicInteger COUNTER = new AtomicInteger(); + static final CountDownLatch HTTP = new CountDownLatch(1); + static final CountDownLatch HTTPS = new CountDownLatch(1); + + void httpStarted(@ObservesAsync HttpServerStart start) { + assertNotNull(start.options()); + HTTP.countDown(); + } + + static void httpsStarted(@ObservesAsync HttpsServerStart start) { + assertNotNull(start.options()); + HTTPS.countDown(); + } + + @PreDestroy + void destroy() { + COUNTER.incrementAndGet(); + } + + } + +} diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/DomainSocketServerStart.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/DomainSocketServerStart.java new file mode 100644 index 0000000000000..a74279c2215dd --- /dev/null +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/DomainSocketServerStart.java @@ -0,0 +1,21 @@ +package io.quarkus.vertx.http; + +import io.vertx.core.http.HttpServerOptions; + +/** + * Quarkus fires a CDI event of this type asynchronously when the domain socket server starts listening + * on the configured host and port. + * + *
+ * @ApplicationScoped
+ * public class MyListener {
+ *
+ *     void domainSocketStarted(@ObservesAsync DomainSocketServerStart start) {
+ *         // ...notified when the domain socket server starts listening
+ *     }
+ * }
+ * 
+ */ +public record DomainSocketServerStart(HttpServerOptions options) { + +} diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/HttpServerStart.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/HttpServerStart.java new file mode 100644 index 0000000000000..da052f863897f --- /dev/null +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/HttpServerStart.java @@ -0,0 +1,21 @@ +package io.quarkus.vertx.http; + +import io.vertx.core.http.HttpServerOptions; + +/** + * Quarkus fires a CDI event of this type asynchronously when the HTTP server starts listening + * on the configured host and port. + * + *
+ * @ApplicationScoped
+ * public class MyListener {
+ *
+ *     void httpStarted(@ObservesAsync HttpServerStart start) {
+ *         // ...notified when the HTTP server starts listening
+ *     }
+ * }
+ * 
+ */ +public record HttpServerStart(HttpServerOptions options) { + +} diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/HttpsServerStart.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/HttpsServerStart.java new file mode 100644 index 0000000000000..ae7f8acdb5da4 --- /dev/null +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/HttpsServerStart.java @@ -0,0 +1,21 @@ +package io.quarkus.vertx.http; + +import io.vertx.core.http.HttpServerOptions; + +/** + * Quarkus fires a CDI event of this type asynchronously when the HTTPS server starts listening + * on the configured host and port. + * + *
+ * @ApplicationScoped
+ * public class MyListener {
+ *
+ *     void httpsStarted(@ObservesAsync HttpsServerStart start) {
+ *         // ...notified when the HTTPS server starts listening
+ *     }
+ * }
+ * 
+ */ +public record HttpsServerStart(HttpServerOptions options) { + +} diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java index a3d8ebd792d96..2e874d8f21161 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -46,6 +47,7 @@ import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpResponseStatus; import io.quarkus.arc.Arc; +import io.quarkus.arc.ArcContainer; import io.quarkus.arc.InstanceHandle; import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.bootstrap.runner.Timing; @@ -71,7 +73,10 @@ import io.quarkus.tls.runtime.config.TlsConfig; import io.quarkus.vertx.core.runtime.VertxCoreRecorder; import io.quarkus.vertx.core.runtime.config.VertxConfiguration; +import io.quarkus.vertx.http.DomainSocketServerStart; import io.quarkus.vertx.http.HttpServerOptionsCustomizer; +import io.quarkus.vertx.http.HttpServerStart; +import io.quarkus.vertx.http.HttpsServerStart; import io.quarkus.vertx.http.ManagementInterface; import io.quarkus.vertx.http.runtime.HttpConfiguration.InsecureRequests; import io.quarkus.vertx.http.runtime.devmode.RemoteSyncHandler; @@ -744,8 +749,9 @@ private static CompletableFuture initializeMainHttpServer(Vertx vertx, H launchMode, websocketSubProtocols, registry); // Customize - if (Arc.container() != null) { - List> instances = Arc.container() + ArcContainer container = Arc.container(); + if (container != null) { + List> instances = container .listAll(HttpServerOptionsCustomizer.class); for (InstanceHandle instance : instances) { HttpServerOptionsCustomizer customizer = instance.get(); @@ -784,12 +790,17 @@ private static CompletableFuture initializeMainHttpServer(Vertx vertx, H CompletableFuture futureResult = new CompletableFuture<>(); AtomicInteger connectionCount = new AtomicInteger(); + + // Note that a new HttpServer is created for each IO thread but we only want to fire the events (HttpServerStart etc.) once, + // for the first server that started listening + // See https://vertx.io/docs/vertx-core/java/#_server_sharing for more information + AtomicBoolean startEventsFired = new AtomicBoolean(); + vertx.deployVerticle(new Supplier() { @Override public Verticle get() { return new WebDeploymentVerticle(httpMainServerOptions, httpMainSslServerOptions, httpMainDomainSocketOptions, - launchMode, - insecureRequestStrategy, httpConfiguration, connectionCount, registry); + launchMode, insecureRequestStrategy, httpConfiguration, connectionCount, registry, startEventsFired); } }, new DeploymentOptions().setInstances(ioThreads), new Handler>() { @Override @@ -1129,11 +1140,12 @@ private static class WebDeploymentVerticle extends AbstractVerticle implements R private final HttpConfiguration quarkusConfig; private final AtomicInteger connectionCount; private final List reloadingTasks = new CopyOnWriteArrayList<>(); + private final AtomicBoolean startEventsFired; public WebDeploymentVerticle(HttpServerOptions httpOptions, HttpServerOptions httpsOptions, HttpServerOptions domainSocketOptions, LaunchMode launchMode, InsecureRequests insecureRequests, HttpConfiguration quarkusConfig, AtomicInteger connectionCount, - TlsConfigurationRegistry registry) { + TlsConfigurationRegistry registry, AtomicBoolean startEventsFired) { this.httpOptions = httpOptions; this.httpsOptions = httpsOptions; this.launchMode = launchMode; @@ -1142,6 +1154,7 @@ public WebDeploymentVerticle(HttpServerOptions httpOptions, HttpServerOptions ht this.quarkusConfig = quarkusConfig; this.connectionCount = connectionCount; this.registry = registry; + this.startEventsFired = startEventsFired; org.crac.Core.getGlobalContext().register(this); } @@ -1166,6 +1179,9 @@ public void start(Promise startFuture) { .fail(new IllegalArgumentException("Must configure at least one of http, https or unix domain socket")); } + ArcContainer container = Arc.container(); + boolean notifyStartObservers = container != null ? startEventsFired.compareAndSet(false, true) : false; + if (httpServerEnabled) { httpServer = vertx.createHttpServer(httpOptions); if (insecureRequests == HttpConfiguration.InsecureRequests.ENABLED) { @@ -1196,27 +1212,34 @@ public void handle(HttpServerRequest req) { } }); } - setupTcpHttpServer(httpServer, httpOptions, false, startFuture, remainingCount, connectionCount); + setupTcpHttpServer(httpServer, httpOptions, false, startFuture, remainingCount, connectionCount, + container, notifyStartObservers); } if (domainSocketOptions != null) { domainSocketServer = vertx.createHttpServer(domainSocketOptions); domainSocketServer.requestHandler(ACTUAL_ROOT); - setupUnixDomainSocketHttpServer(domainSocketServer, domainSocketOptions, startFuture, remainingCount); + setupUnixDomainSocketHttpServer(domainSocketServer, domainSocketOptions, startFuture, remainingCount, + container, notifyStartObservers); } if (httpsOptions != null) { httpsServer = vertx.createHttpServer(httpsOptions); httpsServer.requestHandler(ACTUAL_ROOT); - setupTcpHttpServer(httpsServer, httpsOptions, true, startFuture, remainingCount, connectionCount); + setupTcpHttpServer(httpsServer, httpsOptions, true, startFuture, remainingCount, connectionCount, + container, notifyStartObservers); } } private void setupUnixDomainSocketHttpServer(HttpServer httpServer, HttpServerOptions options, Promise startFuture, - AtomicInteger remainingCount) { + AtomicInteger remainingCount, ArcContainer container, boolean notifyStartObservers) { httpServer.listen(SocketAddress.domainSocketAddress(options.getHost()), event -> { if (event.succeeded()) { + if (notifyStartObservers) { + container.beanManager().getEvent().select(DomainSocketServerStart.class) + .fireAsync(new DomainSocketServerStart(options)); + } if (remainingCount.decrementAndGet() == 0) { startFuture.complete(null); } @@ -1240,7 +1263,8 @@ private void setupUnixDomainSocketHttpServer(HttpServer httpServer, HttpServerOp } private void setupTcpHttpServer(HttpServer httpServer, HttpServerOptions options, boolean https, - Promise startFuture, AtomicInteger remainingCount, AtomicInteger currentConnectionCount) { + Promise startFuture, AtomicInteger remainingCount, AtomicInteger currentConnectionCount, + ArcContainer container, boolean notifyStartObservers) { if (quarkusConfig.limits.maxConnections.isPresent() && quarkusConfig.limits.maxConnections.getAsInt() > 0) { var tracker = vertx.isMetricsEnabled() @@ -1315,11 +1339,20 @@ public void handle(AsyncResult event) { } if (https) { - CDI.current().select(HttpCertificateUpdateEventListener.class).get() + container.instance(HttpCertificateUpdateEventListener.class).get() .register(event.result(), quarkusConfig.tlsConfigurationName.orElse(TlsConfig.DEFAULT_NAME), "http server"); } + if (notifyStartObservers) { + Event startEvent = container.beanManager().getEvent(); + if (https) { + startEvent.select(HttpsServerStart.class).fireAsync(new HttpsServerStart(options)); + } else { + startEvent.select(HttpServerStart.class).fireAsync(new HttpServerStart(options)); + } + } + if (remainingCount.decrementAndGet() == 0) { //make sure we only complete once startFuture.complete(null);