Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Graceful client connection close #8051

Merged
merged 6 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @param <REQ> type of the client request
*/
public interface HttpClient<REQ extends ClientRequest<REQ>> {
public interface HttpClient<REQ extends ClientRequest<REQ>> extends ReleasableResource {
/**
* Create a request for a method.
*
Expand All @@ -32,6 +32,13 @@ public interface HttpClient<REQ extends ClientRequest<REQ>> {
*/
REQ method(Method method);

/**
* Gracefully close all opened client specific connections.
*/
default void closeResource() {
// Do nothing by default
}

/**
* Shortcut for get method with a path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ public HttpClientRequest method(Method method) {
tcpProtocolIds);
}

@Override
public void closeResource() {
for (ProtocolSpi o : List.copyOf(clientSpiByProtocol.values())) {
o.spi().releaseResource();
}
}

@Override
public <T, C extends ProtocolConfig> T client(Protocol<T, C> protocol, C protocolConfig) {
return protocol.provider().protocol(this, protocolConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webclient.spi;

import io.helidon.webclient.api.ReleasableResource;

/**
* Client connection cache with release shutdown hook to provide graceful shutdown.
*/
public abstract class ClientConnectionCache implements ReleasableResource {
protected ClientConnectionCache() {
danielkec marked this conversation as resolved.
Show resolved Hide resolved
Runtime.getRuntime().addShutdownHook(new Thread(this::releaseResource));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import io.helidon.webclient.api.ClientRequest;
import io.helidon.webclient.api.ClientUri;
import io.helidon.webclient.api.FullClientRequest;
import io.helidon.webclient.api.ReleasableResource;

/**
* Integration for HTTP versions to provide a single API.
*/
public interface HttpClientSpi {
public interface HttpClientSpi extends ReleasableResource {
/**
* Return whether this HTTP version can handle the provided request.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ class Http1ClientImpl implements Http1Client, HttpClientSpi {
private final Http1ClientConfig clientConfig;
private final Http1ClientProtocolConfig protocolConfig;
private final Http1ConnectionCache connectionCache;
private final Http1ConnectionCache clientCache;

Http1ClientImpl(WebClient webClient, Http1ClientConfig clientConfig) {
this.webClient = webClient;
this.clientConfig = clientConfig;
this.protocolConfig = clientConfig.protocolConfig();
if (clientConfig.shareConnectionCache()) {
this.connectionCache = Http1ConnectionCache.shared();
this.clientCache = null;
} else {
this.connectionCache = Http1ConnectionCache.create();
this.clientCache = connectionCache;
}
}

Expand Down Expand Up @@ -86,6 +89,13 @@ public ClientRequest<?> clientRequest(FullClientRequest<?> clientRequest, Client
.fragment(clientUri.fragment());
}

@Override
public void closeResource() {
if (clientCache != null) {
this.clientCache.closeResource();
}
}

WebClient webClient() {
return webClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package io.helidon.webclient.http1;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.helidon.common.tls.Tls;
import io.helidon.http.ClientRequestHeaders;
Expand All @@ -33,20 +35,22 @@
import io.helidon.webclient.api.Proxy;
import io.helidon.webclient.api.TcpClientConnection;
import io.helidon.webclient.api.WebClient;
import io.helidon.webclient.spi.ClientConnectionCache;

import static java.lang.System.Logger.Level.DEBUG;

/**
* Cache of HTTP/1.1 connections for keep alive.
*/
class Http1ConnectionCache {
class Http1ConnectionCache extends ClientConnectionCache {
private static final System.Logger LOGGER = System.getLogger(Http1ConnectionCache.class.getName());
private static final Tls NO_TLS = Tls.builder().enabled(false).build();
private static final String HTTPS = "https";
private static final Http1ConnectionCache SHARED = create();
private static final List<String> ALPN_ID = List.of(Http1Client.PROTOCOL_ID);
private static final Duration QUEUE_TIMEOUT = Duration.ofMillis(10);
private final Map<ConnectionKey, LinkedBlockingDeque<TcpClientConnection>> cache = new ConcurrentHashMap<>();
private final AtomicBoolean closed = new AtomicBoolean();

static Http1ConnectionCache shared() {
return SHARED;
Expand All @@ -71,6 +75,16 @@ ClientConnection connection(Http1ClientImpl http1Client,
}
}

@Override
public void closeResource() {
if (closed.getAndSet(true)) {
return;
}
cache.values().stream()
.flatMap(Collection::stream)
.forEach(TcpClientConnection::closeResource);
}

private boolean handleKeepAlive(boolean defaultKeepAlive, WritableHeaders<?> headers) {
if (headers.contains(HeaderValues.CONNECTION_CLOSE)) {
return false;
Expand All @@ -90,6 +104,11 @@ private ClientConnection keepAliveConnection(Http1ClientImpl http1Client,
Tls tls,
ClientUri uri,
Proxy proxy) {

if (closed.get()) {
throw new IllegalStateException("Connection cache is closed");
}

Http1ClientConfig clientConfig = http1Client.clientConfig();

ConnectionKey connectionKey = new ConnectionKey(uri.scheme(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -85,8 +86,7 @@ class Http2ClientConnection {
private Http2Settings serverSettings = Http2Settings.builder()
.build();
private Future<?> handleTask;

private volatile boolean closed = false;
private final AtomicReference<State> state = new AtomicReference<>(State.OPEN);

Http2ClientConnection(Http2ClientImpl http2Client, ClientConnection connection) {
this.protocolConfig = http2Client.protocolConfig();
Expand Down Expand Up @@ -177,7 +177,7 @@ Http2ClientStream tryStream(Http2StreamConfig config) {
}

boolean closed() {
return closed || (protocolConfig.ping() && !ping());
return state.get().closed() || (protocolConfig.ping() && !ping());
}

boolean ping() {
Expand All @@ -203,13 +203,15 @@ void updateLastStreamId(int lastStreamId) {
}

void close() {
closed = true;
try {
handleTask.cancel(true);
ctx.log(LOGGER, TRACE, "Closing connection");
connection.closeResource();
} catch (Throwable e) {
ctx.log(LOGGER, TRACE, "Failed to close HTTP/2 connection.", e);
this.goAway(0, Http2ErrorCode.NO_ERROR, "Closing connection");
if (state.getAndSet(State.CLOSED) != State.CLOSED) {
try {
handleTask.cancel(true);
ctx.log(LOGGER, TRACE, "Closing connection");
connection.closeResource();
} catch (Throwable e) {
ctx.log(LOGGER, TRACE, "Failed to close HTTP/2 connection.", e);
}
}
}

Expand Down Expand Up @@ -268,14 +270,14 @@ private void start(Http2ClientProtocolConfig protocolConfig,
try {
while (!Thread.interrupted()) {
if (!handle()) {
closed = true;
this.close();
ctx.log(LOGGER, TRACE, "Connection closed");
return;
}
}
ctx.log(LOGGER, TRACE, "Client listener interrupted");
} catch (Throwable t) {
closed = true;
this.close();
ctx.log(LOGGER, DEBUG, "Failed to handle HTTP/2 client connection", t);
}
});
Expand Down Expand Up @@ -457,8 +459,26 @@ private void ackSettings() {
}

private void goAway(int streamId, Http2ErrorCode errorCode, String msg) {
Http2Settings http2Settings = Http2Settings.create();
Http2GoAway frame = new Http2GoAway(streamId, errorCode, msg);
writer.write(frame.toFrameData(http2Settings, 0, Http2Flag.NoFlags.create()));
if (State.OPEN == state.getAndSet(State.GO_AWAY)) {
Http2Settings http2Settings = Http2Settings.create();
Http2GoAway frame = new Http2GoAway(streamId, errorCode, msg);
writer.write(frame.toFrameData(http2Settings, 0, Http2Flag.NoFlags.create()));
}
}

private enum State {
CLOSED(true),
GO_AWAY(true),
OPEN(false);

private final boolean closed;

State(boolean closed){
this.closed = closed;
}

boolean closed() {
return closed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void close() {
// this is to prevent concurrent modification (connections remove themselves from the map)
Set<Http2ClientConnection> toClose = new HashSet<>(allConnections.keySet());
toClose.forEach(Http2ClientConnection::close);
this.activeConnection.set(null);
this.activeConnection.getAndSet(null).close();
this.allConnections.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@ class Http2ClientImpl implements Http2Client, HttpClientSpi {
private final Http2ClientConfig clientConfig;
private final Http2ClientProtocolConfig protocolConfig;
private final Http2ConnectionCache connectionCache;
private final Http2ConnectionCache clientCache;

Http2ClientImpl(WebClient webClient, Http2ClientConfig clientConfig) {
this.webClient = webClient;
this.clientConfig = clientConfig;
this.protocolConfig = clientConfig.protocolConfig();
if (clientConfig.shareConnectionCache()) {
this.connectionCache = Http2ConnectionCache.shared();
this.clientCache = null;
} else {
this.connectionCache = Http2ConnectionCache.create();
this.clientCache = connectionCache;
}
}

Expand Down Expand Up @@ -94,6 +97,13 @@ public ClientRequest<?> clientRequest(FullClientRequest<?> clientRequest, Client
.fragment(clientUri.fragment());
}

@Override
public void closeResource() {
if (clientCache != null) {
this.clientCache.closeResource();
}
}

WebClient webClient() {
return webClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,26 @@

package io.helidon.webclient.http2;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import io.helidon.common.configurable.LruCache;
import io.helidon.webclient.api.ClientUri;
import io.helidon.webclient.api.ConnectionKey;
import io.helidon.webclient.http1.Http1ClientRequest;
import io.helidon.webclient.http1.Http1ClientResponse;
import io.helidon.webclient.spi.ClientConnectionCache;

final class Http2ConnectionCache {
//todo Gracefully close connections in channel cache
final class Http2ConnectionCache extends ClientConnectionCache {
private static final Http2ConnectionCache SHARED = create();
private final LruCache<ConnectionKey, Boolean> http2Supported = LruCache.<ConnectionKey, Boolean>builder()
.capacity(1000)
.build();
private final Map<ConnectionKey, Http2ClientConnectionHandler> cache = new ConcurrentHashMap<>();

private final AtomicBoolean closed = new AtomicBoolean();
static Http2ConnectionCache shared() {
return SHARED;
}
Expand All @@ -42,16 +44,22 @@ static Http2ConnectionCache create() {
return new Http2ConnectionCache();
}

@Override
public void closeResource() {
if (!closed.getAndSet(true)) {
List.copyOf(cache.keySet())
.forEach(this::closeAndRemove);
}
}

boolean supports(ConnectionKey ck) {
return http2Supported.get(ck).isPresent();
}

void remove(ConnectionKey connectionKey) {
Http2ClientConnectionHandler handler = cache.remove(connectionKey);
if (handler != null) {
handler.close();
if (!closed.get()) {
closeAndRemove(connectionKey);
}
http2Supported.remove(connectionKey);
}

Http2ConnectionAttemptResult newStream(Http2ClientImpl http2Client,
Expand All @@ -60,6 +68,10 @@ Http2ConnectionAttemptResult newStream(Http2ClientImpl http2Client,
ClientUri initialUri,
Function<Http1ClientRequest, Http1ClientResponse> http1EntityHandler) {

if (closed.get()) {
throw new IllegalStateException("Connection cache is closed");
}

// this statement locks all threads - must not do anything complicated (just create a new instance)
Http2ConnectionAttemptResult result =
cache.computeIfAbsent(connectionKey, Http2ClientConnectionHandler::new)
Expand All @@ -73,4 +85,12 @@ Http2ConnectionAttemptResult newStream(Http2ClientImpl http2Client,
}
return result;
}

private void closeAndRemove(ConnectionKey connectionKey){
Http2ClientConnectionHandler handler = cache.remove(connectionKey);
if (handler != null) {
handler.close();
}
http2Supported.remove(connectionKey);
}
}
Loading