Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP StreamingService to BlockingStreaming backpressure and error propagation fixes #1328

Merged
merged 5 commits into from
Jan 21, 2021
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,36 @@
import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.internal.DelayedCancellable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.http.api.BlockingUtils.futureGetCancelOnInterrupt;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;

final class StreamingHttpServiceToBlockingStreamingHttpService implements BlockingStreamingHttpService {
private final StreamingHttpService original;
private final int demandBatchSize;

StreamingHttpServiceToBlockingStreamingHttpService(final StreamingHttpService original) {
this(original, 64);
}

StreamingHttpServiceToBlockingStreamingHttpService(final StreamingHttpService original,
final int demandBatchSize) {
if (demandBatchSize <= 0) {
throw new IllegalArgumentException("demandBatchSize: " + demandBatchSize + " (expected >0)");
}
this.original = requireNonNull(original);
this.demandBatchSize = demandBatchSize;
}

@Override
Expand All @@ -58,8 +64,8 @@ private Completable handleBlockingRequest(final HttpServiceContext ctx,
return original.handle(ctx, request.toStreamingRequest(), ctx.streamingResponseFactory())
.flatMapCompletable(streamingHttpResponse -> {
copyMeta(streamingHttpResponse, svcResponse);
return new PayloadBodyAndTrailersToPayloadWriter(
streamingHttpResponse.payloadBodyAndTrailers(), svcResponse.sendMetaData());
return new MessageBodyToPayloadWriter(streamingHttpResponse.payloadBodyAndTrailers(),
svcResponse.sendMetaData(), demandBatchSize);
});
}

Expand All @@ -80,102 +86,96 @@ public void closeGracefully() throws Exception {
original.closeAsyncGracefully().toFuture().get();
}

private static class PayloadBodyAndTrailersToPayloadWriter extends SubscribableCompletable {

private final Publisher<Object> payloadBodyAndTrailers;
private static final class MessageBodyToPayloadWriter extends SubscribableCompletable {
private final Publisher<Object> messageBody;
private final HttpPayloadWriter<Buffer> payloadWriter;
private final int demandBatchSize;

PayloadBodyAndTrailersToPayloadWriter(final Publisher<Object> payloadBodyAndTrailers,
final HttpPayloadWriter<Buffer> payloadWriter) {
this.payloadBodyAndTrailers = payloadBodyAndTrailers;
MessageBodyToPayloadWriter(final Publisher<Object> messageBody,
final HttpPayloadWriter<Buffer> payloadWriter,
final int demandBatchSize) {
this.messageBody = messageBody;
this.payloadWriter = payloadWriter;
this.demandBatchSize = demandBatchSize;
}

@Override
protected void handleSubscribe(final CompletableSource.Subscriber subscriber) {
toSource(payloadBodyAndTrailers).subscribe(new PayloadPump(subscriber, payloadWriter));
toSource(messageBody).subscribe(new PayloadPump(subscriber, payloadWriter, demandBatchSize));
}

private static final class PayloadPump extends DelayedCancellable
implements PublisherSource.Subscriber<Object> {

private static final Logger LOGGER = LoggerFactory.getLogger(PayloadPump.class);

private static final AtomicIntegerFieldUpdater<PayloadPump> terminatedUpdater =
newUpdater(PayloadPump.class, "terminated");

private static final class PayloadPump implements PublisherSource.Subscriber<Object> {
private final Subscriber subscriber;
private final HttpPayloadWriter<Buffer> payloadWriter;
private volatile int terminated;
@Nullable
private Subscription subscription;
private final int demandBatchSize;
private int itemsToNextRequest;

PayloadPump(final Subscriber subscriber, final HttpPayloadWriter<Buffer> payloadWriter) {
PayloadPump(final Subscriber subscriber, final HttpPayloadWriter<Buffer> payloadWriter,
final int demandBatchSize) {
this.subscriber = subscriber;
this.payloadWriter = payloadWriter;
this.demandBatchSize = demandBatchSize;
}

@Override
public void onSubscribe(final PublisherSource.Subscription inSubscription) {
// We need to protect sub.cancel() from concurrent invocation with sub.request(MAX)
subscriber.onSubscribe(this);
inSubscription.request(Long.MAX_VALUE);
delayedCancellable(inSubscription);
public void onSubscribe(final Subscription inSubscription) {
// We need to protect sub.cancel() from concurrent invocation.
subscription = ConcurrentSubscription.wrap(inSubscription);
subscriber.onSubscribe(subscription);
itemsToNextRequest = demandBatchSize;
subscription.request(demandBatchSize);
}

@Override
public void onNext(@Nullable final Object bufferOrTrailers) {
assert bufferOrTrailers != null;
try {
if (bufferOrTrailers instanceof Buffer) {
payloadWriter.write((Buffer) bufferOrTrailers);
return;
}
if (bufferOrTrailers instanceof HttpHeaders) {
payloadWriter.setTrailers((HttpHeaders) bufferOrTrailers);
return;
}
assert false : "Expected only buffer or trailer in payloadBodyAndTrailers()";
} catch (IOException e) {
if (bufferOrTrailers instanceof Buffer) {
try {
if (tryTerminate()) {
subscriber.onError(e);
} else {
throwException(e);
}
} finally {
this.cancel();
payloadWriter.write((Buffer) bufferOrTrailers);
} catch (IOException e) {
throwException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this need onError

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If onNext throws the async source is responsible for catching the exception and propagating an error back downstream. Throwing the exception also effectively cancels the subscription and lets the upstream source cleanup.

We generally don't throw from terminal methods (e.g. onComplete(), onError(Throwable)) because the upstream source isn't allowed to invoke terminal methods more than once, so we must handle the error locally.

}
} else if (bufferOrTrailers instanceof HttpHeaders) {
payloadWriter.setTrailers((HttpHeaders) bufferOrTrailers);
} else {
throw new IllegalArgumentException("unsupported type: " + bufferOrTrailers);
}
requestMoreIfRequired();
}

@Override
public void onError(final Throwable t) {
// Don't close the payloadWriter on error, we need to bubble up the exception through the subscriber to
// communicate the failure
if (tryTerminate()) {
subscriber.onError(t);
} else {
LOGGER.error("Failed to deliver onError() after termination", t);
try {
payloadWriter.close(t);
} catch (Throwable cause) {
subscriber.onError(cause);
return;
}
subscriber.onError(t);
}

@Override
public void onComplete() {
try {
payloadWriter.close();
} catch (IOException e) {
if (tryTerminate()) {
subscriber.onError(e);
} else {
LOGGER.warn("Failed to deliver IOException from payloadWriter.close() after termination", e);
}
}
if (tryTerminate()) {
subscriber.onComplete();
} catch (Throwable cause) {
subscriber.onError(cause);
return;
}
subscriber.onComplete();
}

boolean tryTerminate() {
return terminatedUpdater.compareAndSet(this, 0, 1);
private void requestMoreIfRequired() {
// Request more when we half of the outstanding demand has been delivered. This attempts to keep some
// outstanding demand in the event there is impedance mismatch between producer and consumer (as opposed
// to waiting until outstanding demand reaches 0) while still having an upper bound.
if (--itemsToNextRequest == demandBatchSize >>> 1) {
final int toRequest = demandBatchSize - itemsToNextRequest;
itemsToNextRequest = demandBatchSize;
assert subscription != null;
subscription.request(toRequest);
}
}
}
}
Expand Down