Skip to content

Commit

Permalink
Avoid "Maximum active streams violated for this endpoint"
Browse files Browse the repository at this point in the history
Motivation:

In apple#1307 we changed `LoadBalancedStreamingHttpClient` to not close the
connection on cancel for HTTP/2 protocol, because our existing API does
not expose methods for closing h2 stream instead of h2 connection.
Instead, we had to _prematurely_ mark the request as finished, because
in case of cancellation there is no guarantee we will receive a terminal
signal on the response. Also, the stream is not aware of
`LoadBalancedStreamingHttpConnection` API and therefore it can not mark
the request as finished when it closes the stream.
That change significantly increased a probability users see
"Maximum active streams violated for this endpoint." Even though it's a
`RetryableException` it creates pain and still may pop up after
auto-retry strategy used all attempts.

Modification:

- Introduce `OwnedRunnable` that can be passed to the transport layer
with the request object;
- Add pkg-private `StreamingHttpRequestWithContext` wrapper which
delivers `OwnedRunnable` to the transport;
- Generate that `OwnedRunnable` in `LoadBalancedStreamingHttpClient`;
- `LoadBalancedStreamingHttpClient` marks the request as finished only
if it owns `OwnedRunnable`;
- If h2-transport takes ownership of the `OwnedRunnable`,
`LoadBalancedStreamingHttpClient` is not responsible for marking the
request as finished anymore;
- Add tests to verify the fix;

Result:

HTTP/2 requests marked as finished only before they reach transport or
only after the stream closes.
  • Loading branch information
idelpivnitskiy committed Feb 12, 2021
1 parent a74c2b4 commit 09cf8c7
Show file tree
Hide file tree
Showing 6 changed files with 757 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.servicetalk.concurrent.api.internal.SubscribableSingle;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.concurrent.internal.ThrowableUtils;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpConnectionContext;
import io.servicetalk.http.api.HttpEventKey;
Expand All @@ -41,6 +42,7 @@
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.netty.LoadBalancedStreamingHttpClient.OwnedRunnable;
import io.servicetalk.transport.api.ConnectionObserver;
import io.servicetalk.transport.api.ConnectionObserver.MultiplexedObserver;
import io.servicetalk.transport.api.ConnectionObserver.StreamObserver;
Expand Down Expand Up @@ -68,6 +70,7 @@

import java.net.SocketAddress;
import java.net.SocketOption;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;
Expand All @@ -78,6 +81,7 @@
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.http.api.HttpEventKey.MAX_CONCURRENCY;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.HeaderUtils.LAST_CHUNK_PREDICATE;
Expand Down Expand Up @@ -239,24 +243,58 @@ protected void handleSubscribe(final Subscriber<? super StreamingHttpResponse> s
final StreamObserver observer = multiplexedObserver.onNewStream();
final Promise<Http2StreamChannel> promise;
final SequentialCancellable sequentialCancellable;
Runnable ownedRunnable = null;
try {
final EventExecutor e = parentContext.nettyChannel().eventLoop();
promise = e.newPromise();
// Take ownership of the Runnable associated with the request (if any) before we start opening
// a new stream. If we move this action to childChannelActive, the
// LoadBalancedStreamingHttpClient may prematurely mark the request as finished before netty
// marks the stream as inactive. This code is responsible for running this Runnable in case of
// any errors or stream closure.
if (StreamingHttpRequestWithContext.class.equals(request.getClass())) {
OwnedRunnable runnable = ((StreamingHttpRequestWithContext) request).runnable();
if (runnable.own()) {
ownedRunnable = runnable;
} else {
// The request is already cancelled and the cancel signal will eventually propagate
// here. No need to try to open a stream, we can just fail fast:
final Throwable cause = StacklessCancellationException.newInstance(this.getClass(),
"handleSubscribe");
observer.streamClosed(cause);
deliverErrorFromSource(subscriber, cause);
return;
}
} // Else user wrapped the request object => Runnable will always be owned by originator
bs.open(promise);
sequentialCancellable = new SequentialCancellable(() -> promise.cancel(true));
} catch (Throwable cause) {
if (ownedRunnable != null) {
ownedRunnable.run();
}
observer.streamClosed(cause);
deliverErrorFromSource(subscriber, cause);
return;
}
subscriber.onSubscribe(sequentialCancellable);

try {
subscriber.onSubscribe(sequentialCancellable);
} catch (Throwable cause) {
if (ownedRunnable != null) {
ownedRunnable.run();
}
handleExceptionFromOnSubscribe(subscriber, cause);
return;
}

final Runnable onCloseRunnable = ownedRunnable;
if (promise.isDone()) {
childChannelActive(promise, subscriber, sequentialCancellable, strategy, request, observer,
allowDropTrailersReadFromTransport);
allowDropTrailersReadFromTransport, onCloseRunnable);
} else {
promise.addListener((FutureListener<Http2StreamChannel>) future -> childChannelActive(
future, subscriber, sequentialCancellable, strategy, request, observer,
allowDropTrailersReadFromTransport));
allowDropTrailersReadFromTransport, onCloseRunnable));
}
}
};
Expand All @@ -268,13 +306,17 @@ private void childChannelActive(Future<Http2StreamChannel> future,
HttpExecutionStrategy strategy,
StreamingHttpRequest request,
StreamObserver streamObserver,
boolean allowDropTrailersReadFromTransport) {
boolean allowDropTrailersReadFromTransport,
@Nullable Runnable onCloseRunnable) {
final SingleSource<StreamingHttpResponse> responseSingle;
Throwable futureCause = future.cause(); // assume this doesn't throw
if (futureCause == null) {
Http2StreamChannel streamChannel = null;
try {
streamChannel = future.getNow();
if (onCloseRunnable != null) {
streamChannel.closeFuture().addListener(f -> onCloseRunnable.run());
}
parentContext.trackActiveStream(streamChannel);

final CloseHandler closeHandler = forNonPipelined(true, streamChannel.config());
Expand Down Expand Up @@ -310,6 +352,11 @@ private void childChannelActive(Future<Http2StreamChannel> future,
unexpected.addSuppressed(cause);
LOGGER.warn("Unexpected exception while handling the original cause", unexpected);
}
} else {
streamObserver.streamClosed(cause);
if (onCloseRunnable != null) {
onCloseRunnable.run();
}
}
subscriber.onError(cause);
return;
Expand All @@ -331,6 +378,10 @@ public void onError(final Throwable t) {
}
});
} else {
streamObserver.streamClosed(futureCause);
if (onCloseRunnable != null) {
onCloseRunnable.run();
}
subscriber.onError(futureCause);
}
}
Expand Down Expand Up @@ -447,4 +498,21 @@ public void eventConsumed() {
}
}
}

private static final class StacklessCancellationException extends CancellationException {
private static final long serialVersionUID = 3235852873427231209L;

private StacklessCancellationException() { }

// Override fillInStackTrace() so we not populate the backtrace via a native call and so leak the
// Classloader.
@Override
public Throwable fillInStackTrace() {
return this;
}

static StacklessCancellationException newInstance(Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessCancellationException(), clazz, method);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;

import static io.servicetalk.client.api.internal.RequestConcurrencyController.Result.Accepted;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
import static java.util.function.Function.identity;

final class LoadBalancedStreamingHttpClient implements FilterableStreamingHttpClient {
Expand Down Expand Up @@ -65,17 +67,24 @@ public Single<StreamingHttpResponse> request(final HttpExecutionStrategy strateg
// LoadBalancer takes ownership of it (e.g. connection initialization) and in that case they will not be
// following the LoadBalancer API which this Client depends upon to ensure the concurrent request count state is
// correct.
return loadBalancer.selectConnection(SELECTOR_FOR_REQUEST)
.flatMap(c -> c.request(strategy, request)
return loadBalancer.selectConnection(SELECTOR_FOR_REQUEST).flatMap(c -> {
final OwnedRunnable ownedRunnable = c.connectionContext().protocol().major() <= 1 ? null :
new OwnedRunnable(c::requestFinished);
return c.request(strategy, ownedRunnable == null ? request :
new StreamingHttpRequestWithContext(request, ownedRunnable))
.liftSync(new BeforeFinallyHttpOperator(new TerminalSignalConsumer() {
@Override
public void onComplete() {
c.requestFinished();
if (ownedRunnable == null || ownedRunnable.own()) {
c.requestFinished();
}
}

@Override
public void onError(final Throwable throwable) {
c.requestFinished();
if (ownedRunnable == null || ownedRunnable.own()) {
c.requestFinished();
}
}

@Override
Expand All @@ -88,22 +97,25 @@ public void cancel() {
// Transport MAY not close the connection if cancel raced with completion and completion
// was seen by the transport before cancel. We have no way of knowing at this layer
// if this indeed happen.
//
// For H2 and above, connection are multiplexed and use virtual streams for each
// request-response exchange. Because we don't have any way to know when the stream gets
// closed we prematurely mark the request as finished. If the maximum concurrent streams
// violated for this connection that exception is safe to retry on a new or same
// connection.
if (c.connectionContext().protocol().major() <= 1) {
// request-response exchange. Because we don't have access to the stream at this level
// we cannot close it. Instead, we use a Runnable which will be registered for the
// stream and executed when it closes. However, cancellation can happen before transport
// created a stream. We check the ownership of the Runnable and if it was not owned by
// the transport, we mark request as finished immediately.
if (ownedRunnable == null) {
c.closeAsync().subscribe();
} else {
} else if (ownedRunnable.own()) {
c.requestFinished();
}
}
}))
// subscribeShareContext is used because otherwise the AsyncContext modified during response
// meta data processing will not be visible during processing of the response payload for
// ConnectionFilters (it already is visible on ClientFilters).
.subscribeShareContext());
.subscribeShareContext();
});
}

@Override
Expand Down Expand Up @@ -142,4 +154,26 @@ public Completable closeAsyncGracefully() {
public StreamingHttpRequest newRequest(final HttpRequestMethod method, final String requestTarget) {
return reqRespFactory.newRequest(method, requestTarget);
}

static final class OwnedRunnable implements Runnable {
private static final AtomicIntegerFieldUpdater<OwnedRunnable> ownedUpdater =
newUpdater(OwnedRunnable.class, "owned");
@SuppressWarnings("unused")
private volatile int owned;

private final Runnable runnable;

OwnedRunnable(final Runnable runnable) {
this.runnable = runnable;
}

@Override
public void run() {
runnable.run();
}

boolean own() {
return ownedUpdater.compareAndSet(this, 0, 1);
}
}
}
Loading

0 comments on commit 09cf8c7

Please sign in to comment.