Skip to content

Commit

Permalink
Reuse connections for 5xx errors (#5607)
Browse files Browse the repository at this point in the history
* Reuse connections for 5xx errors for HTTP/1 requests

* Reuse HTTP/2 connections that receive a 5xx error

* Fix flaky tests
  • Loading branch information
zoewangg authored Jan 22, 2025
1 parent 21c9655 commit 0bef9fd
Show file tree
Hide file tree
Showing 16 changed files with 33 additions and 106 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSCRTHTTPClient-158f65a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT HTTP Client",
"contributor": "",
"description": "Reuse connections that receive a 5xx service response."
}
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-NettyNIOHTTPClient-edf8570.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Netty NIO HTTP Client",
"contributor": "",
"description": "Reuse connections that receive a 5xx service response."
}
6 changes: 6 additions & 0 deletions .changes/next-release/feature-ApacheHTTPClient-f0116e2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Apache HTTP Client",
"contributor": "",
"description": "Reuse connections that receive a 5xx service response."
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import software.amazon.awssdk.http.TlsTrustManagersProvider;
import software.amazon.awssdk.http.apache.internal.ApacheHttpRequestConfig;
import software.amazon.awssdk.http.apache.internal.DefaultConfiguration;
import software.amazon.awssdk.http.apache.internal.SdkConnectionReuseStrategy;
import software.amazon.awssdk.http.apache.internal.SdkProxyRoutePlanner;
import software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory;
import software.amazon.awssdk.http.apache.internal.conn.IdleConnectionReaper;
Expand Down Expand Up @@ -157,7 +156,6 @@ private ConnectionManagerAwareHttpClient createClient(ApacheHttpClient.DefaultBu
.disableRedirectHandling()
.disableAutomaticRetries()
.setUserAgent("") // SDK will set the user agent header in the pipeline. Don't let Apache waste time
.setConnectionReuseStrategy(new SdkConnectionReuseStrategy())
.setConnectionManager(ClientConnectionManagerFactory.wrap(cm));

addProxyConfig(builder, configuration);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private void onSuccessfulResponseComplete(HttpStream stream) {
completionFuture.complete(null);
});

responseHandlerHelper.cleanUpConnectionBasedOnStatusCode(stream);
responseHandlerHelper.releaseConnection(stream);
}

private void handlePublisherError(HttpStream stream, Throwable failure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ private void onSuccessfulResponseComplete(HttpStream stream) {

// requestCompletionFuture has been completed at this point, no need to notify the future
simplePublisher.complete();
responseHandlerHelper.cleanUpConnectionBasedOnStatusCode(stream);
responseHandlerHelper.releaseConnection(stream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpHeaderBlock;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkHttpResponse;

/**
Expand Down Expand Up @@ -88,13 +87,4 @@ public void closeConnection(HttpStream stream) {
}
}
}

public void cleanUpConnectionBasedOnStatusCode(HttpStream stream) {
// always close the connection on a 5XX response code.
if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
closeConnection(stream);
} else {
releaseConnection(stream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ void serverError_shouldShutdownConnection() {
responseHandler.onResponseHeadersDone(httpStream, 0);
responseHandler.onResponseComplete(httpStream, 0);
requestFuture.join();
verify(crtConn).shutdown();
verify(crtConn).close();
verify(httpStream).close();
}
Expand Down Expand Up @@ -121,7 +120,6 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException {

responseHandler.onResponseComplete(httpStream, 0);
requestFuture.join();
verify(crtConn).shutdown();
verify(crtConn).close();
verify(httpStream).close();
verify(httpStream, never()).incrementWindow(anyInt());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpFullResponse;
Expand Down Expand Up @@ -96,7 +95,7 @@ protected void channelRead0(ChannelHandlerContext channelContext, HttpObject msg
.build();
channelContext.channel().attr(RESPONSE_STATUS_CODE).set(response.status().code());
channelContext.channel().attr(RESPONSE_CONTENT_LENGTH).set(responseContentLength(response));
channelContext.channel().attr(KEEP_ALIVE).set(shouldKeepAlive(response));
channelContext.channel().attr(KEEP_ALIVE).set(HttpUtil.isKeepAlive(response));
ChannelUtils.getAttribute(channelContext.channel(), CHANNEL_DIAGNOSTICS)
.ifPresent(ChannelDiagnostics::incrementResponseCount);
requestContext.handler().onHeaders(sdkResponse);
Expand Down Expand Up @@ -203,13 +202,6 @@ private static void finalizeResponse(RequestContext requestContext, ChannelHandl
}
}

private boolean shouldKeepAlive(HttpResponse response) {
if (HttpStatusFamily.of(response.status().code()) == HttpStatusFamily.SERVER_ERROR) {
return false;
}
return HttpUtil.isKeepAlive(response);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
RequestContext requestContext = ctx.channel().attr(REQUEST_CONTEXT_KEY).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.http.nio.netty.internal.http2;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpContent;
Expand All @@ -32,7 +31,6 @@
import io.netty.handler.codec.http2.HttpConversionUtil;
import java.io.IOException;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger;

/**
Expand Down Expand Up @@ -62,20 +60,6 @@ private void onHeadersRead(Http2HeadersFrame headersFrame, ChannelHandlerContext

HttpResponse httpResponse = HttpConversionUtil.toHttpResponse(headersFrame.stream().id(), headersFrame.headers(), true);
ctx.fireChannelRead(httpResponse);

if (HttpStatusFamily.of(httpResponse.status().code()) == HttpStatusFamily.SERVER_ERROR) {
fireConnectionExceptionForServerError(ctx);
}
}

private void fireConnectionExceptionForServerError(ChannelHandlerContext ctx) {
if (ctx.channel().parent() != null) {
Channel parent = ctx.channel().parent();
log.debug(ctx.channel(),
() -> "A 5xx server error occurred on an Http2 stream, notifying the connection channel " + ctx.channel());
parent.pipeline().fireExceptionCaught(new Http2ConnectionTerminatingException("A 5xx server error occurred on an "
+ "Http2 stream " + ctx.channel()));
}
}

private void onDataRead(Http2DataFrame dataFrame, ChannelHandlerContext ctx) throws Http2Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@ public void teardown() throws InterruptedException {
}

@Test
public void serviceReturn500_newRequestShouldUseNewConnection() {
public void serviceReturn500_newRequestShouldReuseConnection() throws InterruptedException {
server.return500OnFirstRequest = true;
CompletableFuture<?> firstRequest = sendGetRequest(server.port(), netty);
firstRequest.join();

// The request-complete-future does not await the channel-release-future
// Wait a small amount to allow the channel release to complete
Thread.sleep(100);
sendGetRequest(server.port(), netty).join();
assertThat(server.h2ConnectionCount.get()).isEqualTo(2);
assertThat(server.h2ConnectionCount.get()).isEqualTo(1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ protected SdkHttpClient createSdkHttpClient(SdkHttpClientOptions options) {
return builder.buildWithDefaults(attributeMap.build());
}

@Override
public void connectionsAreNotReusedOn5xxErrors() {
// We cannot support this because the URL connection client doesn't allow us to disable connection reuse
}

// https://bugs.openjdk.org/browse/JDK-8163921
@Test
public void noAcceptHeader_shouldSet() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ public void testTrustAllWorks() {
public void testCustomTlsTrustManagerAndTrustAllFails() {
}

// Empty test; behavior not supported because the URL connection client does not allow disabling connection reuse
@Override
public void connectionsAreNotReusedOn5xxErrors() throws Exception {
}

@Test
public void testGetResponseCodeNpeIsWrappedAsIo() throws Exception {
connectionInterceptor = safeFunction(connection -> new DelegateHttpURLConnection(connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,16 @@ public void teardown() throws InterruptedException {
}

@Test
public void connectionReceiveServerErrorStatusShouldNotReuseConnection() {
public void connectionReceiveServerErrorStatusShouldReuseConnection() throws InterruptedException {
server.return500OnFirstRequest = true;
server.closeConnection = false;

HttpTestUtils.sendGetRequest(server.port(), client).join();
// The request-complete-future does not await the channel-release-future
// Wait a small amount to allow the channel release to complete
Thread.sleep(100);
HttpTestUtils.sendGetRequest(server.port(), client).join();
assertThat(server.channels.size()).isEqualTo(2);
assertThat(server.channels.size()).isEqualTo(1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void connectionPoolingWorks() throws Exception {
}

@Test
public void connectionsAreNotReusedOn5xxErrors() throws Exception {
public void connectionsAreReusedOn5xxErrors() throws Exception {
int initialOpenedConnections = CONNECTION_COUNTER.openedConnections();

SdkHttpClientOptions httpClientOptions = new SdkHttpClientOptions();
Expand All @@ -169,7 +169,7 @@ public void connectionsAreNotReusedOn5xxErrors() throws Exception {
// connection count increased by at least as many connections as we got 5xx errors back on. But the connection
// manager also predictively creates connections and we need to take those into account in a way that lets it
// remain a dynamic behavior.
assertThat(CONNECTION_COUNTER.openedConnections()).isGreaterThanOrEqualTo(initialOpenedConnections + 5);
assertThat(CONNECTION_COUNTER.openedConnections()).isGreaterThanOrEqualTo(initialOpenedConnections);
}
}

Expand Down

0 comments on commit 0bef9fd

Please sign in to comment.