diff --git a/devtools/dev-space/dev-proxy/dev-proxy-server/pom.xml b/devtools/dev-space/dev-proxy/dev-proxy-server/pom.xml deleted file mode 100644 index 0b05127ead1ed..0000000000000 --- a/devtools/dev-space/dev-proxy/dev-proxy-server/pom.xml +++ /dev/null @@ -1,117 +0,0 @@ - - - 4.0.0 - - quarkus-build-parent - io.quarkus - 999-SNAPSHOT - ../../../../build-parent/pom.xml - - io.quarkus.devspace - devspace-proxy-server - - - - - io.vertx - vertx-http-proxy - - - io.quarkus - quarkus-reactive-routes - - - io.quarkus - quarkus-jackson - - - io.quarkus - quarkus-arc - - - io.quarkus.devspace - devspace-proxy-core - ${project.version} - - - io.quarkus - quarkus-junit5 - test - - - io.rest-assured - rest-assured - test - - - - - - io.quarkus - quarkus-maven-plugin - true - - - - build - generate-code - generate-code-tests - - - - - - maven-compiler-plugin - - - -parameters - - - - - maven-surefire-plugin - - - org.jboss.logmanager.LogManager - ${maven.home} - - - - - maven-failsafe-plugin - - - - integration-test - verify - - - - ${project.build.directory}/${project.build.finalName}-runner - - org.jboss.logmanager.LogManager - ${maven.home} - - - - - - - - - - native - - - native - - - - false - native - - - - diff --git a/devtools/dev-space/dev-proxy/dev-proxy-server/src/main/java/io/quarkus/devspace/server/ProxyConfig.java b/devtools/dev-space/dev-proxy/dev-proxy-server/src/main/java/io/quarkus/devspace/server/ProxyConfig.java deleted file mode 100644 index 6ccf4bad97533..0000000000000 --- a/devtools/dev-space/dev-proxy/dev-proxy-server/src/main/java/io/quarkus/devspace/server/ProxyConfig.java +++ /dev/null @@ -1,12 +0,0 @@ -package io.quarkus.devspace.server; - -import io.smallrye.config.ConfigMapping; - -@ConfigMapping(prefix = "devspace") -public interface ProxyConfig { - String name(); - - String host(); - - int port(); -} diff --git a/devtools/dev-space/dev-proxy/dev-proxy-server/src/main/java/io/quarkus/devspace/server/QuarkusDevProxyServer.java b/devtools/dev-space/dev-proxy/dev-proxy-server/src/main/java/io/quarkus/devspace/server/QuarkusDevProxyServer.java deleted file mode 100644 index 2274cc2f90dbe..0000000000000 --- a/devtools/dev-space/dev-proxy/dev-proxy-server/src/main/java/io/quarkus/devspace/server/QuarkusDevProxyServer.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.quarkus.devspace.server; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; -import jakarta.inject.Inject; - -import io.quarkus.runtime.StartupEvent; -import io.vertx.core.Vertx; -import io.vertx.ext.web.Router; - -@ApplicationScoped -public class QuarkusDevProxyServer extends DevProxyServer { - - @Inject - protected ProxyConfig config; - - public void start(@Observes StartupEvent start, Vertx vertx, Router router) { - this.vertx = vertx; - this.router = router; - init(new ServiceConfig(config.name(), config.host(), config.port())); - } -} diff --git a/devtools/dev-space/dev-proxy/dev-proxy-server/src/main/resources/application.properties b/devtools/dev-space/dev-proxy/dev-proxy-server/src/main/resources/application.properties deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/devtools/dev-space/dev-proxy/pom.xml b/devtools/dev-space/dev-proxy/pom.xml deleted file mode 100644 index 76e77674a2fb2..0000000000000 --- a/devtools/dev-space/dev-proxy/pom.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - 4.0.0 - - - quarkus-build-parent - io.quarkus - 999-SNAPSHOT - ../../../build-parent/pom.xml - - - quarkus-devspace-proxy-all - pom - Quarkus - Dev Space Proxy - - - INFO - - - - core - dev-proxy-server - - diff --git a/devtools/dev-space/pom.xml b/devtools/dev-space/pom.xml deleted file mode 100644 index 64a185d7164e9..0000000000000 --- a/devtools/dev-space/pom.xml +++ /dev/null @@ -1,25 +0,0 @@ - - - 4.0.0 - - - quarkus-build-parent - io.quarkus - 999-SNAPSHOT - ../../build-parent/pom.xml - - - quarkus-devspace-all - pom - Quarkus - Dev Space - - - INFO - - - - dev-proxy - - diff --git a/devtools/pom.xml b/devtools/pom.xml index a446bd7494fc5..a01063c3f5c10 100644 --- a/devtools/pom.xml +++ b/devtools/pom.xml @@ -26,6 +26,5 @@ maven gradle cli - dev-space diff --git a/devtools/dev-space/dev-proxy/dev-proxy-server/src/test/java/io/quarkus/devspace/test/DevProxyTestCase.java b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/DevSpaceProxyTest.java similarity index 72% rename from devtools/dev-space/dev-proxy/dev-proxy-server/src/test/java/io/quarkus/devspace/test/DevProxyTestCase.java rename to extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/DevSpaceProxyTest.java index 2e7ca3d9b34fd..853279423932e 100644 --- a/devtools/dev-space/dev-proxy/dev-proxy-server/src/test/java/io/quarkus/devspace/test/DevProxyTestCase.java +++ b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/DevSpaceProxyTest.java @@ -1,50 +1,50 @@ -package io.quarkus.devspace.test; +package io.quarkus.vertx.http; import static io.restassured.RestAssured.given; import static org.hamcrest.CoreMatchers.equalTo; -import java.util.Map; - +import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkus.devspace.ProxyUtils; import io.quarkus.devspace.client.DevProxyClient; import io.quarkus.devspace.server.DevProxyServer; -import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.junit.QuarkusTestProfile; -import io.quarkus.test.junit.TestProfile; +import io.quarkus.devspace.server.ServiceConfig; +import io.quarkus.test.QuarkusUnitTest; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; +import io.vertx.ext.web.Router; -@QuarkusTest -@TestProfile(DevProxyTestCase.ConfigOverrides.class) -public class DevProxyTestCase { +public class DevSpaceProxyTest { + public static final int SERVICE_PORT = 9091; + public static final int PROXY_PORT = 9092; @Inject public Vertx vertx; - @Inject - public DevProxyServer server; + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar.addClasses(DevSpaceProxyTest.RouteProducer.class)); + + public static DevProxyServer proxyServer; + public static HttpServer proxy; static HttpServer myService; - static HttpServer localService; - - public static class ConfigOverrides implements QuarkusTestProfile { - @Override - public Map getConfigOverrides() { - return Map.of( - "devspace.host", "localhost", - "devspace.name", "my-service", - "devspace.port", "9091" - //,"quarkus.log.level", "DEBUG" - ); + @Singleton + public static class RouteProducer { + void observeRouter(@Observes Router router) { + router.route().handler( + request -> request.response().setStatusCode(200).putHeader("Content-Type", "text/plain").end("local")); } + } @BeforeEach @@ -55,24 +55,23 @@ public void before() { myService = vertx.createHttpServer(); myService.requestHandler(request -> { request.response().setStatusCode(200).putHeader("Content-Type", "text/plain").end("my-service"); - }).listen(9091); + }).listen(SERVICE_PORT); - localService = vertx.createHttpServer(); - localService.requestHandler(request -> { - request.response() - .setStatusCode(200) - .putHeader("Content-Type", "text/plain") - .end("local"); - }).listen(9092); + proxy = vertx.createHttpServer(); + proxyServer = new DevProxyServer(); + Router proxyRouter = Router.router(vertx); + ServiceConfig config = new ServiceConfig("my-service", "localhost", SERVICE_PORT); + proxyServer.init(vertx, proxyRouter, config); + proxy.requestHandler(proxyRouter).listen(PROXY_PORT); } @AfterAll public static void after() { - if (myService != null) + if (myService != null) { ProxyUtils.await(1000, myService.close()); - if (localService != null) - ProxyUtils.await(1000, localService.close()); + ProxyUtils.await(1000, proxy.close()); + } } @Test @@ -80,14 +79,14 @@ public void testProxy() { // invoke service directly given() .when() - .port(9091) + .port(SERVICE_PORT) .get("/yo") .then() .statusCode(200) .body(equalTo("my-service")); given() .when() - .port(9091) + .port(SERVICE_PORT) .body("hello") .contentType("text/plain") .post("/yo") @@ -97,14 +96,12 @@ public void testProxy() { // invoke local directly given() .when() - .port(9092) .get("/yo") .then() .statusCode(200) .body(equalTo("local")); given() .when() - .port(9092) .body("hello") .contentType("text/plain") .post("/yo") @@ -114,12 +111,14 @@ public void testProxy() { // invoke proxy given() .when() + .port(PROXY_PORT) .get("/yo") .then() .statusCode(200) .body(equalTo("my-service")); given() .when() + .port(PROXY_PORT) .body("hello") .contentType("text/plain") .post("/yo") @@ -128,7 +127,7 @@ public void testProxy() { .body(equalTo("my-service")); } - @Test + //@Test public void testGlobalSession() throws Exception { DevProxyClient client = DevProxyClient.create(vertx) .proxy("localhost", 8081, false) diff --git a/devtools/dev-space/dev-proxy/core/pom.xml b/extensions/vertx-http/devspace-proxy-core/pom.xml similarity index 67% rename from devtools/dev-space/dev-proxy/core/pom.xml rename to extensions/vertx-http/devspace-proxy-core/pom.xml index e675d7ca47cd4..e9008a47fd3eb 100644 --- a/devtools/dev-space/dev-proxy/core/pom.xml +++ b/extensions/vertx-http/devspace-proxy-core/pom.xml @@ -4,10 +4,9 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 4.0.0 - quarkus-build-parent + quarkus-vertx-http-parent io.quarkus 999-SNAPSHOT - ../../../../build-parent/pom.xml io.quarkus.devspace devspace-proxy-core @@ -36,22 +35,6 @@ org.jboss.logging jboss-logging - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - - - com.fasterxml.jackson.datatype - jackson-datatype-jdk8 - - - com.fasterxml.jackson.module - jackson-module-parameter-names - io.rest-assured rest-assured diff --git a/devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/ProxyUtils.java b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/ProxyUtils.java similarity index 100% rename from devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/ProxyUtils.java rename to extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/ProxyUtils.java diff --git a/devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/client/DevProxyClient.java b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/client/DevProxyClient.java similarity index 96% rename from devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/client/DevProxyClient.java rename to extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/client/DevProxyClient.java index c1b995e9a5755..78f02caee7d44 100644 --- a/devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/client/DevProxyClient.java +++ b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/client/DevProxyClient.java @@ -30,6 +30,7 @@ public class DevProxyClient { protected String service; protected String pollLink; protected CountDownLatch workerShutdown; + protected long pollTimeoutMillis = 1000; protected DevProxyClient() { @@ -82,16 +83,20 @@ public boolean startSession(String sessionId) throws Exception { }); latch.await(); if (!success.get()) { - shutdown = true; - running = false; - proxyClient.close(); - serviceClient.close(); + forcedShutdown(); return false; } this.sessionId = sessionId; return true; } + public void forcedShutdown() { + shutdown = true; + running = false; + proxyClient.close(); + serviceClient.close(); + } + protected void pollFailure(Throwable failure) { log.error("Poll failed", failure); workerOffline(); @@ -113,7 +118,7 @@ protected void poll() { } proxyClient.request(HttpMethod.POST, pollLink) .onSuccess(request -> { - request.setTimeout(DevProxyServer.POLL_TIMEOUT) + request.setTimeout(pollTimeoutMillis) .send() .onSuccess(this::handlePoll) //.onSuccess(this::handlePollWaitBody) @@ -209,7 +214,7 @@ private void handleServiceResponse(String responsePath, HttpClientResponse servi workerOffline(); }) .onSuccess(pushRequest -> { - pushRequest.setTimeout(DevProxyServer.POLL_TIMEOUT * 2); + pushRequest.setTimeout(pollTimeoutMillis); pushRequest.putHeader(DevProxyServer.STATUS_CODE_HEADER, Integer.toString(serviceResponse.statusCode())); serviceResponse.headers() .forEach((key, val) -> pushRequest.headers().add(DevProxyServer.HEADER_FORWARD_PREFIX + key, val)); diff --git a/devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/client/DevProxyClientBuilder.java b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/client/DevProxyClientBuilder.java similarity index 93% rename from devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/client/DevProxyClientBuilder.java rename to extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/client/DevProxyClientBuilder.java index b38c0c0c817f6..775dc4012e297 100644 --- a/devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/client/DevProxyClientBuilder.java +++ b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/client/DevProxyClientBuilder.java @@ -57,6 +57,11 @@ public DevProxyClientBuilder service(String name, String host, int port, HttpCli return this; } + public DevProxyClientBuilder pollTimeoutMillis(long timeout) { + devProxyClient.pollTimeoutMillis = timeout; + return this; + } + public DevProxyClient build() { return devProxyClient; } diff --git a/devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java similarity index 83% rename from devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java rename to extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java index 3b5a5cf1adf72..6a7f8e9a5b750 100644 --- a/devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java +++ b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java @@ -32,9 +32,32 @@ import io.vertx.httpproxy.HttpProxy; public class DevProxyServer { - class ProxySession { + public class ProxySession { final BlockingQueue queue = new LinkedBlockingQueue<>(); final ConcurrentHashMap responsePending = new ConcurrentHashMap<>(); + final ServiceProxy proxy; + final long timerId; + final String sessionId; + final String who; + + ProxySession(ServiceProxy proxy, String sessionId, String who) { + timerId = vertx.setPeriodic(POLL_TIMEOUT, this::timerCallback); + this.proxy = proxy; + this.sessionId = sessionId; + this.who = who; + } + + void timerCallback(Long t) { + checkIdle(); + } + + private void checkIdle() { + if (!running) + return; + if (System.currentTimeMillis() - lastPoll > POLL_TIMEOUT && numPollers == 0) { + shutdown(); + } + } volatile boolean running = true; AtomicLong requestId = new AtomicLong(System.currentTimeMillis()); @@ -49,18 +72,47 @@ RoutingContext dequeueResponse(String requestId) { return responsePending.remove(requestId); } - void shutdown() { + synchronized void shutdown() { + if (!running) + return; running = false; + proxy.sessions.remove(this.sessionId); + vertx.cancelTimer(timerId); while (!queue.isEmpty()) { List requests = new ArrayList<>(); queue.drainTo(requests); - requests.stream().forEach((ctx) -> ctx.fail(500)); + requests.stream().forEach((ctx) -> proxy.proxy.handle(ctx.request())); } } + volatile long lastPoll; + volatile int numPollers; + + void pollStarted() { + lastPoll = System.currentTimeMillis(); + numPollers++; + } + + void pollProcessing() { + lastPoll = System.currentTimeMillis(); + } + + void pollEnded() { + numPollers--; + lastPoll = System.currentTimeMillis(); + } + + synchronized void pollDisconnect() { + numPollers--; + if (!running) { + return; + } + checkIdle(); + } + } - class ServiceProxy { + public class ServiceProxy { public ServiceProxy(ServiceConfig service) { this.config = service; this.proxy = HttpProxy.reverseProxy(client); @@ -76,19 +128,6 @@ void shutdown() { session.shutdown(); } } - - void removeSession(String name) { - // forward RoutingContext to proxy - ProxySession session = sessions.remove(name); - if (session != null) { - session.running = false; - while (!session.queue.isEmpty()) { - List requests = new ArrayList<>(); - session.queue.drainTo(requests); - requests.stream().forEach((ctx) -> proxy.handle(ctx.request())); - } - } - } } public static final String CLIENT_API_PATH = "/_dev_proxy_client_"; @@ -104,14 +143,16 @@ void removeSession(String name) { public static final String POLL_LINK = "X-Depot-Poll-Path"; public static final String PROXY_API_PATH = "/_dev_proxy_api_"; - public static final long POLL_TIMEOUT = 1000; + protected long POLL_TIMEOUT = 1000; protected static final Logger log = Logger.getLogger(DevProxyServer.class); protected ServiceProxy service; protected Vertx vertx; protected Router router; protected HttpClient client; - protected void init(ServiceConfig config) { + public void init(Vertx vertx, Router router, ServiceConfig config) { + this.vertx = vertx; + this.router = router; client = vertx.createHttpClient(); // API routes router.route().handler((context) -> { @@ -192,22 +233,6 @@ private static void sendBody(HttpServerRequest source, HttpServerResponse destin }); } - public Vertx getVertx() { - return vertx; - } - - public void setVertx(Vertx vertx) { - this.vertx = vertx; - } - - public Router getRouter() { - return router; - } - - public void setRouter(Router router) { - this.router = router; - } - public void proxy(RoutingContext ctx) { log.debug("*** entered proxy ***"); // Get session id from header or cookie @@ -245,15 +270,32 @@ public void clientConnect(RoutingContext ctx) { if (!sessionQueryParam.isEmpty()) { sessionId = sessionQueryParam.get(0); } + List whoQueryParam = ctx.queryParam("who"); + String who = null; + if (!whoQueryParam.isEmpty()) { + who = whoQueryParam.get(0); + } + if (who == null) { + ctx.response().setStatusCode(400).end(); + log.errorv("Failed Client Connect to service {0} and session {1}: who identity not sent", service.config.getName(), + sessionId); + return; + } + synchronized (this) { + ProxySession session = service.sessions.get(sessionId); + if (session != null) { + if (!who.equals(session.who)) { + log.errorv("Failed Client Connect for {0} to service {1} and session {2}: Existing connection {3}", who, + service.config.getName(), sessionId, session.who); + ctx.response().setStatusCode(409).putHeader("Content-Type", "text/plain").end(session.who); - ProxySession session = service.sessions.get(sessionId); - if (session == null) { - session = new ProxySession(); - log.debugv("Client Connect to service {0} and session {1}", service.config.getName(), sessionId); - service.sessions.put(sessionId, session); + } + } else { + service.sessions.put(sessionId, new ProxySession(service, sessionId, who)); + ctx.response().setStatusCode(204).putHeader(POLL_LINK, CLIENT_API_PATH + "/poll/session/" + sessionId) + .end(); + } } - ctx.response().setStatusCode(204).putHeader(POLL_LINK, CLIENT_API_PATH + "/poll/session/" + sessionId) - .end(); } public void deleteClientConnection(RoutingContext ctx) { @@ -264,8 +306,10 @@ public void deleteClientConnection(RoutingContext ctx) { if (!sessionQueryParam.isEmpty()) { sessionId = sessionQueryParam.get(0); } - - service.removeSession(sessionId); + ProxySession session = service.sessions.get(sessionId); + if (session != null) { + session.shutdown(); + } ctx.response().setStatusCode(204).end(); } @@ -273,7 +317,7 @@ public void pushResponse(RoutingContext ctx) { String sessionId = ctx.pathParam("session"); String requestId = ctx.pathParam("request"); String kp = ctx.queryParams().get("keepAlive"); - boolean keepAlive = kp == null ? false : Boolean.parseBoolean(kp); + boolean keepAlive = kp == null ? true : Boolean.parseBoolean(kp); ProxySession session = service.sessions.get(sessionId); if (session == null) { @@ -312,6 +356,7 @@ public void pushResponse(RoutingContext ctx) { executePoll(ctx, session, sessionId); } else { log.debugv("End polling {0} {1}", service.config.getName(), sessionId); + session.pollEnded(); ctx.response().setStatusCode(204).end(); } } @@ -347,11 +392,13 @@ public void pollNext(RoutingContext ctx) { DevProxyServer.error(ctx, 404, "Session not found for service " + service.config.getName()); return; } + session.pollStarted(); executePoll(ctx, session, sessionId); } private void executePoll(RoutingContext ctx, ProxySession session, String sessionId) { vertx.executeBlocking(new Handler<>() { + @Override public void handle(Promise event) { final AtomicBoolean closed = new AtomicBoolean(false); @@ -369,6 +416,7 @@ public void handle(Promise event) { if (closed.get()) { log.debug("Polled message but connection was closed, returning to queue"); session.queue.put(proxiedCtx); + session.pollDisconnect(); return; } } else if (closed.get()) { @@ -383,6 +431,7 @@ public void handle(Promise event) { log.error("poll interrupted"); ctx.fail(500); } + session.pollProcessing(); pollResponse.setStatusCode(200); HttpServerRequest proxiedRequest = proxiedCtx.request(); proxiedRequest.pause(); diff --git a/devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/server/ServiceConfig.java b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/ServiceConfig.java similarity index 100% rename from devtools/dev-space/dev-proxy/core/src/main/java/io/quarkus/devspace/server/ServiceConfig.java rename to extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/ServiceConfig.java diff --git a/extensions/vertx-http/pom.xml b/extensions/vertx-http/pom.xml index f370261e6feee..34a4bbb7f920e 100644 --- a/extensions/vertx-http/pom.xml +++ b/extensions/vertx-http/pom.xml @@ -14,6 +14,7 @@ Quarkus - Vert.x - HTTP pom + devspace-proxy-core deployment runtime dev-console-spi diff --git a/extensions/vertx-http/runtime/pom.xml b/extensions/vertx-http/runtime/pom.xml index 80b22078da13f..305e6b9d403f2 100644 --- a/extensions/vertx-http/runtime/pom.xml +++ b/extensions/vertx-http/runtime/pom.xml @@ -47,6 +47,10 @@ io.quarkus quarkus-vertx + + io.quarkus.devspace + devspace-proxy-core + io.smallrye.reactive smallrye-mutiny-vertx-web diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VirtualDevProxyClient.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VirtualDevProxyClient.java new file mode 100644 index 0000000000000..3781b01b5b51b --- /dev/null +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VirtualDevProxyClient.java @@ -0,0 +1,423 @@ +package io.quarkus.vertx.http.runtime.devmode; + +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.jboss.logging.Logger; + +import io.netty.channel.FileRegion; +import io.netty.handler.codec.http.*; +import io.quarkus.devspace.client.DevProxyClient; +import io.quarkus.devspace.server.DevProxyServer; +import io.quarkus.netty.runtime.virtual.VirtualClientConnection; +import io.quarkus.netty.runtime.virtual.VirtualResponseHandler; +import io.quarkus.vertx.http.runtime.QuarkusHttpHeaders; +import io.quarkus.vertx.http.runtime.VertxHttpRecorder; +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.*; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.buffer.impl.BufferImpl; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import io.vertx.core.streams.impl.InboundBuffer; + +public class VirtualDevProxyClient { + protected static final Logger log = Logger.getLogger(DevProxyClient.class); + + protected Vertx vertx; + protected HttpClient proxyClient; + protected String proxyHost; + protected int proxyPort; + protected String whoami; + protected String sessionId; + protected int numPollers = 1; + protected volatile boolean running = true; + protected volatile boolean shutdown = false; + protected String pollLink; + protected CountDownLatch workerShutdown; + protected long pollTimeoutMillis = 1000; + + public void setVertx(Vertx vertx) { + this.vertx = vertx; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public void setProxyPort(int proxyPort) { + this.proxyPort = proxyPort; + } + + public void setWhoami(String whoami) { + this.whoami = whoami; + } + + public void setPollTimeoutMillis(long pollTimeoutMillis) { + this.pollTimeoutMillis = pollTimeoutMillis; + } + + public void setNumPollers(int numPollers) { + this.numPollers = numPollers; + } + + public boolean startGlobalSession() throws Exception { + return startSession(DevProxyServer.GLOBAL_PROXY_SESSION); + } + + public boolean startSession(String sessionId) throws Exception { + String uri = DevProxyServer.CLIENT_API_PATH + "/connect?who=" + whoami + "&session=" + sessionId; + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean success = new AtomicBoolean(); + proxyClient.request(HttpMethod.POST, uri, event -> { + if (event.failed()) { + log.error("Could not connect to startSession", event.cause()); + latch.countDown(); + return; + } + HttpClientRequest request = event.result(); + request.send().onComplete(event1 -> { + if (event1.failed()) { + log.error("Could not connect to startSession", event1.cause()); + latch.countDown(); + return; + } + HttpClientResponse response = event1.result(); + if (response.statusCode() != 204) { + response.bodyHandler(body -> { + log.error("Could not connect to startSession " + response.statusCode() + body.toString()); + latch.countDown(); + }); + return; + } + try { + this.pollLink = response.getHeader(DevProxyServer.POLL_LINK); + for (int i = 0; i < numPollers; i++) + poll(); + workerShutdown = new CountDownLatch(numPollers); + success.set(true); + } finally { + latch.countDown(); + } + }); + }); + latch.await(); + if (!success.get()) { + forcedShutdown(); + return false; + } + this.sessionId = sessionId; + return true; + } + + public void forcedShutdown() { + shutdown = true; + running = false; + proxyClient.close(); + } + + protected void pollFailure(Throwable failure) { + log.error("Poll failed", failure); + workerOffline(); + } + + protected void pollFailure(String error) { + log.error("Poll failed: " + error); + workerOffline(); + } + + private void workerOffline() { + workerShutdown.countDown(); + } + + protected void poll() { + if (!running) { + workerOffline(); + return; + } + proxyClient.request(HttpMethod.POST, pollLink) + .onSuccess(request -> { + request.setTimeout(pollTimeoutMillis) + .send() + .onSuccess(this::handlePoll) + //.onSuccess(this::handlePollWaitBody) + .onFailure(this::pollFailure); + + }) + .onFailure(this::pollFailure); + } + + private class NettyResponseHandler implements VirtualResponseHandler, ReadStream { + + final String responsePath; + InboundBuffer queue; + Buffer end = Buffer.buffer(); + Handler endHandler; + + public NettyResponseHandler(String responsePath, Vertx vertx) { + this.responsePath = responsePath; + queue = new InboundBuffer<>(vertx.getOrCreateContext()); + queue.pause(); + } + + @Override + public ReadStream exceptionHandler(@Nullable Handler handler) { + queue.exceptionHandler(handler); + return this; + } + + @Override + public ReadStream handler(@Nullable Handler handler) { + queue.handler((buf) -> { + if (buf == end) { + if (endHandler != null) { + endHandler.handle(null); + } + } else { + handler.handle(buf); + } + }); + return this; + } + + @Override + public ReadStream pause() { + queue.pause(); + return this; + } + + @Override + public ReadStream resume() { + queue.resume(); + return this; + } + + @Override + public ReadStream fetch(long amount) { + queue.fetch(amount); + return this; + } + + @Override + public ReadStream endHandler(@Nullable Handler endHandler) { + this.endHandler = endHandler; + return this; + } + + @Override + public void handleMessage(Object msg) { + if (msg instanceof HttpResponse) { + HttpResponse res = (HttpResponse) msg; + proxyClient.request(HttpMethod.POST, responsePath + "?keepAlive=" + running) + .onFailure(exc -> { + log.error("Proxy handle response failure", exc); + workerOffline(); + }) + .onSuccess(pushRequest -> { + pushRequest.setTimeout(pollTimeoutMillis); + pushRequest.putHeader(DevProxyServer.STATUS_CODE_HEADER, Integer.toString(res.status().code())); + + for (String name : res.headers().names()) { + final List allForName = res.headers().getAll(name); + if (allForName == null || allForName.isEmpty()) { + continue; + } + + for (Iterator valueIterator = allForName.iterator(); valueIterator.hasNext();) { + String val = valueIterator.next(); + if (name.equalsIgnoreCase("Transfer-Encoding") + && val.equals("chunked")) { + continue; // ignore transfer encoding, chunked screws up message and response + } + pushRequest.headers().add(DevProxyServer.HEADER_FORWARD_PREFIX + name, val); + } + } + pushRequest.send(this) + .onFailure(exc -> { + log.error("Failed to push service response", exc); + workerOffline(); + }) + .onSuccess(VirtualDevProxyClient.this::handlePoll); // a successful push restarts poll + }); + } + if (msg instanceof HttpContent) { + queue.write(BufferImpl.buffer(((HttpContent) msg).content())); + } + if (msg instanceof FileRegion) { + log.error("FileRegion not supported yet"); + throw new RuntimeException("FileRegion not supported yet"); + } + if (msg instanceof LastHttpContent) { + queue.write(end); + } + } + + @Override + public void close() { + + } + } + + private class NettyWriteStream implements WriteStream { + VirtualClientConnection connection; + + public NettyWriteStream(VirtualClientConnection connection) { + this.connection = connection; + } + + private void writeHttpContent(Buffer data) { + // todo getByteBuf copies the underlying byteBuf + DefaultHttpContent content = new DefaultHttpContent(data.getByteBuf()); + connection.sendMessage(content); + + } + + @Override + public WriteStream exceptionHandler(@Nullable Handler handler) { + return this; + } + + @Override + public Future write(Buffer data) { + writeHttpContent(data); + Promise promise = Promise.promise(); + write(data, promise); + return promise.future(); + } + + @Override + public void write(Buffer data, Handler> handler) { + writeHttpContent(data); + handler.handle(Future.succeededFuture()); + } + + @Override + public void end(Handler> handler) { + connection.sendMessage(LastHttpContent.EMPTY_LAST_CONTENT); + handler.handle(Future.succeededFuture()); + } + + @Override + public WriteStream setWriteQueueMaxSize(int maxSize) { + return this; + } + + @Override + public boolean writeQueueFull() { + return false; + } + + @Override + public WriteStream drainHandler(@Nullable Handler handler) { + return this; + } + } + + protected void handlePoll(HttpClientResponse pollResponse) { + pollResponse.pause(); + log.debug("------ handlePoll"); + int proxyStatus = pollResponse.statusCode(); + if (proxyStatus == 408) { + poll(); + return; + } else if (proxyStatus == 204) { + // keepAlive = false sent back + workerOffline(); + return; + } else if (proxyStatus != 200) { + pollResponse.bodyHandler(body -> { + pollFailure(body.toString()); + }); + return; + } + + String method = pollResponse.getHeader(DevProxyServer.METHOD_HEADER); + String uri = pollResponse.getHeader(DevProxyServer.URI_HEADER); + String responsePath = pollResponse.getHeader(DevProxyServer.RESPONSE_LINK); + NettyResponseHandler handler = new NettyResponseHandler(responsePath, vertx); + VirtualClientConnection connection = VirtualClientConnection.connect(handler, VertxHttpRecorder.VIRTUAL_HTTP, + null); + + QuarkusHttpHeaders quarkusHeaders = new QuarkusHttpHeaders(); + // add context specific things + io.netty.handler.codec.http.HttpMethod httpMethod = io.netty.handler.codec.http.HttpMethod.valueOf(method); + + DefaultHttpRequest nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, + httpMethod, uri, + quarkusHeaders); + pollResponse.headers().forEach((key, val) -> { + log.debugv("Poll response header: {0} : {1}", key, val); + int idx = key.indexOf(DevProxyServer.HEADER_FORWARD_PREFIX); + if (idx == 0) { + String headerName = key.substring(DevProxyServer.HEADER_FORWARD_PREFIX.length()); + nettyRequest.headers().add(headerName, val); + } else if (key.equalsIgnoreCase("Content-Length")) { + nettyRequest.headers().add("Content-Length", val); + } + }); + if (!nettyRequest.headers().contains(HttpHeaderNames.HOST)) { + nettyRequest.headers().add(HttpHeaderNames.HOST, "localhost"); + } + + connection.sendMessage(nettyRequest); + pollResponse.pipeTo(new NettyWriteStream(connection)); + } + + private void deletePushResponse(String link) { + if (link == null) { + workerOffline(); + return; + } + proxyClient.request(HttpMethod.DELETE, link) + .onFailure(event -> workerOffline()) + .onSuccess(request -> request.send().onComplete(event -> workerOffline())); + } + + public void shutdown() { + if (shutdown) { + return; + } + try { + running = false; + try { + // give time for workers to finish + workerShutdown.await(5, TimeUnit.SECONDS); + } catch (Throwable ignored) { + + } + // delete session + CountDownLatch latch = new CountDownLatch(1); + if (sessionId != null) { + String uri = DevProxyServer.CLIENT_API_PATH + "/connect?session=" + sessionId; + proxyClient.request(HttpMethod.DELETE, uri) + .onFailure(event -> { + log.error("Failed to delete sesssion on shutdown", event); + latch.countDown(); + }) + .onSuccess(request -> request.send() + .onComplete(event -> { + if (event.failed()) { + log.error("Failed to delete sesssion on shutdown", event.cause()); + } + latch.countDown(); + })); + + } + try { + latch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + } + proxyClient.close(); + } finally { + shutdown = true; + } + } + +}