From 740e6992b930965936dbc49d209759f0e038dfb4 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Fri, 20 Oct 2023 13:41:22 +0200 Subject: [PATCH] Do not consume orphaned message bodies, just warn. Motivation ---------- In a previous changeset, the HttpMessageDiscardWatchdogServiceFilter has been introduced which tries to proactively clean up orphaned/ discarded message body buffers: This works in some, but not all cases in a reliable fashion. Modifications ------------- Instead of trying to clean up the buffers (which might not work all the time), just WARN into the logs so users can take proactive action to fix their code. The log level has been dropped from ERROR to WARN since it is not a fatal error but still needs to be taken seriously. --- servicetalk-http-netty/build.gradle | 1 + ...tpMessageDiscardWatchdogServiceFilter.java | 43 ++++++------------- ...ssageDiscardWatchdogServiceFilterTest.java | 39 +++++++++++------ 3 files changed, 40 insertions(+), 43 deletions(-) diff --git a/servicetalk-http-netty/build.gradle b/servicetalk-http-netty/build.gradle index a234e21cc7..fa0ff23065 100644 --- a/servicetalk-http-netty/build.gradle +++ b/servicetalk-http-netty/build.gradle @@ -48,6 +48,7 @@ dependencies { testImplementation testFixtures(project(":servicetalk-concurrent-internal")) testImplementation testFixtures(project(":servicetalk-concurrent-reactivestreams")) testImplementation testFixtures(project(":servicetalk-http-api")) + testImplementation testFixtures(project(":servicetalk-log4j2-mdc-utils")) testImplementation testFixtures(project(":servicetalk-transport-netty-internal")) testImplementation project(":servicetalk-concurrent-api-test") testImplementation project(":servicetalk-concurrent-test-internal") diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java index 644e218434..bf46ad0ff7 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilter.java @@ -82,14 +82,14 @@ public Single handle(final HttpServiceContext ctx, final AtomicReference> reference = request.context() .computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>()); assert reference != null; - final Publisher previous = reference.getAndSet(response.messageBody()); - if (previous != null) { + if (reference.getAndSet(response.messageBody()) != null) { // If a previous message exists, the Single got resubscribed to - // (i.e. during a retry) and so previous message body needs to be cleaned up. - LOGGER.warn("Automatically draining previous HTTP response message body that was " + - "not consumed. Users-defined retry logic must drain response payload before " + - "retrying."); - previous.ignoreElements().subscribe(); + // (i.e. during a retry) and so previous message body needs to be cleaned up by the + // user. + LOGGER.warn("Discovered un-drained HTTP response message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Responses (or their message body) must " + + "be fully consumed before retrying."); } return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> { @@ -146,14 +146,6 @@ public void onComplete() { */ private static final class CleanerHttpLifecycleObserver implements HttpLifecycleObserver { - /** - * Helps to remember if we logged an error for user-defined filters already to not spam the logs. - *

- * NOTE: this variable is intentionally not volatile since thread visibility is not a concern, but repeated - * volatile accesses are. - */ - private static boolean loggedError; - private CleanerHttpLifecycleObserver() { // Singleton } @@ -181,20 +173,13 @@ public HttpResponseObserver onResponse(final HttpResponseMetaData responseMetaDa public void onExchangeFinally() { if (requestContext != null) { final AtomicReference maybePublisher = requestContext.get(MESSAGE_PUBLISHER_KEY); - if (maybePublisher != null) { - Publisher message = (Publisher) maybePublisher.get(); - if (message != null) { - // No-one subscribed to the message (or there is none), so if there is a message - // proactively clean it up. - if (!loggedError) { - LOGGER.error("Automatically draining HTTP response message body which has " + - "been dropped by user code - this is a strong indication of a bug " + - "in a user-defined filter. Responses (or their message body) must " + - "be fully consumed before discarding."); - loggedError = true; - } - message.ignoreElements().subscribe(); - } + if (maybePublisher != null && maybePublisher.get() != null) { + // No-one subscribed to the message (or there is none), so if there is a message + // tell the user to clean it up. + LOGGER.warn("Discovered un-drained HTTP response message body which has " + + "been dropped by user code - this is a strong indication of a bug " + + "in a user-defined filter. Responses (or their message body) must " + + "be fully consumed before discarding."); } } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilterTest.java index e65dddd33b..1ed74db563 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpMessageDiscardWatchdogServiceFilterTest.java @@ -27,14 +27,16 @@ import io.servicetalk.http.api.StreamingHttpResponse; import io.servicetalk.http.api.StreamingHttpResponseFactory; import io.servicetalk.http.api.StreamingHttpServiceFilter; +import io.servicetalk.log4j2.mdc.utils.LoggerStringWriter; import io.servicetalk.transport.netty.internal.ExecutionContextExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.stream.Stream; @@ -42,6 +44,7 @@ import static io.servicetalk.http.netty.BuilderUtils.newClientBuilder; import static io.servicetalk.http.netty.BuilderUtils.newServerBuilder; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; final class HttpMessageDiscardWatchdogServiceFilterTest { @@ -54,12 +57,19 @@ final class HttpMessageDiscardWatchdogServiceFilterTest { ExecutionContextExtension.cached("client-io", "client-executor") .setClassLevel(true); - @ParameterizedTest(name = "{displayName} [{index}] transformer={0} expectedSubscriptions={1}") - @MethodSource("responseTransformers") - void cleansPayloadBodyIfDiscardedInFilter(final ResponseTransformer transformer, - final int expectedSubscriptions) throws Exception { - final CountDownLatch payloadSubscriptionCounter = new CountDownLatch(expectedSubscriptions); + @BeforeEach + public void setup() { + LoggerStringWriter.reset(); + } + + @AfterEach + public void tearDown() { + LoggerStringWriter.remove(); + } + @ParameterizedTest(name = "{displayName} [{index}] transformer={0}") + @MethodSource("responseTransformers") + void cleansPayloadBodyIfDiscardedInFilter(final ResponseTransformer transformer) throws Exception { try (HttpServerContext serverContext = newServerBuilder(SERVER_CTX) .appendServiceFilter(service -> new StreamingHttpServiceFilter(service) { @Override @@ -71,8 +81,7 @@ public Single handle(final HttpServiceContext ctx, }) .listenStreamingAndAwait((ctx, request, responseFactory) -> Single.fromSupplier(() -> { final Publisher buffer = Publisher - .from(ctx.executionContext().bufferAllocator().fromUtf8("Hello, World!")) - .beforeOnSubscribe(subscription -> payloadSubscriptionCounter.countDown()); + .from(ctx.executionContext().bufferAllocator().fromUtf8("Hello, World!")); return responseFactory.ok().payloadBody(buffer); }))) { @@ -82,7 +91,9 @@ public Single handle(final HttpServiceContext ctx, assertEquals(0, response.payloadBody().readableBytes()); } - payloadSubscriptionCounter.await(); + String output = LoggerStringWriter.stableAccumulated(1000); + assertTrue(output.contains("Discovered un-drained HTTP response message body which " + + "has been dropped by user code")); } } @@ -101,7 +112,7 @@ public Single apply(final Single r public String toString() { return "Throws Exception"; } - }, 1), + }), Arguments.of(new ResponseTransformer() { @Override public Single apply(final Single response, @@ -113,7 +124,7 @@ public Single apply(final Single r public String toString() { return "Drops payload body while transforming"; } - }, 1), + }), Arguments.of(new ResponseTransformer() { @Override public Single apply(final Single response, @@ -125,7 +136,7 @@ public Single apply(final Single r public String toString() { return "Drops message body while transforming"; } - }, 1), + }), Arguments.of(new ResponseTransformer() { @Override public Single apply(final Single response, @@ -137,7 +148,7 @@ public Single apply(final Single r public String toString() { return "Drops response and creates new one"; } - }, 1), + }), Arguments.of(new ResponseTransformer() { private final AtomicInteger retryCounter = new AtomicInteger(); @@ -158,7 +169,7 @@ public Single apply(final Single r public String toString() { return "Retries and drops again"; } - }, 2) + }) ); }