diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index b9b56fbcab1..081bd4b15aa 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -17,6 +17,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http2.*; import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.TimeoutException; import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.*; @@ -661,7 +662,14 @@ public void doSetWriteQueueMaxSize(int size) { @Override public void reset(Throwable cause) { - long code = cause instanceof StreamResetException ? ((StreamResetException)cause).getCode() : 0; + long code; + if (cause instanceof StreamResetException) { + code = ((StreamResetException)cause).getCode(); + } else if (cause instanceof java.util.concurrent.TimeoutException) { + code = 0x08L; // CANCEL + } else { + code = 0L; + } conn.context.emit(code, this::writeReset); } diff --git a/src/test/java/io/vertx/core/http/Http1xClientTimeoutTest.java b/src/test/java/io/vertx/core/http/Http1xClientTimeoutTest.java new file mode 100644 index 00000000000..37e6f337d29 --- /dev/null +++ b/src/test/java/io/vertx/core/http/Http1xClientTimeoutTest.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http; + +public class Http1xClientTimeoutTest extends HttpClientTimeoutTest { +} diff --git a/src/test/java/io/vertx/core/http/Http2ClientTimeoutTest.java b/src/test/java/io/vertx/core/http/Http2ClientTimeoutTest.java new file mode 100644 index 00000000000..fbf1d0e7b82 --- /dev/null +++ b/src/test/java/io/vertx/core/http/Http2ClientTimeoutTest.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2011-2023 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http; + +public class Http2ClientTimeoutTest extends HttpClientTimeoutTest { + + @Override + protected HttpServerOptions createBaseServerOptions() { + return Http2ServerTest.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTPS_PORT, HttpTestBase.DEFAULT_HTTPS_HOST); + } + + @Override + protected HttpClientOptions createBaseClientOptions() { + return Http2ServerTest.createHttp2ClientOptions(); + } +} diff --git a/src/test/java/io/vertx/core/http/HttpClientTimeoutTest.java b/src/test/java/io/vertx/core/http/HttpClientTimeoutTest.java index 2777eb784fa..d9434089339 100644 --- a/src/test/java/io/vertx/core/http/HttpClientTimeoutTest.java +++ b/src/test/java/io/vertx/core/http/HttpClientTimeoutTest.java @@ -12,6 +12,7 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.DeploymentOptions; +import io.vertx.core.ThreadingModel; import io.vertx.core.buffer.Buffer; import io.vertx.core.impl.Utils; import io.vertx.core.net.NetServer; @@ -22,52 +23,13 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -public class HttpClientTimeoutTest extends HttpTestBase { - - @Test - public void testEndToEndRequestTimeout() throws Exception { - int timeout = 3000; - int ratio = 60; - int delay = timeout * ratio / 100; - server.requestHandler(req -> { - switch (req.uri()) { - case "/slow": - vertx.setTimer(delay, id -> { - req.response().end(); - }); - break; - default: - req.response().end(); - break; - } - }); - startServer(testAddress); - List requests = new ArrayList<>(); - for (int i = 0;i < 5;i++) { - HttpClientRequest request = client.request(new RequestOptions(requestOptions)).toCompletionStage().toCompletableFuture().get(); - requests.add(request); - } - vertx.setTimer(delay, id -> { - requests.forEach(req -> { - req.send().compose(HttpClientResponse::body); - }); - }); - long now = System.currentTimeMillis(); - client.request(new RequestOptions(requestOptions).setTimeout(timeout).setURI("/slow")) - .onComplete(onSuccess(req -> { - req.send().compose(HttpClientResponse::body).onComplete(onSuccess(body -> { - long elapsed = System.currentTimeMillis() - now; - assertTrue(elapsed >= delay * 2); - testComplete(); - })); - })); - await(); - } +public abstract class HttpClientTimeoutTest extends HttpTestBase { @Test public void testConnectTimeoutDoesFire() throws Exception { @@ -75,6 +37,11 @@ public void testConnectTimeoutDoesFire() throws Exception { server.requestHandler(req -> { req.response().end(); }); + server.connectionHandler(conn -> { + if (conn.getWindowSize() != -1) { + conn.updateSettings(new Http2Settings().setMaxConcurrentStreams(5)); + } + }); startServer(testAddress); List requests = new ArrayList<>(); for (int i = 0;i < 5;i++) { @@ -97,6 +64,11 @@ public void testConnectTimeoutDoesNotFire() throws Exception { server.requestHandler(req -> { req.response().end(); }); + server.connectionHandler(conn -> { + if (conn.getWindowSize() != -1) { + conn.updateSettings(new Http2Settings().setMaxConcurrentStreams(5)); + } + }); startServer(testAddress); List requests = new ArrayList<>(); for (int i = 0;i < 5;i++) { @@ -122,15 +94,16 @@ public void testConnectTimeoutDoesNotFire() throws Exception { @Test public void testTimedOutWaiterDoesNotConnect() throws Exception { Assume.assumeTrue("Domain socket don't pass this test", testAddress.isInetSocket()); + Assume.assumeTrue("HTTP/2 don't pass this test", createBaseClientOptions().getProtocolVersion() == HttpVersion.HTTP_1_1); long responseDelay = 300; int requests = 6; - client.close(); CountDownLatch firstCloseLatch = new CountDownLatch(1); - server.close(onSuccess(v -> firstCloseLatch.countDown())); + server.close().onComplete(onSuccess(v -> firstCloseLatch.countDown())); // Make sure server is closed before continuing awaitLatch(firstCloseLatch); - client = vertx.createHttpClient(createBaseClientOptions().setKeepAlive(false).setMaxPoolSize(1)); + client.close(); + client = vertx.createHttpClient(createBaseClientOptions().setKeepAlive(false), new PoolOptions().setHttp1MaxSize(1)); AtomicInteger requestCount = new AtomicInteger(0); // We need a net server because we need to intercept the socket connection, not just full http requests NetServer server = vertx.createNetServer(); @@ -154,29 +127,34 @@ public void testTimedOutWaiterDoesNotConnect() throws Exception { CountDownLatch latch = new CountDownLatch(requests); - server.listen(testAddress, onSuccess(s -> { - for(int count = 0; count < requests; count++) { + server.listen(testAddress) + .toCompletionStage() + .toCompletableFuture() + .get(20, TimeUnit.SECONDS); - if (count % 2 == 0) { - client.request(requestOptions) - .compose(HttpClientRequest::send) - .compose(HttpClientResponse::body) - .onComplete(onSuccess(buff -> { - assertEquals("OK", buff.toString()); - latch.countDown(); - })); - } else { - // Odd requests get a timeout less than the responseDelay, since we have a pool size of one and a delay all but - // the first request should end up in the wait queue, the odd numbered requests should time out so we should get - // (requests + 1 / 2) connect attempts - client - .request(new RequestOptions(requestOptions).setConnectTimeout(responseDelay / 2)) - .onComplete(onFailure(err -> { - latch.countDown(); - })); - } + for(int count = 0; count < requests; count++) { + + if (count % 2 == 0) { + client.request(requestOptions) + .compose(req -> req + .send() + .andThen(onSuccess(resp -> assertEquals(200, resp.statusCode()))) + .compose(HttpClientResponse::body)) + .onComplete(onSuccess(buff -> { + assertEquals("OK", buff.toString()); + latch.countDown(); + })); + } else { + // Odd requests get a timeout less than the responseDelay, since we have a pool size of one and a delay all but + // the first request should end up in the wait queue, the odd numbered requests should time out so we should get + // (requests + 1 / 2) connect attempts + client + .request(new RequestOptions(requestOptions).setConnectTimeout(responseDelay / 2)) + .onComplete(onFailure(err -> { + latch.countDown(); + })); } - })); + } awaitLatch(latch); @@ -185,7 +163,7 @@ public void testTimedOutWaiterDoesNotConnect() throws Exception { } @Test - public void testRequestIdleTimeoutIsNotDelayedAfterResponseIsReceived() throws Exception { + public void testRequestTimeoutIsNotDelayedAfterResponseIsReceived() throws Exception { int n = 6; waitFor(n); server.requestHandler(req -> { @@ -195,12 +173,13 @@ public void testRequestIdleTimeoutIsNotDelayedAfterResponseIsReceived() throws E vertx.deployVerticle(new AbstractVerticle() { @Override public void start() throws Exception { - HttpClient client = vertx.createHttpClient(new HttpClientOptions().setMaxPoolSize(n)); + client.close(); + client = vertx.createHttpClient(createBaseClientOptions(), new PoolOptions().setHttp1MaxSize(1)); for (int i = 0;i < n;i++) { AtomicBoolean responseReceived = new AtomicBoolean(); client.request(requestOptions).onComplete(onSuccess(req -> { req.idleTimeout(500); - req.send(onSuccess(resp -> { + req.send().onComplete(onSuccess(resp -> { try { Thread.sleep(150); } catch (InterruptedException e) { @@ -213,31 +192,30 @@ public void start() throws Exception { })); } } - }, new DeploymentOptions().setWorker(true)); + }, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER)); await(); } @Test - public void testRequestIdleTimeoutCanceledWhenRequestEndsNormally() { + public void testRequestTimeoutCanceledWhenRequestEndsNormally() throws Exception { server.requestHandler(req -> req.response().end()); - server.listen(testAddress, onSuccess(s -> { - AtomicReference exception = new AtomicReference<>(); - client.request(requestOptions).onComplete(onSuccess(req -> { - req - .exceptionHandler(exception::set) - .idleTimeout(500) - .end(); - vertx.setTimer(1000, id -> { - assertNull("Did not expect any exception", exception.get()); - testComplete(); - }); - })); + startServer(testAddress); + AtomicReference exception = new AtomicReference<>(); + client.request(requestOptions).onComplete(onSuccess(req -> { + req + .exceptionHandler(exception::set) + .idleTimeout(500) + .end(); + vertx.setTimer(1000, id -> { + assertNull("Did not expect any exception", exception.get()); + testComplete(); + }); })); await(); } @Test - public void testRequestIdleTimeoutCanceledWhenRequestHasAnotherError() { + public void testRequestTimeoutCanceledWhenRequestHasAnOtherError() { Assume.assumeFalse(Utils.isWindows()); AtomicReference exception = new AtomicReference<>(); // There is no server running, should fail to connect @@ -248,26 +226,31 @@ public void testRequestIdleTimeoutCanceledWhenRequestHasAnotherError() { assertFalse("Expected to not end with timeout exception, but did: " + exception.get(), exception.get() instanceof TimeoutException); testComplete(); }); + await(); } @Test - public void testHttpClientRequestIdleTimeoutResetsTheConnection() throws Exception { + public void testHttpClientRequestTimeoutResetsTheConnection() throws Exception { waitFor(3); server.requestHandler(req -> { AtomicBoolean errored = new AtomicBoolean(); req.exceptionHandler(err -> { if (errored.compareAndSet(false, true)) { + if (req.version() == HttpVersion.HTTP_2) { + StreamResetException reset = (StreamResetException) err; + assertEquals(8, reset.getCode()); + } complete(); } }); }); startServer(testAddress); client.request(requestOptions).onComplete(onSuccess(req -> { - req.response(onFailure(err -> { + req.response().onComplete(onFailure(err -> { complete(); })); - req.setChunked(true).sendHead(onSuccess(version -> req.idleTimeout(500))); + req.setChunked(true).sendHead().onComplete(onSuccess(version -> req.idleTimeout(500))); AtomicBoolean errored = new AtomicBoolean(); req.exceptionHandler(err -> { if (errored.compareAndSet(false, true)) { @@ -279,70 +262,68 @@ public void testHttpClientRequestIdleTimeoutResetsTheConnection() throws Excepti } @Test - public void testResponseDataIdleTimeout() { + public void testResponseDataTimeout() throws Exception { waitFor(2); Buffer expected = TestUtils.randomBuffer(1000); server.requestHandler(req -> { req.response().setChunked(true).write(expected); }); - server.listen(testAddress, onSuccess(s -> { - Buffer received = Buffer.buffer(); - client.request(requestOptions).onComplete(onSuccess(req -> { - req.response(onSuccess(resp -> { - AtomicInteger count = new AtomicInteger(); - resp.exceptionHandler(t -> { - if (count.getAndIncrement() == 0) { - assertTrue(t instanceof TimeoutException); - assertEquals(expected, received); - complete(); - } - }); - resp.request().idleTimeout(500); - resp.handler(buff -> { - received.appendBuffer(buff); - // Force the internal timer to be rescheduled with the remaining amount of time - // e.g around 100 ms - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - })); + startServer(testAddress); + Buffer received = Buffer.buffer(); + client.request(requestOptions).onComplete(onSuccess(req -> { + req.response().onComplete(onSuccess(resp -> { AtomicInteger count = new AtomicInteger(); - req.exceptionHandler(t -> { + resp.exceptionHandler(t -> { if (count.getAndIncrement() == 0) { assertTrue(t instanceof TimeoutException); assertEquals(expected, received); complete(); } }); - req.sendHead(); + resp.request().idleTimeout(500); + resp.handler(buff -> { + received.appendBuffer(buff); + // Force the internal timer to be rescheduled with the remaining amount of time + // e.g around 100 ms + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); })); + AtomicInteger count = new AtomicInteger(); + req.exceptionHandler(t -> { + if (count.getAndIncrement() == 0) { + assertTrue(t instanceof TimeoutException); + assertEquals(expected, received); + complete(); + } + }); + req.sendHead(); })); await(); } @Test - public void testRequestTimesOutWhenIndicatedPeriodExpiresWithoutAResponseFromRemoteServer() { + public void testRequestTimesOutWhenIndicatedPeriodExpiresWithoutAResponseFromRemoteServer() throws Exception { server.requestHandler(noOpHandler()); // No response handler so timeout triggers AtomicBoolean failed = new AtomicBoolean(); - server.listen(testAddress, onSuccess(s -> { - client.request(new RequestOptions(requestOptions).setIdleTimeout(1000)) - .compose(HttpClientRequest::send).onComplete(onFailure(t -> { - // Catch the first, the second is going to be a connection closed exception when the - // server is shutdown on testComplete - if (failed.compareAndSet(false, true)) { - testComplete(); - } - })); - })); + startServer(testAddress); + client.request(new RequestOptions(requestOptions).setIdleTimeout(1000)) + .compose(HttpClientRequest::send).onComplete(onFailure(t -> { + // Catch the first, the second is going to be a connection closed exception when the + // server is shutdown on testComplete + if (failed.compareAndSet(false, true)) { + testComplete(); + } + })); + await(); } - // Note : cannot pass for http/2 because flushing is not the same : investigate @Test - public void testRequestTimeoutExtendedWhenResponseChunksReceived() { + public void testRequestTimeoutExtendedWhenResponseChunksReceived() throws Exception { long timeout = 2000; int numChunks = 100; AtomicInteger count = new AtomicInteger(0); @@ -359,21 +340,20 @@ public void testRequestTimeoutExtendedWhenResponseChunksReceived() { }); }); - server.listen(testAddress, onSuccess(s -> { - client.request(new RequestOptions(requestOptions) - .setIdleTimeout(timeout)).onComplete(onSuccess(req -> { - req.send(onSuccess(resp -> { - assertEquals(200, resp.statusCode()); - resp.endHandler(v -> testComplete()); - })); - })); - })); + startServer(testAddress); + + client.request(new RequestOptions(requestOptions).setIdleTimeout(timeout)) + .compose(req -> req + .send() + .andThen(onSuccess(resp -> assertEquals(200, resp.statusCode()))) + .compose(HttpClientResponse::end)) + .onComplete(onSuccess(v -> testComplete())); await(); } @Test - public void testRequestsTimeoutInQueue() { + public void testRequestsTimeoutInQueue() throws Exception { server.requestHandler(req -> { vertx.setTimer(1000, id -> { @@ -385,23 +365,23 @@ public void testRequestsTimeoutInQueue() { }); client.close(); - client = vertx.createHttpClient(new HttpClientOptions().setKeepAlive(false).setMaxPoolSize(1)); - - server.listen(testAddress, onSuccess(s -> { - // Add a few requests that should all timeout - for (int i = 0; i < 5; i++) { - client.request(new RequestOptions(requestOptions).setIdleTimeout(500)) - .compose(HttpClientRequest::send) - .onComplete(onFailure(t -> assertTrue(t instanceof TimeoutException))); - } - // Now another request that should not timeout - client.request(new RequestOptions(requestOptions).setIdleTimeout(3000)) + client = vertx.createHttpClient(createBaseClientOptions().setKeepAlive(false), new PoolOptions().setHttp1MaxSize(1)); + + startServer(testAddress); + + // Add a few requests that should all timeout + for (int i = 0; i < 5; i++) { + client.request(new RequestOptions(requestOptions).setIdleTimeout(500)) .compose(HttpClientRequest::send) - .onComplete(onSuccess(resp -> { - assertEquals(200, resp.statusCode()); - testComplete(); - })); - })); + .onComplete(onFailure(t -> assertTrue(t instanceof TimeoutException))); + } + // Now another request that should not timeout + client.request(new RequestOptions(requestOptions).setIdleTimeout(3000)) + .compose(HttpClientRequest::send) + .onComplete(onSuccess(resp -> { + assertEquals(200, resp.statusCode()); + testComplete(); + })); await(); }