diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallChainBase.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallChainBase.java index 58de9afbad3..40d30132f8d 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallChainBase.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallChainBase.java @@ -36,7 +36,6 @@ import io.helidon.nima.webclient.api.HttpClientConfig; import io.helidon.nima.webclient.api.HttpClientResponse; import io.helidon.nima.webclient.api.ReleasableResource; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.api.WebClientServiceRequest; import io.helidon.nima.webclient.api.WebClientServiceResponse; import io.helidon.nima.webclient.http1.Http1ClientRequest; @@ -49,9 +48,8 @@ abstract class Http2CallChainBase implements WebClientService.Chain { private static final Tls NO_TLS = Tls.builder().enabled(false).build(); - private final WebClient webClient; + private final Http2ClientImpl http2Client; private final HttpClientConfig clientConfig; - private final Http2ClientProtocolConfig protocolConfig; private final Http2ClientRequestImpl clientRequest; private final Function http1EntityHandler; private final CompletableFuture whenComplete; @@ -62,16 +60,13 @@ abstract class Http2CallChainBase implements WebClientService.Chain { private Http.Status responseStatus; private Http2ConnectionAttemptResult.Result result; - Http2CallChainBase(WebClient webClient, - HttpClientConfig clientConfig, - Http2ClientProtocolConfig protocolConfig, + Http2CallChainBase(Http2ClientImpl http2Client, Http2ClientRequestImpl clientRequest, CompletableFuture whenComplete, Function http1EntityHandler) { - this.webClient = webClient; - this.clientConfig = clientConfig; - this.protocolConfig = protocolConfig; + this.http2Client = http2Client; + this.clientConfig = http2Client.clientConfig(); this.clientRequest = clientRequest; this.whenComplete = whenComplete; this.http1EntityHandler = http1EntityHandler; @@ -86,23 +81,26 @@ public WebClientServiceResponse proceed(WebClientServiceRequest serviceRequest) requestHeaders.remove(Http.HeaderNames.CONNECTION, LogHeaderConsumer.INSTANCE); requestHeaders.setIfAbsent(USER_AGENT_HEADER); - Http2ConnectionAttemptResult result = Http2ConnectionCache.newStream(webClient, - protocolConfig, - connectionKey(serviceRequest), - clientRequest, - uri, - http1EntityHandler); + ConnectionKey connectionKey = connectionKey(serviceRequest); + + Http2ConnectionAttemptResult result = http2Client.connectionCache() + .newStream(connectionKey, clientRequest, uri, http1EntityHandler); this.result = result.result(); - if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) { - // ALPN, prior knowledge, or upgrade success - this.stream = result.stream(); - return doProceed(serviceRequest, requestHeaders, result.stream()); - } else { - // upgrade failed - this.response = result.response(); - return doProceed(serviceRequest, result.response()); + try { + if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) { + // ALPN, prior knowledge, or upgrade success + this.stream = result.stream(); + return doProceed(serviceRequest, requestHeaders, result.stream()); + } else { + // upgrade failed + this.response = result.response(); + return doProceed(serviceRequest, result.response()); + } + } catch (StreamTimeoutException e){ + http2Client.connectionCache().remove(connectionKey); + throw e; } } diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallEntityChain.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallEntityChain.java index b7b3baefe35..cc9a8555597 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallEntityChain.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallEntityChain.java @@ -26,8 +26,6 @@ import io.helidon.nima.http.media.EntityWriter; import io.helidon.nima.http2.Http2Headers; import io.helidon.nima.webclient.api.ClientUri; -import io.helidon.nima.webclient.api.HttpClientConfig; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.api.WebClientServiceRequest; import io.helidon.nima.webclient.api.WebClientServiceResponse; @@ -35,14 +33,12 @@ class Http2CallEntityChain extends Http2CallChainBase { private final CompletableFuture whenSent; private final Object entity; - Http2CallEntityChain(WebClient webClient, - HttpClientConfig clientConfig, - Http2ClientProtocolConfig protocolConfig, + Http2CallEntityChain(Http2ClientImpl http2Client, Http2ClientRequestImpl request, CompletableFuture whenSent, CompletableFuture whenComplete, Object entity) { - super(webClient, clientConfig, protocolConfig, request, whenComplete, it -> it.submit(entity)); + super(http2Client, request, whenComplete, it -> it.submit(entity)); this.whenSent = whenSent; this.entity = entity; } diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallOutputStreamChain.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallOutputStreamChain.java index 96200c2c6f1..74f56820f05 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallOutputStreamChain.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2CallOutputStreamChain.java @@ -24,8 +24,6 @@ import io.helidon.nima.http2.Http2Headers; import io.helidon.nima.webclient.api.ClientRequest; import io.helidon.nima.webclient.api.ClientUri; -import io.helidon.nima.webclient.api.HttpClientConfig; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.api.WebClientServiceRequest; import io.helidon.nima.webclient.api.WebClientServiceResponse; @@ -33,16 +31,12 @@ class Http2CallOutputStreamChain extends Http2CallChainBase { private final CompletableFuture whenSent; private final ClientRequest.OutputStreamHandler streamHandler; - Http2CallOutputStreamChain(WebClient webClient, + Http2CallOutputStreamChain(Http2ClientImpl http2Client, Http2ClientRequestImpl http2ClientRequest, - HttpClientConfig clientConfig, - Http2ClientProtocolConfig protocolConfig, CompletableFuture whenSent, CompletableFuture whenComplete, ClientRequest.OutputStreamHandler streamHandler) { - super(webClient, - clientConfig, - protocolConfig, + super(http2Client, http2ClientRequest, whenComplete, req -> req.outputStream(streamHandler)); diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientConnection.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientConnection.java index 12391bea37f..abac2907bc3 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientConnection.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientConnection.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -49,11 +48,11 @@ import io.helidon.nima.http2.Http2RstStream; import io.helidon.nima.http2.Http2Setting; import io.helidon.nima.http2.Http2Settings; +import io.helidon.nima.http2.Http2StreamState; import io.helidon.nima.http2.Http2Util; import io.helidon.nima.http2.Http2WindowUpdate; import io.helidon.nima.http2.WindowSize; import io.helidon.nima.webclient.api.ClientConnection; -import io.helidon.nima.webclient.api.WebClient; import static java.lang.System.Logger.Level.DEBUG; import static java.lang.System.Logger.Level.TRACE; @@ -61,12 +60,7 @@ class Http2ClientConnection { private static final System.Logger LOGGER = System.getLogger(Http2ClientConnection.class.getName()); - private static final ExecutorService TMP_EXECUTOR = Executors.newFixedThreadPool(10, - Thread.ofPlatform() - .name("helidon-tmp-client-", 0) - .factory()); private static final int FRAME_HEADER_LENGTH = 9; - private final Http2FrameListener sendListener = new Http2LoggingFrameListener("cl-send"); private final Http2FrameListener recvListener = new Http2LoggingFrameListener("cl-recv"); private final LockingStreamIdSequence streamIdSeq = new LockingStreamIdSequence(); @@ -81,11 +75,18 @@ class Http2ClientConnection { private final Http2ConnectionWriter writer; private final DataReader reader; private final DataWriter dataWriter; + private volatile int lastStreamId; private Http2Settings serverSettings = Http2Settings.builder() .build(); private Future handleTask; + private volatile boolean closed = false; + + public boolean closed(){ + return closed; + } + Http2ClientConnection(Http2ClientProtocolConfig protocolConfig, ClientConnection connection) { this.connectionFlowControl = ConnectionFlowControl.clientBuilder(this::writeWindowsUpdate) .maxFrameSize(protocolConfig.maxFrameSize()) @@ -99,13 +100,12 @@ class Http2ClientConnection { this.writer = new Http2ConnectionWriter(connection.helidonSocket(), connection.writer(), List.of()); } - static Http2ClientConnection create(WebClient webClient, - Http2ClientProtocolConfig protocolConfig, + static Http2ClientConnection create(Http2ClientImpl http2Client, ClientConnection connection, boolean sendSettings) { - Http2ClientConnection h2conn = new Http2ClientConnection(protocolConfig, connection); - h2conn.start(protocolConfig, webClient.executor(), sendSettings); + Http2ClientConnection h2conn = new Http2ClientConnection(http2Client.protocolConfig(), connection); + h2conn.start(http2Client.protocolConfig(), http2Client.webClient().executor(), sendSettings); return h2conn; } @@ -135,11 +135,12 @@ Http2ClientStream stream(int streamId) { Http2ClientStream createStream(Http2StreamConfig config) { //FIXME: priority - return new Http2ClientStream(this, - serverSettings, - ctx, - config.timeout(), - streamIdSeq); + Http2ClientStream stream = new Http2ClientStream(this, + serverSettings, + ctx, + config.timeout(), + streamIdSeq); + return stream; } void addStream(int streamId, Http2ClientStream stream) { @@ -171,7 +172,12 @@ Http2ClientStream tryStream(Http2StreamConfig config) { } } + void updateLastStreamId(int lastStreamId){ + this.lastStreamId = lastStreamId; + } + void close() { + closed = true; try { handleTask.cancel(true); ctx.log(LOGGER, TRACE, "Closing connection"); @@ -193,7 +199,9 @@ static Http2Settings settings(Http2ClientProtocolConfig config) { } private void sendPreface(Http2ClientProtocolConfig config, boolean sendSettings) { - dataWriter.writeNow(Http2Util.prefaceData()); + BufferData prefaceData = Http2Util.prefaceData(); + sendListener.frame(ctx, 0, prefaceData); + dataWriter.writeNow(prefaceData); if (sendSettings) { // §3.5 Preface bytes must be followed by setting frame Http2Settings http2Settings = settings(config); @@ -217,8 +225,7 @@ private void start(Http2ClientProtocolConfig protocolConfig, boolean sendSettings) { CountDownLatch cdl = new CountDownLatch(1); - // handleTask = executor.submit(() -> { - handleTask = TMP_EXECUTOR.submit(() -> { + handleTask = executor.submit(() -> { ctx.log(LOGGER, TRACE, "Starting HTTP/2 connection, thread: %s", Thread.currentThread().getName()); try { sendPreface(protocolConfig, sendSettings); @@ -233,6 +240,7 @@ private void start(Http2ClientProtocolConfig protocolConfig, try { while (!Thread.interrupted()) { if (!handle()) { + closed = true; ctx.log(LOGGER, TRACE, "Connection closed"); return; } @@ -253,7 +261,23 @@ private void start(Http2ClientProtocolConfig protocolConfig, } private void writeWindowsUpdate(int streamId, Http2WindowUpdate windowUpdateFrame) { - writer.write(windowUpdateFrame.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create())); + if (streamId == 0){ + writer.write(windowUpdateFrame.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create())); + return; + } + if (streamId < lastStreamId) { + for (var s : streams.values()) { + if (s.streamId() > streamId && s.streamState() != Http2StreamState.IDLE) { + // RC against parallel newer streams, data already buffered at client being read + // There is no need to do request for more data as stream is no more usable + return; + } + } + } + Http2ClientStream stream = stream(streamId); + if (stream != null && !stream.streamState().equals(Http2StreamState.CLOSED)) { + writer.write(windowUpdateFrame.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create())); + } } private boolean handle() { diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientConnectionHandler.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientConnectionHandler.java index b941e30fdaf..84e720ef167 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientConnectionHandler.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientConnectionHandler.java @@ -71,28 +71,26 @@ class Http2ClientConnectionHandler { } void close() { - // this is to prevent concurrent modification (connections remove themself from the map) + // this is to prevent concurrent modification (connections remove themselves from the map) Set toClose = new HashSet<>(allConnections.keySet()); toClose.forEach(Http2ClientConnection::close); this.activeConnection.set(null); this.allConnections.clear(); } - Http2ConnectionAttemptResult newStream(WebClient webClient, - Http2ClientProtocolConfig protocolConfig, + Http2ConnectionAttemptResult newStream(Http2ClientImpl http2Client, Http2ClientRequestImpl request, ClientUri initialUri, Function http1EntityHandler) { return switch (result.get()) { - case HTTP_1 -> http1(webClient, protocolConfig, request, initialUri, http1EntityHandler); - case HTTP_2 -> http2(webClient, protocolConfig, request, initialUri); - case UNKNOWN -> httpX(webClient, protocolConfig, request, initialUri, http1EntityHandler); + case HTTP_1 -> http1(http2Client, request, initialUri, http1EntityHandler); + case HTTP_2 -> http2(http2Client, request, initialUri); + case UNKNOWN -> httpX(http2Client, request, initialUri, http1EntityHandler); }; } - Http2ConnectionAttemptResult http2(WebClient webClient, - Http2ClientProtocolConfig protocolConfig, + Http2ConnectionAttemptResult http2(Http2ClientImpl http2Client, Http2ClientRequestImpl request, ClientUri initialUri) { try { @@ -102,17 +100,17 @@ Http2ConnectionAttemptResult http2(WebClient webClient, } try { // read/write lock to obtain a stream or create a new connection - Http2ClientConnection conn = activeConnection.get(); + Http2ClientConnection conn = activeConnection.updateAndGet(c -> c != null && c.closed() ? null : c); Http2ClientStream stream; if (conn == null) { - conn = createConnection(webClient, protocolConfig, request, initialUri); + conn = createConnection(http2Client, request, initialUri); // we must assume that a new connection can handle a new stream stream = conn.createStream(request); } else { stream = conn.tryStream(request); if (stream == null) { // either the connection is closed, or it ran out of streams - conn = createConnection(webClient, protocolConfig, request, initialUri); + conn = createConnection(http2Client, request, initialUri); stream = conn.createStream(request); } } @@ -123,8 +121,7 @@ Http2ConnectionAttemptResult http2(WebClient webClient, } } - private Http2ConnectionAttemptResult httpX(WebClient webClient, - Http2ClientProtocolConfig protocolConfig, + private Http2ConnectionAttemptResult httpX(Http2ClientImpl http2Client, Http2ClientRequestImpl request, ClientUri initialUri, Function http1EntityHandler) { @@ -134,6 +131,7 @@ private Http2ConnectionAttemptResult httpX(WebClient webClient, throw new IllegalStateException("Interrupted", e); } try { + WebClient webClient = http2Client.webClient(); if (request.tls().enabled() && "https".equals(initialUri.scheme())) { // use ALPN, not upgrade, if prior, only h2, otherwise both List alpn; @@ -147,18 +145,17 @@ private Http2ConnectionAttemptResult httpX(WebClient webClient, if (Http2Client.PROTOCOL_ID.equals(tcpClientConnection.helidonSocket().protocol())) { result.set(Result.HTTP_2); // this should always be true - Http2ClientConnection connection = Http2ClientConnection.create(webClient, - protocolConfig, + Http2ClientConnection connection = Http2ClientConnection.create(http2Client, tcpClientConnection, true); allConnections.put(connection, true); h2ConnByConn.put(tcpClientConnection, connection); this.activeConnection.set(connection); - return http2(webClient, protocolConfig, request, initialUri); + return http2(http2Client, request, initialUri); } else { result.set(Result.HTTP_1); request.connection(tcpClientConnection); - return http1(webClient, protocolConfig, request, initialUri, http1EntityHandler); + return http1(http2Client, request, initialUri, http1EntityHandler); } } else { // this should not really happen, as H2 is depending on ALPN, but let's support it anyway, and hope we can @@ -168,27 +165,26 @@ private Http2ConnectionAttemptResult httpX(WebClient webClient, } if (result.get() != Result.UNKNOWN) { - return http2(webClient, protocolConfig, request, initialUri); + return http2(http2Client, request, initialUri); } // we need to connect if (request.priorKnowledge()) { // there is no fallback to HTTP/1 with prior knowledge - it must work or fail - return http2(webClient, protocolConfig, request, initialUri); + return http2(http2Client, request, initialUri); } // attempt an upgrade to HTTP/2 UpgradeResponse upgradeResponse = http1Request(webClient, request, initialUri) .header(UPGRADE_HEADER) .header(CONNECTION_UPGRADE_HEADER) - .header(HTTP2_SETTINGS_HEADER, settingsForUpgrade(protocolConfig)) + .header(HTTP2_SETTINGS_HEADER, settingsForUpgrade(http2Client.protocolConfig())) .upgrade("h2c"); if (upgradeResponse.isUpgraded()) { result.set(Result.HTTP_2); - Http2ClientConnection conn = Http2ClientConnection.create(webClient, - protocolConfig, + Http2ClientConnection conn = Http2ClientConnection.create(http2Client, upgradeResponse.connection(), false); activeConnection.set(conn); - return http2(webClient, protocolConfig, request, initialUri); + return http2(http2Client, request, initialUri); } else { result.set(Result.HTTP_1); return new Http2ConnectionAttemptResult(Result.HTTP_1, @@ -209,13 +205,15 @@ private String settingsForUpgrade(Http2ClientProtocolConfig protocolConfig) { return Base64.getEncoder().encodeToString(b); } - private Http2ConnectionAttemptResult http1(WebClient webClient, - Http2ClientProtocolConfig protocolConfig, + private Http2ConnectionAttemptResult http1(Http2ClientImpl http2Client, Http2ClientRequestImpl request, ClientUri initialUri, Function http1EntityHandler) { return new Http2ConnectionAttemptResult(Result.HTTP_1, null, - http1EntityHandler.apply(http1Request(webClient, request, initialUri))); + http1EntityHandler.apply(http1Request( + http2Client.webClient(), + request, + initialUri))); } private Http1ClientRequest http1Request(WebClient webClient, Http2ClientRequestImpl request, ClientUri initialUri) { @@ -232,28 +230,29 @@ private Http1ClientRequest http1Request(WebClient webClient, Http2ClientRequestI .followRedirects(request.followRedirects()); } - private Http2ClientConnection createConnection(WebClient webClient, - Http2ClientProtocolConfig protocolConfig, + private Http2ClientConnection createConnection(Http2ClientImpl http2Client, Http2ClientRequestImpl request, ClientUri requestUri) { + WebClient webClient = http2Client.webClient(); + Http2ClientProtocolConfig protocolConfig = http2Client.protocolConfig(); Optional maybeConnection = request.connection(); Http2ClientConnection usedConnection; if (maybeConnection.isPresent()) { // TLS is ignored (we cannot do a TLS negotiation on a connected connection) // we cannot cache this connection, it will be a one-off - usedConnection = Http2ClientConnection.create(webClient, protocolConfig, maybeConnection.get(), true); + usedConnection = Http2ClientConnection.create(http2Client, maybeConnection.get(), true); } else { ClientConnection connection; // we know that this is HTTP/2 capable server - still need to support all three (prior, upgrade, alpn) if (request.tls().enabled() && "https".equals(requestUri.scheme())) { connection = connectClient(webClient, List.of(Http2Client.PROTOCOL_ID)); - usedConnection = Http2ClientConnection.create(webClient, protocolConfig, connection, true); + usedConnection = Http2ClientConnection.create(http2Client, connection, true); } else { if (request.priorKnowledge()) { connection = connectClient(webClient, List.of(Http2Client.PROTOCOL_ID)); - usedConnection = Http2ClientConnection.create(webClient, protocolConfig, connection, true); + usedConnection = Http2ClientConnection.create(http2Client, connection, true); } else { // attempt an upgrade to HTTP/2 UpgradeResponse upgradeResponse = http1Request(webClient, request, requestUri) @@ -264,7 +263,7 @@ private Http2ClientConnection createConnection(WebClient webClient, if (upgradeResponse.isUpgraded()) { result.set(Result.HTTP_2); connection = upgradeResponse.connection(); - usedConnection = Http2ClientConnection.create(webClient, protocolConfig, connection, false); + usedConnection = Http2ClientConnection.create(http2Client, connection, false); } else { try (HttpClientResponse response = upgradeResponse.response()) { if (LOGGER.isLoggable(TRACE)) { diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientImpl.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientImpl.java index a94025756ff..24c0733fdf6 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientImpl.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientImpl.java @@ -26,12 +26,13 @@ import io.helidon.nima.webclient.spi.HttpClientSpi; class Http2ClientImpl implements Http2Client, HttpClientSpi { - private final WebClient client; + private final WebClient webClient; private final Http2ClientConfig clientConfig; private final Http2ClientProtocolConfig protocolConfig; + private final Http2ConnectionCache connectionCache = new Http2ConnectionCache(this); - Http2ClientImpl(WebClient client, Http2ClientConfig clientConfig) { - this.client = client; + Http2ClientImpl(WebClient webClient, Http2ClientConfig clientConfig) { + this.webClient = webClient; this.clientConfig = clientConfig; this.protocolConfig = clientConfig.protocolConfig(); } @@ -45,7 +46,7 @@ public Http2ClientRequest method(Http.Method method) { UriQueryWriteable query = UriQueryWriteable.create(); clientConfig.baseQuery().ifPresent(query::from); - return new Http2ClientRequestImpl(client, clientConfig, protocolConfig, method, clientUri, clientConfig.properties()); + return new Http2ClientRequestImpl(this, method, clientUri, clientConfig.properties()); } @Override @@ -62,7 +63,7 @@ public SupportLevel supports(FullClientRequest clientRequest, ClientUri clien clientConfig.dnsResolver(), clientConfig.dnsAddressLookup(), clientRequest.proxy()); - if (Http2ConnectionCache.supports(ck)) { + if (connectionCache.supports(ck)) { return SupportLevel.SUPPORTED; } @@ -71,9 +72,7 @@ public SupportLevel supports(FullClientRequest clientRequest, ClientUri clien @Override public ClientRequest clientRequest(FullClientRequest clientRequest, ClientUri clientUri) { - Http2ClientRequest request = new Http2ClientRequestImpl(client, - clientConfig, - protocolConfig, + Http2ClientRequest request = new Http2ClientRequestImpl(this, clientRequest.method(), clientUri, clientRequest.properties()); @@ -89,4 +88,20 @@ public ClientRequest clientRequest(FullClientRequest clientRequest, Client .headers(clientRequest.headers()) .fragment(clientUri.fragment()); } + + WebClient webClient() { + return webClient; + } + + Http2ClientConfig clientConfig() { + return clientConfig; + } + + Http2ClientProtocolConfig protocolConfig() { + return protocolConfig; + } + + Http2ConnectionCache connectionCache(){ + return connectionCache; + } } diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientRequestImpl.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientRequestImpl.java index add99954244..f0808622291 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientRequestImpl.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientRequestImpl.java @@ -26,41 +26,40 @@ import io.helidon.nima.webclient.api.ClientRequestBase; import io.helidon.nima.webclient.api.ClientUri; import io.helidon.nima.webclient.api.FullClientRequest; -import io.helidon.nima.webclient.api.HttpClientConfig; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.api.WebClientServiceRequest; import io.helidon.nima.webclient.api.WebClientServiceResponse; class Http2ClientRequestImpl extends ClientRequestBase implements Http2ClientRequest, Http2StreamConfig, FullClientRequest { - private final WebClient webClient; - private final Http2ClientProtocolConfig protocolConfig; - + private final Http2ClientImpl http2Client; private int priority; private boolean priorKnowledge; private int requestPrefetch = 0; private Duration flowControlTimeout = Duration.ofMillis(100); private Duration timeout = Duration.ofSeconds(10); - Http2ClientRequestImpl(WebClient webClient, - HttpClientConfig clientConfig, - Http2ClientProtocolConfig protocolConfig, + Http2ClientRequestImpl(Http2ClientImpl http2Client, Http.Method method, ClientUri clientUri, Map properties) { - super(clientConfig, webClient.cookieManager(), Http2Client.PROTOCOL_ID, method, clientUri, properties); - + super(http2Client.clientConfig(), + http2Client.webClient().cookieManager(), + Http2Client.PROTOCOL_ID, + method, + clientUri, + properties); + + this.http2Client = http2Client; + Http2ClientProtocolConfig protocolConfig = http2Client.protocolConfig(); this.priorKnowledge = protocolConfig.priorKnowledge(); - this.webClient = webClient; - this.protocolConfig = protocolConfig; } Http2ClientRequestImpl(Http2ClientRequestImpl request, Http.Method method, ClientUri clientUri, Map properties) { - this(request.webClient, request.clientConfig(), request.protocolConfig, method, clientUri, properties); + this(request.http2Client, method, clientUri, properties); followRedirects(request.followRedirects()); maxRedirects(request.maxRedirects()); @@ -118,10 +117,8 @@ public Http2ClientResponse doSubmit(Object entity) { public Http2ClientResponse doOutputStream(OutputStreamHandler streamHandler) { CompletableFuture whenSent = new CompletableFuture<>(); CompletableFuture whenComplete = new CompletableFuture<>(); - Http2CallChainBase callChain = new Http2CallOutputStreamChain(webClient, + Http2CallChainBase callChain = new Http2CallOutputStreamChain(http2Client, this, - clientConfig(), - protocolConfig, whenSent, whenComplete, streamHandler); @@ -200,9 +197,7 @@ private Http2ClientResponse invokeEntityFollowRedirects(Object entity) { private Http2ClientResponseImpl invokeEntity(Object entity) { CompletableFuture whenSent = new CompletableFuture<>(); CompletableFuture whenComplete = new CompletableFuture<>(); - Http2CallChainBase httpCall = new Http2CallEntityChain(webClient, - clientConfig(), - protocolConfig, + Http2CallChainBase httpCall = new Http2CallEntityChain(http2Client, this, whenSent, whenComplete, diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientResponseImpl.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientResponseImpl.java index a502cc3bbac..1cddbf2ec8f 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientResponseImpl.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientResponseImpl.java @@ -20,6 +20,7 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import io.helidon.common.buffers.BufferData; import io.helidon.common.http.ClientRequestHeaders; @@ -40,6 +41,8 @@ class Http2ClientResponseImpl implements Http2ClientResponse { private final MediaContext mediaContext; private final ClientUri lastEndpointUri; + private final AtomicBoolean closed = new AtomicBoolean(false); + Http2ClientResponseImpl(Http.Status status, ClientRequestHeaders requestHeaders, ClientResponseHeaders responseHeaders, @@ -105,7 +108,9 @@ public ClientUri lastEndpointUri() { @Override public void close() { - complete.complete(null); - closeResponseRunnable.run(); + if (!closed.getAndSet(true)) { + complete.complete(null); + closeResponseRunnable.run(); + } } } diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientStream.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientStream.java index c35f8399bb1..ad54622890e 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientStream.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ClientStream.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Set; import io.helidon.common.buffers.BufferData; import io.helidon.common.socket.SocketContext; @@ -52,6 +53,7 @@ class Http2ClientStream implements Http2Stream, ReleasableResource { private static final System.Logger LOGGER = System.getLogger(Http2ClientStream.class.getName()); + private static final Set NON_CANCELABLE = Set.of(Http2StreamState.CLOSED, Http2StreamState.IDLE); private final Http2ClientConnection connection; private final Http2Settings serverSettings; private final SocketContext ctx; @@ -167,6 +169,9 @@ boolean hasEntity() { } void cancel() { + if (NON_CANCELABLE.contains(state)) { + return; + } Http2RstStream rstStream = new Http2RstStream(Http2ErrorCode.CANCEL); Http2FrameData frameData = rstStream.toFrameData(settings, streamId, Http2Flag.NoFlags.create()); sendListener.frameHeader(ctx, streamId, frameData.header()); @@ -224,6 +229,7 @@ void write(Http2Headers http2Headers, boolean endOfStream) { // §5.1.1 - The identifier of a newly established stream MUST be numerically // greater than all streams that the initiating endpoint has opened or reserved. this.streamId = streamIdSeq.lockAndNext(); + this.connection.updateLastStreamId(streamId); this.buffer = new StreamBuffer(streamId, timeout); // fixme Configurable initial win size, max frame size diff --git a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ConnectionCache.java b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ConnectionCache.java index 429dfc4eb49..fcb3647fae2 100644 --- a/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ConnectionCache.java +++ b/nima/http2/webclient/src/main/java/io/helidon/nima/http2/webclient/Http2ConnectionCache.java @@ -23,7 +23,6 @@ import io.helidon.common.configurable.LruCache; import io.helidon.nima.webclient.api.ClientUri; import io.helidon.nima.webclient.api.ConnectionKey; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.http1.Http1ClientRequest; import io.helidon.nima.webclient.http1.Http1ClientResponse; @@ -32,39 +31,55 @@ final class Http2ConnectionCache { private static final LruCache HTTP2_SUPPORTED = LruCache.builder() .capacity(1000) .build(); - private static final Map CHANNEL_CACHE = new ConcurrentHashMap<>(); + private final LruCache http2Supported = LruCache.builder() + .capacity(1000) + .build(); + private static final Map GLOBAL_CACHE = new ConcurrentHashMap<>(); + private final Map localCache = new ConcurrentHashMap<>(); + private final Http2ClientImpl http2Client; - private Http2ConnectionCache() { + Http2ConnectionCache(Http2ClientImpl http2Client) { + this.http2Client = http2Client; } - static boolean supports(ConnectionKey ck) { - return HTTP2_SUPPORTED.get(ck).isPresent(); + boolean supports(ConnectionKey ck) { + return selectH2SupportCache().get(ck).isPresent(); } static void clear() { HTTP2_SUPPORTED.clear(); - CHANNEL_CACHE.forEach((c, c2) -> c2.close()); + GLOBAL_CACHE.forEach((c, c2) -> c2.close()); + } + + void remove(ConnectionKey connectionKey) { + selectCache().remove(connectionKey); + selectH2SupportCache().remove(connectionKey); } - static Http2ConnectionAttemptResult newStream(WebClient webClient, - Http2ClientProtocolConfig protocolConfig, - ConnectionKey connectionKey, + Http2ConnectionAttemptResult newStream(ConnectionKey connectionKey, Http2ClientRequestImpl request, ClientUri initialUri, Function http1EntityHandler) { // this statement locks all threads - must not do anything complicated (just create a new instance) - Http2ConnectionAttemptResult result = CHANNEL_CACHE.computeIfAbsent(connectionKey, - Http2ClientConnectionHandler::new) + Http2ConnectionAttemptResult result = + selectCache().computeIfAbsent(connectionKey, Http2ClientConnectionHandler::new) // this statement may block a single connection key - .newStream(webClient, - protocolConfig, + .newStream(http2Client, request, initialUri, http1EntityHandler); if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) { - HTTP2_SUPPORTED.put(connectionKey, true); + selectH2SupportCache().put(connectionKey, true); } return result; } + + private Map selectCache(){ + return http2Client.clientConfig().shareConnectionCache() ? GLOBAL_CACHE : localCache; + } + + private LruCache selectH2SupportCache(){ + return http2Client.clientConfig().shareConnectionCache() ? HTTP2_SUPPORTED : http2Supported; + } } diff --git a/nima/http2/webclient/src/test/java/io/helidon/nima/http2/webclient/Http2WebClientTest.java b/nima/http2/webclient/src/test/java/io/helidon/nima/http2/webclient/Http2WebClientTest.java index ca4ffcdafe1..7461e353786 100644 --- a/nima/http2/webclient/src/test/java/io/helidon/nima/http2/webclient/Http2WebClientTest.java +++ b/nima/http2/webclient/src/test/java/io/helidon/nima/http2/webclient/Http2WebClientTest.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Stream; import io.helidon.common.LazyValue; @@ -37,12 +39,15 @@ import io.helidon.nima.http2.webserver.Http2Route; import io.helidon.nima.testing.junit5.webserver.ServerTest; import io.helidon.nima.testing.junit5.webserver.SetUpServer; +import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webserver.WebServer; import io.helidon.nima.webserver.WebServerConfig; import io.helidon.nima.webserver.http1.Http1Route; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -63,22 +68,36 @@ class Http2WebClientTest { private static final Http.HeaderName CLIENT_USER_AGENT_HEADER_NAME = Http.HeaderNames.create("client-user-agent"); private static ExecutorService executorService; private static int plainPort; - private static final LazyValue priorKnowledgeClient = LazyValue.create(() -> Http2Client.builder() + private static int tlsPort; + private static Supplier localCacheClient = () -> Http2Client.builder() + .shareConnectionCache(false) + .connectTimeout(Duration.ofMinutes(10)) + .baseUri("http://localhost:" + plainPort + "/versionspecific") + .build(); + private static final Supplier globalCacheClient = () -> Http2Client.builder() + .shareConnectionCache(true) + .connectTimeout(Duration.ofMinutes(10)) + .baseUri("http://localhost:" + plainPort + "/versionspecific") + .build(); + private static final Supplier priorKnowledgeClient = () -> Http2Client.builder() + .shareConnectionCache(false) + .connectTimeout(Duration.ofMinutes(10)) .protocolConfig(pc -> pc.priorKnowledge(true)) .baseUri("http://localhost:" + plainPort + "/versionspecific") - .build()); - private static final LazyValue upgradeClient = LazyValue.create(() -> Http2Client.builder() + .build(); + private static final Supplier upgradeClient = () -> Http2Client.builder() + .shareConnectionCache(false) .baseUri("http://localhost:" + plainPort + "/versionspecific") - .build()); - private static int tlsPort; - private static final LazyValue tlsClient = LazyValue.create(() -> Http2Client.builder() + .build(); + private static final Supplier tlsClient = () -> Http2Client.builder() + .shareConnectionCache(false) .baseUri("https://localhost:" + tlsPort + "/versionspecific") .tls(Tls.builder() .enabled(true) .trustAll(true) .endpointIdentificationAlgorithm(Tls.ENDPOINT_IDENTIFICATION_NONE) .build()) - .build()); + .build(); Http2WebClientTest(WebServer server) { plainPort = server.port(); @@ -87,7 +106,7 @@ class Http2WebClientTest { @SetUpServer static void setUpServer(WebServerConfig.Builder serverBuilder) { - executorService = Executors.newFixedThreadPool(5); + executorService = Executors.newFixedThreadPool(10); Keys privateKeyConfig = Keys.builder() @@ -151,6 +170,8 @@ static void setUpServer(WebServerConfig.Builder serverBuilder) { static Stream clientTypes() { return Stream.of( + Arguments.of("localConnectionCache", LazyValue.create(() -> localCacheClient.get())), + Arguments.of("globalConnectionCache", globalCacheClient), Arguments.of("priorKnowledge", priorKnowledgeClient), Arguments.of("upgrade", upgradeClient), Arguments.of("tls", tlsClient) @@ -167,9 +188,7 @@ static void afterAll() throws InterruptedException { @ParameterizedTest(name = "{0}") @MethodSource("clientTypes") - void clientGet(String name, LazyValue client) { - // reset everything, so we need to create a new connection - Http2ConnectionCache.clear(); + void clientGet(String name, Supplier client) { try (Http2ClientResponse response = client.get() .get() .queryParam("custQueryParam", "test-get") @@ -186,9 +205,7 @@ void clientGet(String name, LazyValue client) { @ParameterizedTest(name = "{0}") @MethodSource("clientTypes") - void clientPut(String clientType, LazyValue client) { - Http2ConnectionCache.clear(); - + void clientPut(String clientType, Supplier client) { String payload = clientType + " payload"; String custHeaderValue = clientType + " header value"; @@ -211,9 +228,7 @@ void clientPut(String clientType, LazyValue client) { @ParameterizedTest(name = "{0}") @MethodSource("clientTypes") - void clientPost(String clientType, LazyValue client) { - Http2ConnectionCache.clear(); - + void clientPost(String clientType, Supplier client) { String payload = clientType + " payload"; String custHeaderValue = clientType + " header value"; @@ -234,15 +249,15 @@ void clientPost(String clientType, LazyValue client) { } } - @Disabled("Failing intermittently, to be investigated") +// @Disabled("Failing intermittently, to be investigated") @ParameterizedTest(name = "{0}") @MethodSource("clientTypes") - void multiplexParallelStreamsGet(String clientType, LazyValue client) + void multiplexParallelStreamsGet(String clientType, Supplier client) throws ExecutionException, InterruptedException, TimeoutException { - Http2ConnectionCache.clear(); + Http2Client http2Client = client.get(); Consumer callable = id -> { - try (Http2ClientResponse response = client.get() + try (Http2ClientResponse response = http2Client .get("/h2streaming") .queryParam("execId", id.toString()) .request() diff --git a/nima/tests/integration/http2/client/src/test/java/io/helidon/nima/tests/integration/http2/client/Http2ClientTest.java b/nima/tests/integration/http2/client/src/test/java/io/helidon/nima/tests/integration/http2/client/Http2ClientTest.java index 8ced7156f88..caaac611a32 100644 --- a/nima/tests/integration/http2/client/src/test/java/io/helidon/nima/tests/integration/http2/client/Http2ClientTest.java +++ b/nima/tests/integration/http2/client/src/test/java/io/helidon/nima/tests/integration/http2/client/Http2ClientTest.java @@ -36,6 +36,8 @@ import org.junit.jupiter.api.Test; +import java.util.function.Supplier; + import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -46,8 +48,8 @@ class Http2ClientTest { private static final String TEST_HEADER_VALUE = "as!fd"; private static final Header TEST_HEADER = Http.Headers.create(HeaderNames.create(TEST_HEADER_NAME), TEST_HEADER_VALUE); private final Http1Client http1Client; - private final Http2Client tlsClient; - private final Http2Client plainClient; + private final Supplier tlsClient; + private final Supplier plainClient; Http2ClientTest(WebServer server, Http1Client http1Client) { int plainPort = server.port(); @@ -58,12 +60,14 @@ class Http2ClientTest { .endpointIdentificationAlgorithm(Tls.ENDPOINT_IDENTIFICATION_NONE) .trustAll(true) .build(); - this.tlsClient = Http2Client.builder() + this.tlsClient = () -> Http2Client.builder() .baseUri("https://localhost:" + tlsPort + "/") .tls(insecureTls) + .shareConnectionCache(false) .build(); - this.plainClient = Http2Client.builder() + this.plainClient = () -> Http2Client.builder() .baseUri("http://localhost:" + plainPort + "/") + .shareConnectionCache(false) .build(); } @@ -104,7 +108,7 @@ void testHttp1() { @Test void testUpgrade() { - try (Http2ClientResponse response = plainClient + try (Http2ClientResponse response = plainClient.get() .get("/") .request()) { @@ -117,7 +121,7 @@ void testUpgrade() { @Test void testAppProtocol() { - try (Http2ClientResponse response = tlsClient + try (Http2ClientResponse response = tlsClient.get() .get("/") .request()) { @@ -130,7 +134,7 @@ void testAppProtocol() { @Test void testPriorKnowledge() { - try (Http2ClientResponse response = tlsClient + try (Http2ClientResponse response = tlsClient.get() .get("/") .priorKnowledge(true) .request()) { diff --git a/nima/tests/integration/webclient/webclient/src/test/java/io/helidon/nima/webclient/http1/ClientRequestImplTest.java b/nima/tests/integration/webclient/webclient/src/test/java/io/helidon/nima/webclient/http1/ClientRequestImplTest.java index fb832a5fc9f..471dffe4d85 100644 --- a/nima/tests/integration/webclient/webclient/src/test/java/io/helidon/nima/webclient/http1/ClientRequestImplTest.java +++ b/nima/tests/integration/webclient/webclient/src/test/java/io/helidon/nima/webclient/http1/ClientRequestImplTest.java @@ -223,13 +223,16 @@ void testConnectionQueueDequeue() { for (int i = 0; i < 5; ++i) { HttpClientRequest request = injectedHttp1client.put("/test"); // connection will be dequeued if queue is not empty - connectionNow = Http1ConnectionCache.connection(WebClient.create(), - Http1ClientConfig.create(), - injectedHttp1client.prototype().tls(), - Proxy.noProxy(), - request.resolvedUri(), - request.headers(), - true); + WebClient webClient = WebClient.create(); + Http1ClientConfig clientConfig = Http1ClientConfig.create(); + Http1ClientImpl http1Client = new Http1ClientImpl(webClient, clientConfig); + connectionNow = http1Client + .connectionCache() + .connection(injectedHttp1client.prototype().tls(), + Proxy.noProxy(), + request.resolvedUri(), + request.headers(), + true); request.connection(connectionNow); HttpClientResponse response = request.request(); // connection will be queued up @@ -251,13 +254,17 @@ void testConnectionQueueSizeLimit() { for (int i = 0; i < connectionQueueSize + 1; ++i) { HttpClientRequest request = injectedHttp1client.put("/test"); - connectionList.add(Http1ConnectionCache.connection(WebClient.create(), - injectedHttp1client.prototype(), - injectedHttp1client.prototype().tls(), - Proxy.noProxy(), - request.resolvedUri(), - request.headers(), - true)); + WebClient webClient = WebClient.create(); + Http1ClientConfig clientConfig = Http1ClientConfig.create(); + Http1ClientImpl http1Client = new Http1ClientImpl(webClient, clientConfig); + + connectionList.add(http1Client + .connectionCache() + .connection(clientConfig.tls(), + Proxy.noProxy(), + request.resolvedUri(), + request.headers(), + true)); request.connection(connectionList.get(i)); responseList.add(request.request()); } @@ -272,13 +279,19 @@ void testConnectionQueueSizeLimit() { HttpClientResponse response = null; for (int i = 0; i < connectionQueueSize + 1; ++i) { HttpClientRequest request = injectedHttp1client.put("/test"); - connection = Http1ConnectionCache.connection(WebClient.create(), - injectedHttp1client.prototype(), - injectedHttp1client.prototype().tls(), - Proxy.noProxy(), - request.resolvedUri(), - request.headers(), - true); + + WebClient webClient = WebClient.create(); + Http1ClientConfig clientConfig = Http1ClientConfig.create(); + Http1ClientImpl http1Client = new Http1ClientImpl(webClient, clientConfig); + + connection = http1Client + .connectionCache() + .connection(clientConfig.tls(), + Proxy.noProxy(), + request.resolvedUri(), + request.headers(), + true); + request.connection(connection); response = request.request(); if (i < connectionQueueSize) { @@ -293,13 +306,19 @@ void testConnectionQueueSizeLimit() { // The queue is currently empty so check if we can add the last created connection into it. response.close(); HttpClientRequest request = injectedHttp1client.put("/test"); - ClientConnection connectionNow = Http1ConnectionCache.connection(WebClient.create(), - injectedHttp1client.prototype(), - injectedHttp1client.prototype().tls(), - Proxy.noProxy(), - request.resolvedUri(), - request.headers(), - true); + + WebClient webClient = WebClient.create(); + Http1ClientConfig clientConfig = Http1ClientConfig.create(); + Http1ClientImpl http1Client = new Http1ClientImpl(webClient, clientConfig); + + ClientConnection connectionNow = http1Client + .connectionCache() + .connection(clientConfig.tls(), + Proxy.noProxy(), + request.resolvedUri(), + request.headers(), + true); + request.connection(connectionNow); HttpClientResponse responseNow = request.request(); // Verify that the connection was dequeued diff --git a/nima/webclient/api/src/main/java/io/helidon/nima/webclient/api/HttpClientConfigBlueprint.java b/nima/webclient/api/src/main/java/io/helidon/nima/webclient/api/HttpClientConfigBlueprint.java index 73cb39b79be..cf4f65b0402 100644 --- a/nima/webclient/api/src/main/java/io/helidon/nima/webclient/api/HttpClientConfigBlueprint.java +++ b/nima/webclient/api/src/main/java/io/helidon/nima/webclient/api/HttpClientConfigBlueprint.java @@ -216,4 +216,12 @@ default ClientRequestHeaders defaultRequestHeaders() { */ @ConfiguredOption Optional cookieManager(); + + /** + * Whether to share connection cache between all the WebClient instances in JVM. + * + * @return true if connection cache is shared + */ + @ConfiguredOption("true") + boolean shareConnectionCache(); } diff --git a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallChainBase.java b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallChainBase.java index f6117f10dfa..0cf0bc6d995 100644 --- a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallChainBase.java +++ b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallChainBase.java @@ -43,7 +43,6 @@ import io.helidon.nima.webclient.api.ClientUri; import io.helidon.nima.webclient.api.HttpClientConfig; import io.helidon.nima.webclient.api.Proxy; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.api.WebClientServiceRequest; import io.helidon.nima.webclient.api.WebClientServiceResponse; import io.helidon.nima.webclient.spi.WebClientService; @@ -55,7 +54,6 @@ abstract class Http1CallChainBase implements WebClientService.Chain { private static final System.Logger LOGGER = System.getLogger(Http1CallChainBase.class.getName()); private final BufferData writeBuffer = BufferData.growing(128); - private final WebClient webClient; private final HttpClientConfig clientConfig; private final Http1ClientProtocolConfig protocolConfig; private final ClientConnection connection; @@ -64,21 +62,20 @@ abstract class Http1CallChainBase implements WebClientService.Chain { private final boolean keepAlive; private final CompletableFuture whenComplete; private final Duration timeout; + private final Http1ClientImpl http1Client; private ClientConnection effectiveConnection; - Http1CallChainBase(WebClient webClient, - HttpClientConfig clientConfig, - Http1ClientProtocolConfig protocolConfig, + Http1CallChainBase(Http1ClientImpl http1Client, Http1ClientRequestImpl clientRequest, CompletableFuture whenComplete) { - this.webClient = webClient; - this.clientConfig = clientConfig; - this.protocolConfig = protocolConfig; + this.clientConfig = http1Client.clientConfig(); + this.protocolConfig = http1Client.protocolConfig(); this.timeout = clientRequest.readTimeout(); this.connection = clientRequest.connection().orElse(null); this.tls = clientRequest.tls(); this.proxy = clientRequest.proxy(); this.keepAlive = clientRequest.keepAlive(); + this.http1Client = clientRequest.http1Client(); this.whenComplete = whenComplete; } @@ -244,13 +241,12 @@ private boolean mayHaveEntity(Http.Status responseStatus, ClientResponseHeaders } private ClientConnection obtainConnection(WebClientServiceRequest request) { - return Http1ConnectionCache.connection(webClient, - clientConfig, - tls, - proxy, - request.uri(), - request.headers(), - keepAlive); + return http1Client.connectionCache() + .connection(tls, + proxy, + request.uri(), + request.headers(), + keepAlive); } static class ContentLengthInputStream extends InputStream { diff --git a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallEntityChain.java b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallEntityChain.java index efd1b7eea12..6be359d27e4 100644 --- a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallEntityChain.java +++ b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallEntityChain.java @@ -27,8 +27,6 @@ import io.helidon.common.http.Http; import io.helidon.nima.http.media.EntityWriter; import io.helidon.nima.webclient.api.ClientConnection; -import io.helidon.nima.webclient.api.HttpClientConfig; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.api.WebClientServiceRequest; import io.helidon.nima.webclient.api.WebClientServiceResponse; @@ -38,18 +36,12 @@ class Http1CallEntityChain extends Http1CallChainBase { private final CompletableFuture whenSent; private final Object entity; - Http1CallEntityChain(WebClient webClient, + Http1CallEntityChain(Http1ClientImpl http1Client, Http1ClientRequestImpl request, - HttpClientConfig clientConfig, - Http1ClientProtocolConfig protocolConfig, CompletableFuture whenSent, CompletableFuture whenComplete, Object entity) { - super(webClient, - clientConfig, - protocolConfig, - request, whenComplete); - + super(http1Client, request, whenComplete); this.request = request; this.whenSent = whenSent; this.entity = entity; diff --git a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallOutputStreamChain.java b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallOutputStreamChain.java index 714fa6f289b..f589e60406f 100644 --- a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallOutputStreamChain.java +++ b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1CallOutputStreamChain.java @@ -32,30 +32,21 @@ import io.helidon.nima.webclient.api.ClientConnection; import io.helidon.nima.webclient.api.ClientRequest; import io.helidon.nima.webclient.api.HttpClientConfig; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.api.WebClientServiceRequest; import io.helidon.nima.webclient.api.WebClientServiceResponse; class Http1CallOutputStreamChain extends Http1CallChainBase { - private final HttpClientConfig clientConfig; - private final Http1ClientProtocolConfig protocolConfig; + private final Http1ClientImpl http1Client; private final CompletableFuture whenSent; private final ClientRequest.OutputStreamHandler osHandler; - Http1CallOutputStreamChain(WebClient webClient, + Http1CallOutputStreamChain(Http1ClientImpl http1Client, Http1ClientRequestImpl clientRequest, - HttpClientConfig clientConfig, - Http1ClientProtocolConfig protocolConfig, CompletableFuture whenSent, CompletableFuture whenComplete, ClientRequest.OutputStreamHandler osHandler) { - super(webClient, - clientConfig, - protocolConfig, - clientRequest, - whenComplete); - this.clientConfig = clientConfig; - this.protocolConfig = protocolConfig; + super(http1Client, clientRequest, whenComplete); + this.http1Client = http1Client; this.whenSent = whenSent; this.osHandler = osHandler; } @@ -72,8 +63,8 @@ WebClientServiceResponse doProceed(ClientConnection connection, reader, writeBuffer, headers, - clientConfig, - protocolConfig, + http1Client.clientConfig(), + http1Client.protocolConfig(), serviceRequest, whenSent); diff --git a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ClientImpl.java b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ClientImpl.java index e452e11f172..6e26d729eac 100644 --- a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ClientImpl.java +++ b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ClientImpl.java @@ -24,12 +24,13 @@ import io.helidon.nima.webclient.spi.HttpClientSpi; class Http1ClientImpl implements Http1Client, HttpClientSpi { - private final WebClient client; + private final WebClient webClient; private final Http1ClientConfig clientConfig; private final Http1ClientProtocolConfig protocolConfig; + private final Http1ConnectionCache connectionCache = new Http1ConnectionCache(this); - Http1ClientImpl(WebClient client, Http1ClientConfig clientConfig) { - this.client = client; + Http1ClientImpl(WebClient webClient, Http1ClientConfig clientConfig) { + this.webClient = webClient; this.clientConfig = clientConfig; this.protocolConfig = clientConfig.protocolConfig(); } @@ -43,7 +44,7 @@ public Http1ClientRequest method(Http.Method method) { clientConfig.baseFragment().ifPresent(clientUri::fragment); clientConfig.baseQuery().ifPresent(clientUri.writeableQuery()::from); - return new Http1ClientRequestImpl(client, clientConfig, protocolConfig, method, clientUri, clientConfig.properties()); + return new Http1ClientRequestImpl(this, method, clientUri, clientConfig.properties()); } @Override @@ -62,9 +63,7 @@ public ClientRequest clientRequest(FullClientRequest clientRequest, Client // this is HTTP/1.1 - it should support any and all HTTP requests // this method is called from the "generic" HTTP client, that can support any version (that is on classpath). // usually HTTP/1.1 is either the only available, or a fallback if other versions cannot be used - Http1ClientRequest request = new Http1ClientRequestImpl(client, - clientConfig, - protocolConfig, + Http1ClientRequest request = new Http1ClientRequestImpl(this, clientRequest.method(), clientUri, clientRequest.properties()); @@ -81,4 +80,20 @@ public ClientRequest clientRequest(FullClientRequest clientRequest, Client .headers(clientRequest.headers()) .fragment(clientUri.fragment()); } + + WebClient webClient() { + return webClient; + } + + Http1ClientConfig clientConfig() { + return clientConfig; + } + + Http1ClientProtocolConfig protocolConfig() { + return protocolConfig; + } + + Http1ConnectionCache connectionCache() { + return connectionCache; + } } diff --git a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ClientRequestImpl.java b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ClientRequestImpl.java index 2af8bbc5028..e7b874e0275 100644 --- a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ClientRequestImpl.java +++ b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ClientRequestImpl.java @@ -24,27 +24,23 @@ import io.helidon.common.http.Http; import io.helidon.nima.webclient.api.ClientRequestBase; import io.helidon.nima.webclient.api.ClientUri; -import io.helidon.nima.webclient.api.HttpClientConfig; -import io.helidon.nima.webclient.api.WebClient; import io.helidon.nima.webclient.api.WebClientServiceRequest; import io.helidon.nima.webclient.api.WebClientServiceResponse; class Http1ClientRequestImpl extends ClientRequestBase implements Http1ClientRequest { private static final System.Logger LOGGER = System.getLogger(Http1ClientRequestImpl.class.getName()); + private final Http1ClientImpl http1Client; - private final WebClient webClient; - private final Http1ClientProtocolConfig protocolConfig; - - Http1ClientRequestImpl(WebClient webClient, - HttpClientConfig clientConfig, - Http1ClientProtocolConfig protocolConfig, + Http1ClientRequestImpl(Http1ClientImpl http1Client, Http.Method method, ClientUri clientUri, Map properties) { - super(clientConfig, webClient.cookieManager(), Http1Client.PROTOCOL_ID, method, clientUri, properties); - - this.webClient = webClient; - this.protocolConfig = protocolConfig; + super(http1Client.clientConfig(), + http1Client.webClient().cookieManager(), + Http1Client.PROTOCOL_ID, + method, clientUri, + properties); + this.http1Client = http1Client; } //Copy constructor for redirection purposes @@ -52,7 +48,10 @@ private Http1ClientRequestImpl(Http1ClientRequestImpl request, Http.Method method, ClientUri clientUri, Map properties) { - this(request.webClient, request.clientConfig(), request.protocolConfig, method, clientUri, properties); + this(request.http1Client, + method, + clientUri, + properties); followRedirects(request.followRedirects()); maxRedirects(request.maxRedirects()); @@ -71,10 +70,8 @@ public Http1ClientResponse doSubmit(Object entity) { public Http1ClientResponse doOutputStream(OutputStreamHandler streamHandler) { CompletableFuture whenSent = new CompletableFuture<>(); CompletableFuture whenComplete = new CompletableFuture<>(); - Http1CallChainBase callChain = new Http1CallOutputStreamChain(webClient, + Http1CallChainBase callChain = new Http1CallOutputStreamChain(http1Client, this, - clientConfig(), - protocolConfig, whenSent, whenComplete, streamHandler); @@ -130,6 +127,10 @@ public UpgradeResponse upgrade(String protocol) { return UpgradeResponse.failure(response); } + Http1ClientImpl http1Client() { + return http1Client; + } + private Http1ClientResponseImpl invokeWithFollowRedirectsEntity(Object entity) { //Request object which should be used for invoking the next request. This will change in case of any redirection. Http1ClientRequestImpl clientRequest = this; @@ -176,10 +177,8 @@ private Http1ClientResponseImpl invokeWithFollowRedirectsEntity(Object entity) { private Http1ClientResponseImpl invokeRequestWithEntity(Object entity) { CompletableFuture whenSent = new CompletableFuture<>(); CompletableFuture whenComplete = new CompletableFuture<>(); - Http1CallChainBase callChain = new Http1CallEntityChain(webClient, + Http1CallChainBase callChain = new Http1CallEntityChain(http1Client, this, - clientConfig(), - protocolConfig, whenSent, whenComplete, entity); diff --git a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ConnectionCache.java b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ConnectionCache.java index 15c24c51f6a..c62ed6c4a0b 100644 --- a/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ConnectionCache.java +++ b/nima/webclient/http1/src/main/java/io/helidon/nima/webclient/http1/Http1ConnectionCache.java @@ -17,6 +17,7 @@ package io.helidon.nima.webclient.http1; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,7 +31,6 @@ import io.helidon.nima.webclient.api.ClientConnection; import io.helidon.nima.webclient.api.ClientUri; import io.helidon.nima.webclient.api.ConnectionKey; -import io.helidon.nima.webclient.api.HttpClientConfig; import io.helidon.nima.webclient.api.Proxy; import io.helidon.nima.webclient.api.TcpClientConnection; import io.helidon.nima.webclient.api.WebClient; @@ -44,30 +44,32 @@ class Http1ConnectionCache { private static final System.Logger LOGGER = System.getLogger(Http1ConnectionCache.class.getName()); private static final Tls NO_TLS = Tls.builder().enabled(false).build(); private static final String HTTPS = "https"; - private static final Map> CHANNEL_CACHE = new ConcurrentHashMap<>(); + private static final Map> GLOBAL_CHANNEL_CACHE = + new ConcurrentHashMap<>(); private static final List ALPN_ID = List.of(Http1Client.PROTOCOL_ID); private static final Duration QUEUE_TIMEOUT = Duration.ofMillis(10); + private final Map> localConnectionCache = new HashMap<>(); + private final Http1ClientImpl http1Client; - private Http1ConnectionCache() { + Http1ConnectionCache(Http1ClientImpl http1Client) { + this.http1Client = http1Client; } - static ClientConnection connection(WebClient webClient, - HttpClientConfig clientConfig, - Tls tls, - Proxy proxy, - ClientUri uri, - ClientRequestHeaders headers, - boolean defaultKeepAlive) { + ClientConnection connection(Tls tls, + Proxy proxy, + ClientUri uri, + ClientRequestHeaders headers, + boolean defaultKeepAlive) { boolean keepAlive = handleKeepAlive(defaultKeepAlive, headers); Tls effectiveTls = HTTPS.equals(uri.scheme()) ? tls : NO_TLS; if (keepAlive) { - return keepAliveConnection(webClient, clientConfig, effectiveTls, uri, proxy); + return keepAliveConnection(effectiveTls, uri, proxy); } else { - return oneOffConnection(webClient, clientConfig, effectiveTls, uri, proxy); + return oneOffConnection(effectiveTls, uri, proxy); } } - private static boolean handleKeepAlive(boolean defaultKeepAlive, WritableHeaders headers) { + private boolean handleKeepAlive(boolean defaultKeepAlive, WritableHeaders headers) { if (headers.contains(Http.Headers.CONNECTION_CLOSE)) { return false; } @@ -82,11 +84,11 @@ private static boolean handleKeepAlive(boolean defaultKeepAlive, WritableHeaders return false; } - private static ClientConnection keepAliveConnection(WebClient webClient, - HttpClientConfig clientConfig, - Tls tls, - ClientUri uri, - Proxy proxy) { + private ClientConnection keepAliveConnection(Tls tls, + ClientUri uri, + Proxy proxy) { + Http1ClientConfig clientConfig = http1Client.clientConfig(); + ConnectionKey connectionKey = new ConnectionKey(uri.scheme(), uri.host(), uri.port(), @@ -95,15 +97,17 @@ private static ClientConnection keepAliveConnection(WebClient webClient, clientConfig.dnsAddressLookup(), proxy); - var connectionQueue = CHANNEL_CACHE.computeIfAbsent(connectionKey, - it -> new LinkedBlockingDeque<>(clientConfig.connectionCacheSize())); + LinkedBlockingDeque connectionQueue = + (clientConfig.shareConnectionCache() ? GLOBAL_CHANNEL_CACHE : localConnectionCache) + .computeIfAbsent(connectionKey, + it -> new LinkedBlockingDeque<>(clientConfig.connectionCacheSize())); TcpClientConnection connection; while ((connection = connectionQueue.poll()) != null && !connection.isConnected()) { } if (connection == null) { - connection = TcpClientConnection.create(webClient, + connection = TcpClientConnection.create(http1Client.webClient(), connectionKey, ALPN_ID, conn -> finishRequest(connectionQueue, conn), @@ -120,10 +124,13 @@ private static ClientConnection keepAliveConnection(WebClient webClient, return connection; } - private static ClientConnection oneOffConnection(WebClient webClient, HttpClientConfig clientConfig, - Tls tls, - ClientUri uri, - Proxy proxy) { + private ClientConnection oneOffConnection(Tls tls, + ClientUri uri, + Proxy proxy) { + + WebClient webClient = http1Client.webClient(); + Http1ClientConfig clientConfig = http1Client.clientConfig(); + return TcpClientConnection.create(webClient, new ConnectionKey(uri.scheme(), uri.host(), @@ -140,7 +147,7 @@ private static ClientConnection oneOffConnection(WebClient webClient, HttpClient .connect(); } - private static boolean finishRequest(LinkedBlockingDeque connectionQueue, TcpClientConnection conn) { + private boolean finishRequest(LinkedBlockingDeque connectionQueue, TcpClientConnection conn) { if (conn.isConnected()) { try { if (connectionQueue.offer(conn, QUEUE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {