diff --git a/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/DevSpaceProxyTest.java b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/DevSpaceProxyTest.java index 827da5464f30a..4a5631fe8fe06 100644 --- a/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/DevSpaceProxyTest.java +++ b/extensions/vertx-http/deployment/src/test/java/io/quarkus/vertx/http/DevSpaceProxyTest.java @@ -3,6 +3,8 @@ import static io.restassured.RestAssured.given; import static org.hamcrest.CoreMatchers.equalTo; +import java.util.concurrent.TimeUnit; + import jakarta.enterprise.event.Observes; import jakarta.inject.Singleton; @@ -17,6 +19,9 @@ import io.quarkus.vertx.http.runtime.devmode.DevSpaceProxyRecorder; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; +import io.vertx.core.impl.VertxBuilder; +import io.vertx.core.impl.VertxThread; +import io.vertx.core.spi.VertxThreadFactory; import io.vertx.ext.web.Router; public class DevSpaceProxyTest { @@ -44,7 +49,10 @@ public class DevSpaceProxyTest { public static class RouteProducer { void observeRouter(@Observes Router router) { router.route().handler( - request -> request.response().setStatusCode(200).putHeader("Content-Type", "text/plain").end("local")); + request -> { + System.out.println("************ CALLED LOCAL SERVER **************"); + request.response().setStatusCode(200).putHeader("Content-Type", "text/plain").end("local"); + }); } } @@ -53,7 +61,13 @@ void observeRouter(@Observes Router router) { @BeforeAll public static void before() { - vertx = Vertx.vertx(); + vertx = new VertxBuilder() + .threadFactory(new VertxThreadFactory() { + public VertxThread newVertxThread(Runnable target, String name, boolean worker, long maxExecTime, + TimeUnit maxExecTimeUnit) { + return new VertxThread(target, "TEST-VERTX." + name, worker, maxExecTime, maxExecTimeUnit); + } + }).init().vertx(); myService = vertx.createHttpServer(); ProxyUtils.await(1000, myService.requestHandler(request -> { request.response().setStatusCode(200).putHeader("Content-Type", "text/plain").end("my-service"); @@ -65,16 +79,18 @@ public static void before() { ServiceConfig config = new ServiceConfig("my-service", "localhost", SERVICE_PORT); proxyServer.init(vertx, proxyRouter, config); ProxyUtils.await(1000, proxy.requestHandler(proxyRouter).listen(PROXY_PORT)); - DevSpaceProxyRecorder.startSession(); - } @AfterAll public static void after() { + System.out.println(" ------- CLEANUP TEST ------"); if (vertx != null) { ProxyUtils.await(1000, myService.close()); + System.out.println(" ------- Cleaned up my-service ------"); ProxyUtils.await(1000, proxy.close()); + System.out.println(" ------- Cleaned up proxy ------"); ProxyUtils.await(1000, vertx.close()); + System.out.println(" ------- Cleaned up test vertx ------"); } } @@ -103,20 +119,23 @@ public void testProxy() { .statusCode(200) .body(equalTo("my-service")); // invoke local directly - given() - .when() - .get("/yo") - .then() - .statusCode(200) - .body(equalTo("local")); - given() - .when() - .body("hello") - .contentType("text/plain") - .post("/yo") - .then() - .statusCode(200) - .body(equalTo("local")); + /* + * given() + * .when() + * .get("/yo") + * .then() + * .statusCode(200) + * .body(equalTo("local")); + * given() + * .when() + * .body("hello") + * .contentType("text/plain") + * .post("/yo") + * .then() + * .statusCode(200) + * .body(equalTo("local")); + * + */ // invoke proxy given() .when() @@ -136,13 +155,15 @@ public void testProxy() { .body(equalTo("my-service")); } - //@Test + @Test public void testGlobalSession() throws Exception { try { + DevSpaceProxyRecorder.startSession(); System.out.println("------------------ POST REQUEST BODY ---------------------"); given() .when() + .port(PROXY_PORT) .contentType("text/plain") .body("hello") .post("/hey") @@ -153,6 +174,7 @@ public void testGlobalSession() throws Exception { System.out.println("-------------------- GET REQUEST --------------------"); given() .when() + .port(PROXY_PORT) .get("/yo") .then() .statusCode(200) @@ -161,16 +183,19 @@ public void testGlobalSession() throws Exception { System.out.println("------------------ POST REQUEST NO BODY ---------------------"); given() .when() - .post("/hey") + .port(PROXY_PORT) + .post("/nobody") .then() .statusCode(200) .contentType(equalTo("text/plain")) .body(equalTo("local")); } finally { + DevSpaceProxyRecorder.closeSession(); } System.out.println("-------------------- After Shutdown GET REQUEST --------------------"); given() .when() + .port(PROXY_PORT) .get("/yo") .then() .statusCode(200) diff --git a/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java index 6a7f8e9a5b750..126d7eb515f18 100644 --- a/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java +++ b/extensions/vertx-http/devspace-proxy-core/src/main/java/io/quarkus/devspace/server/DevProxyServer.java @@ -1,5 +1,6 @@ package io.quarkus.devspace.server; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -13,6 +14,7 @@ import org.jboss.logging.Logger; import io.netty.handler.codec.http.HttpHeaderNames; +import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.Promise; @@ -25,8 +27,14 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.streams.Pipe; +import io.vertx.ext.auth.User; +import io.vertx.ext.web.FileUpload; +import io.vertx.ext.web.ParsedHeaderValues; +import io.vertx.ext.web.RequestBody; +import io.vertx.ext.web.Route; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.Session; import io.vertx.ext.web.handler.BodyHandler; import io.vertx.httpproxy.Body; import io.vertx.httpproxy.HttpProxy; @@ -83,6 +91,11 @@ synchronized void shutdown() { queue.drainTo(requests); requests.stream().forEach((ctx) -> proxy.proxy.handle(ctx.request())); } + try { + queue.put(END_SENTINEL); + } catch (InterruptedException e) { + // ignore + } } volatile long lastPoll; @@ -131,7 +144,6 @@ void shutdown() { } public static final String CLIENT_API_PATH = "/_dev_proxy_client_"; - public static final String SERVICES_API_PATH = DevProxyServer.PROXY_API_PATH + "/services"; public static final String GLOBAL_PROXY_SESSION = "_depot_global"; public static final String SESSION_HEADER = "X-Depot-Proxy-Session"; public static final String HEADER_FORWARD_PREFIX = "X-Depot-Fwd-"; @@ -234,7 +246,7 @@ private static void sendBody(HttpServerRequest source, HttpServerResponse destin } public void proxy(RoutingContext ctx) { - log.debug("*** entered proxy ***"); + log.infov("*** entered proxy {0} {1}", ctx.request().method().toString(), ctx.request().uri()); // Get session id from header or cookie String sessionId = ctx.request().getHeader(SESSION_HEADER); if (sessionId == null) { @@ -245,12 +257,13 @@ public void proxy(RoutingContext ctx) { sessionId = GLOBAL_PROXY_SESSION; } } - log.debugv("Looking for session {0}", sessionId); + log.infov("Looking for session {0}", sessionId); ProxySession session = service.sessions.get(sessionId); if (session != null && session.running) { try { - log.debugv("Enqueued request for service {0} of proxy session {1}", service.config.getName(), sessionId); + log.infov("Enqueued request for service {0} of proxy session {1}", service.config.getName(), sessionId); + ctx.request().pause(); session.queue.put(ctx); } catch (InterruptedException e) { DevProxyServer.error(ctx, 500, "Could not enqueue proxied request"); @@ -308,9 +321,12 @@ public void deleteClientConnection(RoutingContext ctx) { } ProxySession session = service.sessions.get(sessionId); if (session != null) { + log.infov("Shutdown session {0}", sessionId); session.shutdown(); + ctx.response().setStatusCode(204).end(); + } else { + ctx.response().setStatusCode(404).end(); } - ctx.response().setStatusCode(204).end(); } public void pushResponse(RoutingContext ctx) { @@ -352,10 +368,10 @@ public void pushResponse(RoutingContext ctx) { }); sendBody(pushedResponse, proxiedResponse); if (keepAlive) { - log.debugv("Keep alive {0} {1}", service.config.getName(), sessionId); + log.infov("Keep alive {0} {1}", service.config.getName(), sessionId); executePoll(ctx, session, sessionId); } else { - log.debugv("End polling {0} {1}", service.config.getName(), sessionId); + log.infov("End polling {0} {1}", service.config.getName(), sessionId); session.pollEnded(); ctx.response().setStatusCode(204).end(); } @@ -384,7 +400,7 @@ public void deletePushResponse(RoutingContext ctx) { public void pollNext(RoutingContext ctx) { String sessionId = ctx.pathParam("session"); - log.debugv("pollNext {0} {1}", service.config.getName(), sessionId); + log.infov("pollNext {0} {1}", service.config.getName(), sessionId); ProxySession session = service.sessions.get(sessionId); if (session == null) { @@ -409,27 +425,37 @@ public void handle(Promise event) { ctx.request().connection().exceptionHandler((v) -> closed.set(true)); RoutingContext proxiedCtx = null; try { - log.debugv("Polling {0} {1}", service.config.getName(), sessionId); + log.infov("Polling {0} {1}", service.config.getName(), sessionId); proxiedCtx = session.queue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS); if (proxiedCtx != null) { - log.debugv("Got request {0} {1}", service.config.getName(), sessionId); + if (proxiedCtx == END_SENTINEL) { + log.info("Polling exiting as session no longer exists"); + ctx.response().setStatusCode(404).end(); + return; + } + log.infov("Got request {0} {1}", service.config.getName(), sessionId); if (closed.get()) { - log.debug("Polled message but connection was closed, returning to queue"); + log.info("Polled message but connection was closed, returning to queue"); session.queue.put(proxiedCtx); session.pollDisconnect(); return; } } else if (closed.get()) { - log.debug("Polled message timeout, client closed"); + log.info("Client closed"); return; } else { - log.debug("Polled message timeout, sending 408"); - ctx.fail(408); + log.info("Polled message timeout, sending 408"); + ctx.response().setStatusCode(408).end(); return; } } catch (InterruptedException e) { - log.error("poll interrupted"); - ctx.fail(500); + log.error("executePoll interrupted"); + ctx.response().setStatusCode(500).end(); + return; + } catch (Throwable t) { + log.error("executePoll failed", t); + ctx.response().setStatusCode(500).end(); + return; } session.pollProcessing(); pollResponse.setStatusCode(200); @@ -452,4 +478,246 @@ public void handle(Promise event) { } }, false, null); } + + static final RoutingContext END_SENTINEL = new RoutingContext() { + @Override + public HttpServerRequest request() { + return null; + } + + @Override + public HttpServerResponse response() { + return null; + } + + @Override + public void next() { + + } + + @Override + public void fail(int statusCode) { + + } + + @Override + public void fail(Throwable throwable) { + + } + + @Override + public void fail(int statusCode, Throwable throwable) { + + } + + @Override + public RoutingContext put(String key, Object obj) { + return null; + } + + @Override + public T get(String key) { + return null; + } + + @Override + public T get(String key, T defaultValue) { + return null; + } + + @Override + public T remove(String key) { + return null; + } + + @Override + public Map data() { + return Map.of(); + } + + @Override + public Vertx vertx() { + return null; + } + + @Override + public String mountPoint() { + return ""; + } + + @Override + public Route currentRoute() { + return null; + } + + @Override + public String normalizedPath() { + return ""; + } + + @Override + public Cookie getCookie(String name) { + return null; + } + + @Override + public RoutingContext addCookie(Cookie cookie) { + return null; + } + + @Override + public Cookie removeCookie(String name, boolean invalidate) { + return null; + } + + @Override + public int cookieCount() { + return 0; + } + + @Override + public Map cookieMap() { + return Map.of(); + } + + @Override + public RequestBody body() { + return null; + } + + @Override + public List fileUploads() { + return List.of(); + } + + @Override + public void cancelAndCleanupFileUploads() { + + } + + @Override + public Session session() { + return null; + } + + @Override + public boolean isSessionAccessed() { + return false; + } + + @Override + public User user() { + return null; + } + + @Override + public Throwable failure() { + return null; + } + + @Override + public int statusCode() { + return 0; + } + + @Override + public String getAcceptableContentType() { + return ""; + } + + @Override + public ParsedHeaderValues parsedHeaders() { + return null; + } + + @Override + public int addHeadersEndHandler(Handler handler) { + return 0; + } + + @Override + public boolean removeHeadersEndHandler(int handlerID) { + return false; + } + + @Override + public int addBodyEndHandler(Handler handler) { + return 0; + } + + @Override + public boolean removeBodyEndHandler(int handlerID) { + return false; + } + + @Override + public int addEndHandler(Handler> handler) { + return 0; + } + + @Override + public boolean removeEndHandler(int handlerID) { + return false; + } + + @Override + public boolean failed() { + return false; + } + + @Override + public void setBody(Buffer body) { + + } + + @Override + public void setSession(Session session) { + + } + + @Override + public void setUser(User user) { + + } + + @Override + public void clearUser() { + + } + + @Override + public void setAcceptableContentType(String contentType) { + + } + + @Override + public void reroute(HttpMethod method, String path) { + + } + + @Override + public Map pathParams() { + return Map.of(); + } + + @Override + public String pathParam(String name) { + return ""; + } + + @Override + public MultiMap queryParams() { + return null; + } + + @Override + public MultiMap queryParams(Charset encoding) { + return null; + } + + @Override + public List queryParam(String name) { + return List.of(); + } + }; } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevSpaceProxyRecorder.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevSpaceProxyRecorder.java index 97856716926ea..a8b42f7e35aca 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevSpaceProxyRecorder.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/DevSpaceProxyRecorder.java @@ -14,13 +14,20 @@ public class DevSpaceProxyRecorder { private static final Logger log = Logger.getLogger(DevSpaceProxyRecorder.class); - public static VirtualDevpaceProxyClient client; - public static DevspaceConfig config; + static VirtualDevpaceProxyClient client; + static DevspaceConfig config; + static Vertx vertx; public void init(Supplier vertx, DevspaceConfig c) { - log.info("Initializing devspace"); - URI uri = null; config = c; + DevSpaceProxyRecorder.vertx = vertx.get(); + if (!config.delayConnect) { + startSession(); + } + } + + public static void startSession() { + URI uri = null; try { uri = new URI(config.uri.get()); } catch (URISyntaxException e) { @@ -36,21 +43,18 @@ public void init(Supplier vertx, DevspaceConfig c) { } options.setDefaultHost(host); options.setDefaultPort(port); - client.proxyClient = vertx.get().createHttpClient(options); - client.vertx = vertx.get(); - client.whoami = c.whoami.get(); - if (!config.delayConnect) { - startSession(); - } else { - log.info(" --- delay connect ---"); - } - } - - public static void startSession() { + client.proxyClient = vertx.createHttpClient(options); + client.vertx = vertx; + client.whoami = config.whoami.get(); if (config.session.isPresent()) { client.startSession(config.session.get()); } else { client.startGlobalSession(); } } + + public static void closeSession() { + if (client != null) + client.shutdown(); + } } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VirtualDevpaceProxyClient.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VirtualDevpaceProxyClient.java index 23a1336a441ac..a676549e36078 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VirtualDevpaceProxyClient.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/VirtualDevpaceProxyClient.java @@ -8,7 +8,14 @@ import org.jboss.logging.Logger; import io.netty.channel.FileRegion; -import io.netty.handler.codec.http.*; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.quarkus.devspace.ProxyUtils; import io.quarkus.devspace.server.DevProxyServer; import io.quarkus.netty.runtime.virtual.VirtualClientConnection; import io.quarkus.netty.runtime.virtual.VirtualResponseHandler; @@ -21,7 +28,9 @@ import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpClosedException; import io.vertx.core.http.HttpMethod; +import io.vertx.core.impl.NoStackTraceTimeoutException; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.WriteStream; import io.vertx.core.streams.impl.InboundBuffer; @@ -45,18 +54,22 @@ public boolean startGlobalSession() { } public boolean startSession(String sessionId) { + log.info("Start devspace session: " + sessionId); String uri = DevProxyServer.CLIENT_API_PATH + "/connect?who=" + whoami + "&session=" + sessionId; CountDownLatch latch = new CountDownLatch(1); AtomicBoolean success = new AtomicBoolean(); proxyClient.request(HttpMethod.POST, uri, event -> { + log.info("******* Connect request start"); if (event.failed()) { log.error("Could not connect to startSession", event.cause()); latch.countDown(); return; } HttpClientRequest request = event.result(); + log.info("******* Sending Connect request"); request.send().onComplete(event1 -> { + log.info("******* Connect request onComplete"); if (event1.failed()) { log.error("Could not connect to startSession", event1.cause()); latch.countDown(); @@ -70,6 +83,7 @@ public boolean startSession(String sessionId) { }); return; } + log.info("******* Connect request succeeded"); try { this.pollLink = response.getHeader(DevProxyServer.POLL_LINK); for (int i = 0; i < numPollers; i++) @@ -87,6 +101,7 @@ public boolean startSession(String sessionId) { throw new RuntimeException(e); } if (!success.get()) { + log.error("Failed to connect to proxy server"); forcedShutdown(); return false; } @@ -101,7 +116,11 @@ public void forcedShutdown() { } protected void pollFailure(Throwable failure) { - log.error("Poll failed", failure); + if (failure instanceof HttpClosedException) { + log.warn("Client poll stopped. Connection closed by server"); + } else { + log.error("Poll failed", failure); + } workerOffline(); } @@ -111,10 +130,14 @@ protected void pollFailure(String error) { } private void workerOffline() { - workerShutdown.countDown(); + try { + workerShutdown.countDown(); + } catch (Exception ignore) { + } } protected void poll() { + log.info("**** poll() start"); if (!running) { workerOffline(); return; @@ -135,13 +158,20 @@ private class NettyResponseHandler implements VirtualResponseHandler, ReadStream final String responsePath; InboundBuffer queue; - Buffer end = Buffer.buffer(); + static Buffer end = Buffer.buffer(); Handler endHandler; + VirtualClientConnection connection; + + public void setConnection(VirtualClientConnection connection) { + this.connection = connection; + } + + private void write(Buffer buf) { + vertx.runOnContext((v) -> queue.write(buf)); + } public NettyResponseHandler(String responsePath, Vertx vertx) { this.responsePath = responsePath; - queue = new InboundBuffer<>(vertx.getOrCreateContext()); - queue.pause(); } @Override @@ -152,12 +182,22 @@ public ReadStream exceptionHandler(@Nullable Handler handler) @Override public ReadStream handler(@Nullable Handler handler) { + if (handler == null) { + if (queue != null) + queue.handler(null); + return this; + } + log.info("NettyResponseHandler: set handler"); queue.handler((buf) -> { + log.info("NettyResponseHandler: handler"); if (buf == end) { + log.info("NettyResponseHandler: calling end"); + connection.close(); if (endHandler != null) { endHandler.handle(null); } } else { + log.info("NettyResponseHandler: handler.handle(buf)"); handler.handle(buf); } }); @@ -166,18 +206,22 @@ public ReadStream handler(@Nullable Handler handler) { @Override public ReadStream pause() { + log.info("NettyResponseHandler: pause"); queue.pause(); return this; } @Override public ReadStream resume() { - queue.resume(); + log.info("NettyResponseHandler: resume"); + boolean result = queue.resume(); + log.info("NettyResponseHandler: resume returned: " + result); return this; } @Override public ReadStream fetch(long amount) { + log.info("NettyResponseHandler: fetch"); queue.fetch(amount); return this; } @@ -190,7 +234,10 @@ public ReadStream endHandler(@Nullable Handler endHandler) { @Override public void handleMessage(Object msg) { + log.infov("NettyResponseHandler: handleMessage({0})", msg.getClass().getName()); if (msg instanceof HttpResponse) { + queue = new InboundBuffer<>(vertx.getOrCreateContext()); + queue.pause(); HttpResponse res = (HttpResponse) msg; proxyClient.request(HttpMethod.POST, responsePath + "?keepAlive=" + running) .onFailure(exc -> { @@ -198,6 +245,7 @@ public void handleMessage(Object msg) { workerOffline(); }) .onSuccess(pushRequest -> { + log.info("NettyResponseHandler connect accepted for pushResponse"); pushRequest.setTimeout(pollTimeoutMillis); pushRequest.putHeader(DevProxyServer.STATUS_CODE_HEADER, Integer.toString(res.status().code())); @@ -218,21 +266,27 @@ public void handleMessage(Object msg) { } pushRequest.send(this) .onFailure(exc -> { - log.error("Failed to push service response", exc); - workerOffline(); + if (exc instanceof NoStackTraceTimeoutException) { + poll(); + } else { + log.error("Failed to push service response", exc); + workerOffline(); + } }) .onSuccess(VirtualDevpaceProxyClient.this::handlePoll); // a successful push restarts poll }); } if (msg instanceof HttpContent) { - queue.write(BufferImpl.buffer(((HttpContent) msg).content())); + log.info("NettyResponseHandler: write HttpContent"); + write(BufferImpl.buffer(((HttpContent) msg).content())); } if (msg instanceof FileRegion) { log.error("FileRegion not supported yet"); throw new RuntimeException("FileRegion not supported yet"); } if (msg instanceof LastHttpContent) { - queue.write(end); + log.info("NettyResponseHandler: write LastHttpContent"); + write(end); } } @@ -250,6 +304,7 @@ public NettyWriteStream(VirtualClientConnection connection) { } private void writeHttpContent(Buffer data) { + log.info("NettyWriteStream: writeHttpContent"); // todo getByteBuf copies the underlying byteBuf DefaultHttpContent content = new DefaultHttpContent(data.getByteBuf()); connection.sendMessage(content); @@ -277,6 +332,7 @@ public void write(Buffer data, Handler> handler) { @Override public void end(Handler> handler) { + log.info("NettyWriteStream: end"); connection.sendMessage(LastHttpContent.EMPTY_LAST_CONTENT); handler.handle(Future.succeededFuture()); } @@ -299,13 +355,19 @@ public WriteStream drainHandler(@Nullable Handler handler) { protected void handlePoll(HttpClientResponse pollResponse) { pollResponse.pause(); - log.debug("------ handlePoll"); + log.info("------ handlePoll"); int proxyStatus = pollResponse.statusCode(); if (proxyStatus == 408) { + log.info("Poll timeout, redo poll"); poll(); return; } else if (proxyStatus == 204) { // keepAlive = false sent back + log.info("Keepalive = false. Stop Polling"); + workerOffline(); + return; + } else if (proxyStatus == 404) { + log.info("session was closed, exiting poll"); workerOffline(); return; } else if (proxyStatus != 200) { @@ -314,13 +376,14 @@ protected void handlePoll(HttpClientResponse pollResponse) { }); return; } - + log.info("Unpack poll request"); String method = pollResponse.getHeader(DevProxyServer.METHOD_HEADER); String uri = pollResponse.getHeader(DevProxyServer.URI_HEADER); String responsePath = pollResponse.getHeader(DevProxyServer.RESPONSE_LINK); NettyResponseHandler handler = new NettyResponseHandler(responsePath, vertx); VirtualClientConnection connection = VirtualClientConnection.connect(handler, VertxHttpRecorder.VIRTUAL_HTTP, null); + handler.setConnection(connection); QuarkusHttpHeaders quarkusHeaders = new QuarkusHttpHeaders(); // add context specific things @@ -343,6 +406,7 @@ protected void handlePoll(HttpClientResponse pollResponse) { nettyRequest.headers().add(HttpHeaderNames.HOST, "localhost"); } + log.info("send initial nettyRequest"); connection.sendMessage(nettyRequest); pollResponse.pipeTo(new NettyWriteStream(connection)); } @@ -363,35 +427,33 @@ public void shutdown() { } try { running = false; - try { - // give time for workers to finish - workerShutdown.await(5, TimeUnit.SECONDS); - } catch (Throwable ignored) { - - } // delete session CountDownLatch latch = new CountDownLatch(1); if (sessionId != null) { String uri = DevProxyServer.CLIENT_API_PATH + "/connect?session=" + sessionId; proxyClient.request(HttpMethod.DELETE, uri) .onFailure(event -> { - log.error("Failed to delete sesssion on shutdown", event); + log.error("Failed to delete session on shutdown", event); latch.countDown(); }) .onSuccess(request -> request.send() .onComplete(event -> { if (event.failed()) { - log.error("Failed to delete sesssion on shutdown", event.cause()); + log.error("Failed to delete session on shutdown", event.cause()); } latch.countDown(); })); } + try { - latch.await(5, TimeUnit.SECONDS); + latch.await(1000, TimeUnit.MILLISECONDS); + workerShutdown.await(pollTimeoutMillis * 2, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { } - proxyClient.close(); + log.infov("Workers still running after shutdown {0}", workerShutdown.getCount()); + ProxyUtils.await(1000, proxyClient.close()); + } finally { shutdown = true; }