From 0b893c43c6bdcc2a744f2dd6b1e04b857e4414ef Mon Sep 17 00:00:00 2001 From: "Sean C. Sullivan" Date: Sun, 2 Jul 2023 19:52:51 -0700 Subject: [PATCH] brotli support in HttpClient --- .../http/client/Http2ConnectionProvider.java | 10 +++- .../reactor/netty/http/client/HttpClient.java | 13 +++- .../netty/http/client/HttpClientConfig.java | 59 ++++++++++++++----- .../HttpCompressionClientServerTests.java | 8 ++- .../netty/http/client/HttpClientTest.java | 52 +++++++++++++++- 5 files changed, 118 insertions(+), 24 deletions(-) diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 1b25507f94..62f009d5af 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -118,16 +118,18 @@ protected CoreSubscriber> createDisposableAcquire( MonoSink sink, Context currentContext) { boolean acceptGzip = false; + boolean acceptBrotli = false; ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null; SocketAddress proxyAddress = ((ClientTransportConfig) config).proxyProvider() != null ? ((ClientTransportConfig) config).proxyProvider().getAddress().get() : null; Function uriTagValue = null; if (config instanceof HttpClientConfig) { acceptGzip = ((HttpClientConfig) config).acceptGzip; + acceptBrotli = ((HttpClientConfig) config).acceptBrotli; uriTagValue = ((HttpClientConfig) config).uriTagValue; } return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(), - acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, sink, currentContext, uriTagValue); + acceptGzip, acceptBrotli, metricsRecorder, pendingAcquireTimeout, pool, proxyAddress, remoteAddress, sink, currentContext, uriTagValue); } @Override @@ -238,6 +240,7 @@ static final class DisposableAcquire final ConnectionObserver obs; final ChannelOperations.OnSetup opsFactory; final boolean acceptGzip; + final boolean acceptBrotli; final ChannelMetricsRecorder metricsRecorder; final long pendingAcquireTimeout; final InstrumentedPool pool; @@ -254,6 +257,7 @@ static final class DisposableAcquire ConnectionObserver obs, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, long pendingAcquireTimeout, InstrumentedPool pool, @@ -267,6 +271,7 @@ static final class DisposableAcquire this.obs = obs; this.opsFactory = opsFactory; this.acceptGzip = acceptGzip; + this.acceptBrotli = acceptBrotli; this.metricsRecorder = metricsRecorder; this.pendingAcquireTimeout = pendingAcquireTimeout; this.pool = pool; @@ -283,6 +288,7 @@ static final class DisposableAcquire this.obs = parent.obs; this.opsFactory = parent.opsFactory; this.acceptGzip = parent.acceptGzip; + this.acceptBrotli = parent.acceptBrotli; this.metricsRecorder = parent.metricsRecorder; this.pendingAcquireTimeout = parent.pendingAcquireTimeout; this.pool = parent.pool; @@ -412,7 +418,7 @@ public void operationComplete(Future future) { setChannelContext(ch, currentContext()); } HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())), - opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue); + opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue); ChannelOperations ops = ChannelOperations.get(ch); if (ops != null) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java index c35dc57083..116d855560 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java @@ -29,6 +29,7 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; @@ -515,7 +516,13 @@ public final HttpClient baseUrl(String baseUrl) { * @return a new {@link HttpClient} */ public final HttpClient compress(boolean compressionEnabled) { + configuration().headers.remove(HttpHeaderNames.ACCEPT_ENCODING); if (compressionEnabled) { + configuration().acceptBrotli = Brotli.isAvailable(); + if (configuration().acceptBrotli) { + configuration().headers.add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR); + } + if (!configuration().acceptGzip) { HttpClient dup = duplicate(); HttpHeaders headers = configuration().headers.copy(); @@ -525,7 +532,7 @@ public final HttpClient compress(boolean compressionEnabled) { return dup; } } - else if (configuration().acceptGzip) { + else if (configuration().acceptGzip || configuration().acceptBrotli) { HttpClient dup = duplicate(); if (isCompressing(configuration().headers)) { HttpHeaders headers = configuration().headers.copy(); @@ -533,6 +540,7 @@ else if (configuration().acceptGzip) { dup.configuration().headers = headers; } dup.configuration().acceptGzip = false; + dup.configuration().acceptBrotli = false; return dup; } return this; @@ -1602,7 +1610,8 @@ public final HttpClient wiretap(boolean enable) { } static boolean isCompressing(HttpHeaders h) { - return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true); + return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true) + || h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR, true); } static String reactorNettyVersion() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index b05b1a02fc..28e2659212 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -117,6 +117,7 @@ public String baseUrl() { public int channelHash() { int result = super.channelHash(); result = 31 * result + Boolean.hashCode(acceptGzip); + result = 31 * result + Boolean.hashCode(acceptBrotli); result = 31 * result + Objects.hashCode(decoder); result = 31 * result + _protocols; result = 31 * result + Objects.hashCode(sslProvider); @@ -202,6 +203,15 @@ public boolean isAcceptGzip() { return acceptGzip; } + /** + * Return whether Brotli compression is enabled. + * + * @return whether Brotli compression is enabled + */ + public boolean isAcceptBrotli() { + return acceptBrotli; + } + /** * Return true if {@code retry once} is disabled, false otherwise. * @@ -314,6 +324,7 @@ public WebsocketClientSpec websocketClientSpec() { // Protected/Package private write API boolean acceptGzip; + boolean acceptBrotli; String baseUrl; BiFunction> body; Function, ? extends Mono> connector; @@ -349,6 +360,7 @@ public WebsocketClientSpec websocketClientSpec() { Supplier remoteAddress) { super(connectionProvider, options, remoteAddress); this.acceptGzip = false; + this.acceptBrotli = false; this.cookieDecoder = ClientCookieDecoder.STRICT; this.cookieEncoder = ClientCookieEncoder.STRICT; this.decoder = new HttpResponseDecoderSpec(); @@ -363,6 +375,7 @@ public WebsocketClientSpec websocketClientSpec() { HttpClientConfig(HttpClientConfig parent) { super(parent); this.acceptGzip = parent.acceptGzip; + this.acceptBrotli = parent.acceptBrotli; this.baseUrl = parent.baseUrl; this.body = parent.body; this.connector = parent.connector; @@ -537,6 +550,7 @@ static void addStreamHandlers( ConnectionObserver obs, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, @@ -551,7 +565,7 @@ static void addStreamHandlers( pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT) .addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER); - if (acceptGzip) { + if (acceptGzip || acceptBrotli) { pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); } @@ -617,7 +631,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { } } - static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder, + static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, boolean acceptBrotli, HttpResponseDecoderSpec decoder, Http2Settings http2Settings, ConnectionObserver observer) { Http2FrameCodecBuilder http2FrameCodecBuilder = Http2FrameCodecBuilder.forClient() @@ -639,6 +653,7 @@ static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpRe static void configureHttp11OrH2CleartextPipeline( ChannelPipeline p, boolean acceptGzip, + boolean acceptBrotli, HttpResponseDecoderSpec decoder, Http2Settings http2Settings, @Nullable ChannelMetricsRecorder metricsRecorder, @@ -670,7 +685,7 @@ static void configureHttp11OrH2CleartextPipeline( Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build(); Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec, - new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, uriTagValue)); + new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, uriTagValue)); HttpClientUpgradeHandler upgradeHandler = new ReactorNettyHttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); @@ -679,7 +694,7 @@ static void configureHttp11OrH2CleartextPipeline( .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer)); - if (acceptGzip) { + if (acceptGzip || acceptBrotli) { p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); } @@ -704,6 +719,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { @SuppressWarnings("deprecation") static void configureHttp11Pipeline(ChannelPipeline p, boolean acceptGzip, + boolean acceptBrotli, HttpResponseDecoderSpec decoder, @Nullable ChannelMetricsRecorder metricsRecorder, @Nullable SocketAddress proxyAddress, @@ -720,7 +736,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, NettyPipeline.HttpCodec, new HttpClientCodec(decoderConfig, decoder.failOnMissingResponse, decoder.parseHttpAfterConnectRequest)); - if (acceptGzip) { + if (acceptGzip || acceptBrotli) { p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); } @@ -779,6 +795,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { static final class H2CleartextCodec extends ChannelHandlerAdapter { final boolean acceptGzip; + final boolean acceptBrotli; final Http2FrameCodec http2FrameCodec; final ChannelMetricsRecorder metricsRecorder; final ChannelOperations.OnSetup opsFactory; @@ -790,11 +807,13 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { Http2FrameCodec http2FrameCodec, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, @Nullable Function uriTagValue) { this.acceptGzip = acceptGzip; + this.acceptBrotli = acceptBrotli; this.http2FrameCodec = http2FrameCodec; this.metricsRecorder = metricsRecorder; this.opsFactory = opsFactory; @@ -819,12 +838,12 @@ public void handlerAdded(ChannelHandlerContext ctx) { if (responseTimeoutHandler != null) { pipeline.remove(NettyPipeline.ResponseTimeoutHandler); http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE, - new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, proxyAddress, + new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutHandler.getReaderIdleTimeInMillis(), uriTagValue)); } else { http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE, - new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, uriTagValue)); + new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, uriTagValue)); } pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec) .addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler, http2MultiplexHandler); @@ -839,6 +858,7 @@ public void handlerAdded(ChannelHandlerContext ctx) { static final class H2Codec extends ChannelInitializer { final boolean acceptGzip; + final boolean acceptBrotli; final ChannelMetricsRecorder metricsRecorder; final ConnectionObserver observer; final ChannelOperations.OnSetup opsFactory; @@ -853,12 +873,13 @@ static final class H2Codec extends ChannelInitializer { @Nullable ConnectionObserver observer, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, @Nullable Function uriTagValue) { // Handle outbound and upgrade streams - this(owner, observer, opsFactory, acceptGzip, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue); + this(owner, observer, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, -1, uriTagValue); } H2Codec( @@ -866,6 +887,7 @@ static final class H2Codec extends ChannelInitializer { @Nullable ConnectionObserver observer, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, @Nullable SocketAddress proxyAddress, SocketAddress remoteAddress, @@ -873,6 +895,7 @@ static final class H2Codec extends ChannelInitializer { @Nullable Function uriTagValue) { // Handle outbound and upgrade streams this.acceptGzip = acceptGzip; + this.acceptBrotli = acceptBrotli; this.metricsRecorder = metricsRecorder; this.observer = observer; this.opsFactory = opsFactory; @@ -891,7 +914,7 @@ protected void initChannel(Channel ch) { setChannelContext(ch, owner.currentContext()); } addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory, - acceptGzip, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutMillis, uriTagValue); + acceptGzip, acceptBrotli, metricsRecorder, proxyAddress, remoteAddress, responseTimeoutMillis, uriTagValue); } else { // Handle server pushes (inbound streams) @@ -915,6 +938,7 @@ public boolean isSharable() { static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter { final boolean acceptGzip; + final boolean acceptBrotli; final HttpResponseDecoderSpec decoder; final Http2Settings http2Settings; final ChannelMetricsRecorder metricsRecorder; @@ -925,6 +949,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter { H2OrHttp11Codec(HttpClientChannelInitializer initializer, ConnectionObserver observer, SocketAddress remoteAddress) { this.acceptGzip = initializer.acceptGzip; + this.acceptBrotli = initializer.acceptBrotli; this.decoder = initializer.decoder; this.http2Settings = initializer.http2Settings; this.metricsRecorder = initializer.metricsRecorder; @@ -945,10 +970,10 @@ public void channelActive(ChannelHandlerContext ctx) { log.debug(format(ctx.channel(), "Negotiated application-level protocol [" + protocol + "]")); } if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { - configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, http2Settings, observer); + configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer); } else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { - configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); + configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); } else { throw new IllegalStateException("unknown protocol: " + protocol); @@ -967,6 +992,7 @@ else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { static final class HttpClientChannelInitializer implements ChannelPipelineConfigurer { final boolean acceptGzip; + final boolean acceptBrotli; final HttpResponseDecoderSpec decoder; final Http2Settings http2Settings; final ChannelMetricsRecorder metricsRecorder; @@ -978,6 +1004,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig HttpClientChannelInitializer(HttpClientConfig config) { this.acceptGzip = config.acceptGzip; + this.acceptBrotli = config.acceptBrotli; this.decoder = config.decoder; this.http2Settings = config.http2Settings(); this.metricsRecorder = config.metricsRecorderInternal(); @@ -999,21 +1026,21 @@ public void onChannelInit(ConnectionObserver observer, Channel channel, @Nullabl new H2OrHttp11Codec(this, observer, remoteAddress)); } else if ((protocols & h11) == h11) { - configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); + configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); } else if ((protocols & h2) == h2) { - configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer); + configureHttp2Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer); } } else { if ((protocols & h11orH2C) == h11orH2C) { - configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, metricsRecorder, observer, opsFactory, proxyAddress, remoteAddress, uriTagValue); + configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, metricsRecorder, observer, opsFactory, proxyAddress, remoteAddress, uriTagValue); } else if ((protocols & h11) == h11) { - configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); + configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, proxyAddress, remoteAddress, uriTagValue); } else if ((protocols & h2c) == h2c) { - configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer); + configureHttp2Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer); } } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java index bbd2092d3a..e6a11c652a 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java @@ -22,11 +22,13 @@ import java.lang.annotation.Target; import java.nio.charset.Charset; import java.time.Duration; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.zip.GZIPInputStream; import com.aayushatharva.brotli4j.decoder.DecoderJNI; import com.aayushatharva.brotli4j.decoder.DirectDecompress; +import com.google.common.collect.ImmutableList; import io.netty.buffer.Unpooled; import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.compression.Zstd; @@ -447,7 +449,7 @@ void compressionServerDefaultClientDefaultIsNone(HttpServer server, HttpClient c @ParameterizedCompressionTest void compressionActivatedOnClientAddsHeader(HttpServer server, HttpClient client) { - AtomicReference zip = new AtomicReference<>("fail"); + AtomicReference> acceptEncodingHeaderValues = new AtomicReference<>(ImmutableList.of("fail")); disposableServer = server.compress(true) @@ -455,13 +457,13 @@ void compressionActivatedOnClientAddsHeader(HttpServer server, HttpClient client .bindNow(Duration.ofSeconds(10)); client.port(disposableServer.port()) .compress(true) - .headers(h -> zip.set(h.get("accept-encoding"))) + .headers(h -> acceptEncodingHeaderValues.set(h.getAll("accept-encoding"))) .get() .uri("/test") .responseContent() .blockLast(Duration.ofSeconds(10)); - assertThat(zip.get()).isEqualTo("gzip"); + assertThat(acceptEncodingHeaderValues.get()).isEqualTo(ImmutableList.of("br", "gzip")); } @ParameterizedCompressionTest diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index ac6a946cdf..348198cd91 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,6 +69,7 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; +import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpContentDecompressor; @@ -159,6 +160,11 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx .get(30, TimeUnit.SECONDS); } + @BeforeAll + static void verifyNettyBrotliIsAvailable() { + assertThat(Brotli.isAvailable()).isTrue(); + } + @Test void abort() { disposableServer = @@ -472,6 +478,46 @@ void gzip() { .verify(Duration.ofSeconds(30)); } + @Test + void brotliEnabled() { + doTestBrotli(true); + } + + @Test + void brotliDisabled() { + doTestBrotli(false); + } + + private void doTestBrotli(boolean brotliEnabled) { + String expectedResponse = brotliEnabled ? "br" : "no brotli"; + disposableServer = + createServer() + .compress(true) + .handle((req, res) -> res.sendString(Mono.just(req.requestHeaders() + .get(HttpHeaderNames.ACCEPT_ENCODING, + "no brotli")))) + .bindNow(); + HttpClient client = createHttpClientForContextWithPort(); + + if (brotliEnabled) { + assertThat(Brotli.isAvailable()).isTrue(); + client = client.compress(true); + client.configuration().headers = client.configuration().headers() + .set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR); + } + + StepVerifier.create(client.get() + .uri("/") + .response((r, buf) -> buf.asString() + .elementAt(0) + .zipWith(Mono.just(r)))) + .expectNextMatches(tuple -> + expectedResponse.equals(tuple.getT1()) + && (tuple.getT2().status().code() == 200)) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + @Test void gzipEnabled() { doTestGzip(true); @@ -491,9 +537,13 @@ private void doTestGzip(boolean gzipEnabled) { "no gzip")))) .bindNow(); HttpClient client = createHttpClientForContextWithPort(); + client.configuration().acceptBrotli = false; + client.configuration().acceptGzip = gzipEnabled; if (gzipEnabled) { client = client.compress(true); + client.configuration().headers = client.configuration().headers() + .set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); } StepVerifier.create(client.get()