From eebe45886ba9d13b458d3c4a9775ad67b056cb8d Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Mon, 11 Nov 2024 11:07:28 -0800 Subject: [PATCH] Fix response leak that can be caused by an exception during redirect (#3095) Motivation: `RedirectConfig` accepts user-defined functions and it's possible that some of them can throw. `RedirectSingle` expects it and applies a `try-catch` block, but we forgot to drain the response payload body before propagating the error. Modifications: - Drain response payload before propagating an exception during redirect processing; - Enhance tests to validate expected behavior; Result: We do not leave undrained response payload if redirect fails with an exception. --- .../http/utils/RedirectSingle.java | 16 +++-- .../RedirectingHttpRequesterFilterTest.java | 69 +++++++++++++------ 2 files changed, 60 insertions(+), 25 deletions(-) diff --git a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/RedirectSingle.java b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/RedirectSingle.java index 512ec51098..e6179738e9 100644 --- a/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/RedirectSingle.java +++ b/servicetalk-http-utils/src/main/java/io/servicetalk/http/utils/RedirectSingle.java @@ -44,6 +44,7 @@ import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY; import static io.servicetalk.http.api.HttpHeaderNames.HOST; import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.REDIRECTION_3XX; +import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed; /** * An operator, which implements redirect logic for {@link StreamingHttpClient}. @@ -184,14 +185,19 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) { LOGGER.trace("Executing redirect to '{}' for request '{}'", location, request); } + // Consume any payload of the redirect response + final Single nextResponse = response.messageBody().ignoreElements() + .concat(redirectSingle.requester.request(newRequest)); + final RedirectSubscriber redirectSubscriber = new RedirectSubscriber(target, redirectSingle, newRequest, + redirectCount + 1, sequentialCancellable); terminalDelivered = true; // Mark as "delivered" because we do not own `target` from this point - toSource(response.messageBody().ignoreElements() // Consume any payload of the redirect response - .concat(redirectSingle.requester.request(newRequest))) - .subscribe(new RedirectSubscriber(target, redirectSingle, newRequest, redirectCount + 1, - sequentialCancellable)); + toSource(nextResponse).subscribe(redirectSubscriber); } catch (Throwable cause) { if (!terminalDelivered) { - safeOnError(target, cause); + // Drain response payload body before propagating the cause + sequentialCancellable.nextCancellable(response.messageBody().ignoreElements() + .whenOnError(suppressed -> safeOnError(target, addSuppressed(cause, suppressed))) + .subscribe(() -> safeOnError(target, cause))); } else { LOGGER.info("Ignoring exception from onSuccess of Subscriber {}.", target, cause); } diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/RedirectingHttpRequesterFilterTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/RedirectingHttpRequesterFilterTest.java index 13ef47c511..be9fcfe31b 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/RedirectingHttpRequesterFilterTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/RedirectingHttpRequesterFilterTest.java @@ -16,10 +16,12 @@ package io.servicetalk.http.utils; import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.concurrent.Cancellable; import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.ExecutorExtension; import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.api.TestPublisher; +import io.servicetalk.concurrent.api.TestSubscription; import io.servicetalk.http.api.DefaultHttpHeadersFactory; import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory; import io.servicetalk.http.api.HttpExecutionContext; @@ -52,6 +54,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import static io.servicetalk.buffer.api.Matchers.contentEqualTo; @@ -217,7 +220,7 @@ void connectRequestsDoesNotRedirectByDefault() throws Exception { StreamingHttpRequest request = client.newRequest(CONNECT, "servicetalk.io") .setHeader(HOST, "servicetalk.io"); verifyDoesNotRedirect(client, request, SEE_OTHER); - verifyRedirectResponsePayloadsDrained(false); + verifyRedirectResponsePayloadsDrained(false, false); } @Test @@ -383,7 +386,7 @@ private void crossSchemeRedirects(String fromScheme, String toScheme, verifyRedirected(client, request, false, true); } else { verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY); - verifyRedirectResponsePayloadsDrained(false); + verifyRedirectResponsePayloadsDrained(false, false); } } @@ -401,7 +404,7 @@ void redirectFromRelativeFormToAbsoluteFormNonRelativeLocation(boolean allowNonR verifyRedirected(client, request, false, true); } else { verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY); - verifyRedirectResponsePayloadsDrained(false); + verifyRedirectResponsePayloadsDrained(false, false); } } @@ -419,7 +422,7 @@ void redirectFromAbsoluteFormToAbsoluteFormNonRelativeLocation(boolean allowNonR verifyRedirected(client, request, false, true); } else { verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY); - verifyRedirectResponsePayloadsDrained(false); + verifyRedirectResponsePayloadsDrained(false, false); } } @@ -468,7 +471,7 @@ void customLocationMapper() throws Exception { StreamingHttpRequest redirectedRequest = verifyResponse(client, request, OK, -1, 2, GET); assertThat("Request didn't change", request, not(sameInstance(redirectedRequest))); verifyHeadersAndMessageBodyRedirected(redirectedRequest); - verifyRedirectResponsePayloadsDrained(true); + verifyRedirectResponsePayloadsDrained(true, false); assertThat("LocationMapper was not invoked", locationMapperInvoked.get(), is(true)); } @@ -497,7 +500,7 @@ void changePostToGet(int statusCode) throws Exception { StreamingHttpRequest redirectedRequest = verifyResponse(client, request, OK, -1, 2, GET); assertThat("Request didn't change", request, not(sameInstance(redirectedRequest))); verifyHeadersAndMessageBodyRedirected(redirectedRequest); - verifyRedirectResponsePayloadsDrained(true); + verifyRedirectResponsePayloadsDrained(true, false); } @ParameterizedTest(name = "{displayName} [{index}] manyHeaders={0}") @@ -544,7 +547,7 @@ void configureRedirectOfPayloadBodyForNonRelativeRedirects() throws Exception { assertThat("Unexpected payload body", redirectedRequest.payloadBody().collect(StringBuilder::new, (sb, chunk) -> sb.append(chunk.toString(US_ASCII))) .toFuture().get().toString(), contentEqualTo(REQUEST_PAYLOAD)); - verifyRedirectResponsePayloadsDrained(true); + verifyRedirectResponsePayloadsDrained(true, false); } @Test @@ -580,43 +583,61 @@ void manuallyRedirectHeadersAndMessageBodyForNonRelativeRedirects() throws Excep verifyRedirected(client, newRequest(client, GET), true, true); } - @Test - void redirectRequestTransformerThrows() { + @ParameterizedTest(name = "{displayName} [{index}] cancel={0}") + @ValueSource(booleans = {false, true}) + void redirectRequestTransformerThrows(boolean cancel) { + AtomicReference cancellable = new AtomicReference<>(); when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse()); StreamingHttpClient client = newClient(new RedirectConfigBuilder() .redirectRequestTransformer((relative, original, response, redirect) -> { + if (cancel) { + cancellable.get().cancel(); + } throw DELIBERATE_EXCEPTION; }).build()); ExecutionException e = assertThrows(ExecutionException.class, - () -> client.request(newRequest(client, GET)).toFuture().get()); + () -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get()); assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION)); + verifyRedirectResponsePayloadsDrained(true, cancel); } - @Test - void redirectPredicateThrows() { + @ParameterizedTest(name = "{displayName} [{index}] cancel={0}") + @ValueSource(booleans = {false, true}) + void redirectPredicateThrows(boolean cancel) { + AtomicReference cancellable = new AtomicReference<>(); when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse()); StreamingHttpClient client = newClient(new RedirectConfigBuilder() .redirectPredicate((relative, count, request, response) -> { + if (cancel) { + cancellable.get().cancel(); + } throw DELIBERATE_EXCEPTION; }).build()); ExecutionException e = assertThrows(ExecutionException.class, - () -> client.request(newRequest(client, GET)).toFuture().get()); + () -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get()); assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION)); + verifyRedirectResponsePayloadsDrained(true, cancel); } - @Test - void locationMapperThrows() { + @ParameterizedTest(name = "{displayName} [{index}] cancel={0}") + @ValueSource(booleans = {false, true}) + void locationMapperThrows(boolean cancel) { + AtomicReference cancellable = new AtomicReference<>(); when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse()); StreamingHttpClient client = newClient(new RedirectConfigBuilder() .locationMapper((request, response) -> { + if (cancel) { + cancellable.get().cancel(); + } throw DELIBERATE_EXCEPTION; }).build()); ExecutionException e = assertThrows(ExecutionException.class, - () -> client.request(newRequest(client, GET)).toFuture().get()); + () -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get()); assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION)); + verifyRedirectResponsePayloadsDrained(true, cancel); } @Test @@ -688,16 +709,24 @@ private StreamingHttpRequest verifyRedirected(StreamingHttpClient client, assertThat("Unexpected request-target of redirected request", redirectedRequest.requestTarget(), startsWith("/")); } - verifyRedirectResponsePayloadsDrained(true); + verifyRedirectResponsePayloadsDrained(true, false); return redirectedRequest; } - private void verifyRedirectResponsePayloadsDrained(boolean drained) { + private void verifyRedirectResponsePayloadsDrained(boolean drained, boolean cancelled) { int n = 0; for (TestPublisher payload : redirectResponsePayloads) { - assertThat("Redirect response payload (/location-" + ++n + - (drained ? ") was not drained" : ") was unexpectedly drained"), + assertThat("Redirect (/location-" + ++n + ") response payload was " + + (drained ? "not" : "unexpectedly") + " drained", payload.isSubscribed(), is(drained)); + + if (drained) { + TestSubscription subscription = new TestSubscription(); + payload.onSubscribe(subscription); + assertThat("Redirect (/location-" + ++n + ") response payload subscription was " + + (cancelled ? "not" : "unexpectedly") + " cancelled", + subscription.isCancelled(), is(cancelled)); + } } }