Skip to content

Commit

Permalink
brotli support in HttpClient
Browse files Browse the repository at this point in the history
  • Loading branch information
sullis committed Feb 7, 2024
1 parent 9836ec8 commit b78270a
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,16 @@ protected CoreSubscriber<PooledRef<Connection>> createDisposableAcquire(
MonoSink<Connection> sink,
Context currentContext) {
boolean acceptGzip = false;
boolean acceptBrotli = false;
ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null;
Function<String, String> uriTagValue = null;
if (config instanceof HttpClientConfig) {
acceptGzip = ((HttpClientConfig) config).acceptGzip;
acceptBrotli = ((HttpClientConfig) config).acceptBrotli;
uriTagValue = ((HttpClientConfig) config).uriTagValue;
}
return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(),
acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, sink, currentContext, uriTagValue);
acceptGzip, acceptBrotli, metricsRecorder, pendingAcquireTimeout, pool, sink, currentContext, uriTagValue);
}

@Override
Expand Down Expand Up @@ -223,6 +225,7 @@ static final class DisposableAcquire
final ConnectionObserver obs;
final ChannelOperations.OnSetup opsFactory;
final boolean acceptGzip;
final boolean acceptBrotli;
final ChannelMetricsRecorder metricsRecorder;
final long pendingAcquireTimeout;
final InstrumentedPool<Connection> pool;
Expand All @@ -237,6 +240,7 @@ static final class DisposableAcquire
ConnectionObserver obs,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
long pendingAcquireTimeout,
InstrumentedPool<Connection> pool,
Expand All @@ -248,6 +252,7 @@ static final class DisposableAcquire
this.obs = obs;
this.opsFactory = opsFactory;
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.metricsRecorder = metricsRecorder;
this.pendingAcquireTimeout = pendingAcquireTimeout;
this.pool = pool;
Expand All @@ -262,6 +267,7 @@ static final class DisposableAcquire
this.obs = parent.obs;
this.opsFactory = parent.opsFactory;
this.acceptGzip = parent.acceptGzip;
this.acceptBrotli = parent.acceptBrotli;
this.metricsRecorder = parent.metricsRecorder;
this.pendingAcquireTimeout = parent.pendingAcquireTimeout;
this.pool = parent.pool;
Expand Down Expand Up @@ -383,7 +389,7 @@ public void operationComplete(Future<Http2StreamChannel> future) {
setChannelContext(ch, currentContext());
}
HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())),
opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue);
opsFactory, acceptGzip, acceptBrotli, metricsRecorder, -1, uriTagValue);

ChannelOperations<?, ?> ops = ChannelOperations.get(ch);
if (ops != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.compression.Brotli;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
Expand Down Expand Up @@ -515,7 +516,13 @@ public final HttpClient baseUrl(String baseUrl) {
* @return a new {@link HttpClient}
*/
public final HttpClient compress(boolean compressionEnabled) {
configuration().headers.remove(HttpHeaderNames.ACCEPT_ENCODING);
if (compressionEnabled) {
configuration().acceptBrotli = Brotli.isAvailable();
if (configuration().acceptBrotli) {
configuration().headers.add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR);
}

if (!configuration().acceptGzip) {
HttpClient dup = duplicate();
HttpHeaders headers = configuration().headers.copy();
Expand All @@ -525,14 +532,15 @@ public final HttpClient compress(boolean compressionEnabled) {
return dup;
}
}
else if (configuration().acceptGzip) {
else if (configuration().acceptGzip || configuration().acceptBrotli) {
HttpClient dup = duplicate();
if (isCompressing(configuration().headers)) {
HttpHeaders headers = configuration().headers.copy();
headers.remove(HttpHeaderNames.ACCEPT_ENCODING);
dup.configuration().headers = headers;
}
dup.configuration().acceptGzip = false;
dup.configuration().acceptBrotli = false;
return dup;
}
return this;
Expand Down Expand Up @@ -1594,7 +1602,8 @@ public final HttpClient wiretap(boolean enable) {
}

static boolean isCompressing(HttpHeaders h) {
return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true);
return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true)
|| h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR, true);
}

static String reactorNettyVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public String baseUrl() {
public int channelHash() {
int result = super.channelHash();
result = 31 * result + Boolean.hashCode(acceptGzip);
result = 31 * result + Boolean.hashCode(acceptBrotli);
result = 31 * result + Objects.hashCode(decoder);
result = 31 * result + _protocols;
result = 31 * result + Objects.hashCode(sslProvider);
Expand Down Expand Up @@ -201,6 +202,15 @@ public boolean isAcceptGzip() {
return acceptGzip;
}

/**
* Return whether Brotli compression is enabled.
*
* @return whether Brotli compression is enabled
*/
public boolean isAcceptBrotli() {
return acceptBrotli;
}

/**
* Return true if {@code retry once} is disabled, false otherwise.
*
Expand Down Expand Up @@ -313,6 +323,7 @@ public WebsocketClientSpec websocketClientSpec() {
// Protected/Package private write API

boolean acceptGzip;
boolean acceptBrotli;
String baseUrl;
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> body;
Function<? super Mono<? extends Connection>, ? extends Mono<? extends Connection>> connector;
Expand Down Expand Up @@ -348,6 +359,7 @@ public WebsocketClientSpec websocketClientSpec() {
Supplier<? extends SocketAddress> remoteAddress) {
super(connectionProvider, options, remoteAddress);
this.acceptGzip = false;
this.acceptBrotli = false;
this.cookieDecoder = ClientCookieDecoder.STRICT;
this.cookieEncoder = ClientCookieEncoder.STRICT;
this.decoder = new HttpResponseDecoderSpec();
Expand All @@ -362,6 +374,7 @@ public WebsocketClientSpec websocketClientSpec() {
HttpClientConfig(HttpClientConfig parent) {
super(parent);
this.acceptGzip = parent.acceptGzip;
this.acceptBrotli = parent.acceptBrotli;
this.baseUrl = parent.baseUrl;
this.body = parent.body;
this.connector = parent.connector;
Expand Down Expand Up @@ -536,6 +549,7 @@ static void addStreamHandlers(
ConnectionObserver obs,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
long responseTimeoutMillis,
@Nullable Function<String, String> uriTagValue) {
Expand All @@ -548,7 +562,7 @@ static void addStreamHandlers(
pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT)
.addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER);

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand Down Expand Up @@ -602,7 +616,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
}
}

static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder,
static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, boolean acceptBrotli, HttpResponseDecoderSpec decoder,
Http2Settings http2Settings, ConnectionObserver observer) {
Http2FrameCodecBuilder http2FrameCodecBuilder =
Http2FrameCodecBuilder.forClient()
Expand All @@ -624,6 +638,7 @@ static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpRe
static void configureHttp11OrH2CleartextPipeline(
ChannelPipeline p,
boolean acceptGzip,
boolean acceptBrotli,
HttpResponseDecoderSpec decoder,
Http2Settings http2Settings,
@Nullable ChannelMetricsRecorder metricsRecorder,
Expand Down Expand Up @@ -654,7 +669,7 @@ static void configureHttp11OrH2CleartextPipeline(
Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build();

Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec,
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue));
new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, uriTagValue));

HttpClientUpgradeHandler upgradeHandler =
new ReactorNettyHttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength());
Expand All @@ -663,7 +678,7 @@ static void configureHttp11OrH2CleartextPipeline(
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler)
.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer));

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand All @@ -688,6 +703,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
@SuppressWarnings("deprecation")
static void configureHttp11Pipeline(ChannelPipeline p,
boolean acceptGzip,
boolean acceptBrotli,
HttpResponseDecoderSpec decoder,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
Expand All @@ -703,7 +719,7 @@ static void configureHttp11Pipeline(ChannelPipeline p,
decoder.parseHttpAfterConnectRequest,
decoder.allowDuplicateContentLengths()));

if (acceptGzip) {
if (acceptGzip || acceptBrotli) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}

Expand Down Expand Up @@ -762,6 +778,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) {
static final class H2CleartextCodec extends ChannelHandlerAdapter {

final boolean acceptGzip;
final boolean acceptBrotli;
final Http2FrameCodec http2FrameCodec;
final ChannelMetricsRecorder metricsRecorder;
final ChannelOperations.OnSetup opsFactory;
Expand All @@ -771,9 +788,11 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter {
Http2FrameCodec http2FrameCodec,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.http2FrameCodec = http2FrameCodec;
this.metricsRecorder = metricsRecorder;
this.opsFactory = opsFactory;
Expand All @@ -796,12 +815,12 @@ public void handlerAdded(ChannelHandlerContext ctx) {
if (responseTimeoutHandler != null) {
pipeline.remove(NettyPipeline.ResponseTimeoutHandler);
http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE,
new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder,
new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder,
responseTimeoutHandler.getReaderIdleTimeInMillis(), uriTagValue));
}
else {
http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE,
new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, uriTagValue));
new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, uriTagValue));
}
pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec)
.addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler, http2MultiplexHandler);
Expand All @@ -816,6 +835,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
static final class H2Codec extends ChannelInitializer<Channel> {

final boolean acceptGzip;
final boolean acceptBrotli;
final ChannelMetricsRecorder metricsRecorder;
final ConnectionObserver observer;
final ChannelOperations.OnSetup opsFactory;
Expand All @@ -828,22 +848,25 @@ static final class H2Codec extends ChannelInitializer<Channel> {
@Nullable ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
// Handle outbound and upgrade streams
this(owner, observer, opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue);
this(owner, observer, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, -1, uriTagValue);
}

H2Codec(
@Nullable Http2ConnectionProvider.DisposableAcquire owner,
@Nullable ConnectionObserver observer,
ChannelOperations.OnSetup opsFactory,
boolean acceptGzip,
boolean acceptBrotli,
@Nullable ChannelMetricsRecorder metricsRecorder,
long responseTimeoutMillis,
@Nullable Function<String, String> uriTagValue) {
// Handle outbound and upgrade streams
this.acceptGzip = acceptGzip;
this.acceptBrotli = acceptBrotli;
this.metricsRecorder = metricsRecorder;
this.observer = observer;
this.opsFactory = opsFactory;
Expand All @@ -860,7 +883,7 @@ protected void initChannel(Channel ch) {
setChannelContext(ch, owner.currentContext());
}
addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory,
acceptGzip, metricsRecorder, responseTimeoutMillis, uriTagValue);
acceptGzip, acceptBrotli, metricsRecorder, responseTimeoutMillis, uriTagValue);
}
else {
// Handle server pushes (inbound streams)
Expand All @@ -884,6 +907,7 @@ public boolean isSharable() {

static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {
final boolean acceptGzip;
final boolean acceptBrotli;
final HttpResponseDecoderSpec decoder;
final Http2Settings http2Settings;
final ChannelMetricsRecorder metricsRecorder;
Expand All @@ -892,6 +916,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter {

H2OrHttp11Codec(HttpClientChannelInitializer initializer, ConnectionObserver observer) {
this.acceptGzip = initializer.acceptGzip;
this.acceptBrotli = initializer.acceptBrotli;
this.decoder = initializer.decoder;
this.http2Settings = initializer.http2Settings;
this.metricsRecorder = initializer.metricsRecorder;
Expand All @@ -910,10 +935,10 @@ public void channelActive(ChannelHandlerContext ctx) {
log.debug(format(ctx.channel(), "Negotiated application-level protocol [" + protocol + "]"));
}
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, http2Settings, observer);
configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer);
}
else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, metricsRecorder, uriTagValue);
configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, uriTagValue);
}
else {
throw new IllegalStateException("unknown protocol: " + protocol);
Expand All @@ -932,6 +957,7 @@ else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
static final class HttpClientChannelInitializer implements ChannelPipelineConfigurer {

final boolean acceptGzip;
final boolean acceptBrotli;
final HttpResponseDecoderSpec decoder;
final Http2Settings http2Settings;
final ChannelMetricsRecorder metricsRecorder;
Expand All @@ -942,6 +968,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig

HttpClientChannelInitializer(HttpClientConfig config) {
this.acceptGzip = config.acceptGzip;
this.acceptBrotli = config.acceptBrotli;
this.decoder = config.decoder;
this.http2Settings = config.http2Settings();
this.metricsRecorder = config.metricsRecorderInternal();
Expand All @@ -962,21 +989,21 @@ public void onChannelInit(ConnectionObserver observer, Channel channel, @Nullabl
new H2OrHttp11Codec(this, observer));
}
else if ((protocols & h11) == h11) {
configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, uriTagValue);
configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, uriTagValue);
}
else if ((protocols & h2) == h2) {
configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer);
configureHttp2Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer);
}
}
else {
if ((protocols & h11orH2C) == h11orH2C) {
configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, metricsRecorder, observer, opsFactory, uriTagValue);
configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, metricsRecorder, observer, opsFactory, uriTagValue);
}
else if ((protocols & h11) == h11) {
configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, uriTagValue);
configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, uriTagValue);
}
else if ((protocols & h2c) == h2c) {
configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer);
configureHttp2Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer);
}
}
}
Expand Down
Loading

0 comments on commit b78270a

Please sign in to comment.