Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/clientcore/http-netty4/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
<suppress files="io.clientcore.http.netty4.NettyHttpClientBuilder.java" checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" />
<suppress files="io.clientcore.http.netty4.implementation.Netty4ChannelBinaryData.java" checks="com.azure.tools.checkstyle.checks.RawExceptionThrowCheck" />
<suppress files="io.clientcore.http.netty4.implementation.Netty4ChannelInputStream.java" checks="com.azure.tools.checkstyle.checks.RawExceptionThrowCheck" />
<suppress files="io.clientcore.http.netty4.implementation.WrappedHttpHeaders.java" checks="com.azure.tools.checkstyle.checks.RawExceptionThrowCheck" />
<suppress files="io.clientcore.http.netty4.implementation.WrappedHttp11Headers.java" checks="com.azure.tools.checkstyle.checks.RawExceptionThrowCheck" />
<suppress files="io.clientcore.http.netty4.NettyHttpClientBuilder.java" checks="com.azure.tools.checkstyle.checks.ServiceClientBuilderCheck" />
</suppressions>
8 changes: 4 additions & 4 deletions sdk/clientcore/http-netty4/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@
</Match>
<Match>
<Bug pattern="NP_NULL_PARAM_DEREF_ALL_TARGETS_DANGEROUS" />
<Class name="io.clientcore.http.netty4.implementation.WrappedHttpHeadersTests" />
<Class name="io.clientcore.http.netty4.implementation.WrappedHttp11HeadersTests" />
</Match>
<Match>
<Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
<Class name="io.clientcore.http.netty4.implementation.WrappedHttpHeadersTests" />
<Class name="io.clientcore.http.netty4.implementation.WrappedHttp11HeadersTests" />
</Match>
<Match>
<Bug pattern="OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE" />
<Class name="io.clientcore.http.netty4.NettyHttpClientTests" />
</Match>
<Match>
<Bug pattern="OS_OPEN_STREAM" />
<Class name="io.clientcore.http.netty4.implementation.Netty4ChannelInputStreamTests" />
<Class name="io.clientcore.http.netty4.implementation.Netty4Http11ChannelInputStreamTests" />
</Match>
<Match>
<Bug pattern="OS_OPEN_STREAM_EXCEPTION_PATH" />
Expand All @@ -42,7 +42,7 @@
<Bug pattern="RR_NOT_CHECKED" />
<Or>
<Class name="io.clientcore.http.netty4.implementation.HttpResponseDrainsBufferTests" />
<Class name="io.clientcore.http.netty4.implementation.Netty4ChannelInputStreamTests" />
<Class name="io.clientcore.http.netty4.implementation.Netty4Http11ChannelInputStreamTests" />
</Or>
</Match>
<Match>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.ssl.ApplicationProtocolConfig;
Expand All @@ -55,6 +54,8 @@

import static io.clientcore.core.utils.ServerSentEventUtils.attemptRetry;
import static io.clientcore.core.utils.ServerSentEventUtils.processTextEventStream;
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.HTTP_RESPONSE;
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROGRESS_AND_TIMEOUT;
import static io.clientcore.http.netty4.implementation.Netty4Utility.awaitLatch;
import static io.clientcore.http.netty4.implementation.Netty4Utility.createCodec;
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp11Request;
Expand Down Expand Up @@ -166,11 +167,10 @@ protected void initChannel(Channel channel) throws SSLException {
channel.pipeline().addLast(Netty4HandlerNames.SSL, ssl.newHandler(channel.alloc(), host, port));
channel.pipeline()
.addLast(Netty4HandlerNames.SSL_INITIALIZER, new Netty4SslInitializationHandler());
}

if (isHttps) {
channel.pipeline()
.addLast(new Netty4AlpnHandler(request, addProgressAndTimeoutHandler, errorReference, latch));
.addLast(Netty4HandlerNames.ALPN,
new Netty4AlpnHandler(request, responseReference, errorReference, latch));
}
}
});
Expand Down Expand Up @@ -198,14 +198,10 @@ protected void initChannel(Channel channel) throws SSLException {
// effectively be a no-op.
if (addProgressAndTimeoutHandler) {
channel.pipeline()
.addLast(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, new Netty4ProgressAndTimeoutHandler(
progressReporter, writeTimeoutMillis, responseTimeoutMillis, readTimeoutMillis));
.addLast(PROGRESS_AND_TIMEOUT, new Netty4ProgressAndTimeoutHandler(progressReporter,
writeTimeoutMillis, responseTimeoutMillis, readTimeoutMillis));
}

Netty4ResponseHandler responseHandler
= new Netty4ResponseHandler(request, responseReference, errorReference, latch);
channel.pipeline().addLast(Netty4HandlerNames.RESPONSE, responseHandler);

Throwable earlyError = errorReference.get();
if (earlyError != null) {
// If an error occurred between the connect and the request being sent, don't proceed with sending
Expand Down Expand Up @@ -237,15 +233,14 @@ protected void initChannel(Channel channel) throws SSLException {
} else {
// If there isn't an SslHandler, we can send the request immediately.
// Add the HTTP/1.1 codec, as we only support HTTP/2 when using SSL ALPN.
HttpClientCodec codec = createCodec();
if (addProgressAndTimeoutHandler) {
channel.pipeline()
.addBefore(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_1_1_CODEC, codec);
} else {
channel.pipeline().addBefore(Netty4HandlerNames.RESPONSE, Netty4HandlerNames.HTTP_1_1_CODEC, codec);
}
Netty4ResponseHandler responseHandler
= new Netty4ResponseHandler(request, responseReference, errorReference, latch);
channel.pipeline().addLast(HTTP_RESPONSE, responseHandler);

String addBefore = addProgressAndTimeoutHandler ? PROGRESS_AND_TIMEOUT : HTTP_RESPONSE;
channel.pipeline().addBefore(addBefore, Netty4HandlerNames.HTTP_CODEC, createCodec());

sendHttp11Request(request, channel, addProgressAndTimeoutHandler, errorReference)
sendHttp11Request(request, channel, errorReference)
.addListener((ChannelFutureListener) sendListener -> {
if (!sendListener.isSuccess()) {
setOrSuppressError(errorReference, sendListener.cause());
Expand Down Expand Up @@ -288,7 +283,7 @@ protected void initChannel(Channel channel) throws SSLException {
// We're ignoring the response content.
CountDownLatch drainLatch = new CountDownLatch(1);
channel.pipeline().addLast(new Netty4EagerConsumeChannelHandler(drainLatch, ignored -> {
}));
}, info.isHttp2()));
channel.config().setAutoRead(true);
awaitLatch(drainLatch);
} else if (bodyHandling == ResponseBodyHandling.STREAM) {
Expand All @@ -306,7 +301,7 @@ protected void initChannel(Channel channel) throws SSLException {
}
}

body = new Netty4ChannelBinaryData(info.getEagerContent(), channel, length);
body = new Netty4ChannelBinaryData(info.getEagerContent(), channel, length, info.isHttp2());
} else {
// All cases otherwise assume BUFFER.
CountDownLatch drainLatch = new CountDownLatch(1);
Expand All @@ -316,7 +311,7 @@ protected void initChannel(Channel channel) throws SSLException {
} catch (IOException ex) {
throw LOGGER.throwableAtError().log(ex, CoreException::from);
}
}));
}, info.isHttp2()));
channel.config().setAutoRead(true);
awaitLatch(drainLatch);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private static Class<? extends SocketChannel> getChannelClass(String className)
private Duration readTimeout;
private Duration responseTimeout;
private Duration writeTimeout;
// private HttpProtocolVersion maximumHttpVersion = HttpProtocolVersion.HTTP_2;
private HttpProtocolVersion maximumHttpVersion = HttpProtocolVersion.HTTP_2;

/**
* Creates a new instance of {@link NettyHttpClientBuilder}.
Expand Down Expand Up @@ -260,26 +260,26 @@ public NettyHttpClientBuilder proxy(ProxyOptions proxyOptions) {
return this;
}

// /**
// * Sets the maximum {@link HttpProtocolVersion HTTP protocol version} that the HTTP client will support.
// * <p>
// * By default, the maximum HTTP protocol version is set to {@link HttpProtocolVersion#HTTP_2 HTTP_2}.
// * <p>
// * If {@code httpVersion} is null, it will reset the maximum HTTP protocol version to
// * {@link HttpProtocolVersion#HTTP_2 HTTP_2}.
// *
// * @param httpVersion The maximum HTTP protocol version that the HTTP client will support.
// * @return The updated {@link JdkHttpClientBuilder} object.
// */
// public NettyHttpClientBuilder maximumHttpVersion(HttpProtocolVersion httpVersion) {
// if (httpVersion != null) {
// this.maximumHttpVersion = httpVersion;
// } else {
// this.maximumHttpVersion = HttpProtocolVersion.HTTP_2;
// }
//
// return this;
// }
/**
* Sets the maximum {@link HttpProtocolVersion HTTP protocol version} that the HTTP client will support.
* <p>
* By default, the maximum HTTP protocol version is set to {@link HttpProtocolVersion#HTTP_2 HTTP_2}.
* <p>
* If {@code httpVersion} is null, it will reset the maximum HTTP protocol version to
* {@link HttpProtocolVersion#HTTP_2 HTTP_2}.
*
* @param httpVersion The maximum HTTP protocol version that the HTTP client will support.
* @return The updated builder.
*/
public NettyHttpClientBuilder maximumHttpVersion(HttpProtocolVersion httpVersion) {
if (httpVersion != null) {
this.maximumHttpVersion = httpVersion;
} else {
this.maximumHttpVersion = HttpProtocolVersion.HTTP_2;
}

return this;
}

/**
* Builds the NettyHttpClient.
Expand Down Expand Up @@ -312,7 +312,7 @@ public HttpClient build() {
ProxyOptions buildProxyOptions
= (proxyOptions == null) ? ProxyOptions.fromConfiguration(buildConfiguration, true) : proxyOptions;

return new NettyHttpClient(bootstrap, sslContextModifier, HttpProtocolVersion.HTTP_1_1,
return new NettyHttpClient(bootstrap, sslContextModifier, maximumHttpVersion,
new ChannelInitializationProxyHandler(buildProxyOptions), getTimeoutMillis(readTimeout),
getTimeoutMillis(responseTimeout), getTimeoutMillis(writeTimeout));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

import io.clientcore.core.http.models.HttpRequest;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;

Expand All @@ -19,7 +21,6 @@

import static io.clientcore.http.netty4.implementation.Netty4Utility.createCodec;
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp11Request;
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp2Request;
import static io.clientcore.http.netty4.implementation.Netty4Utility.setOrSuppressError;

/**
Expand All @@ -28,23 +29,22 @@
*/
public final class Netty4AlpnHandler extends ApplicationProtocolNegotiationHandler {
private final HttpRequest request;
private final boolean addProgressAndTimeoutHandler;
private final AtomicReference<ResponseStateInfo> responseReference;
private final AtomicReference<Throwable> errorReference;
private final CountDownLatch latch;

/**
* Creates a new instance of {@link Netty4AlpnHandler} with a fallback to using HTTP/1.1.
*
* @param request The request to send once ALPN negotiation completes.
* @param addProgressAndTimeoutHandler Whether the progress and timeout handler was added to the ChannelPipeline.
* @param errorReference An AtomicReference keeping track of errors during the request lifecycle.
* @param latch A CountDownLatch that will be released once the request completes.
*/
public Netty4AlpnHandler(HttpRequest request, boolean addProgressAndTimeoutHandler,
public Netty4AlpnHandler(HttpRequest request, AtomicReference<ResponseStateInfo> responseReference,
AtomicReference<Throwable> errorReference, CountDownLatch latch) {
super(ApplicationProtocolNames.HTTP_1_1);
this.request = request;
this.addProgressAndTimeoutHandler = addProgressAndTimeoutHandler;
this.responseReference = responseReference;
this.errorReference = errorReference;
this.latch = latch;
}
Expand All @@ -57,16 +57,38 @@ public boolean isSharable() {
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
FlushConsolidationHandler flushConsolidationHandler = new FlushConsolidationHandler(1024, true);
Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient().validateHeaders(true).build();
Http2MultiplexHandler http2MultiplexHandler = new Http2MultiplexHandler(NoOpHandler.INSTANCE);
ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(Netty4HandlerNames.SSL, Netty4HandlerNames.HTTP_2_FLUSH, flushConsolidationHandler);
pipeline.addAfter(Netty4HandlerNames.HTTP_2_FLUSH, Netty4HandlerNames.HTTP_2_CODEC, http2FrameCodec);
pipeline.addAfter(Netty4HandlerNames.HTTP_2_CODEC, Netty4HandlerNames.HTTP_2_MULTIPLEX,
http2MultiplexHandler);
// TODO (alzimmer): InboundHttp2ToHttpAdapter buffers the entire response into a FullHttpResponse. Need to
// create a streaming version of this to support huge response payloads.
Http2Connection http2Connection = new DefaultHttp2Connection(false);
HttpToHttp2ConnectionHandler connectionHandler = new HttpToHttp2ConnectionHandlerBuilder()
.initialSettings(new Http2Settings().headerTableSize(4096)
.maxHeaderListSize(256 * 1024)
.pushEnabled(false)
.initialWindowSize(256 * 1024))
.frameListener(new DelegatingDecompressorFrameListener(http2Connection,
new InboundHttp2ToHttpAdapterBuilder(http2Connection).maxContentLength(Integer.MAX_VALUE)
.propagateSettings(true)
.validateHttpHeaders(true)
.build()))
.connection(http2Connection)
.validateHeaders(true)
.build();

sendHttp2Request(request, ctx.channel(), addProgressAndTimeoutHandler, errorReference)
if (ctx.pipeline().get(Netty4HandlerNames.PROGRESS_AND_TIMEOUT) != null) {
ctx.pipeline()
.addAfter(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_RESPONSE,
new Netty4ResponseHandler(request, responseReference, errorReference, latch));
ctx.pipeline()
.addBefore(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_CODEC,
connectionHandler);
} else {
ctx.pipeline().addAfter(Netty4HandlerNames.SSL, Netty4HandlerNames.HTTP_CODEC, connectionHandler);
ctx.pipeline()
.addAfter(Netty4HandlerNames.HTTP_CODEC, Netty4HandlerNames.HTTP_RESPONSE,
new Netty4ResponseHandler(request, responseReference, errorReference, latch));
}

sendHttp11Request(request, ctx.channel(), errorReference)
.addListener((ChannelFutureListener) sendListener -> {
if (!sendListener.isSuccess()) {
setOrSuppressError(errorReference, sendListener.cause());
Expand All @@ -77,9 +99,20 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
}
});
} else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
ctx.pipeline().addAfter(Netty4HandlerNames.SSL, Netty4HandlerNames.HTTP_1_1_CODEC, createCodec());
if (ctx.pipeline().get(Netty4HandlerNames.PROGRESS_AND_TIMEOUT) != null) {
ctx.pipeline()
.addAfter(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_RESPONSE,
new Netty4ResponseHandler(request, responseReference, errorReference, latch));
ctx.pipeline()
.addBefore(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_CODEC, createCodec());
} else {
ctx.pipeline().addAfter(Netty4HandlerNames.SSL, Netty4HandlerNames.HTTP_CODEC, createCodec());
ctx.pipeline()
.addAfter(Netty4HandlerNames.HTTP_CODEC, Netty4HandlerNames.HTTP_RESPONSE,
new Netty4ResponseHandler(request, responseReference, errorReference, latch));
}

sendHttp11Request(request, ctx.channel(), addProgressAndTimeoutHandler, errorReference)
sendHttp11Request(request, ctx.channel(), errorReference)
.addListener((ChannelFutureListener) sendListener -> {
if (!sendListener.isSuccess()) {
setOrSuppressError(errorReference, sendListener.cause());
Expand All @@ -93,13 +126,4 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
throw new IllegalStateException("unknown protocol: " + protocol);
}
}

private static final class NoOpHandler extends ChannelHandlerAdapter {
private static final NoOpHandler INSTANCE = new NoOpHandler();

@Override
public boolean isSharable() {
return true;
}
}
}
Loading
Loading