Skip to content

Commit

Permalink
Review issues 1
Browse files Browse the repository at this point in the history
  • Loading branch information
danielkec committed Aug 15, 2023
1 parent d8f3fe3 commit add0ccc
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public WebClientServiceResponse proceed(WebClientServiceRequest serviceRequest)
ConnectionKey connectionKey = connectionKey(serviceRequest);

Http2ConnectionAttemptResult result = http2Client.connectionCache()
.newStream(connectionKey, clientRequest, uri, http1EntityHandler);
.newStream(http2Client, connectionKey, clientRequest, uri, http1EntityHandler);

this.result = result.result();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ class Http2ClientImpl implements Http2Client, HttpClientSpi {
private final WebClient webClient;
private final Http2ClientConfig clientConfig;
private final Http2ClientProtocolConfig protocolConfig;
private final Http2ConnectionCache connectionCache = new Http2ConnectionCache(this);
private final Http2ConnectionCache connectionCache;

Http2ClientImpl(WebClient webClient, Http2ClientConfig clientConfig) {
this.webClient = webClient;
this.clientConfig = clientConfig;
this.protocolConfig = clientConfig.protocolConfig();
if (clientConfig.shareConnectionCache()) {
this.connectionCache = Http2ConnectionCache.shared();
} else {
this.connectionCache = Http2ConnectionCache.create();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,58 +28,46 @@

final class Http2ConnectionCache {
//todo Gracefully close connections in channel cache
private static final LruCache<ConnectionKey, Boolean> HTTP2_SUPPORTED = LruCache.<ConnectionKey, Boolean>builder()
.capacity(1000)
.build();
private static final Http2ConnectionCache SHARED = create();
private final LruCache<ConnectionKey, Boolean> http2Supported = LruCache.<ConnectionKey, Boolean>builder()
.capacity(1000)
.build();
private static final Map<ConnectionKey, Http2ClientConnectionHandler> GLOBAL_CACHE = new ConcurrentHashMap<>();
private final Map<ConnectionKey, Http2ClientConnectionHandler> localCache = new ConcurrentHashMap<>();
private final Http2ClientImpl http2Client;
private final Map<ConnectionKey, Http2ClientConnectionHandler> cache = new ConcurrentHashMap<>();

Http2ConnectionCache(Http2ClientImpl http2Client) {
this.http2Client = http2Client;
static Http2ConnectionCache shared() {
return SHARED;
}

boolean supports(ConnectionKey ck) {
return selectH2SupportCache().get(ck).isPresent();
static Http2ConnectionCache create() {
return new Http2ConnectionCache();
}

static void clear() {
HTTP2_SUPPORTED.clear();
GLOBAL_CACHE.forEach((c, c2) -> c2.close());
boolean supports(ConnectionKey ck) {
return http2Supported.get(ck).isPresent();
}

void remove(ConnectionKey connectionKey) {
selectCache().remove(connectionKey);
selectH2SupportCache().remove(connectionKey);
cache.remove(connectionKey);
http2Supported.remove(connectionKey);
}

Http2ConnectionAttemptResult newStream(ConnectionKey connectionKey,
Http2ClientRequestImpl request,
ClientUri initialUri,
Function<Http1ClientRequest, Http1ClientResponse> http1EntityHandler) {
Http2ConnectionAttemptResult newStream(Http2ClientImpl http2Client,
ConnectionKey connectionKey,
Http2ClientRequestImpl request,
ClientUri initialUri,
Function<Http1ClientRequest, Http1ClientResponse> http1EntityHandler) {

// this statement locks all threads - must not do anything complicated (just create a new instance)
Http2ConnectionAttemptResult result =
selectCache().computeIfAbsent(connectionKey, Http2ClientConnectionHandler::new)
cache.computeIfAbsent(connectionKey, Http2ClientConnectionHandler::new)
// this statement may block a single connection key
.newStream(http2Client,
request,
initialUri,
http1EntityHandler);
if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) {
selectH2SupportCache().put(connectionKey, true);
http2Supported.put(connectionKey, true);
}
return result;
}

private Map<ConnectionKey, Http2ClientConnectionHandler> selectCache(){
return http2Client.clientConfig().shareConnectionCache() ? GLOBAL_CACHE : localCache;
}

private LruCache<ConnectionKey, Boolean> selectH2SupportCache(){
return http2Client.clientConfig().shareConnectionCache() ? HTTP2_SUPPORTED : http2Supported;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ void testConnectionQueueDequeue() {
Http1ClientImpl http1Client = new Http1ClientImpl(webClient, clientConfig);
connectionNow = http1Client
.connectionCache()
.connection(injectedHttp1client.prototype().tls(),
.connection(http1Client,
injectedHttp1client.prototype().tls(),
Proxy.noProxy(),
request.resolvedUri(),
request.headers(),
Expand Down Expand Up @@ -260,7 +261,8 @@ void testConnectionQueueSizeLimit() {

connectionList.add(http1Client
.connectionCache()
.connection(clientConfig.tls(),
.connection(http1Client,
clientConfig.tls(),
Proxy.noProxy(),
request.resolvedUri(),
request.headers(),
Expand All @@ -286,7 +288,8 @@ void testConnectionQueueSizeLimit() {

connection = http1Client
.connectionCache()
.connection(clientConfig.tls(),
.connection(http1Client,
clientConfig.tls(),
Proxy.noProxy(),
request.resolvedUri(),
request.headers(),
Expand All @@ -313,7 +316,8 @@ void testConnectionQueueSizeLimit() {

ClientConnection connectionNow = http1Client
.connectionCache()
.connection(clientConfig.tls(),
.connection(http1Client,
clientConfig.tls(),
Proxy.noProxy(),
request.resolvedUri(),
request.headers(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ private boolean mayHaveEntity(Http.Status responseStatus, ClientResponseHeaders

private ClientConnection obtainConnection(WebClientServiceRequest request) {
return http1Client.connectionCache()
.connection(tls,
.connection(http1Client,
tls,
proxy,
request.uri(),
request.headers(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ class Http1ClientImpl implements Http1Client, HttpClientSpi {
private final WebClient webClient;
private final Http1ClientConfig clientConfig;
private final Http1ClientProtocolConfig protocolConfig;
private final Http1ConnectionCache connectionCache = new Http1ConnectionCache(this);
private final Http1ConnectionCache connectionCache;

Http1ClientImpl(WebClient webClient, Http1ClientConfig clientConfig) {
this.webClient = webClient;
this.clientConfig = clientConfig;
this.protocolConfig = clientConfig.protocolConfig();
if (clientConfig.shareConnectionCache()) {
this.connectionCache = Http1ConnectionCache.shared();
} else {
this.connectionCache = Http1ConnectionCache.create();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

Expand All @@ -44,28 +43,31 @@ class Http1ConnectionCache {
private static final System.Logger LOGGER = System.getLogger(Http1ConnectionCache.class.getName());
private static final Tls NO_TLS = Tls.builder().enabled(false).build();
private static final String HTTPS = "https";
private static final Map<ConnectionKey, LinkedBlockingDeque<TcpClientConnection>> GLOBAL_CHANNEL_CACHE =
new ConcurrentHashMap<>();
private static final Http1ConnectionCache SHARED = create();
private static final List<String> ALPN_ID = List.of(Http1Client.PROTOCOL_ID);
private static final Duration QUEUE_TIMEOUT = Duration.ofMillis(10);
private final Map<ConnectionKey, LinkedBlockingDeque<TcpClientConnection>> localConnectionCache = new HashMap<>();
private final Http1ClientImpl http1Client;
private final Map<ConnectionKey, LinkedBlockingDeque<TcpClientConnection>> cache = new HashMap<>();

Http1ConnectionCache(Http1ClientImpl http1Client) {
this.http1Client = http1Client;
static Http1ConnectionCache shared() {
return SHARED;
}

ClientConnection connection(Tls tls,
static Http1ConnectionCache create() {
return new Http1ConnectionCache();
}

ClientConnection connection(Http1ClientImpl http1Client,
Tls tls,
Proxy proxy,
ClientUri uri,
ClientRequestHeaders headers,
boolean defaultKeepAlive) {
boolean keepAlive = handleKeepAlive(defaultKeepAlive, headers);
Tls effectiveTls = HTTPS.equals(uri.scheme()) ? tls : NO_TLS;
if (keepAlive) {
return keepAliveConnection(effectiveTls, uri, proxy);
return keepAliveConnection(http1Client, effectiveTls, uri, proxy);
} else {
return oneOffConnection(effectiveTls, uri, proxy);
return oneOffConnection(http1Client, effectiveTls, uri, proxy);
}
}

Expand All @@ -84,7 +86,8 @@ private boolean handleKeepAlive(boolean defaultKeepAlive, WritableHeaders<?> hea
return false;
}

private ClientConnection keepAliveConnection(Tls tls,
private ClientConnection keepAliveConnection(Http1ClientImpl http1Client,
Tls tls,
ClientUri uri,
Proxy proxy) {
Http1ClientConfig clientConfig = http1Client.clientConfig();
Expand All @@ -98,9 +101,8 @@ private ClientConnection keepAliveConnection(Tls tls,
proxy);

LinkedBlockingDeque<TcpClientConnection> connectionQueue =
(clientConfig.shareConnectionCache() ? GLOBAL_CHANNEL_CACHE : localConnectionCache)
.computeIfAbsent(connectionKey,
it -> new LinkedBlockingDeque<>(clientConfig.connectionCacheSize()));
cache.computeIfAbsent(connectionKey,
it -> new LinkedBlockingDeque<>(clientConfig.connectionCacheSize()));

TcpClientConnection connection;
while ((connection = connectionQueue.poll()) != null && !connection.isConnected()) {
Expand All @@ -124,7 +126,8 @@ private ClientConnection keepAliveConnection(Tls tls,
return connection;
}

private ClientConnection oneOffConnection(Tls tls,
private ClientConnection oneOffConnection(Http1ClientImpl http1Client,
Tls tls,
ClientUri uri,
Proxy proxy) {

Expand Down

0 comments on commit add0ccc

Please sign in to comment.