From d335c0f852b6c940adbbbd51a6bb98b6ec033ae8 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Thu, 11 Aug 2022 22:42:48 +0200 Subject: [PATCH] Watermarked response backpressure #3136 Signed-off-by: Daniel Kec --- ..._helidon_reactive_webserver_WebServer.adoc | 16 ++ .../server/StreamingOutputLeakTest.java | 104 ++++++++++ .../webserver/testsupport/TestClient.java | 6 + reactive/webserver/webserver/pom.xml | 23 ++- .../webserver/BackpressureStrategy.java | 67 ++++++ .../reactive/webserver/BareResponse.java | 7 + .../reactive/webserver/BareResponseImpl.java | 71 +++++-- .../reactive/webserver/ForwardingHandler.java | 8 +- .../reactive/webserver/HttpInitializer.java | 2 +- .../helidon/reactive/webserver/Response.java | 1 + .../reactive/webserver/ServerBasicConfig.java | 14 ++ .../webserver/ServerConfiguration.java | 31 +++ .../webserver/ServerResponseSubscription.java | 163 +++++++++++++++ .../webserver/SocketConfiguration.java | 70 +++++++ .../helidon/reactive/webserver/WebServer.java | 12 ++ .../reactive/webserver/BackpressureTest.java | 192 ------------------ .../BareResponseSubscriberTckTest.java | 2 + .../reactive/webserver/ResponseTest.java | 5 + .../webserver/WaterMarkedBackpressureIT.java | 173 ++++++++++++++++ 19 files changed, 745 insertions(+), 222 deletions(-) create mode 100644 microprofile/server/src/test/java/io/helidon/microprofile/server/StreamingOutputLeakTest.java create mode 100644 reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BackpressureStrategy.java create mode 100644 reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerResponseSubscription.java create mode 100644 reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/WaterMarkedBackpressureIT.java diff --git a/docs/config/io_helidon_reactive_webserver_WebServer.adoc b/docs/config/io_helidon_reactive_webserver_WebServer.adoc index 105504c08d7..82455c6ccbb 100644 --- a/docs/config/io_helidon_reactive_webserver_WebServer.adoc +++ b/docs/config/io_helidon_reactive_webserver_WebServer.adoc @@ -71,6 +71,22 @@ This is a standalone configuration type, prefix from configuration root: `server |`max-upgrade-content-length` |int |`65536` |Set a maximum length of the content of an upgrade request. Default is `64*1024` +|`backpressure-buffer-size` |long |`5242880` |Set a maximum length of the unflushed response data sending buffer can keep without applying backpressure. +Depends on `backpressure-policy` what happens if max buffer size is reached. + + Default is `5*1024*1024` - 5Mb +|`backpressure-policy` | String | `LINEAR` |Sets the strategy for applying backpressure to the reactive stream +of response data. + +* LINEAR - Data chunks are requested one-by-one after previous data chunk has been written to Netty's buffer, when +`backpressure-buffer-size` watermark is reached, new chunks are not requested until buffer size decrease under +the watermark value. +* PREFETCH - After first data chunk arrives, expected number of chunks needed to fill the buffer up +to watermark is calculated and requested. +* AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested and extra flush is initiated. +* UNBOUNDED - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream. + + Default is `LINEAR` |`port` |int |`0` |Configures a server port to listen on with the server socket. If port is `0` then any available ephemeral port will be used. |`receive-buffer-size` |int |{nbsp} |Configures proposed value of the TCP receive window that is advertised to the remote peer on the diff --git a/microprofile/server/src/test/java/io/helidon/microprofile/server/StreamingOutputLeakTest.java b/microprofile/server/src/test/java/io/helidon/microprofile/server/StreamingOutputLeakTest.java new file mode 100644 index 00000000000..1d4aac0e11a --- /dev/null +++ b/microprofile/server/src/test/java/io/helidon/microprofile/server/StreamingOutputLeakTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.microprofile.server; + +import jakarta.validation.constraints.NotNull; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.StreamingOutput; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Random; + +import io.helidon.microprofile.tests.junit5.AddBean; +import io.helidon.microprofile.tests.junit5.AddConfig; +import io.helidon.microprofile.tests.junit5.AddExtension; +import io.helidon.microprofile.tests.junit5.DisableDiscovery; +import io.helidon.microprofile.tests.junit5.HelidonTest; + +import org.glassfish.jersey.ext.cdi1x.internal.CdiComponentProvider; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.is; + +@HelidonTest +@DisableDiscovery +@AddBean(StreamingOutputLeakTest.DownloadResource.class) +@AddExtension(ServerCdiExtension.class) +@AddExtension(JaxRsCdiExtension.class) +@AddExtension(CdiComponentProvider.class) +@AddConfig(key = "server.backpressure-buffer-size", value = "20971520")//20Mb +class StreamingOutputLeakTest { + + private static final int SIZE10MB = 10 * 1024 * 1024; + private static final int SIZE = SIZE10MB; + private static final long NUMBER_OF_BUFS = 20; + private static final byte[] DATA_10MB = new byte[SIZE]; + + static { + Random r = new Random(); + r.nextBytes(DATA_10MB); + } + + /** + * Reproducer for issue #4643 + */ + @Test + void streamingOutput(WebTarget target) throws IOException { + + InputStream is = target.path("/download") + .request() + .get(InputStream.class); + long size = 0; + while (is.read() != -1) { + size++; + } + is.close(); + + // Make sure all data has been read + assertThat(size, is(NUMBER_OF_BUFS * SIZE)); + } + + @Path("/download") + public static class DownloadResource { + + @GET + @Produces(MediaType.MULTIPART_FORM_DATA) + public Response getPayload( + @NotNull @QueryParam("fileName") String fileName) { + StreamingOutput fileStream = output -> { + + // 2gb + for (int i = 0; i < NUMBER_OF_BUFS; i++) { + output.write(DATA_10MB); + output.flush(); + } + + }; + return Response + .ok(fileStream, MediaType.MULTIPART_FORM_DATA) + .build(); + } + } +} \ No newline at end of file diff --git a/reactive/webserver/test-support/src/main/java/io/helidon/reactive/webserver/testsupport/TestClient.java b/reactive/webserver/test-support/src/main/java/io/helidon/reactive/webserver/testsupport/TestClient.java index b9feac86ca9..925c79c427f 100644 --- a/reactive/webserver/test-support/src/main/java/io/helidon/reactive/webserver/testsupport/TestClient.java +++ b/reactive/webserver/test-support/src/main/java/io/helidon/reactive/webserver/testsupport/TestClient.java @@ -38,6 +38,7 @@ import io.helidon.common.reactive.Single; import io.helidon.reactive.media.common.MediaContext; import io.helidon.reactive.media.common.MediaSupport; +import io.helidon.reactive.webserver.BackpressureStrategy; import io.helidon.reactive.webserver.BareRequest; import io.helidon.reactive.webserver.BareResponse; import io.helidon.reactive.webserver.RequestHeaders; @@ -294,6 +295,11 @@ public Single whenCompleted() { return Single.create(completionStage); } + @Override + public void backpressureStrategy(BackpressureStrategy backpressureStrategy) { + //noop + } + @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; diff --git a/reactive/webserver/webserver/pom.xml b/reactive/webserver/webserver/pom.xml index cc88a394170..3d355fb97c1 100644 --- a/reactive/webserver/webserver/pom.xml +++ b/reactive/webserver/webserver/pom.xml @@ -220,17 +220,30 @@ maven-failsafe-plugin + tck-test integration-test verify + + + **/*TckTest.java + + + + + integration-test + + integration-test + verify + + + + **/*IT.java + + - - - **/*TckTest.java - - org.apache.maven.surefire diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BackpressureStrategy.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BackpressureStrategy.java new file mode 100644 index 00000000000..bb677098398 --- /dev/null +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BackpressureStrategy.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.reactive.webserver; + +import java.util.concurrent.Flow; + +import io.helidon.reactive.webserver.ServerResponseSubscription.Unbounded; +import io.helidon.reactive.webserver.ServerResponseSubscription.WatermarkAutoFlush; +import io.helidon.reactive.webserver.ServerResponseSubscription.WatermarkLinear; +import io.helidon.reactive.webserver.ServerResponseSubscription.WatermarkPrefetch; + +/** + * Strategy for applying backpressure to the reactive stream of response data. + */ +public enum BackpressureStrategy { + /** + * Data chunks are requested one-by-one after previous data chunk has been given to Netty for writing. + * When backpressure-buffer-size watermark is reached new chunks are not requested until buffer size + * decrease under the watermark value. + */ + LINEAR(1), + /** + * Data are requested one-by-one, in case buffer reaches watermark, + * no other data is requested and extra flush is initiated. + */ + AUTO_FLUSH(2), + /** + * After first data chunk arrives, expected number of chunks needed + * to fill the buffer up to watermark is calculated and requested. + */ + PREFETCH(3), + /** + * No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream. + */ + UNBOUNDED(4); + + private final int type; + + BackpressureStrategy(int type) { + this.type = type; + } + + ServerResponseSubscription createSubscription(Flow.Subscription subscription, + long backpressureBufferSize) { + switch (type) { + case 1: return new WatermarkLinear(subscription, backpressureBufferSize); + case 2: return new WatermarkAutoFlush(subscription, backpressureBufferSize); + case 3: return new WatermarkPrefetch(subscription, backpressureBufferSize); + case 4: return new Unbounded(subscription); + default: throw new IllegalStateException("Unknown backpressure strategy."); + } + } +} diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BareResponse.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BareResponse.java index 453d3a28c0b..df3b839b8be 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BareResponse.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BareResponse.java @@ -58,6 +58,13 @@ void writeStatusAndHeaders(Http.Status status, Map> headers */ Single whenCompleted(); + /** + * Set the backpressure strategy used for requesting response data. + * + * @param backpressureStrategy strategy used for requesting response data + */ + void backpressureStrategy(BackpressureStrategy backpressureStrategy); + /** * Each response is subscribed up to a single publisher and AFTER {@link #writeStatusAndHeaders(Http.Status, Map)} * method is called and returned. diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BareResponseImpl.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BareResponseImpl.java index 98e5349c3ed..b4aaa286b1c 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BareResponseImpl.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/BareResponseImpl.java @@ -78,10 +78,12 @@ class BareResponseImpl implements BareResponse { private final CompletableFuture originalEntityAnalyzed; // Accessed by Subscriber method threads - private Flow.Subscription subscription; + private ServerResponseSubscription subscription; private volatile DataChunk firstChunk; private CompletableFuture prevRequestChunk; private CompletableFuture requestEntityAnalyzed; + private BackpressureStrategy backpressureStrategy; + private final long backpressureBufferSize; // Accessed by writeStatusHeaders(status, headers) method private volatile boolean lengthOptimization; @@ -100,10 +102,14 @@ class BareResponseImpl implements BareResponse { RequestContext requestContext, CompletableFuture prevRequestChunk, CompletableFuture requestEntityAnalyzed, + long backpressureBufferSize, + BackpressureStrategy backpressureStrategy, long requestId) { this.requestContext = requestContext; this.originalEntityAnalyzed = requestEntityAnalyzed; this.requestEntityAnalyzed = requestEntityAnalyzed; + this.backpressureStrategy = backpressureStrategy; + this.backpressureBufferSize = backpressureBufferSize; this.responseFuture = new CompletableFuture<>(); this.headersFuture = new CompletableFuture<>(); this.channel = new NettyChannel(ctx.channel()); @@ -126,6 +132,11 @@ class BareResponseImpl implements BareResponse { responseFuture.whenComplete(this::responseComplete); } + @Override + public void backpressureStrategy(BackpressureStrategy backpressureStrategy) { + this.backpressureStrategy = backpressureStrategy; + } + /** * Steps required for the completion of this response. * @@ -192,7 +203,9 @@ public void writeStatusAndHeaders(Http.Status status, Map> originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE); } else { if (!requestContext.requestCompleted()) { - LOGGER.finer(() -> log("Request content not fully read with keep-alive: true", channel)); + if (LOGGER.isLoggable(Level.FINER)) { + LOGGER.finer(() -> log("Request content not fully read with keep-alive: true", channel)); + } if (requestContext.isDataRequested()) { // there are pending requests, we have emitted some data and request was not explicitly canceled @@ -232,7 +245,9 @@ public void writeStatusAndHeaders(Http.Status status, Map> // Content length optimization attempt if (!lengthOptimization) { requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> { - LOGGER.fine(() -> log("Writing headers %s", status)); + if (LOGGER.isLoggable(Level.FINE)) { + LOGGER.fine(() -> log("Writing headers %s", status)); + } requestContext.runInScope(() -> orderedWrite(this::initWriteResponse)); return listener; }); @@ -253,7 +268,9 @@ private void completeResponseFuture(Throwable throwable) { if (throwable == null) { responseFuture.complete(this); } else { - LOGGER.finer(() -> log("Response completion failed %s", throwable)); + if (LOGGER.isLoggable(Level.FINER)) { + LOGGER.finer(() -> log("Response completion failed %s", throwable)); + } if (subscription != null) { subscription.cancel(); } @@ -296,7 +313,9 @@ private void completeInternalPipe(boolean wasClosed, Throwable throwable) { LOGGER.finest(log("Closing with an empty buffer; keep-alive: false", channel)); } } else { - LOGGER.finest(() -> log("Writing an empty last http content; keep-alive: true")); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> log("Writing an empty last http content; keep-alive: true")); + } channel.read(); } writeLastContent(throwable, listener); @@ -352,7 +371,9 @@ private GenericFutureListener> completeOnFailureListener(St return future -> { if (!future.isSuccess()) { completeResponseFuture(new IllegalStateException(message, future.cause())); - LOGGER.finest(() -> log("Failure listener: " + future.cause())); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> log("Failure listener: " + future.cause())); + } } }; } @@ -361,7 +382,9 @@ private GenericFutureListener> completeOnSuccessListener(Th return future -> { if (future.isSuccess()) { completeResponseFuture(throwable); - LOGGER.finest(() -> log("Last http message flushed", channel)); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> log("Last http message flushed", channel)); + } } }; } @@ -372,8 +395,9 @@ public void onSubscribe(Flow.Subscription subscription) { subscription.cancel(); return; } - this.subscription = Objects.requireNonNull(subscription, "subscription is null"); - subscription.request(1); + this.subscription = backpressureStrategy + .createSubscription(Objects.requireNonNull(subscription, "subscription is null"), backpressureBufferSize); + this.subscription.onSubscribe(); } @Override @@ -387,13 +411,13 @@ public void onNext(DataChunk data) { } else { prevRequestChunk = prevRequestChunk.thenRun(channel::flush); } - subscription.request(1); + subscription.tryRequest(); return; } if (lengthOptimization && firstChunk == null) { firstChunk = data.isReadOnly() ? data : data.duplicate(); // cache first chunk - subscription.request(1); + subscription.tryRequest(); return; } @@ -442,7 +466,9 @@ private void initWriteResponse() { * @param data the chunk. */ private void sendData(DataChunk data, boolean requestOneMore) { - LOGGER.finest(() -> log("Sending data chunk")); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> log("Sending data chunk")); + } DefaultHttpContent httpContent; if (data.isBackedBy(ByteBuf.class)) { @@ -460,26 +486,27 @@ private void sendData(DataChunk data, boolean requestOneMore) { httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(data.data())); } - LOGGER.finest(() -> log("Sending data chunk on event loop thread", channel)); + int size = httpContent.content().capacity(); + + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> log("Sending data chunk on event loop thread", channel)); + } channel.write(data.flush(), httpContent, f -> { // After request for write is made on event loop thread - if (!data.flush() && requestOneMore) { - // No flush, request another chunk immediately, this one needs to wait in the cache - subscription.request(1); - } + subscription.inc(channel, size); + subscription.tryRequest(); // Add listeners to execute when actual write is done return f.addListener(future -> { + subscription.dec(size); // Complete write future based con channel future data.writeFuture() .ifPresent(writeFuture -> NettyChannel.completeFuture(future, writeFuture, data)); - boolean flush = data.flush(); data.release(); - if (flush && requestOneMore) { - // Previous chunk sent, request another one - subscription.request(1); + subscription.tryRequest(); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> log("Data chunk sent with result: %s", future.isSuccess())); } - LOGGER.finest(() -> log("Data chunk sent with result: %s", future.isSuccess())); }) .addListener(completeOnFailureListener("Failure when sending a content!")) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ForwardingHandler.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ForwardingHandler.java index ad9fd1c6d0a..55c02b34a8e 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ForwardingHandler.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ForwardingHandler.java @@ -83,6 +83,7 @@ public class ForwardingHandler extends SimpleChannelInboundHandler { private final ReferenceQueue queues; private final long maxPayloadSize; private final Runnable clearQueues; + private final SocketConfiguration soConfig; private final DirectHandlers directHandlers; // this field is always accessed by the very same thread; as such, it doesn't need to be @@ -101,14 +102,15 @@ public class ForwardingHandler extends SimpleChannelInboundHandler { SSLEngine sslEngine, ReferenceQueue queues, Runnable clearQueues, - long maxPayloadSize, + SocketConfiguration soConfig, DirectHandlers directHandlers) { this.routing = routing; this.webServer = webServer; this.sslEngine = sslEngine; this.queues = queues; - this.maxPayloadSize = maxPayloadSize; + this.maxPayloadSize = soConfig.maxPayloadSize(); this.clearQueues = clearQueues; + this.soConfig = soConfig; this.directHandlers = directHandlers; } @@ -406,6 +408,8 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques requestContext, prevRequestFuture, requestEntityAnalyzed, + soConfig.backpressureBufferSize(), + soConfig.backpressureStrategy(), requestId); prevRequestFuture = new CompletableFuture<>(); CompletableFuture thisResp = prevRequestFuture; diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/HttpInitializer.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/HttpInitializer.java index c954b5e71ba..350d35c8877 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/HttpInitializer.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/HttpInitializer.java @@ -189,7 +189,7 @@ public void initChannel(SocketChannel ch) { sslEngine, queues, this::clearQueues, - soConfig.maxPayloadSize(), + soConfig, directHandlers)); } diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/Response.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/Response.java index b78bf63f3e3..abecbb7e9ac 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/Response.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/Response.java @@ -137,6 +137,7 @@ public Void send(Throwable content) { @Override public Single send(T content) { try { + bareResponse.backpressureStrategy(BackpressureStrategy.UNBOUNDED); sendLockSupport.execute(() -> { Publisher sendPublisher = writerContext.marshall( Single.just(content), GenericType.create(content)); diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerBasicConfig.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerBasicConfig.java index 3962771a674..82c914f0d2c 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerBasicConfig.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerBasicConfig.java @@ -179,6 +179,8 @@ static class SocketConfig implements SocketConfiguration { private final int initialBufferSize; private final boolean enableCompression; private final long maxPayloadSize; + private final long backpressureBufferSize; + private final BackpressureStrategy backpressureStrategy; private final int maxUpgradeContentLength; /** @@ -199,6 +201,8 @@ static class SocketConfig implements SocketConfiguration { this.initialBufferSize = builder.initialBufferSize(); this.enableCompression = builder.enableCompression(); this.maxPayloadSize = builder.maxPayloadSize(); + this.backpressureBufferSize = builder.backpressureBufferSize(); + this.backpressureStrategy = builder.backpressureStrategy(); this.maxUpgradeContentLength = builder.maxUpgradeContentLength(); WebServerTls webServerTls = builder.tlsConfig(); this.webServerTls = webServerTls.enabled() ? webServerTls : null; @@ -283,5 +287,15 @@ public boolean enableCompression() { public long maxPayloadSize() { return maxPayloadSize; } + + @Override + public long backpressureBufferSize() { + return backpressureBufferSize; + } + + @Override + public BackpressureStrategy backpressureStrategy() { + return backpressureStrategy; + } } } diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerConfiguration.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerConfiguration.java index 97687cc524b..44e8e8b6c3c 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerConfiguration.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerConfiguration.java @@ -521,6 +521,37 @@ public Builder maxPayloadSize(long size) { return this; } + /** + * Maximum length of the response data sending buffer can keep without flushing. + * Depends on `backpressure-policy` what happens if max buffer size is reached. + * + * @param size maximum non-flushed data Netty can buffer until backpressure is applied + * @return an updated builder + */ + @Override + public Builder backpressureBufferSize(long size) { + defaultSocketBuilder().backpressureBufferSize(size); + return this; + } + + /** + * Sets a backpressure strategy for the server to apply against user provided response upstream. + * + *
    + *
  • LINEAR - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
  • + *
  • AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
  • + *
  • PREFETCH - After first data chunk arrives, probable number of chunks needed to fill the buffer up to watermark is calculated and requested.
  • + *
  • NONE - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
  • + *
+ * @param backpressureStrategy One of NONE, PREFETCH or LINEAR, default is LINEAR + * @return an updated builder + */ + @Override + public Builder backpressureStrategy(BackpressureStrategy backpressureStrategy) { + defaultSocketBuilder().backpressureStrategy(backpressureStrategy); + return this; + } + /** * Set a maximum length of the content of an upgrade request. *

diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerResponseSubscription.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerResponseSubscription.java new file mode 100644 index 00000000000..cbd63deec06 --- /dev/null +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/ServerResponseSubscription.java @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.reactive.webserver; + +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.LongAdder; + +interface ServerResponseSubscription { + + void tryRequest(); + + void onSubscribe(); + + void cancel(); + + void inc(NettyChannel channel, int byteSize); + + void dec(int byteSize); + + class WatermarkLinear implements ServerResponseSubscription { + + private final long waterMark; + private final LongAdder actualBuffer = new LongAdder(); + private final Flow.Subscription subscription; + + WatermarkLinear(Flow.Subscription subscription, long waterMark) { + this.subscription = subscription; + this.waterMark = waterMark; + } + + @Override + public void onSubscribe() { + tryRequest(); + } + + @Override + public void tryRequest() { + if (watermarkNotReached()) { + subscription().request(1); + } + } + + @Override + public void cancel() { + subscription().cancel(); + } + + @Override + public void inc(NettyChannel channel, int byteSize) { + actualBuffer.add(byteSize); + } + + @Override + public void dec(int byteSize) { + actualBuffer.add(-byteSize); + } + + protected boolean watermarkNotReached(){ + return actualBuffer.sum() <= waterMark(); + } + + protected Flow.Subscription subscription() { + return this.subscription; + } + + protected long waterMark() { + return this.waterMark; + } + } + + class WatermarkPrefetch extends WatermarkLinear { + private int firstChunkSize = 0; + private long nextRequest = 1; + + WatermarkPrefetch(Flow.Subscription subscription, long watermark) { + super(subscription, watermark); + } + + @Override + public void onSubscribe() { + tryRequest(); + } + + @Override + public void tryRequest() { + if (watermarkNotReached()) { + subscription().request(nextRequest); + nextRequest = 1; + } + } + + @Override + public void inc(NettyChannel channel, int byteSize) { + if (firstChunkSize == 0) { + firstChunkSize = byteSize; + nextRequest = waterMark() / firstChunkSize; + } + super.inc(channel, byteSize); + } + } + + class WatermarkAutoFlush extends WatermarkLinear { + WatermarkAutoFlush(Flow.Subscription subscription, long watermark) { + super(subscription, watermark); + } + + @Override + public void inc(NettyChannel channel, int byteSize) { + if (!watermarkNotReached()) { + channel.flush(); + } + super.inc(channel, byteSize); + } + } + + class Unbounded implements ServerResponseSubscription { + + private final Flow.Subscription subscription; + + Unbounded(Flow.Subscription subscription) { + this.subscription = subscription; + } + + @Override + public void onSubscribe() { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void tryRequest() { + //noop + } + + @Override + public void cancel() { + subscription.cancel(); + } + + @Override + public void inc(NettyChannel channel, int byteSize) { + //noop + } + + @Override + public void dec(int byteSize) { + //noop + } + } +} diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/SocketConfiguration.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/SocketConfiguration.java index ba312a4d6fe..ff552e5699f 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/SocketConfiguration.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/SocketConfiguration.java @@ -165,6 +165,25 @@ default long maxPayloadSize() { return -1L; } + /** + * Maximum length of the response data sending buffer can keep without flushing. + * Depends on `backpressure-policy` what happens if max buffer size is reached. + * + * @return maximum non-flushed data Netty can buffer until backpressure is applied + */ + default long backpressureBufferSize() { + return 5 * 1024 * 1024; + } + + /** + * Strategy for applying backpressure to the reactive stream of response data. + * + * @return strategy identifier for applying backpressure + */ + default BackpressureStrategy backpressureStrategy() { + return BackpressureStrategy.LINEAR; + } + /** * Initial size of the buffer used to parse HTTP line and headers. * @@ -366,6 +385,31 @@ default B tls(Supplier tlsConfig) { @ConfiguredOption B maxPayloadSize(long size); + /** + * Maximum length of the response data sending buffer can keep without flushing. + * Depends on `backpressure-policy` what happens if max buffer size is reached. + * + * @param size maximum non-flushed data Netty can buffer until backpressure is applied + * @return this builder + */ + @ConfiguredOption + B backpressureBufferSize(long size); + + /** + * Sets a backpressure strategy for the server to apply against user provided response upstream. + * + *

    + *
  • LINEAR - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
  • + *
  • AUTO_FLUSH - Data are requested one-by-one, in case buffer reaches watermark, no other data is requested.
  • + *
  • PREFETCH - After first data chunk arrives, probable number of chunks needed to fill the buffer up to watermark is calculated and requested.
  • + *
  • NONE - No backpressure is applied, Long.MAX_VALUE(unbounded) is requested from upstream.
  • + *
+ * @param backpressureStrategy One of NONE, PREFETCH or LINEAR, default is LINEAR + * @return this builder + */ + @ConfiguredOption("LINEAR") + B backpressureStrategy(BackpressureStrategy backpressureStrategy); + /** * Set a maximum length of the content of an upgrade request. *

@@ -419,6 +463,8 @@ default B config(Config config) { // compression config.get("enable-compression").asBoolean().ifPresent(this::enableCompression); + config.get("backpressure-buffer-size").asLong().ifPresent(this::backpressureBufferSize); + config.get("backpressure-strategy").as(BackpressureStrategy.class).ifPresent(this::backpressureStrategy); return (B) this; } } @@ -454,7 +500,9 @@ final class Builder implements SocketConfigurationBuilder, io.helidon.c private int initialBufferSize = 128; private boolean enableCompression = false; private long maxPayloadSize = -1; + private BackpressureStrategy backpressureStrategy = BackpressureStrategy.LINEAR; private int maxUpgradeContentLength = 64 * 1024; + private long maxBufferSize = 5 * 1024 * 1024; private Builder() { } @@ -628,6 +676,18 @@ public Builder maxPayloadSize(long size) { return this; } + @Override + public Builder backpressureBufferSize(long size) { + this.maxBufferSize = size; + return this; + } + + @Override + public Builder backpressureStrategy(BackpressureStrategy backpressureStrategy) { + this.backpressureStrategy = backpressureStrategy; + return this; + } + @Override public Builder maxUpgradeContentLength(int size) { this.maxUpgradeContentLength = size; @@ -714,6 +774,8 @@ public Builder config(Config config) { config.get("validate-headers").asBoolean().ifPresent(this::validateHeaders); config.get("initial-buffer-size").asInt().ifPresent(this::initialBufferSize); config.get("enable-compression").asBoolean().ifPresent(this::enableCompression); + config.get("backpressure-buffer-size").asLong().ifPresent(this::backpressureBufferSize); + config.get("backpressure-strategy").as(BackpressureStrategy.class).ifPresent(this::backpressureStrategy); return this; } @@ -778,6 +840,14 @@ long maxPayloadSize() { return maxPayloadSize; } + long backpressureBufferSize() { + return maxBufferSize; + } + + BackpressureStrategy backpressureStrategy() { + return backpressureStrategy; + } + int maxUpgradeContentLength() { return maxUpgradeContentLength; } diff --git a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/WebServer.java b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/WebServer.java index c0638522084..b937d532939 100644 --- a/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/WebServer.java +++ b/reactive/webserver/webserver/src/main/java/io/helidon/reactive/webserver/WebServer.java @@ -588,6 +588,18 @@ public Builder maxPayloadSize(long size) { return this; } + @Override + public Builder backpressureBufferSize(long backpressureBufferSize) { + configurationBuilder.backpressureBufferSize(backpressureBufferSize); + return this; + } + + @Override + public Builder backpressureStrategy(BackpressureStrategy backpressureStrategy) { + configurationBuilder.backpressureStrategy(backpressureStrategy); + return this; + } + @Override public Builder maxUpgradeContentLength(int size) { configurationBuilder.maxUpgradeContentLength(size); diff --git a/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/BackpressureTest.java b/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/BackpressureTest.java index ecc687ab8b7..e69de29bb2d 100644 --- a/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/BackpressureTest.java +++ b/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/BackpressureTest.java @@ -1,192 +0,0 @@ -/* - * Copyright (c) 2021, 2022 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.reactive.webserver; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; -import java.util.stream.IntStream; - -import io.helidon.common.LazyValue; -import io.helidon.common.http.DataChunk; -import io.helidon.common.reactive.IoMulti; -import io.helidon.common.reactive.Multi; -import io.helidon.reactive.webclient.WebClient; -import io.helidon.reactive.webclient.WebClientResponse; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -public class BackpressureTest { - - private static final Logger LOGGER = Logger.getLogger(BackpressureTest.class.getName()); - static final long TIMEOUT_SEC = 40; - // 5 MB buffer size should be enough to cause incomplete write in Netty - static final int BUFFER_SIZE = 5 * 1024 * 1024; - - @Test - void overloadEventLoopWithIoMulti() { - Multi pub = IoMulti.multiFromStreamBuilder(randomEndlessIs()) - .byteBufferSize(BUFFER_SIZE) - .build() - .map(byteBuffer -> DataChunk.create(true, byteBuffer)); - overloadEventLoop(pub); - } - - @Test - void overloadEventLoopWithMulti() { - InputStream inputStream = randomEndlessIs(); - Multi pub = Multi.create(() -> new Iterator() { - @Override - public boolean hasNext() { - return true; - } - - @Override - public DataChunk next() { - try { - return DataChunk.create(true, false, ByteBuffer.wrap(inputStream.readNBytes(BUFFER_SIZE))); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - overloadEventLoop(pub); - } - - /** - * Attempts to overload webserver subscriber with higher data flow than Netty's NioEventLoop can - * send at first iteration. By causing incomplete write leaves the rest of the bytebuffer to be written by the next - * event loop iteration. - *

- * This can overflow Netty buffer or, in case of single threaded unbounded request, prevent event loop from ever reaching next - * iteration. - *

- * Incomplete write is not flushed and its ChannelFuture's listener isn't executed, leaving DataChunk NOT released. - * That should lead to OutOfMemory error or assertion error in sample DataChunk batch, - * depends on the JVM memory settings. - * - * @param multi publisher providing endless stream of high volume(preferably more than 2 MB but not less than 1264 kB) data chunks - */ - void overloadEventLoop(Multi multi) { - AtomicBoolean firstChunk = new AtomicBoolean(true); - AtomicBoolean shuttingDown = new AtomicBoolean(false); - AtomicReference> serverUpstreamError = new AtomicReference<>(Optional.empty()); - List firstBatch = new ArrayList<>(5); - - Multi dataChunkMulti = - // Kill server publisher when client is done - multi.takeWhile(ch -> !shuttingDown.get()) - .peek(chunk -> { - if (firstChunk.getAndSet(false)) { - // skip first chunk, it gets released on complete - return; - } - // Keep 2 - 6 chunk references - if (firstBatch.size() < 5) { - firstBatch.add(chunk); - } - }) - .onError(Throwable::printStackTrace) - .onError(t -> serverUpstreamError.set(Optional.of(t))); - - AtomicLong byteCnt = new AtomicLong(); - - LazyValue validateOnce = LazyValue.create(() -> { - Collection snapshot = Collections.unmodifiableCollection(firstBatch); - LOGGER.info("======== DataChunk sample batch ========"); - IntStream.range(0, snapshot.size()) - .forEach(i -> LOGGER.info("Chunk #" + (i + 2) + " released: " + firstBatch.get(i).isReleased())); - - boolean result = firstBatch.stream() - .allMatch(DataChunk::isReleased); - - // clean up - firstBatch.forEach(DataChunk::release); - - return result; - }); - - WebServer webServer = null; - try { - webServer = WebServer.builder() - .host("localhost") - .routing(r -> r.get("/", (req, res) -> res.send(dataChunkMulti))) - .build() - .start() - .await(TIMEOUT_SEC, TimeUnit.SECONDS); - - - WebClient.builder() - .baseUri("http://localhost:" + webServer.port()) - .build() - .get() - .path("/") - .request() - .peek(res -> assertThat(res.status().reasonPhrase(), res.status().code(), is(200))) - .flatMap(WebClientResponse::content) - // limit reception to 300 MB - .takeWhile(ws -> byteCnt.get() < (300 * 1024 * 1024)) - .forEach(chunk -> { - long actCnt = byteCnt.addAndGet(chunk.bytes().length); - if (actCnt % (100 * 1024 * 1024) == 0) { - LOGGER.info("Client received " + (actCnt / (1024 * 1024)) + "MB"); - } - if (actCnt > (200 * 1024 * 1024)) { - // After 200 MB check fist 5 chunks if those are released - // but keep the pressure and don't kill the stream - assertThat("Not all chunks from the first batch are released!", validateOnce.get()); - } - chunk.release(); - }) - // Kill server publisher we are done here - .onTerminate(() -> shuttingDown.set(true)) - .await(TIMEOUT_SEC, TimeUnit.SECONDS); - - } finally { - if (webServer != null) { - webServer.shutdown().await(TIMEOUT_SEC, TimeUnit.SECONDS); - } - } - serverUpstreamError.get().ifPresent(Assertions::fail); - } - - static InputStream randomEndlessIs() { - Random random = new Random(); - return new InputStream() { - @Override - public synchronized int read() { - return random.nextInt(Byte.MAX_VALUE); - } - }; - } -} diff --git a/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/BareResponseSubscriberTckTest.java b/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/BareResponseSubscriberTckTest.java index 287bc60217c..83f2a4e96d5 100644 --- a/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/BareResponseSubscriberTckTest.java +++ b/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/BareResponseSubscriberTckTest.java @@ -56,6 +56,8 @@ public Flow.Subscriber createFlowSubscriber(WhiteboxSubscriberProbe whenHeadersCompleted() { return Single.create(closeFuture); } + @Override + public void backpressureStrategy(BackpressureStrategy backpressureStrategy) { + //noop + } + @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(Long.MAX_VALUE); diff --git a/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/WaterMarkedBackpressureIT.java b/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/WaterMarkedBackpressureIT.java new file mode 100644 index 00000000000..8c7c0869748 --- /dev/null +++ b/reactive/webserver/webserver/src/test/java/io/helidon/reactive/webserver/WaterMarkedBackpressureIT.java @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.reactive.webserver; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import io.helidon.common.http.DataChunk; +import io.helidon.common.reactive.Multi; +import io.helidon.common.reactive.Single; +import io.helidon.reactive.webclient.WebClient; +import io.helidon.reactive.webclient.WebClientResponse; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static io.helidon.reactive.webserver.BackpressureStrategy.AUTO_FLUSH; +import static io.helidon.reactive.webserver.BackpressureStrategy.LINEAR; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class WaterMarkedBackpressureIT { + + private static final Logger LOGGER = Logger.getLogger(WaterMarkedBackpressureIT.class.getName()); + + static final Duration TIMEOUT = Duration.ofSeconds(15); + private static ExecutorService executorService; + + @BeforeMethod + void setUp() { + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterMethod + void tearDown() { + executorService.shutdown(); + } + + @Test + void linear() { + AtomicLong receivedSize = new AtomicLong(0); + AtomicLong lastRequestedNumber = new AtomicLong(0); + CompletableFuture firstBatchSent = new CompletableFuture<>(); + + WebServer webServer = null; + try { + webServer = WebServer.builder() + .host("localhost") + .backpressureBufferSize(500) + .backpressureStrategy(LINEAR) + .routing(r -> r.get("/", (req, res) -> { + res.send(Multi.range(0, 1000) + .observeOn(executorService) + .peek(lastRequestedNumber::set) + // Never flush! + // 5 bytes per chunk + .map(l -> DataChunk.create(false, ByteBuffer.wrap((String.format("%05d", l)).getBytes()))) + .onComplete(() -> firstBatchSent.complete(null)) + .onCompleteResumeWith(Single.never()) + ); + })) + .build() + .start() + .await(TIMEOUT); + + WebClient.builder() + .baseUri("http://localhost:" + webServer.port()) + .build() + .get() + .path("/") + .request() + .flatMap(WebClientResponse::content) + .forEach(chunk -> { + byte[] bytes = chunk.bytes(); + receivedSize.addAndGet(bytes.length); + chunk.release(); + }) + .onError(t -> LOGGER.log(Level.SEVERE, t, () -> "Error calling the test server")); + + try { + Single.create(firstBatchSent, true).await(Duration.ofSeconds(5)); + } catch (CompletionException e) { + //expected + } + + // When buffer fills up no other chunks are requested + assertThat(lastRequestedNumber.get(), is(101L)); + // No data are flushed + assertThat(receivedSize.get(), is(0L)); + } finally { + if (webServer != null) { + webServer.shutdown().await(TIMEOUT); + } + executorService.shutdown(); + } + } + + @Test + void autoFlush() { + AtomicLong receivedSize = new AtomicLong(0); + + WebServer webServer = null; + try { + webServer = WebServer.builder() + .host("localhost") + .backpressureBufferSize(500) + .backpressureStrategy(AUTO_FLUSH) + .routing(r -> r.get("/", (req, res) -> { + res.send(Multi.range(0, 150) + .observeOn(executorService) + // Never flush! + // 5 bytes per chunk + .map(l -> DataChunk.create(false, ByteBuffer.wrap((String.format("%05d", l)).getBytes()))) + .onCompleteResumeWith(Single.never()) + ); + })) + .build() + .start() + .await(TIMEOUT); + + WebClient.builder() + .baseUri("http://localhost:" + webServer.port()) + .build() + .get() + .path("/") + .request() + .flatMap(WebClientResponse::content) + .takeWhile(chunk -> { + byte[] bytes = chunk.bytes(); + receivedSize.addAndGet(bytes.length); + String data = new String(bytes); + chunk.release(); + return !data.equals("00101"); + }) + .ignoreElements() + .onErrorResumeWithSingle(t -> { + LOGGER.log(Level.WARNING, "Give a chance to assertions", t); + return Single.empty(); + }) + .await(TIMEOUT); + + + assertThat(receivedSize.get(), is(510L)); + } finally { + if (webServer != null) { + webServer.shutdown().await(TIMEOUT); + } + executorService.shutdown(); + } + } +}