Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webclient keep-alive #2139

Merged
merged 4 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ protected static WebClient createNewClient(WebClientService... clientServices) {
WebClient.Builder builder = WebClient.builder()
.baseUri("http://localhost:" + webServer.port() + "/greet")
.config(CONFIG.get("client"))
.keepAlive(true)
.context(context)
.addMediaSupport(JsonpSupport.create());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ final class NettyClient implements WebClient {
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofMinutes(1);
private static final Duration DEFAULT_READ_TIMEOUT = Duration.ofMinutes(10);
private static final boolean DEFAULT_FOLLOW_REDIRECTS = false;
private static final boolean DEFAULT_KEEP_ALIVE = false;
private static final int DEFAULT_NUMBER_OF_REDIRECTS = 5;
private static final LazyValue<String> DEFAULT_USER_AGENT = LazyValue
.create(() -> "Helidon/" + Version.VERSION + " (java " + System.getProperty("java.runtime.version") + ")");
Expand All @@ -61,6 +62,7 @@ final class NettyClient implements WebClient {
.writerContextParent(DEFAULT_MEDIA_SUPPORT.writerContext())
.proxy(DEFAULT_PROXY)
.tls(DEFAULT_TLS)
.keepAlive(DEFAULT_KEEP_ALIVE)
.build();

// configurable per client instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,24 @@
import io.helidon.webclient.spi.WebClientService;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.AttributeKey;

import static io.helidon.webclient.WebClientRequestBuilderImpl.COMPLETED;
import static io.helidon.webclient.WebClientRequestBuilderImpl.IN_USE;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RECEIVED;
import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RESPONSE;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RESULT;

/**
* Created for each request/response interaction.
Expand All @@ -52,35 +59,21 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {

private static final Logger LOGGER = Logger.getLogger(NettyClientHandler.class.getName());

private static final AttributeKey<WebClientServiceResponse> SERVICE_RESPONSE = AttributeKey.valueOf("response");
private static final AttributeKey<WebClientServiceResponse> SERVICE_RESPONSE = AttributeKey.valueOf("serviceResponse");

private static final List<HttpInterceptor> HTTP_INTERCEPTORS = new ArrayList<>();

static {
HTTP_INTERCEPTORS.add(new RedirectInterceptor());
}

private final WebClientResponseImpl.Builder clientResponse;
private final CompletableFuture<WebClientResponse> responseFuture;
private final CompletableFuture<WebClientServiceResponse> responseReceived;
private final CompletableFuture<WebClientServiceResponse> requestComplete;
private HttpResponsePublisher publisher;
private ResponseCloser responseCloser;

/**
* Creates new instance.
*
* @param responseFuture response future
* @param responseReceived response received future
* @param requestComplete request complete future
*/
NettyClientHandler(CompletableFuture<WebClientResponse> responseFuture,
CompletableFuture<WebClientServiceResponse> responseReceived,
CompletableFuture<WebClientServiceResponse> requestComplete) {
this.responseFuture = responseFuture;
this.responseReceived = responseReceived;
this.requestComplete = requestComplete;
this.clientResponse = WebClientResponseImpl.builder();
NettyClientHandler() {
}

@Override
Expand All @@ -101,32 +94,35 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IO

this.publisher = new HttpResponsePublisher(ctx);
this.responseCloser = new ResponseCloser(ctx);
this.clientResponse.contentPublisher(publisher)
WebClientResponseImpl.Builder responseBuilder = WebClientResponseImpl.builder();
responseBuilder.contentPublisher(publisher)
.readerContext(requestConfiguration.readerContext())
.status(helidonStatus(response.status()))
.httpVersion(Http.Version.create(response.protocolVersion().toString()))
.responseCloser(responseCloser)
.lastEndpointURI(requestConfiguration.requestURI());

HttpHeaders nettyHeaders = response.headers();
for (String name : nettyHeaders.names()) {
List<String> values = nettyHeaders.getAll(name);
responseBuilder.addHeader(name, values);
}

// we got a response, we can safely complete the future
// all errors are now fed only to the publisher
WebClientResponse clientResponse = responseBuilder.build();
ctx.channel().attr(RESPONSE).set(clientResponse);

for (HttpInterceptor interceptor : HTTP_INTERCEPTORS) {
if (interceptor.shouldIntercept(response.status(), requestConfiguration)) {
interceptor.handleInterception(response, clientRequest, responseFuture);
interceptor.handleInterception(response, clientRequest, ctx.channel().attr(RESULT).get());
if (!interceptor.continueAfterInterception()) {
responseCloser.close().thenAccept(future -> LOGGER.finest(() -> "Response closed due to redirection"));
return;
}
}
}

HttpHeaders nettyHeaders = response.headers();
for (String name : nettyHeaders.names()) {
List<String> values = nettyHeaders.getAll(name);
clientResponse.addHeader(name, values);
}

// we got a response, we can safely complete the future
// all errors are now fed only to the publisher
WebClientResponse clientResponse = this.clientResponse.build();
requestConfiguration.cookieManager().put(requestConfiguration.requestURI(),
clientResponse.headers().toMap());

Expand All @@ -144,6 +140,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IO
csr = csr.thenCompose(clientSerResponse -> service.response(clientRequest, clientSerResponse));
}

CompletableFuture<WebClientServiceResponse> responseReceived = ctx.channel().attr(RECEIVED).get();
CompletableFuture<WebClientResponse> responseFuture = ctx.channel().attr(RESULT).get();
csr.whenComplete((clientSerResponse, throwable) -> {
if (throwable != null) {
responseReceived.completeExceptionally(throwable);
Expand All @@ -152,13 +150,13 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IO
} else {
responseReceived.complete(clientServiceResponse);
responseReceived.thenRun(() -> {
responseFuture.complete(clientResponse);
if (shouldResponseAutomaticallyClose(clientResponse)) {
responseCloser.close()
.thenAccept(aVoid -> {
LOGGER.finest(() -> "Response automatically closed. No entity expected");
});
}
responseFuture.complete(clientResponse);
});
}
});
Expand Down Expand Up @@ -191,7 +189,7 @@ && noContentLength(headers)

private boolean noContentLength(WebClientResponseHeaders headers) {
return headers.first(Http.Header.CONTENT_LENGTH).isEmpty()
|| headers.first(Http.Header.CONTENT_LENGTH).get().equals("0");
|| headers.first(Http.Header.CONTENT_LENGTH).get().equals("0");
}

private boolean notChunked(WebClientResponseHeaders headers) {
Expand All @@ -200,6 +198,7 @@ private boolean notChunked(WebClientResponseHeaders headers) {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
CompletableFuture<WebClientResponse> responseFuture = ctx.channel().attr(RESULT).get();
if (responseFuture.isDone()) {
// we failed during entity processing
publisher.fail(cause);
Expand Down Expand Up @@ -241,11 +240,7 @@ private static final class HttpResponsePublisher extends BufferedEmittingPublish

HttpResponsePublisher(ChannelHandlerContext ctx) {
super.onRequest((n, cnt) -> {
if (super.isUnbounded()) {
ctx.channel().config().setAutoRead(true);
} else {
ctx.channel().config().setAutoRead(false);
}
ctx.channel().config().setAutoRead(super.isUnbounded());

try {
lock.lock();
Expand All @@ -258,12 +253,10 @@ private static final class HttpResponsePublisher extends BufferedEmittingPublish
});
}



public int emit(final ByteBuf buf) {
buf.retain();
return super.emit(DataChunk.create(false, true, buf::release,
buf.nioBuffer().asReadOnlyBuffer()));
buf.nioBuffer().asReadOnlyBuffer()));
}
}

Expand All @@ -290,22 +283,32 @@ boolean isClosed() {
*/
Single<Void> close() {
if (closed.compareAndSet(false, true)) {
WebClientServiceResponse clientServiceResponse = ctx.channel().attr(SERVICE_RESPONSE).get();
Channel channel = ctx.channel();
WebClientServiceResponse clientServiceResponse = channel.attr(SERVICE_RESPONSE).get();
CompletableFuture<WebClientServiceResponse> requestComplete = channel.attr(COMPLETED).get();
requestComplete.complete(clientServiceResponse);

channel.config().setAutoRead(true);
WebClientResponse response = channel.attr(RESPONSE).get();
String connection = response.headers().first(Http.Header.CONNECTION)
.orElseGet(HttpHeaderValues.CLOSE::toString);
if (connection.equals(HttpHeaderValues.CLOSE.toString())) {
ctx.close()
.addListener(future -> {
if (future.isSuccess()) {
LOGGER.finest(() -> "Response from has been closed.");
cf.complete(null);
} else {
LOGGER.log(Level.SEVERE,
future.cause(),
() -> "An exception occurred while closing the response");
cf.completeExceptionally(future.cause());
}
});
} else {
channel.attr(IN_USE).get().set(false);
cf.complete(null);
}
publisher.complete();
ctx.close()
.addListener(future -> {
if (future.isSuccess()) {
LOGGER.finest(() -> "Response from has been closed.");
cf.complete(null);
} else {
LOGGER.log(Level.SEVERE,
future.cause(),
() -> "An exception occurred while closing the response");
cf.completeExceptionally(future.cause());
}
});
}
return Single.create(cf, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -33,35 +35,29 @@
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.FutureListener;

import static io.helidon.webclient.WebClientRequestBuilderImpl.CONNECTION_IDENT;
import static io.helidon.webclient.WebClientRequestBuilderImpl.IN_USE;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RESULT;

/**
* Helidon Web Client initializer which is used for netty channel initialization.
*/
class NettyClientInitializer extends ChannelInitializer<SocketChannel> {

private final RequestConfiguration configuration;
private final CompletableFuture<WebClientResponse> future;
private final CompletableFuture<WebClientServiceResponse> responseReceived;
private final CompletableFuture<WebClientServiceResponse> requestComplete;

/**
* Creates new instance.
*
* @param configuration request configuration
* @param future response completable future
* @param responseReceived future indicating recerved response headers
* @param requestComplete future indicating completed request
* @param configuration request configuration
*/
NettyClientInitializer(RequestConfiguration configuration,
CompletableFuture<WebClientResponse> future,
CompletableFuture<WebClientServiceResponse> responseReceived,
CompletableFuture<WebClientServiceResponse> requestComplete) {
NettyClientInitializer(RequestConfiguration configuration) {
this.configuration = configuration;
this.future = future;
this.responseReceived = responseReceived;
this.requestComplete = requestComplete;
}

@Override
Expand Down Expand Up @@ -101,7 +97,7 @@ protected void initChannel(SocketChannel channel) {
//Check if ssl handshake has been successful. Without this check will this exception be replaced by
//netty and therefore it will be lost.
if (channelFuture.cause() != null) {
future.completeExceptionally(channelFuture.cause());
channel.attr(RESULT).get().completeExceptionally(channelFuture.cause());
channel.close();
}
});
Expand All @@ -111,6 +107,34 @@ protected void initChannel(SocketChannel channel) {
pipeline.addLast("logger", new LoggingHandler(LogLevel.TRACE));
pipeline.addLast("httpCodec", new HttpClientCodec());
pipeline.addLast("httpDecompressor", new HttpContentDecompressor());
pipeline.addLast("helidonHandler", new NettyClientHandler(future, responseReceived, requestComplete));
pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 50));
pipeline.addLast("idleConnectionHandler", new IdleConnectionHandler());
pipeline.addLast("helidonHandler", new NettyClientHandler());
}

private static class IdleConnectionHandler extends ChannelDuplexHandler {

private static final Logger LOGGER = Logger.getLogger(IdleConnectionHandler.class.getName());

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
if (ctx.channel().attr(IN_USE).get().compareAndSet(false, true)) {
ctx.close();
}
}
super.userEventTriggered(ctx, evt);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
WebClientRequestBuilderImpl.ConnectionIdent key = channel.attr(CONNECTION_IDENT).get();
LOGGER.finest(() -> "Channel closed -> " + channel.hashCode());
if (key != null) {
WebClientRequestBuilderImpl.removeChannelFromCache(key, channel);
}
super.channelInactive(ctx);
}
}
}
25 changes: 25 additions & 0 deletions webclient/webclient/src/main/java/io/helidon/webclient/Proxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -352,6 +353,30 @@ private InetSocketAddress address() {
return new InetSocketAddress(host, port);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Proxy proxy = (Proxy) o;
return port == proxy.port
&& useSystemSelector == proxy.useSystemSelector
&& type == proxy.type
&& Objects.equals(host, proxy.host)
&& Objects.equals(noProxy, proxy.noProxy)
&& Objects.equals(username, proxy.username)
&& Objects.equals(password, proxy.password)
&& Objects.equals(systemSelector, proxy.systemSelector);
}

@Override
public int hashCode() {
return Objects.hash(type, host, port, noProxy, username, password, systemSelector, useSystemSelector);
}

/**
* Fluent API builder for {@link Proxy}.
*/
Expand Down
Loading