Skip to content

Commit

Permalink
Shared connection cache switch #7301
Browse files Browse the repository at this point in the history
Fix multiplexParallelStreamsGet test #7307
Fix Http2ClientTest#testPriorKnowledge test #7271
  • Loading branch information
danielkec committed Aug 11, 2023
1 parent 8cefd76 commit 44a6533
Show file tree
Hide file tree
Showing 20 changed files with 373 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.helidon.nima.webclient.api.HttpClientConfig;
import io.helidon.nima.webclient.api.HttpClientResponse;
import io.helidon.nima.webclient.api.ReleasableResource;
import io.helidon.nima.webclient.api.WebClient;
import io.helidon.nima.webclient.api.WebClientServiceRequest;
import io.helidon.nima.webclient.api.WebClientServiceResponse;
import io.helidon.nima.webclient.http1.Http1ClientRequest;
Expand All @@ -49,9 +48,8 @@
abstract class Http2CallChainBase implements WebClientService.Chain {
private static final Tls NO_TLS = Tls.builder().enabled(false).build();

private final WebClient webClient;
private final Http2ClientImpl http2Client;
private final HttpClientConfig clientConfig;
private final Http2ClientProtocolConfig protocolConfig;
private final Http2ClientRequestImpl clientRequest;
private final Function<Http1ClientRequest, Http1ClientResponse> http1EntityHandler;
private final CompletableFuture<WebClientServiceResponse> whenComplete;
Expand All @@ -62,16 +60,13 @@ abstract class Http2CallChainBase implements WebClientService.Chain {
private Http.Status responseStatus;
private Http2ConnectionAttemptResult.Result result;

Http2CallChainBase(WebClient webClient,
HttpClientConfig clientConfig,
Http2ClientProtocolConfig protocolConfig,
Http2CallChainBase(Http2ClientImpl http2Client,
Http2ClientRequestImpl clientRequest,
CompletableFuture<WebClientServiceResponse> whenComplete,
Function<Http1ClientRequest, Http1ClientResponse> http1EntityHandler) {

this.webClient = webClient;
this.clientConfig = clientConfig;
this.protocolConfig = protocolConfig;
this.http2Client = http2Client;
this.clientConfig = http2Client.clientConfig();
this.clientRequest = clientRequest;
this.whenComplete = whenComplete;
this.http1EntityHandler = http1EntityHandler;
Expand All @@ -86,23 +81,26 @@ public WebClientServiceResponse proceed(WebClientServiceRequest serviceRequest)
requestHeaders.remove(Http.HeaderNames.CONNECTION, LogHeaderConsumer.INSTANCE);
requestHeaders.setIfAbsent(USER_AGENT_HEADER);

Http2ConnectionAttemptResult result = Http2ConnectionCache.newStream(webClient,
protocolConfig,
connectionKey(serviceRequest),
clientRequest,
uri,
http1EntityHandler);
ConnectionKey connectionKey = connectionKey(serviceRequest);

Http2ConnectionAttemptResult result = http2Client.connectionCache()
.newStream(connectionKey, clientRequest, uri, http1EntityHandler);

this.result = result.result();

if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) {
// ALPN, prior knowledge, or upgrade success
this.stream = result.stream();
return doProceed(serviceRequest, requestHeaders, result.stream());
} else {
// upgrade failed
this.response = result.response();
return doProceed(serviceRequest, result.response());
try {
if (result.result() == Http2ConnectionAttemptResult.Result.HTTP_2) {
// ALPN, prior knowledge, or upgrade success
this.stream = result.stream();
return doProceed(serviceRequest, requestHeaders, result.stream());
} else {
// upgrade failed
this.response = result.response();
return doProceed(serviceRequest, result.response());
}
} catch (StreamTimeoutException e){
http2Client.connectionCache().remove(connectionKey);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,19 @@
import io.helidon.nima.http.media.EntityWriter;
import io.helidon.nima.http2.Http2Headers;
import io.helidon.nima.webclient.api.ClientUri;
import io.helidon.nima.webclient.api.HttpClientConfig;
import io.helidon.nima.webclient.api.WebClient;
import io.helidon.nima.webclient.api.WebClientServiceRequest;
import io.helidon.nima.webclient.api.WebClientServiceResponse;

class Http2CallEntityChain extends Http2CallChainBase {
private final CompletableFuture<WebClientServiceRequest> whenSent;
private final Object entity;

Http2CallEntityChain(WebClient webClient,
HttpClientConfig clientConfig,
Http2ClientProtocolConfig protocolConfig,
Http2CallEntityChain(Http2ClientImpl http2Client,
Http2ClientRequestImpl request,
CompletableFuture<WebClientServiceRequest> whenSent,
CompletableFuture<WebClientServiceResponse> whenComplete,
Object entity) {
super(webClient, clientConfig, protocolConfig, request, whenComplete, it -> it.submit(entity));
super(http2Client, request, whenComplete, it -> it.submit(entity));
this.whenSent = whenSent;
this.entity = entity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,19 @@
import io.helidon.nima.http2.Http2Headers;
import io.helidon.nima.webclient.api.ClientRequest;
import io.helidon.nima.webclient.api.ClientUri;
import io.helidon.nima.webclient.api.HttpClientConfig;
import io.helidon.nima.webclient.api.WebClient;
import io.helidon.nima.webclient.api.WebClientServiceRequest;
import io.helidon.nima.webclient.api.WebClientServiceResponse;

class Http2CallOutputStreamChain extends Http2CallChainBase {
private final CompletableFuture<WebClientServiceRequest> whenSent;
private final ClientRequest.OutputStreamHandler streamHandler;

Http2CallOutputStreamChain(WebClient webClient,
Http2CallOutputStreamChain(Http2ClientImpl http2Client,
Http2ClientRequestImpl http2ClientRequest,
HttpClientConfig clientConfig,
Http2ClientProtocolConfig protocolConfig,
CompletableFuture<WebClientServiceRequest> whenSent,
CompletableFuture<WebClientServiceResponse> whenComplete,
ClientRequest.OutputStreamHandler streamHandler) {
super(webClient,
clientConfig,
protocolConfig,
super(http2Client,
http2ClientRequest,
whenComplete,
req -> req.outputStream(streamHandler));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
Expand All @@ -49,24 +48,19 @@
import io.helidon.nima.http2.Http2RstStream;
import io.helidon.nima.http2.Http2Setting;
import io.helidon.nima.http2.Http2Settings;
import io.helidon.nima.http2.Http2StreamState;
import io.helidon.nima.http2.Http2Util;
import io.helidon.nima.http2.Http2WindowUpdate;
import io.helidon.nima.http2.WindowSize;
import io.helidon.nima.webclient.api.ClientConnection;
import io.helidon.nima.webclient.api.WebClient;

import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.TRACE;
import static java.lang.System.Logger.Level.WARNING;

class Http2ClientConnection {
private static final System.Logger LOGGER = System.getLogger(Http2ClientConnection.class.getName());
private static final ExecutorService TMP_EXECUTOR = Executors.newFixedThreadPool(10,
Thread.ofPlatform()
.name("helidon-tmp-client-", 0)
.factory());
private static final int FRAME_HEADER_LENGTH = 9;

private final Http2FrameListener sendListener = new Http2LoggingFrameListener("cl-send");
private final Http2FrameListener recvListener = new Http2LoggingFrameListener("cl-recv");
private final LockingStreamIdSequence streamIdSeq = new LockingStreamIdSequence();
Expand All @@ -81,11 +75,18 @@ class Http2ClientConnection {
private final Http2ConnectionWriter writer;
private final DataReader reader;
private final DataWriter dataWriter;
private volatile int lastStreamId;

private Http2Settings serverSettings = Http2Settings.builder()
.build();
private Future<?> handleTask;

private volatile boolean closed = false;

public boolean closed(){
return closed;
}

Http2ClientConnection(Http2ClientProtocolConfig protocolConfig, ClientConnection connection) {
this.connectionFlowControl = ConnectionFlowControl.clientBuilder(this::writeWindowsUpdate)
.maxFrameSize(protocolConfig.maxFrameSize())
Expand All @@ -99,13 +100,12 @@ class Http2ClientConnection {
this.writer = new Http2ConnectionWriter(connection.helidonSocket(), connection.writer(), List.of());
}

static Http2ClientConnection create(WebClient webClient,
Http2ClientProtocolConfig protocolConfig,
static Http2ClientConnection create(Http2ClientImpl http2Client,
ClientConnection connection,
boolean sendSettings) {

Http2ClientConnection h2conn = new Http2ClientConnection(protocolConfig, connection);
h2conn.start(protocolConfig, webClient.executor(), sendSettings);
Http2ClientConnection h2conn = new Http2ClientConnection(http2Client.protocolConfig(), connection);
h2conn.start(http2Client.protocolConfig(), http2Client.webClient().executor(), sendSettings);

return h2conn;
}
Expand Down Expand Up @@ -135,11 +135,12 @@ Http2ClientStream stream(int streamId) {

Http2ClientStream createStream(Http2StreamConfig config) {
//FIXME: priority
return new Http2ClientStream(this,
serverSettings,
ctx,
config.timeout(),
streamIdSeq);
Http2ClientStream stream = new Http2ClientStream(this,
serverSettings,
ctx,
config.timeout(),
streamIdSeq);
return stream;
}

void addStream(int streamId, Http2ClientStream stream) {
Expand Down Expand Up @@ -171,7 +172,12 @@ Http2ClientStream tryStream(Http2StreamConfig config) {
}
}

void updateLastStreamId(int lastStreamId){
this.lastStreamId = lastStreamId;
}

void close() {
closed = true;
try {
handleTask.cancel(true);
ctx.log(LOGGER, TRACE, "Closing connection");
Expand All @@ -193,7 +199,9 @@ static Http2Settings settings(Http2ClientProtocolConfig config) {
}

private void sendPreface(Http2ClientProtocolConfig config, boolean sendSettings) {
dataWriter.writeNow(Http2Util.prefaceData());
BufferData prefaceData = Http2Util.prefaceData();
sendListener.frame(ctx, 0, prefaceData);
dataWriter.writeNow(prefaceData);
if (sendSettings) {
// §3.5 Preface bytes must be followed by setting frame
Http2Settings http2Settings = settings(config);
Expand All @@ -217,8 +225,7 @@ private void start(Http2ClientProtocolConfig protocolConfig,
boolean sendSettings) {
CountDownLatch cdl = new CountDownLatch(1);

// handleTask = executor.submit(() -> {
handleTask = TMP_EXECUTOR.submit(() -> {
handleTask = executor.submit(() -> {
ctx.log(LOGGER, TRACE, "Starting HTTP/2 connection, thread: %s", Thread.currentThread().getName());
try {
sendPreface(protocolConfig, sendSettings);
Expand All @@ -233,6 +240,7 @@ private void start(Http2ClientProtocolConfig protocolConfig,
try {
while (!Thread.interrupted()) {
if (!handle()) {
closed = true;
ctx.log(LOGGER, TRACE, "Connection closed");
return;
}
Expand All @@ -253,7 +261,23 @@ private void start(Http2ClientProtocolConfig protocolConfig,
}

private void writeWindowsUpdate(int streamId, Http2WindowUpdate windowUpdateFrame) {
writer.write(windowUpdateFrame.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create()));
if (streamId == 0){
writer.write(windowUpdateFrame.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create()));
return;
}
if (streamId < lastStreamId) {
for (var s : streams.values()) {
if (s.streamId() > streamId && s.streamState() != Http2StreamState.IDLE) {
// RC against parallel newer streams, data already buffered at client being read
// There is no need to do request for more data as stream is no more usable
return;
}
}
}
Http2ClientStream stream = stream(streamId);
if (stream != null && !stream.streamState().equals(Http2StreamState.CLOSED)) {
writer.write(windowUpdateFrame.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create()));
}
}

private boolean handle() {
Expand Down
Loading

0 comments on commit 44a6533

Please sign in to comment.