Skip to content

Commit

Permalink
Avoid "Maximum active streams violated for this endpoint" at the clie…
Browse files Browse the repository at this point in the history
…nt level (#1373)

Motivation:

In #1307 we changed `LoadBalancedStreamingHttpClient` to not close the
connection on cancel for HTTP/2 protocol, because our existing API does
not expose methods for closing h2 stream instead of h2 connection.
Instead, we had to _prematurely_ mark the request as finished, because
in case of cancellation, there is no guarantee we will receive a terminal
signal on the response. Also, the stream is not aware of
`LoadBalancedStreamingHttpConnection` API and therefore it can not mark
the request as finished when it closes the stream.
That change significantly increased the probability users see
"Maximum active streams violated for this endpoint." Even though it's a
`RetryableException`, it creates pain and still may pop up after
auto-retry strategy used all attempts.

Modification:

- Introduce `OwnedRunnable` that can be passed to the transport layer
with the request object;
- Add pkg-private `StreamingHttpRequestWithContext` wrapper which
delivers `OwnedRunnable` to the transport;
- Generate that `OwnedRunnable` in `LoadBalancedStreamingHttpClient`;
- `LoadBalancedStreamingHttpClient` marks the request as finished only
if it owns `OwnedRunnable`;
- If h2-transport takes ownership of the `OwnedRunnable`,
`LoadBalancedStreamingHttpClient` is not responsible for marking the
request as finished anymore;
- Add tests to verify the fix;

Result:

HTTP/2 requests marked as finished only before they reach transport or
only after the stream closes.
  • Loading branch information
idelpivnitskiy authored Feb 15, 2021
1 parent 1a542c7 commit f5991c4
Show file tree
Hide file tree
Showing 7 changed files with 757 additions and 27 deletions.
9 changes: 9 additions & 0 deletions servicetalk-http-netty/gradle/spotbugs/main-exclusions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,13 @@
<Method name="equals"/>
<Bug pattern="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS"/>
</Match>

<!-- False-positive alerts, the invoked methods have side-effect -->
<Match>
<Or>
<Class name="io.servicetalk.http.netty.StreamingHttpRequestWithContext"/>
<Class name="io.servicetalk.http.netty.AbsoluteAddressHttpRequesterFilter"/>
</Or>
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.servicetalk.concurrent.api.internal.SubscribableSingle;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpConnectionContext;
import io.servicetalk.http.api.HttpEventKey;
Expand All @@ -41,6 +42,7 @@
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.netty.LoadBalancedStreamingHttpClient.OwnedRunnable;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ConnectionObserver.MultiplexedObserver;
import io.servicetalk.transport.api.ConnectionObserver.StreamObserver;
Expand Down Expand Up @@ -68,6 +70,7 @@

import java.net.SocketAddress;
import java.net.SocketOption;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;
Expand All @@ -78,6 +81,7 @@
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.http.api.HttpEventKey.MAX_CONCURRENCY;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.HeaderUtils.LAST_CHUNK_PREDICATE;
Expand Down Expand Up @@ -239,42 +243,96 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
final StreamObserver observer = multiplexedObserver.onNewStream();
final Promise<Http2StreamChannel> promise;
final SequentialCancellable sequentialCancellable;
Runnable ownedRunnable = null;
try {
final EventExecutor e = parentContext.nettyChannel().eventLoop();
promise = e.newPromise();
// Take ownership of the Runnable associated with the request (if any) before we start opening
// a new stream. If we move this action to childChannelActive, the
// LoadBalancedStreamingHttpClient may prematurely mark the request as finished before netty
// marks the stream as inactive. This code is responsible for running this Runnable in case of
// any errors or stream closure.
if (StreamingHttpRequestWithContext.class.equals(request.getClass())) {
OwnedRunnable runnable = ((StreamingHttpRequestWithContext) request).runnable();
if (runnable.own()) {
ownedRunnable = runnable;
} else {
// The request is already cancelled and the cancel signal will eventually propagate
// here. No need to try to open a stream, we can just fail fast:
final Throwable cause = StacklessCancellationException.newInstance(this.getClass(),
"handleSubscribe");
observer.streamClosed(cause);
deliverErrorFromSource(subscriber, cause);
return;
}
} // Else user wrapped the request object => Runnable will always be owned by originator
bs.open(promise);
sequentialCancellable = new SequentialCancellable(() -> promise.cancel(true));
} catch (Throwable cause) {
observer.streamClosed(cause);
cleanupWhenError(cause, observer, ownedRunnable);
deliverErrorFromSource(subscriber, cause);
return;
}
subscriber.onSubscribe(sequentialCancellable);

try {
subscriber.onSubscribe(sequentialCancellable);
} catch (Throwable cause) {
cleanupErrorBeforeOpen(cause, promise, observer, ownedRunnable);
handleExceptionFromOnSubscribe(subscriber, cause);
return;
}

final Runnable onCloseRunnable = ownedRunnable;
if (promise.isDone()) {
childChannelActive(promise, subscriber, sequentialCancellable, strategy, request, observer,
allowDropTrailersReadFromTransport);
allowDropTrailersReadFromTransport, onCloseRunnable);
} else {
promise.addListener((FutureListener<Http2StreamChannel>) future -> childChannelActive(
future, subscriber, sequentialCancellable, strategy, request, observer,
allowDropTrailersReadFromTransport));
allowDropTrailersReadFromTransport, onCloseRunnable));
}
}
};
}

private static void cleanupErrorBeforeOpen(final Throwable cause,
final Promise<Http2StreamChannel> promise,
final StreamObserver observer,
@Nullable final Runnable ownedRunnable) {
promise.addListener((FutureListener<Http2StreamChannel>) future -> {
if (future.cause() == null) { // if succeeded, close the stream then clean up
future.getNow().close().addListener(__ -> cleanupWhenError(cause, observer, ownedRunnable));
} else {
cleanupWhenError(cause, observer, ownedRunnable);
}
});
}

private static void cleanupWhenError(final Throwable cause, final StreamObserver observer,
@Nullable final Runnable ownedRunnable) {
observer.streamClosed(cause);
if (ownedRunnable != null) {
ownedRunnable.run();
}
}

private void childChannelActive(Future<Http2StreamChannel> future,
Subscriber<? super StreamingHttpResponse> subscriber,
SequentialCancellable sequentialCancellable,
HttpExecutionStrategy strategy,
StreamingHttpRequest request,
StreamObserver streamObserver,
boolean allowDropTrailersReadFromTransport) {
boolean allowDropTrailersReadFromTransport,
@Nullable Runnable onCloseRunnable) {
final SingleSource<StreamingHttpResponse> responseSingle;
Throwable futureCause = future.cause(); // assume this doesn't throw
if (futureCause == null) {
Http2StreamChannel streamChannel = null;
try {
streamChannel = future.getNow();
if (onCloseRunnable != null) {
streamChannel.closeFuture().addListener(f -> onCloseRunnable.run());
}
parentContext.trackActiveStream(streamChannel);

final CloseHandler closeHandler = forNonPipelined(true, streamChannel.config());
Expand Down Expand Up @@ -310,6 +368,8 @@ private void childChannelActive(Future<Http2StreamChannel> future,
unexpected.addSuppressed(cause);
LOGGER.warn("Unexpected exception while handling the original cause", unexpected);
}
} else {
cleanupWhenError(cause, streamObserver, onCloseRunnable);
}
subscriber.onError(cause);
return;
Expand All @@ -331,6 +391,7 @@ public void onError(final Throwable t) {
}
});
} else {
cleanupWhenError(futureCause, streamObserver, onCloseRunnable);
subscriber.onError(futureCause);
}
}
Expand Down Expand Up @@ -447,4 +508,21 @@ public void eventConsumed() {
}
}
}

private static final class StacklessCancellationException extends CancellationException {
private static final long serialVersionUID = 3235852873427231209L;

private StacklessCancellationException() { }

// Override fillInStackTrace() so we not populate the backtrace via a native call and so leak the
// Classloader.
@Override
public Throwable fillInStackTrace() {
return this;
}

static StacklessCancellationException newInstance(Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessCancellationException(), clazz, method);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;

import static io.servicetalk.client.api.internal.RequestConcurrencyController.Result.Accepted;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
import static java.util.function.Function.identity;

final class LoadBalancedStreamingHttpClient implements FilterableStreamingHttpClient {
Expand Down Expand Up @@ -65,17 +67,24 @@ public Single<StreamingHttpResponse> request(final HttpExecutionStrategy strateg
// LoadBalancer takes ownership of it (e.g. connection initialization) and in that case they will not be
// following the LoadBalancer API which this Client depends upon to ensure the concurrent request count state is
// correct.
return loadBalancer.selectConnection(SELECTOR_FOR_REQUEST)
.flatMap(c -> c.request(strategy, request)
return loadBalancer.selectConnection(SELECTOR_FOR_REQUEST).flatMap(c -> {
final OwnedRunnable ownedRunnable = c.connectionContext().protocol().major() <= 1 ? null :
new OwnedRunnable(c::requestFinished);
return c.request(strategy, ownedRunnable == null ? request :
new StreamingHttpRequestWithContext(request, ownedRunnable))
.liftSync(new BeforeFinallyHttpOperator(new TerminalSignalConsumer() {
@Override
public void onComplete() {
c.requestFinished();
if (ownedRunnable == null || ownedRunnable.own()) {
c.requestFinished();
}
}

@Override
public void onError(final Throwable throwable) {
c.requestFinished();
if (ownedRunnable == null || ownedRunnable.own()) {
c.requestFinished();
}
}

@Override
Expand All @@ -88,22 +97,25 @@ public void cancel() {
// Transport MAY not close the connection if cancel raced with completion and completion
// was seen by the transport before cancel. We have no way of knowing at this layer
// if this indeed happen.
//
// For H2 and above, connection are multiplexed and use virtual streams for each
// request-response exchange. Because we don't have any way to know when the stream gets
// closed we prematurely mark the request as finished. If the maximum concurrent streams
// violated for this connection that exception is safe to retry on a new or same
// connection.
if (c.connectionContext().protocol().major() <= 1) {
// request-response exchange. Because we don't have access to the stream at this level
// we cannot close it. Instead, we use a Runnable which will be registered for the
// stream and executed when it closes. However, cancellation can happen before transport
// created a stream. We check the ownership of the Runnable and if it was not owned by
// the transport, we mark request as finished immediately.
if (ownedRunnable == null) {
c.closeAsync().subscribe();
} else {
} else if (ownedRunnable.own()) {
c.requestFinished();
}
}
}))
// subscribeShareContext is used because otherwise the AsyncContext modified during response
// meta data processing will not be visible during processing of the response payload for
// ConnectionFilters (it already is visible on ClientFilters).
.subscribeShareContext());
.subscribeShareContext();
});
}

@Override
Expand Down Expand Up @@ -142,4 +154,26 @@ public Completable closeAsyncGracefully() {
public StreamingHttpRequest newRequest(final HttpRequestMethod method, final String requestTarget) {
return reqRespFactory.newRequest(method, requestTarget);
}

static final class OwnedRunnable implements Runnable {
private static final AtomicIntegerFieldUpdater<OwnedRunnable> ownedUpdater =
newUpdater(OwnedRunnable.class, "owned");
@SuppressWarnings("unused")
private volatile int owned;

private final Runnable runnable;

OwnedRunnable(final Runnable runnable) {
this.runnable = runnable;
}

@Override
public void run() {
runnable.run();
}

boolean own() {
return ownedUpdater.compareAndSet(this, 0, 1);
}
}
}
Loading

0 comments on commit f5991c4

Please sign in to comment.