diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java index 5d83544e2c..b3c19d5332 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -106,14 +106,16 @@ protected CoreSubscriber> createDisposableAcquire( MonoSink sink, Context currentContext) { boolean acceptGzip = false; + boolean acceptBrotli = false; ChannelMetricsRecorder metricsRecorder = config.metricsRecorder() != null ? config.metricsRecorder().get() : null; Function uriTagValue = null; if (config instanceof HttpClientConfig) { acceptGzip = ((HttpClientConfig) config).acceptGzip; + acceptBrotli = ((HttpClientConfig) config).acceptBrotli; uriTagValue = ((HttpClientConfig) config).uriTagValue; } return new DisposableAcquire(connectionObserver, config.channelOperationsProvider(), - acceptGzip, metricsRecorder, pendingAcquireTimeout, pool, sink, currentContext, uriTagValue); + acceptGzip, acceptBrotli, metricsRecorder, pendingAcquireTimeout, pool, sink, currentContext, uriTagValue); } @Override @@ -214,6 +216,7 @@ static final class DisposableAcquire final ConnectionObserver obs; final ChannelOperations.OnSetup opsFactory; final boolean acceptGzip; + final boolean acceptBrotli; final ChannelMetricsRecorder metricsRecorder; final long pendingAcquireTimeout; final InstrumentedPool pool; @@ -228,6 +231,7 @@ static final class DisposableAcquire ConnectionObserver obs, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, long pendingAcquireTimeout, InstrumentedPool pool, @@ -239,6 +243,7 @@ static final class DisposableAcquire this.obs = obs; this.opsFactory = opsFactory; this.acceptGzip = acceptGzip; + this.acceptBrotli = acceptBrotli; this.metricsRecorder = metricsRecorder; this.pendingAcquireTimeout = pendingAcquireTimeout; this.pool = pool; @@ -253,6 +258,7 @@ static final class DisposableAcquire this.obs = parent.obs; this.opsFactory = parent.opsFactory; this.acceptGzip = parent.acceptGzip; + this.acceptBrotli = parent.acceptBrotli; this.metricsRecorder = parent.metricsRecorder; this.pendingAcquireTimeout = parent.pendingAcquireTimeout; this.pool = parent.pool; @@ -374,7 +380,7 @@ public void operationComplete(Future future) { setChannelContext(ch, currentContext()); } HttpClientConfig.addStreamHandlers(ch, obs.then(new HttpClientConfig.StreamConnectionObserver(currentContext())), - opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue); + opsFactory, acceptGzip, acceptBrotli, metricsRecorder, -1, uriTagValue); ChannelOperations ops = ChannelOperations.get(ch); if (ops != null) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java index 52f0d67ca9..1e37546827 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClient.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import io.netty.buffer.ByteBuf; import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; @@ -515,7 +516,13 @@ public final HttpClient baseUrl(String baseUrl) { * @return a new {@link HttpClient} */ public final HttpClient compress(boolean compressionEnabled) { + configuration().headers.remove(HttpHeaderNames.ACCEPT_ENCODING); if (compressionEnabled) { + configuration().acceptBrotli = Brotli.isAvailable(); + if (configuration().acceptBrotli) { + configuration().headers.add(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR); + } + if (!configuration().acceptGzip) { HttpClient dup = duplicate(); HttpHeaders headers = configuration().headers.copy(); @@ -525,7 +532,7 @@ public final HttpClient compress(boolean compressionEnabled) { return dup; } } - else if (configuration().acceptGzip) { + else if (configuration().acceptGzip || configuration().acceptBrotli) { HttpClient dup = duplicate(); if (isCompressing(configuration().headers)) { HttpHeaders headers = configuration().headers.copy(); @@ -533,6 +540,7 @@ else if (configuration().acceptGzip) { dup.configuration().headers = headers; } dup.configuration().acceptGzip = false; + dup.configuration().acceptBrotli = false; return dup; } return this; @@ -1590,7 +1598,8 @@ public final HttpClient wiretap(boolean enable) { } static boolean isCompressing(HttpHeaders h) { - return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true); + return h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP, true) + || h.contains(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR, true); } static String reactorNettyVersion() { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index 57969a1987..217bc88aad 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -112,6 +112,7 @@ public String baseUrl() { public int channelHash() { int result = super.channelHash(); result = 31 * result + Boolean.hashCode(acceptGzip); + result = 31 * result + Boolean.hashCode(acceptBrotli); result = 31 * result + Objects.hashCode(decoder); result = 31 * result + _protocols; result = 31 * result + Objects.hashCode(sslProvider); @@ -197,6 +198,15 @@ public boolean isAcceptGzip() { return acceptGzip; } + /** + * Return whether Brotli compression is enabled. + * + * @return whether Brotli compression is enabled + */ + public boolean isAcceptBrotli() { + return acceptBrotli; + } + /** * Return true if {@code retry once} is disabled, false otherwise. * @@ -309,6 +319,7 @@ public WebsocketClientSpec websocketClientSpec() { // Protected/Package private write API boolean acceptGzip; + boolean acceptBrotli; String baseUrl; BiFunction> body; Function, ? extends Mono> connector; @@ -344,6 +355,7 @@ public WebsocketClientSpec websocketClientSpec() { Supplier remoteAddress) { super(connectionProvider, options, remoteAddress); this.acceptGzip = false; + this.acceptBrotli = false; this.cookieDecoder = ClientCookieDecoder.STRICT; this.cookieEncoder = ClientCookieEncoder.STRICT; this.decoder = new HttpResponseDecoderSpec(); @@ -358,6 +370,7 @@ public WebsocketClientSpec websocketClientSpec() { HttpClientConfig(HttpClientConfig parent) { super(parent); this.acceptGzip = parent.acceptGzip; + this.acceptBrotli = parent.acceptBrotli; this.baseUrl = parent.baseUrl; this.body = parent.body; this.connector = parent.connector; @@ -532,6 +545,7 @@ static void addStreamHandlers( ConnectionObserver obs, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, long responseTimeoutMillis, @Nullable Function uriTagValue) { @@ -544,7 +558,7 @@ static void addStreamHandlers( pipeline.addLast(NettyPipeline.H2ToHttp11Codec, HTTP2_STREAM_FRAME_TO_HTTP_OBJECT) .addLast(NettyPipeline.HttpTrafficHandler, HTTP_2_STREAM_BRIDGE_CLIENT_HANDLER); - if (acceptGzip) { + if (acceptGzip || acceptBrotli) { pipeline.addLast(NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); } @@ -598,7 +612,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { } } - static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpResponseDecoderSpec decoder, + static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, boolean acceptBrotli, HttpResponseDecoderSpec decoder, Http2Settings http2Settings, ConnectionObserver observer) { Http2FrameCodecBuilder http2FrameCodecBuilder = Http2FrameCodecBuilder.forClient() @@ -620,6 +634,7 @@ static void configureHttp2Pipeline(ChannelPipeline p, boolean acceptGzip, HttpRe static void configureHttp11OrH2CleartextPipeline( ChannelPipeline p, boolean acceptGzip, + boolean acceptBrotli, HttpResponseDecoderSpec decoder, Http2Settings http2Settings, @Nullable ChannelMetricsRecorder metricsRecorder, @@ -650,7 +665,7 @@ static void configureHttp11OrH2CleartextPipeline( Http2FrameCodec http2FrameCodec = http2FrameCodecBuilder.build(); Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(http2FrameCodec, - new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, metricsRecorder, uriTagValue)); + new H2CleartextCodec(http2FrameCodec, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, uriTagValue)); HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, decoder.h2cMaxContentLength()); @@ -658,7 +673,7 @@ static void configureHttp11OrH2CleartextPipeline( .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.H2CUpgradeHandler, upgradeHandler) .addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpTrafficHandler, new HttpTrafficHandler(observer)); - if (acceptGzip) { + if (acceptGzip || acceptBrotli) { p.addBefore(NettyPipeline.ReactiveBridge, NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); } @@ -683,6 +698,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { @SuppressWarnings("deprecation") static void configureHttp11Pipeline(ChannelPipeline p, boolean acceptGzip, + boolean acceptBrotli, HttpResponseDecoderSpec decoder, @Nullable ChannelMetricsRecorder metricsRecorder, @Nullable Function uriTagValue) { @@ -698,7 +714,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, decoder.parseHttpAfterConnectRequest, decoder.allowDuplicateContentLengths())); - if (acceptGzip) { + if (acceptGzip || acceptBrotli) { p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor()); } @@ -757,6 +773,7 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder) { static final class H2CleartextCodec extends ChannelHandlerAdapter { final boolean acceptGzip; + final boolean acceptBrotli; final Http2FrameCodec http2FrameCodec; final ChannelMetricsRecorder metricsRecorder; final ChannelOperations.OnSetup opsFactory; @@ -766,9 +783,11 @@ static final class H2CleartextCodec extends ChannelHandlerAdapter { Http2FrameCodec http2FrameCodec, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, @Nullable Function uriTagValue) { this.acceptGzip = acceptGzip; + this.acceptBrotli = acceptBrotli; this.http2FrameCodec = http2FrameCodec; this.metricsRecorder = metricsRecorder; this.opsFactory = opsFactory; @@ -791,12 +810,12 @@ public void handlerAdded(ChannelHandlerContext ctx) { if (responseTimeoutHandler != null) { pipeline.remove(NettyPipeline.ResponseTimeoutHandler); http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE, - new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, + new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, responseTimeoutHandler.getReaderIdleTimeInMillis(), uriTagValue)); } else { http2MultiplexHandler = new Http2MultiplexHandler(H2InboundStreamHandler.INSTANCE, - new H2Codec(owner, obs, opsFactory, acceptGzip, metricsRecorder, uriTagValue)); + new H2Codec(owner, obs, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, uriTagValue)); } pipeline.addAfter(ctx.name(), NettyPipeline.HttpCodec, http2FrameCodec) .addAfter(NettyPipeline.HttpCodec, NettyPipeline.H2MultiplexHandler, http2MultiplexHandler); @@ -811,6 +830,7 @@ public void handlerAdded(ChannelHandlerContext ctx) { static final class H2Codec extends ChannelInitializer { final boolean acceptGzip; + final boolean acceptBrotli; final ChannelMetricsRecorder metricsRecorder; final ConnectionObserver observer; final ChannelOperations.OnSetup opsFactory; @@ -823,10 +843,11 @@ static final class H2Codec extends ChannelInitializer { @Nullable ConnectionObserver observer, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, @Nullable Function uriTagValue) { // Handle outbound and upgrade streams - this(owner, observer, opsFactory, acceptGzip, metricsRecorder, -1, uriTagValue); + this(owner, observer, opsFactory, acceptGzip, acceptBrotli, metricsRecorder, -1, uriTagValue); } H2Codec( @@ -834,11 +855,13 @@ static final class H2Codec extends ChannelInitializer { @Nullable ConnectionObserver observer, ChannelOperations.OnSetup opsFactory, boolean acceptGzip, + boolean acceptBrotli, @Nullable ChannelMetricsRecorder metricsRecorder, long responseTimeoutMillis, @Nullable Function uriTagValue) { // Handle outbound and upgrade streams this.acceptGzip = acceptGzip; + this.acceptBrotli = acceptBrotli; this.metricsRecorder = metricsRecorder; this.observer = observer; this.opsFactory = opsFactory; @@ -855,7 +878,7 @@ protected void initChannel(Channel ch) { setChannelContext(ch, owner.currentContext()); } addStreamHandlers(ch, observer.then(new StreamConnectionObserver(owner.currentContext())), opsFactory, - acceptGzip, metricsRecorder, responseTimeoutMillis, uriTagValue); + acceptGzip, acceptBrotli, metricsRecorder, responseTimeoutMillis, uriTagValue); } else { // Handle server pushes (inbound streams) @@ -879,6 +902,7 @@ public boolean isSharable() { static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter { final boolean acceptGzip; + final boolean acceptBrotli; final HttpResponseDecoderSpec decoder; final Http2Settings http2Settings; final ChannelMetricsRecorder metricsRecorder; @@ -887,6 +911,7 @@ static final class H2OrHttp11Codec extends ChannelInboundHandlerAdapter { H2OrHttp11Codec(HttpClientChannelInitializer initializer, ConnectionObserver observer) { this.acceptGzip = initializer.acceptGzip; + this.acceptBrotli = initializer.acceptBrotli; this.decoder = initializer.decoder; this.http2Settings = initializer.http2Settings; this.metricsRecorder = initializer.metricsRecorder; @@ -905,10 +930,10 @@ public void channelActive(ChannelHandlerContext ctx) { log.debug(format(ctx.channel(), "Negotiated application-level protocol [" + protocol + "]")); } if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { - configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, http2Settings, observer); + configureHttp2Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer); } else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { - configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, decoder, metricsRecorder, uriTagValue); + configureHttp11Pipeline(ctx.channel().pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, uriTagValue); } else { throw new IllegalStateException("unknown protocol: " + protocol); @@ -927,6 +952,7 @@ else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { static final class HttpClientChannelInitializer implements ChannelPipelineConfigurer { final boolean acceptGzip; + final boolean acceptBrotli; final HttpResponseDecoderSpec decoder; final Http2Settings http2Settings; final ChannelMetricsRecorder metricsRecorder; @@ -937,6 +963,7 @@ static final class HttpClientChannelInitializer implements ChannelPipelineConfig HttpClientChannelInitializer(HttpClientConfig config) { this.acceptGzip = config.acceptGzip; + this.acceptBrotli = config.acceptBrotli; this.decoder = config.decoder; this.http2Settings = config.http2Settings(); this.metricsRecorder = config.metricsRecorderInternal(); @@ -957,21 +984,21 @@ public void onChannelInit(ConnectionObserver observer, Channel channel, @Nullabl new H2OrHttp11Codec(this, observer)); } else if ((protocols & h11) == h11) { - configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, uriTagValue); + configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, uriTagValue); } else if ((protocols & h2) == h2) { - configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer); + configureHttp2Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer); } } else { if ((protocols & h11orH2C) == h11orH2C) { - configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, metricsRecorder, observer, opsFactory, uriTagValue); + configureHttp11OrH2CleartextPipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, metricsRecorder, observer, opsFactory, uriTagValue); } else if ((protocols & h11) == h11) { - configureHttp11Pipeline(channel.pipeline(), acceptGzip, decoder, metricsRecorder, uriTagValue); + configureHttp11Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, metricsRecorder, uriTagValue); } else if ((protocols & h2c) == h2c) { - configureHttp2Pipeline(channel.pipeline(), acceptGzip, decoder, http2Settings, observer); + configureHttp2Pipeline(channel.pipeline(), acceptGzip, acceptBrotli, decoder, http2Settings, observer); } } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java index de16691778..556ae8ec57 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpCompressionClientServerTests.java @@ -22,11 +22,13 @@ import java.lang.annotation.Target; import java.nio.charset.Charset; import java.time.Duration; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.zip.GZIPInputStream; import com.aayushatharva.brotli4j.decoder.DecoderJNI; import com.aayushatharva.brotli4j.decoder.DirectDecompress; +import com.google.common.collect.ImmutableList; import io.netty.buffer.Unpooled; import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.http.HttpHeaders; @@ -413,7 +415,7 @@ void compressionServerDefaultClientDefaultIsNone(HttpServer server, HttpClient c @ParameterizedCompressionTest void compressionActivatedOnClientAddsHeader(HttpServer server, HttpClient client) { - AtomicReference zip = new AtomicReference<>("fail"); + AtomicReference> acceptEncodingHeaderValues = new AtomicReference<>(ImmutableList.of("fail")); disposableServer = server.compress(true) @@ -421,13 +423,13 @@ void compressionActivatedOnClientAddsHeader(HttpServer server, HttpClient client .bindNow(Duration.ofSeconds(10)); client.port(disposableServer.port()) .compress(true) - .headers(h -> zip.set(h.get("accept-encoding"))) + .headers(h -> acceptEncodingHeaderValues.set(h.getAll("accept-encoding"))) .get() .uri("/test") .responseContent() .blockLast(Duration.ofSeconds(10)); - assertThat(zip.get()).isEqualTo("gzip"); + assertThat(acceptEncodingHeaderValues.get()).isEqualTo(ImmutableList.of("br", "gzip")); } @ParameterizedCompressionTest diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 3c1d3e7f77..580e6de02a 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -69,6 +69,7 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; +import io.netty.handler.codec.compression.Brotli; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpContentDecompressor; @@ -157,6 +158,11 @@ static void cleanup() throws ExecutionException, InterruptedException, TimeoutEx .get(30, TimeUnit.SECONDS); } + @BeforeAll + static void verifyNettyBrotliIsAvailable() { + assertThat(Brotli.isAvailable()).isTrue(); + } + @Test void abort() { disposableServer = @@ -470,6 +476,46 @@ void gzip() { .verify(Duration.ofSeconds(30)); } + @Test + void brotliEnabled() { + doTestBrotli(true); + } + + @Test + void brotliDisabled() { + doTestBrotli(false); + } + + private void doTestBrotli(boolean brotliEnabled) { + String expectedResponse = brotliEnabled ? "br" : "no brotli"; + disposableServer = + createServer() + .compress(true) + .handle((req, res) -> res.sendString(Mono.just(req.requestHeaders() + .get(HttpHeaderNames.ACCEPT_ENCODING, + "no brotli")))) + .bindNow(); + HttpClient client = createHttpClientForContextWithPort(); + + if (brotliEnabled) { + assertThat(Brotli.isAvailable()).isTrue(); + client = client.compress(true); + client.configuration().headers = client.configuration().headers() + .set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.BR); + } + + StepVerifier.create(client.get() + .uri("/") + .response((r, buf) -> buf.asString() + .elementAt(0) + .zipWith(Mono.just(r)))) + .expectNextMatches(tuple -> + expectedResponse.equals(tuple.getT1()) + && (tuple.getT2().status().code() == 200)) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + @Test void gzipEnabled() { doTestGzip(true); @@ -489,9 +535,13 @@ private void doTestGzip(boolean gzipEnabled) { "no gzip")))) .bindNow(); HttpClient client = createHttpClientForContextWithPort(); + client.configuration().acceptBrotli = false; + client.configuration().acceptGzip = gzipEnabled; if (gzipEnabled) { client = client.compress(true); + client.configuration().headers = client.configuration().headers() + .set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); } StepVerifier.create(client.get()