Skip to content

Commit

Permalink
Fix response leak that can be caused by an exception during redirect (#…
Browse files Browse the repository at this point in the history
…3095)

Motivation:

`RedirectConfig` accepts user-defined functions and it's possible that
some of them can throw. `RedirectSingle` expects it and applies a
`try-catch` block, but we forgot to drain the response payload body
before propagating the error.

Modifications:

- Drain response payload before propagating an exception during redirect
processing;
- Enhance tests to validate expected behavior;

Result:

We do not leave undrained response payload if redirect fails with an
exception.
  • Loading branch information
idelpivnitskiy authored Nov 11, 2024
1 parent e8d18ff commit eebe458
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpHeaderNames.HOST;
import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.REDIRECTION_3XX;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;

/**
* An operator, which implements redirect logic for {@link StreamingHttpClient}.
Expand Down Expand Up @@ -184,14 +185,19 @@ public void onSuccess(@Nullable final StreamingHttpResponse response) {
LOGGER.trace("Executing redirect to '{}' for request '{}'", location, request);
}

// Consume any payload of the redirect response
final Single<StreamingHttpResponse> nextResponse = response.messageBody().ignoreElements()
.concat(redirectSingle.requester.request(newRequest));
final RedirectSubscriber redirectSubscriber = new RedirectSubscriber(target, redirectSingle, newRequest,
redirectCount + 1, sequentialCancellable);
terminalDelivered = true; // Mark as "delivered" because we do not own `target` from this point
toSource(response.messageBody().ignoreElements() // Consume any payload of the redirect response
.concat(redirectSingle.requester.request(newRequest)))
.subscribe(new RedirectSubscriber(target, redirectSingle, newRequest, redirectCount + 1,
sequentialCancellable));
toSource(nextResponse).subscribe(redirectSubscriber);
} catch (Throwable cause) {
if (!terminalDelivered) {
safeOnError(target, cause);
// Drain response payload body before propagating the cause
sequentialCancellable.nextCancellable(response.messageBody().ignoreElements()
.whenOnError(suppressed -> safeOnError(target, addSuppressed(cause, suppressed)))
.subscribe(() -> safeOnError(target, cause)));
} else {
LOGGER.info("Ignoring exception from onSuccess of Subscriber {}.", target, cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package io.servicetalk.http.utils;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.ExecutorExtension;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TestPublisher;
import io.servicetalk.concurrent.api.TestSubscription;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.DefaultStreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.HttpExecutionContext;
Expand Down Expand Up @@ -52,6 +54,7 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

import static io.servicetalk.buffer.api.Matchers.contentEqualTo;
Expand Down Expand Up @@ -217,7 +220,7 @@ void connectRequestsDoesNotRedirectByDefault() throws Exception {
StreamingHttpRequest request = client.newRequest(CONNECT, "servicetalk.io")
.setHeader(HOST, "servicetalk.io");
verifyDoesNotRedirect(client, request, SEE_OTHER);
verifyRedirectResponsePayloadsDrained(false);
verifyRedirectResponsePayloadsDrained(false, false);
}

@Test
Expand Down Expand Up @@ -383,7 +386,7 @@ private void crossSchemeRedirects(String fromScheme, String toScheme,
verifyRedirected(client, request, false, true);
} else {
verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY);
verifyRedirectResponsePayloadsDrained(false);
verifyRedirectResponsePayloadsDrained(false, false);
}
}

Expand All @@ -401,7 +404,7 @@ void redirectFromRelativeFormToAbsoluteFormNonRelativeLocation(boolean allowNonR
verifyRedirected(client, request, false, true);
} else {
verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY);
verifyRedirectResponsePayloadsDrained(false);
verifyRedirectResponsePayloadsDrained(false, false);
}
}

Expand All @@ -419,7 +422,7 @@ void redirectFromAbsoluteFormToAbsoluteFormNonRelativeLocation(boolean allowNonR
verifyRedirected(client, request, false, true);
} else {
verifyDoesNotRedirect(client, request, MOVED_PERMANENTLY);
verifyRedirectResponsePayloadsDrained(false);
verifyRedirectResponsePayloadsDrained(false, false);
}
}

Expand Down Expand Up @@ -468,7 +471,7 @@ void customLocationMapper() throws Exception {
StreamingHttpRequest redirectedRequest = verifyResponse(client, request, OK, -1, 2, GET);
assertThat("Request didn't change", request, not(sameInstance(redirectedRequest)));
verifyHeadersAndMessageBodyRedirected(redirectedRequest);
verifyRedirectResponsePayloadsDrained(true);
verifyRedirectResponsePayloadsDrained(true, false);
assertThat("LocationMapper was not invoked", locationMapperInvoked.get(), is(true));
}

Expand Down Expand Up @@ -497,7 +500,7 @@ void changePostToGet(int statusCode) throws Exception {
StreamingHttpRequest redirectedRequest = verifyResponse(client, request, OK, -1, 2, GET);
assertThat("Request didn't change", request, not(sameInstance(redirectedRequest)));
verifyHeadersAndMessageBodyRedirected(redirectedRequest);
verifyRedirectResponsePayloadsDrained(true);
verifyRedirectResponsePayloadsDrained(true, false);
}

@ParameterizedTest(name = "{displayName} [{index}] manyHeaders={0}")
Expand Down Expand Up @@ -544,7 +547,7 @@ void configureRedirectOfPayloadBodyForNonRelativeRedirects() throws Exception {
assertThat("Unexpected payload body", redirectedRequest.payloadBody().collect(StringBuilder::new,
(sb, chunk) -> sb.append(chunk.toString(US_ASCII)))
.toFuture().get().toString(), contentEqualTo(REQUEST_PAYLOAD));
verifyRedirectResponsePayloadsDrained(true);
verifyRedirectResponsePayloadsDrained(true, false);
}

@Test
Expand Down Expand Up @@ -580,43 +583,61 @@ void manuallyRedirectHeadersAndMessageBodyForNonRelativeRedirects() throws Excep
verifyRedirected(client, newRequest(client, GET), true, true);
}

@Test
void redirectRequestTransformerThrows() {
@ParameterizedTest(name = "{displayName} [{index}] cancel={0}")
@ValueSource(booleans = {false, true})
void redirectRequestTransformerThrows(boolean cancel) {
AtomicReference<Cancellable> cancellable = new AtomicReference<>();
when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse());
StreamingHttpClient client = newClient(new RedirectConfigBuilder()
.redirectRequestTransformer((relative, original, response, redirect) -> {
if (cancel) {
cancellable.get().cancel();
}
throw DELIBERATE_EXCEPTION;
}).build());

ExecutionException e = assertThrows(ExecutionException.class,
() -> client.request(newRequest(client, GET)).toFuture().get());
() -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get());
assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION));
verifyRedirectResponsePayloadsDrained(true, cancel);
}

@Test
void redirectPredicateThrows() {
@ParameterizedTest(name = "{displayName} [{index}] cancel={0}")
@ValueSource(booleans = {false, true})
void redirectPredicateThrows(boolean cancel) {
AtomicReference<Cancellable> cancellable = new AtomicReference<>();
when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse());
StreamingHttpClient client = newClient(new RedirectConfigBuilder()
.redirectPredicate((relative, count, request, response) -> {
if (cancel) {
cancellable.get().cancel();
}
throw DELIBERATE_EXCEPTION;
}).build());

ExecutionException e = assertThrows(ExecutionException.class,
() -> client.request(newRequest(client, GET)).toFuture().get());
() -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get());
assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION));
verifyRedirectResponsePayloadsDrained(true, cancel);
}

@Test
void locationMapperThrows() {
@ParameterizedTest(name = "{displayName} [{index}] cancel={0}")
@ValueSource(booleans = {false, true})
void locationMapperThrows(boolean cancel) {
AtomicReference<Cancellable> cancellable = new AtomicReference<>();
when(httpClient.request(any())).thenReturn(redirectResponse(MOVED_PERMANENTLY), okResponse());
StreamingHttpClient client = newClient(new RedirectConfigBuilder()
.locationMapper((request, response) -> {
if (cancel) {
cancellable.get().cancel();
}
throw DELIBERATE_EXCEPTION;
}).build());

ExecutionException e = assertThrows(ExecutionException.class,
() -> client.request(newRequest(client, GET)).toFuture().get());
() -> client.request(newRequest(client, GET)).whenOnSubscribe(cancellable::set).toFuture().get());
assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION));
verifyRedirectResponsePayloadsDrained(true, cancel);
}

@Test
Expand Down Expand Up @@ -688,16 +709,24 @@ private StreamingHttpRequest verifyRedirected(StreamingHttpClient client,
assertThat("Unexpected request-target of redirected request",
redirectedRequest.requestTarget(), startsWith("/"));
}
verifyRedirectResponsePayloadsDrained(true);
verifyRedirectResponsePayloadsDrained(true, false);
return redirectedRequest;
}

private void verifyRedirectResponsePayloadsDrained(boolean drained) {
private void verifyRedirectResponsePayloadsDrained(boolean drained, boolean cancelled) {
int n = 0;
for (TestPublisher<Buffer> payload : redirectResponsePayloads) {
assertThat("Redirect response payload (/location-" + ++n +
(drained ? ") was not drained" : ") was unexpectedly drained"),
assertThat("Redirect (/location-" + ++n + ") response payload was " +
(drained ? "not" : "unexpectedly") + " drained",
payload.isSubscribed(), is(drained));

if (drained) {
TestSubscription subscription = new TestSubscription();
payload.onSubscribe(subscription);
assertThat("Redirect (/location-" + ++n + ") response payload subscription was " +
(cancelled ? "not" : "unexpectedly") + " cancelled",
subscription.isCancelled(), is(cancelled));
}
}
}

Expand Down

0 comments on commit eebe458

Please sign in to comment.