From 2a1dd6b7d7b8d0f29fbb33f08ff063c90de27434 Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Thu, 3 Feb 2022 18:41:25 +0100 Subject: [PATCH] Fix wrong connection close (#3830) * Fix wrong connection close * Reduced logging overhead. Small improvements of tests. * Consume request entity on valid responses. Signed-off-by: Tomas Langer --- .../WebClientRequestBuilderImpl.java | 30 ++-- .../helidon/webserver/BareResponseImpl.java | 77 +++++----- .../helidon/webserver/ForwardingHandler.java | 49 +++--- .../io/helidon/webserver/RequestContext.java | 23 ++- .../helidon/webserver/HttpPipelineTest.java | 15 +- .../io/helidon/webserver/KeepAliveTest.java | 142 ++++++++++++++++++ .../java/io/helidon/webserver/PlainTest.java | 18 +-- .../webserver/ReqEntityAnalyzedTest.java | 28 ++-- .../webserver/utils/SocketHttpClient.java | 3 +- 9 files changed, 291 insertions(+), 94 deletions(-) create mode 100644 webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java diff --git a/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java b/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java index eef37498274..8c3ebd0cee5 100644 --- a/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java +++ b/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.logging.Level; import java.util.logging.Logger; import io.helidon.common.GenericType; @@ -204,15 +205,19 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat for (ChannelRecord channelRecord : channels) { Channel channel = channelRecord.channel; if (channel.isOpen() && channel.attr(IN_USE).get().compareAndSet(false, true)) { - LOGGER.finest(() -> "Reusing -> " + channel.hashCode()); - LOGGER.finest(() -> "Setting in use -> true"); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "Reusing -> " + channel.hashCode() + ", settting in use -> true"); + } return channelRecord.channelFuture; } - LOGGER.finest(() -> "Not accepted -> " + channel.hashCode()); - LOGGER.finest(() -> "Open -> " + channel.isOpen()); - LOGGER.finest(() -> "In use -> " + channel.attr(IN_USE).get()); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "Not accepted -> " + channel.hashCode() + ", open -> " + + channel.isOpen() + ", in use -> " + channel.attr(IN_USE).get()); + } + } + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "New connection to -> " + connectionIdent); } - LOGGER.finest(() -> "New connection to -> " + connectionIdent); URI uri = connectionIdent.base; ChannelFuture connect = bootstrap.connect(uri.getHost(), uri.getPort()); Channel channel = connect.channel(); @@ -225,9 +230,10 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat } static void removeChannelFromCache(ConnectionIdent key, Channel channel) { - LOGGER.finest(() -> "Removing from channel cache."); - LOGGER.finest(() -> "Connection ident -> " + key); - LOGGER.finest(() -> "Channel -> " + channel.hashCode()); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "Removing from channel cache. Connection ident -> " + key + + ", channel -> " + channel.hashCode()); + } CHANNEL_CACHE.get(key).remove(new ChannelRecord(channel)); } @@ -578,8 +584,10 @@ private Single invoke(Flow.Publisher requestEntity : bootstrap.connect(finalUri.getHost(), finalUri.getPort()); channelFuture.addListener((ChannelFutureListener) future -> { - LOGGER.finest(() -> "(client reqID: " + requestId + ") " - + "Channel hashcode -> " + channelFuture.channel().hashCode()); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "(client reqID: " + requestId + ") " + + "Channel hashcode -> " + channelFuture.channel().hashCode()); + } channelFuture.channel().attr(REQUEST).set(clientRequest); channelFuture.channel().attr(RESPONSE_RECEIVED).set(false); channelFuture.channel().attr(RECEIVED).set(responseReceived); diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java index 5afe9af548c..53218ee21a2 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,6 @@ * The BareResponseImpl. */ class BareResponseImpl implements BareResponse { - private static final Logger LOGGER = Logger.getLogger(BareResponseImpl.class.getName()); // See HttpConversionUtil.ExtensionHeaderNames @@ -76,6 +75,7 @@ class BareResponseImpl implements BareResponse { private final HttpHeaders requestHeaders; private final ChannelFuture channelClosedFuture; private final GenericFutureListener> channelClosedListener; + private final CompletableFuture originalEntityAnalyzed; // Accessed by Subscriber method threads private Flow.Subscription subscription; @@ -103,6 +103,7 @@ class BareResponseImpl implements BareResponse { CompletableFuture requestEntityAnalyzed, long requestId) { this.requestContext = requestContext; + this.originalEntityAnalyzed = requestEntityAnalyzed; this.requestEntityAnalyzed = requestEntityAnalyzed; this.responseFuture = new CompletableFuture<>(); this.headersFuture = new CompletableFuture<>(); @@ -192,26 +193,44 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map log("Request content not fully read with keep-alive: true", channel)); - if (!requestContext.hasRequests() || requestContext.requestCancelled()) { - requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> { - if (listener.equals(ChannelFutureListener.CLOSE)) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); - } else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - } - return listener; - }); - //We are not sure which Connection header value should be set. - //If unhandled entity is only one content large, we can keep the keep-alive - channel.read(); - } else { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); - requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE); - throw new IllegalStateException("Cannot request entity and send response without " - + "waiting for it to be handled"); + + if (!isWebSocketUpgrade) { + if (requestContext.isDataRequested()) { + // there are pending requests, we have emitted some data and request was not explicitly canceled + // this is a bug in code, where entity is requested and not fully processed + // throwing an exception here is a breaking change (also this may be an intermittent problem + // as it may depend on thread race) + HttpRequest request = requestContext.request(); + LOGGER.warning("Entity was requested and not fully consumed before a response is sent. " + + "This is not supported. Connection will be closed. Please fix your route for " + + request.method() + " " + request.uri()); + + // let's close this connection, as it is in an unexpected state + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE); + } else { + // we want to consume the entity and keep alive + // entity must be consumed here, so we do not close connection in forwarding handler + // because of unconsumed payload (the following code will only succeed if there is no subscriber) + requestContext.publisher() + .forEach(DataChunk::release) + .onComplete(() -> { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE_ON_FAILURE); + }) + .onError(t -> { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE); + }) + .ignoreElement(); + } } } else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); @@ -220,8 +239,8 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map log("Writing headers %s", status)); requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> { + LOGGER.fine(() -> log("Writing headers %s", status)); requestContext.runInScope(() -> orderedWrite(this::initWriteResponse)); return listener; }); @@ -377,21 +396,7 @@ public void onSubscribe(Flow.Subscription subscription) { return; } this.subscription = Objects.requireNonNull(subscription, "subscription is null"); - - // TyrusSupport controls order of writes manually - if (isWebSocketUpgrade) { - subscription.request(1); - } else { - // Callback deferring first request for data after: - // - Request stream has been completed - requestEntityAnalyzed.whenComplete((channelFutureListener, throwable) -> { - subscription.request(1); - }); - if (keepAlive) { - //Auxiliary read, does nothing in case of pending read - channel.read(); - } - } + subscription.request(1); } @Override diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java index 84a7eb37ba7..4aea3f65b43 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java @@ -98,7 +98,6 @@ public class ForwardingHandler extends SimpleChannelInboundHandler { private CompletableFuture requestEntityAnalyzed; private CompletableFuture prevRequestFuture; private boolean lastContent; - private boolean hadContentAlready; ForwardingHandler(Routing routing, NettyWebServer webServer, @@ -120,7 +119,6 @@ public class ForwardingHandler extends SimpleChannelInboundHandler { private void reset() { lastContent = false; - hadContentAlready = false; isWebSocketUpgrade = false; actualPayloadSize = 0L; ignorePayload = false; @@ -263,19 +261,6 @@ private void channelReadHttpContent(ChannelHandlerContext ctx, Object msg) { // this is here to handle the case when the content is not readable but we didn't // exceptionally complete the publisher and close the connection throw new IllegalStateException("It is not expected to not have readable content."); - } else if (!requestContext.hasRequests() - && HttpUtil.isKeepAlive(requestContext.request()) - && !requestEntityAnalyzed.isDone()) { - if (hadContentAlready) { - LOGGER.finest(() -> "More than one unhandled content present. Closing the connection."); - requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE); - } else { - //We are checking the unhandled entity, but we cannot be sure if connection should be closed or not. - //Next content has to be checked if it is last chunk. If not close connection. - hadContentAlready = true; - LOGGER.finest(() -> "Requesting the next chunk to determine if the connection should be closed."); - ctx.channel().read(); - } } } @@ -290,7 +275,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @SuppressWarnings("checkstyle:methodlength") private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context requestScope, Object msg) { - hadContentAlready = false; if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine(log("Received HttpRequest: %s. Remote address: %s. Scope id: %s", ctx, @@ -353,7 +337,7 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques } if (publisher.hasRequests()) { - LOGGER.finest(() -> log("Requesting next chunks from Netty", ctx)); + LOGGER.finest(() -> log("Requesting next (%d, %d) chunks from Netty", ctx, n, demand)); ctx.channel().read(); } else { LOGGER.finest(() -> log("No hook action required", ctx)); @@ -366,7 +350,12 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques // If a problem with the request URI, return 400 response BareRequestImpl bareRequest; try { - bareRequest = new BareRequestImpl((HttpRequest) msg, publisher, webServer, ctx, sslEngine, requestId); + bareRequest = new BareRequestImpl(request, + requestContextRef.publisher(), + webServer, + ctx, + sslEngine, + requestId); } catch (IllegalArgumentException e) { send400BadRequest(ctx, request, e); return true; @@ -376,9 +365,19 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques LOGGER.finest(log("Request id: %s", ctx, bareRequest.requestId())); } + String contentLength = request.headers().get(HttpHeaderNames.CONTENT_LENGTH); + + if ("0".equals(contentLength) + || (contentLength == null + && !"upgrade".equalsIgnoreCase(request.headers().get(HttpHeaderNames.CONNECTION)) + && !"chunked".equalsIgnoreCase(request.headers().get(HttpHeaderNames.TRANSFER_ENCODING)) + && !"multipart/byteranges".equalsIgnoreCase(request.headers().get(HttpHeaderNames.CONTENT_TYPE)))) { + // no entity + requestContextRef.complete(); + } + // If context length is greater than maximum allowed, return 413 response if (maxPayloadSize >= 0) { - String contentLength = request.headers().get(Http.Header.CONTENT_LENGTH); if (contentLength != null) { try { long value = Long.parseLong(contentLength); @@ -439,6 +438,10 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques LOGGER.fine(log("Response complete: %s", ctx, System.identityHashCode(msg))); } }); + /* + TODO we should only send continue in case the entity is request (e.g. we found a route and user started reading it) + This would solve connection close for 404 for requests with entity + */ if (HttpUtil.is100ContinueExpected(request)) { send100Continue(ctx, request); } @@ -536,7 +539,8 @@ private void send100Continue(ChannelHandlerContext ctx, ""); FullHttpResponse response = toNettyResponse(transportResponse); - ctx.write(response); + // we should flush this immediately, as we need the client to send entity + ctx.writeAndFlush(response); } /** @@ -555,6 +559,8 @@ private void send400BadRequest(ChannelHandlerContext ctx, HttpRequest request, T t); FullHttpResponse response = toNettyResponse(handlerResponse); + // 400 -> close connection + response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); ctx.writeAndFlush(response) .addListener(future -> ctx.close()); @@ -575,6 +581,8 @@ private void send413PayloadTooLarge(ChannelHandlerContext ctx, HttpRequest reque ""); FullHttpResponse response = toNettyResponse(transportResponse); + // too big entity -> close connection + response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); ctx.writeAndFlush(response) .addListener(future -> ctx.close()); @@ -596,7 +604,6 @@ private FullHttpResponse toNettyResponse(TransportResponse handlerResponse) { HttpHeaders nettyHeaders = response.headers(); headers.forEach(nettyHeaders::add); - nettyHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); return response; } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java b/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java index 77b90679895..246807d7aac 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ import io.helidon.common.context.Context; import io.helidon.common.context.Contexts; +import io.helidon.common.http.DataChunk; +import io.helidon.common.reactive.Multi; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpRequest; @@ -35,6 +37,7 @@ class RequestContext { private final HttpRequest request; private final Context scope; private volatile boolean responseCompleted; + private volatile boolean emitted; RequestContext(HttpRequestScopedPublisher publisher, HttpRequest request, Context scope) { this.publisher = publisher; @@ -42,6 +45,11 @@ class RequestContext { this.scope = scope; } + Multi publisher() { + return Multi.create(publisher) + .peek(something -> emitted = true); + } + HttpRequest request() { return request; } @@ -78,6 +86,19 @@ boolean hasRequests() { return publisher.hasRequests(); } + /** + * Has there been a request for content. + * + * @return {@code true} if data was requested and request was not cancelled + */ + boolean isDataRequested() { + return (hasRequests() || hasEmitted()) || requestCancelled(); + } + + boolean hasEmitted() { + return emitted; + } + /** * Is request content cancelled. * diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/HttpPipelineTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/HttpPipelineTest.java index 15ed63c484d..14eecbb2216 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/HttpPipelineTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/HttpPipelineTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,9 +29,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; /** * Test support for HTTP 1.1 pipelining. @@ -67,7 +67,12 @@ private static void startServer(int port) throws Exception { .put("/", (req, res) -> { counter.set(0); log("put server"); - res.send(); + req.content() + .as(String.class) + .forSingle(it -> { + log("put: " + it); + res.send(); + }); }) .get("/", (req, res) -> { log("get server"); @@ -99,8 +104,8 @@ private static void startServer(int port) throws Exception { public void testPipelining() throws Exception { try (SocketHttpClient s = new SocketHttpClient(webServer)) { s.request(Http.Method.PUT, "/"); // reset server - s.request(Http.Method.GET, "/"); // request_0 - s.request(Http.Method.GET, "/"); // request_1 + s.request(Http.Method.GET, null); // request_0 + s.request(Http.Method.GET, null); // request_1 log("put client"); String reset = s.receive(); assertThat(reset, notNullValue()); diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java new file mode 100644 index 00000000000..9e12a46981a --- /dev/null +++ b/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.webserver; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import io.helidon.common.LogConfig; +import io.helidon.common.http.DataChunk; +import io.helidon.common.reactive.Multi; +import io.helidon.webclient.WebClient; +import io.helidon.webclient.WebClientResponse; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.util.AsciiString; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.RepeatedTest; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; + +public class KeepAliveTest { + private static WebServer server; + private static WebClient webClient; + + @BeforeAll + static void setUp() { + LogConfig.configureRuntime(); + server = WebServer.builder() + .routing(Routing.builder() + .register("/close", rules -> rules.any((req, res) -> { + req.content().forEach(dataChunk -> { + // consume only first from two chunks + dataChunk.release(); + throw new RuntimeException("BOOM!"); + }).exceptionally(res::send); + })) + .register("/plain", rules -> rules.any((req, res) -> { + req.content() + .forEach(DataChunk::release) + .onComplete(res::send) + .ignoreElement(); + })) + .build()) + .build(); + server.start().await(); + String serverUrl = "http://localhost:" + server.port(); + webClient = WebClient.builder() + .baseUri(serverUrl) + .keepAlive(true) + .build(); + + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + @RepeatedTest(100) + void closeWithKeepAliveUnconsumedRequest() { + testCall(webClient, true, "/close", 500, HttpHeaderValues.CLOSE, true); + } + + @RepeatedTest(100) + void sendWithoutKeepAlive() { + testCall(webClient, false, "/plain", 200, null, false); + } + + @RepeatedTest(100) + void sendWithKeepAlive() { + testCall(webClient, true, "/plain", 200, HttpHeaderValues.KEEP_ALIVE, false); + } + + private static void testCall(WebClient webClient, + boolean keepAlive, + String path, + int expectedStatus, + AsciiString expectedConnectionHeader, + boolean ignoreConnectionClose) { + WebClientResponse res = null; + try { + res = webClient + .put() + .keepAlive(keepAlive) + .path(path) + .submit(Multi.interval(10, TimeUnit.MILLISECONDS, Executors.newSingleThreadScheduledExecutor()) + .limit(2) + .map(l -> "msg_"+ l) + .map(String::getBytes) + .map(ByteBuffer::wrap) + .map(bb -> DataChunk.create(true, true, bb)) + ) + .await(Duration.ofMinutes(5)); + + assertThat(res.status().code(), is(expectedStatus)); + if (expectedConnectionHeader != null) { + assertThat(res.headers().toMap(), + hasEntry(HttpHeaderNames.CONNECTION.toString(), List.of(expectedConnectionHeader.toString()))); + } else { + assertThat(res.headers().toMap(), not(hasKey(HttpHeaderNames.CONNECTION.toString()))); + } + res.content().forEach(DataChunk::release); + } catch (CompletionException e) { + if (ignoreConnectionClose && e.getMessage().contains("Connection reset")) { + // this is an expected (intermittent) result - due to a natural race (between us writing the request + // data and server responding), we may either get a response + // or the socket may be closed for writing (the reset comes from an attempt to write entity to a closed + // socket) + return; + } + throw e; + } finally { + Optional.ofNullable(res).ifPresent(WebClientResponse::close); + } + } +} \ No newline at end of file diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java index c792db38c78..0507b67b005 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -256,7 +256,7 @@ public void postGetPostGetTheSameConnection() throws Exception { } @Test - public void getWithLargePayloadCausesConnectionClose() throws Exception { + public void getWithLargePayloadDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -264,7 +264,7 @@ public void getWithLargePayloadCausesConnectionClose() throws Exception { // assert assertThat(entityFromResponse(s.receive(), true), is("9\nIt works!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @@ -295,14 +295,14 @@ public void traceWithAnyPayloadCausesConnectionCloseAndBadRequestWhenHandled() t } @Test - public void deferredGetWithLargePayloadCausesConnectionClose() throws Exception { + public void deferredGetWithLargePayloadDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get s.request(Http.Method.GET, "/deferred", SocketHttpClient.longData(100_000).toString()); // assert assertThat(entityFromResponse(s.receive(), true), is("d\nI'm deferred!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @@ -334,7 +334,7 @@ public void unconsumedSmallPostDataDoesNotCauseConnectionClose() throws Exceptio } @Test - public void unconsumedLargePostDataCausesConnectionClose() throws Exception { + public void unconsumedLargePostDataDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -342,12 +342,12 @@ public void unconsumedLargePostDataCausesConnectionClose() throws Exception { // assert assertThat(entityFromResponse(s.receive(), true), is("15\nPayload not consumed!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @Test - public void unconsumedDeferredLargePostDataCausesConnectionClose() throws Exception { + public void unconsumedDeferredLargePostDataDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -355,7 +355,7 @@ public void unconsumedDeferredLargePostDataCausesConnectionClose() throws Except // assert assertThat(entityFromResponse(s.receive(), true), is("d\nI'm deferred!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java index f4c91194b50..886cac1d18d 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Oracle and/or its affiliates. + * Copyright (c) 2021, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,13 +19,12 @@ import java.nio.ByteBuffer; import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import io.helidon.common.http.DataChunk; +import io.helidon.common.http.Http; import io.helidon.common.reactive.Multi; import io.helidon.webclient.WebClient; import io.helidon.webclient.WebClientResponse; @@ -34,9 +33,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.RepeatedTest; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; public class ReqEntityAnalyzedTest { private static final Duration TIME_OUT = Duration.ofSeconds(5); @@ -88,14 +87,23 @@ private void testCall(WebClient webClient, Multi payload) { .onError(Throwable::printStackTrace) .await(TIME_OUT); - webClientResponse.content().as(String.class) - .forSingle(s -> assertEquals("Server:0Server:1Server:2", s, "Wrong response!")) + assertThat(webClientResponse.status(), is(Http.Status.OK_200)); + + String response = webClientResponse.content() + .as(String.class) .await(TIME_OUT); - } catch (CompletionException e) { - fail(e); + assertThat(response, is("Server:0Server:1Server:2")); + } catch (Exception e) { + // this is expected - we do not support parallel read of entity + if (e.getMessage().contains("reset by the host")) { + return; + } + throw e; } finally { - Optional.ofNullable(webClientResponse).ifPresent(WebClientResponse::close); + if (webClientResponse != null) { + webClientResponse.close(); + } } } diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/utils/SocketHttpClient.java b/webserver/webserver/src/test/java/io/helidon/webserver/utils/SocketHttpClient.java index 4e9ccb3f408..d95de54f05b 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/utils/SocketHttpClient.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/utils/SocketHttpClient.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,6 +64,7 @@ public class SocketHttpClient implements AutoCloseable { */ public SocketHttpClient(WebServer webServer) throws IOException { socket = new Socket("localhost", webServer.port()); + socket.setSoTimeout(10000); } /**