Skip to content

Commit

Permalink
Fix wrong connection close (helidon-io#3830)
Browse files Browse the repository at this point in the history
* Fix wrong connection close
* Reduced logging overhead. Small improvements of tests.
* Consume request entity on valid responses.

Signed-off-by: Tomas Langer <tomas.langer@oracle.com>
  • Loading branch information
tomas-langer authored Feb 3, 2022
1 parent 68b8d1c commit 2a1dd6b
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.helidon.common.GenericType;
Expand Down Expand Up @@ -204,15 +205,19 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat
for (ChannelRecord channelRecord : channels) {
Channel channel = channelRecord.channel;
if (channel.isOpen() && channel.attr(IN_USE).get().compareAndSet(false, true)) {
LOGGER.finest(() -> "Reusing -> " + channel.hashCode());
LOGGER.finest(() -> "Setting in use -> true");
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Reusing -> " + channel.hashCode() + ", settting in use -> true");
}
return channelRecord.channelFuture;
}
LOGGER.finest(() -> "Not accepted -> " + channel.hashCode());
LOGGER.finest(() -> "Open -> " + channel.isOpen());
LOGGER.finest(() -> "In use -> " + channel.attr(IN_USE).get());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Not accepted -> " + channel.hashCode() + ", open -> "
+ channel.isOpen() + ", in use -> " + channel.attr(IN_USE).get());
}
}
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "New connection to -> " + connectionIdent);
}
LOGGER.finest(() -> "New connection to -> " + connectionIdent);
URI uri = connectionIdent.base;
ChannelFuture connect = bootstrap.connect(uri.getHost(), uri.getPort());
Channel channel = connect.channel();
Expand All @@ -225,9 +230,10 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat
}

static void removeChannelFromCache(ConnectionIdent key, Channel channel) {
LOGGER.finest(() -> "Removing from channel cache.");
LOGGER.finest(() -> "Connection ident -> " + key);
LOGGER.finest(() -> "Channel -> " + channel.hashCode());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Removing from channel cache. Connection ident -> " + key
+ ", channel -> " + channel.hashCode());
}
CHANNEL_CACHE.get(key).remove(new ChannelRecord(channel));
}

Expand Down Expand Up @@ -578,8 +584,10 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
: bootstrap.connect(finalUri.getHost(), finalUri.getPort());

channelFuture.addListener((ChannelFutureListener) future -> {
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Channel hashcode -> " + channelFuture.channel().hashCode());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Channel hashcode -> " + channelFuture.channel().hashCode());
}
channelFuture.channel().attr(REQUEST).set(clientRequest);
channelFuture.channel().attr(RESPONSE_RECEIVED).set(false);
channelFuture.channel().attr(RECEIVED).set(responseReceived);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates.
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,6 @@
* The BareResponseImpl.
*/
class BareResponseImpl implements BareResponse {

private static final Logger LOGGER = Logger.getLogger(BareResponseImpl.class.getName());

// See HttpConversionUtil.ExtensionHeaderNames
Expand All @@ -76,6 +75,7 @@ class BareResponseImpl implements BareResponse {
private final HttpHeaders requestHeaders;
private final ChannelFuture channelClosedFuture;
private final GenericFutureListener<? extends Future<? super Void>> channelClosedListener;
private final CompletableFuture<ChannelFutureListener> originalEntityAnalyzed;

// Accessed by Subscriber method threads
private Flow.Subscription subscription;
Expand Down Expand Up @@ -103,6 +103,7 @@ class BareResponseImpl implements BareResponse {
CompletableFuture<ChannelFutureListener> requestEntityAnalyzed,
long requestId) {
this.requestContext = requestContext;
this.originalEntityAnalyzed = requestEntityAnalyzed;
this.requestEntityAnalyzed = requestEntityAnalyzed;
this.responseFuture = new CompletableFuture<>();
this.headersFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -192,26 +193,44 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
// Add keep alive header as per:
// http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
// If already set (e.g. WebSocket upgrade), do not override
if (keepAlive) {
// if response Connection header is set explicitly to close, we can ignore the following
if (!keepAlive || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(response.headers().get(HttpHeaderNames.CONNECTION))) {
response.headers().remove(HttpHeaderNames.CONNECTION);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
} else {
if (!requestContext.requestCompleted()) {
LOGGER.finer(() -> log("Request content not fully read with keep-alive: true", channel));
if (!requestContext.hasRequests() || requestContext.requestCancelled()) {
requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> {
if (listener.equals(ChannelFutureListener.CLOSE)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
} else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
return listener;
});
//We are not sure which Connection header value should be set.
//If unhandled entity is only one content large, we can keep the keep-alive
channel.read();
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
throw new IllegalStateException("Cannot request entity and send response without "
+ "waiting for it to be handled");

if (!isWebSocketUpgrade) {
if (requestContext.isDataRequested()) {
// there are pending requests, we have emitted some data and request was not explicitly canceled
// this is a bug in code, where entity is requested and not fully processed
// throwing an exception here is a breaking change (also this may be an intermittent problem
// as it may depend on thread race)
HttpRequest request = requestContext.request();
LOGGER.warning("Entity was requested and not fully consumed before a response is sent. "
+ "This is not supported. Connection will be closed. Please fix your route for "
+ request.method() + " " + request.uri());

// let's close this connection, as it is in an unexpected state
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
} else {
// we want to consume the entity and keep alive
// entity must be consumed here, so we do not close connection in forwarding handler
// because of unconsumed payload (the following code will only succeed if there is no subscriber)
requestContext.publisher()
.forEach(DataChunk::release)
.onComplete(() -> {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE_ON_FAILURE);
})
.onError(t -> {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
})
.ignoreElement();
}
}
} else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
Expand All @@ -220,8 +239,8 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S

// Content length optimization attempt
if (!lengthOptimization) {
LOGGER.fine(() -> log("Writing headers %s", status));
requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> {
LOGGER.fine(() -> log("Writing headers %s", status));
requestContext.runInScope(() -> orderedWrite(this::initWriteResponse));
return listener;
});
Expand Down Expand Up @@ -377,21 +396,7 @@ public void onSubscribe(Flow.Subscription subscription) {
return;
}
this.subscription = Objects.requireNonNull(subscription, "subscription is null");

// TyrusSupport controls order of writes manually
if (isWebSocketUpgrade) {
subscription.request(1);
} else {
// Callback deferring first request for data after:
// - Request stream has been completed
requestEntityAnalyzed.whenComplete((channelFutureListener, throwable) -> {
subscription.request(1);
});
if (keepAlive) {
//Auxiliary read, does nothing in case of pending read
channel.read();
}
}
subscription.request(1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
private CompletableFuture<ChannelFutureListener> requestEntityAnalyzed;
private CompletableFuture<?> prevRequestFuture;
private boolean lastContent;
private boolean hadContentAlready;

ForwardingHandler(Routing routing,
NettyWebServer webServer,
Expand All @@ -120,7 +119,6 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {

private void reset() {
lastContent = false;
hadContentAlready = false;
isWebSocketUpgrade = false;
actualPayloadSize = 0L;
ignorePayload = false;
Expand Down Expand Up @@ -263,19 +261,6 @@ private void channelReadHttpContent(ChannelHandlerContext ctx, Object msg) {
// this is here to handle the case when the content is not readable but we didn't
// exceptionally complete the publisher and close the connection
throw new IllegalStateException("It is not expected to not have readable content.");
} else if (!requestContext.hasRequests()
&& HttpUtil.isKeepAlive(requestContext.request())
&& !requestEntityAnalyzed.isDone()) {
if (hadContentAlready) {
LOGGER.finest(() -> "More than one unhandled content present. Closing the connection.");
requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
} else {
//We are checking the unhandled entity, but we cannot be sure if connection should be closed or not.
//Next content has to be checked if it is last chunk. If not close connection.
hadContentAlready = true;
LOGGER.finest(() -> "Requesting the next chunk to determine if the connection should be closed.");
ctx.channel().read();
}
}
}

Expand All @@ -290,7 +275,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@SuppressWarnings("checkstyle:methodlength")
private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context requestScope, Object msg) {
hadContentAlready = false;
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(log("Received HttpRequest: %s. Remote address: %s. Scope id: %s",
ctx,
Expand Down Expand Up @@ -353,7 +337,7 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
}

if (publisher.hasRequests()) {
LOGGER.finest(() -> log("Requesting next chunks from Netty", ctx));
LOGGER.finest(() -> log("Requesting next (%d, %d) chunks from Netty", ctx, n, demand));
ctx.channel().read();
} else {
LOGGER.finest(() -> log("No hook action required", ctx));
Expand All @@ -366,7 +350,12 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
// If a problem with the request URI, return 400 response
BareRequestImpl bareRequest;
try {
bareRequest = new BareRequestImpl((HttpRequest) msg, publisher, webServer, ctx, sslEngine, requestId);
bareRequest = new BareRequestImpl(request,
requestContextRef.publisher(),
webServer,
ctx,
sslEngine,
requestId);
} catch (IllegalArgumentException e) {
send400BadRequest(ctx, request, e);
return true;
Expand All @@ -376,9 +365,19 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
LOGGER.finest(log("Request id: %s", ctx, bareRequest.requestId()));
}

String contentLength = request.headers().get(HttpHeaderNames.CONTENT_LENGTH);

if ("0".equals(contentLength)
|| (contentLength == null
&& !"upgrade".equalsIgnoreCase(request.headers().get(HttpHeaderNames.CONNECTION))
&& !"chunked".equalsIgnoreCase(request.headers().get(HttpHeaderNames.TRANSFER_ENCODING))
&& !"multipart/byteranges".equalsIgnoreCase(request.headers().get(HttpHeaderNames.CONTENT_TYPE)))) {
// no entity
requestContextRef.complete();
}

// If context length is greater than maximum allowed, return 413 response
if (maxPayloadSize >= 0) {
String contentLength = request.headers().get(Http.Header.CONTENT_LENGTH);
if (contentLength != null) {
try {
long value = Long.parseLong(contentLength);
Expand Down Expand Up @@ -439,6 +438,10 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
LOGGER.fine(log("Response complete: %s", ctx, System.identityHashCode(msg)));
}
});
/*
TODO we should only send continue in case the entity is request (e.g. we found a route and user started reading it)
This would solve connection close for 404 for requests with entity
*/
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx, request);
}
Expand Down Expand Up @@ -536,7 +539,8 @@ private void send100Continue(ChannelHandlerContext ctx,
"");

FullHttpResponse response = toNettyResponse(transportResponse);
ctx.write(response);
// we should flush this immediately, as we need the client to send entity
ctx.writeAndFlush(response);
}

/**
Expand All @@ -555,6 +559,8 @@ private void send400BadRequest(ChannelHandlerContext ctx, HttpRequest request, T
t);

FullHttpResponse response = toNettyResponse(handlerResponse);
// 400 -> close connection
response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

ctx.writeAndFlush(response)
.addListener(future -> ctx.close());
Expand All @@ -575,6 +581,8 @@ private void send413PayloadTooLarge(ChannelHandlerContext ctx, HttpRequest reque
"");

FullHttpResponse response = toNettyResponse(transportResponse);
// too big entity -> close connection
response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

ctx.writeAndFlush(response)
.addListener(future -> ctx.close());
Expand All @@ -596,7 +604,6 @@ private FullHttpResponse toNettyResponse(TransportResponse handlerResponse) {

HttpHeaders nettyHeaders = response.headers();
headers.forEach(nettyHeaders::add);
nettyHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
return response;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates.
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@

import io.helidon.common.context.Context;
import io.helidon.common.context.Contexts;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Multi;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpRequest;
Expand All @@ -35,13 +37,19 @@ class RequestContext {
private final HttpRequest request;
private final Context scope;
private volatile boolean responseCompleted;
private volatile boolean emitted;

RequestContext(HttpRequestScopedPublisher publisher, HttpRequest request, Context scope) {
this.publisher = publisher;
this.request = request;
this.scope = scope;
}

Multi<DataChunk> publisher() {
return Multi.create(publisher)
.peek(something -> emitted = true);
}

HttpRequest request() {
return request;
}
Expand Down Expand Up @@ -78,6 +86,19 @@ boolean hasRequests() {
return publisher.hasRequests();
}

/**
* Has there been a request for content.
*
* @return {@code true} if data was requested and request was not cancelled
*/
boolean isDataRequested() {
return (hasRequests() || hasEmitted()) || requestCancelled();
}

boolean hasEmitted() {
return emitted;
}

/**
* Is request content cancelled.
*
Expand Down
Loading

0 comments on commit 2a1dd6b

Please sign in to comment.