From 01eed5a07127bb794f7a086da04ace0358e08138 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Mon, 25 Jan 2021 21:18:07 -0800 Subject: [PATCH] Completable#mergeDelayError(Publisher) and HTTP client control flow Motivation: The HTTP client control flow merges the resule of the write Publisher (a Completable) and the read Publisher via Completable#merge(Publisher). However when offloading is enabled it is possible events will be delivered out of order which may result in dropped data/trailers. Modifications: - Introduce the Completable#mergeDelayError(Publisher) operator - Use new mergeDelayError(..) operator instead of merge(..) so that both streams must complete which ensures all the read data is delivered to the application before any errors. It is assumed that if the write Publisher fails that the read publisher will also fail due to transport failure. Result: No more dropped data/trailres on HTTP client due to write failure (e.g. connection closure) being delivered before read data due to offloading (e.g. not enough demand, subscription is offloaded, demand arrives after write error is already delivered). --- .../concurrent/api/Completable.java | 41 +- .../api/CompletableMergeWithPublisher.java | 142 +++++- .../CompletableMergeWithPublisherTest.java | 437 ++++++++++++++++-- ...leMergeWithPublisherDelayErrorTckTest.java | 29 ++ .../http/netty/NettyPipelinedConnection.java | 2 +- .../NonPipelinedStreamingHttpConnection.java | 4 +- .../ConnectionCloseHeaderHandlingTest.java | 21 +- .../http/netty/HttpTransportObserverTest.java | 4 +- .../netty/NettyPipelinedConnectionTest.java | 12 +- 9 files changed, 639 insertions(+), 53 deletions(-) create mode 100644 servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/CompletableMergeWithPublisherDelayErrorTckTest.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java index 4eb2edebc6..3d41e45779 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Completable.java @@ -518,7 +518,46 @@ public final Completable merge(Iterable other) { * @see ReactiveX merge operator. */ public final Publisher merge(Publisher mergeWith) { - return new CompletableMergeWithPublisher<>(this, mergeWith, executor); + return new CompletableMergeWithPublisher<>(this, mergeWith, false, executor); + } + + /** + * Merges the passed {@link Publisher} with this {@link Completable}. + *

+ * The resulting {@link Publisher} emits all items emitted by the passed {@link Publisher} and terminates when both + * this {@link Completable} and the passed {@link Publisher} terminate. If either terminates with an error then the + * error will be propagated to the return value. + *

{@code
+     *     ExecutorService e = ...;
+     *     List> futures = ...;
+     *     futures.add(e.submit(() -> resultOfThisCompletable()));
+     *     futures.add(e.submit(() -> resultOfMergeWithStream());
+     *     Throwable overallCause = null;
+     *     // This is an approximation, this operator does not provide any ordering guarantees for the results.
+     *     for (Future future : futures) {
+     *         try {
+     *             f.get();
+     *         } catch (Throwable cause) {
+     *             if (overallCause != null) {
+     *                 overallCause = cause;
+     *             }
+     *         }
+     *     }
+     *     if (overallCause != null) {
+     *         throw overallCause;
+     *     }
+     * }
+ * + * @param mergeWith the {@link Publisher} to merge in + * @param The value type of the resulting {@link Publisher}. + * @return {@link Publisher} emits all items emitted by the passed {@link Publisher} and terminates when both this + * {@link Completable} and the passed {@link Publisher} terminate. If either terminates with an error then the + * error will be propagated to the return value. + * + * @see ReactiveX merge operator. + */ + public final Publisher mergeDelayError(Publisher mergeWith) { + return new CompletableMergeWithPublisher<>(this, mergeWith, true, executor); } /** diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableMergeWithPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableMergeWithPublisher.java index 50a8c144ec..ac161caeb9 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableMergeWithPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/CompletableMergeWithPublisher.java @@ -24,8 +24,11 @@ import io.servicetalk.concurrent.internal.SignalOffloader; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nullable; +import static java.util.Objects.requireNonNull; + /** * {@link Publisher} as returned by {@link Completable#merge(Publisher)}. * @@ -34,22 +37,152 @@ final class CompletableMergeWithPublisher extends AbstractNoHandleSubscribePublisher { private final Completable original; private final Publisher mergeWith; + private final boolean delayError; - CompletableMergeWithPublisher(Completable original, Publisher mergeWith, Executor executor) { + CompletableMergeWithPublisher(Completable original, Publisher mergeWith, boolean delayError, + Executor executor) { super(executor); this.mergeWith = mergeWith; this.original = original; + this.delayError = delayError; } @Override void handleSubscribe(final Subscriber subscriber, final SignalOffloader signalOffloader, final AsyncContextMap contextMap, final AsyncContextProvider contextProvider) { - new Merger<>(subscriber, signalOffloader, contextMap, contextProvider) - .merge(original, mergeWith, signalOffloader, contextMap, contextProvider); + if (delayError) { + new MergerDelayError<>(subscriber, signalOffloader, contextMap, contextProvider) + .merge(original, mergeWith, signalOffloader, contextMap, contextProvider); + } else { + new Merger<>(subscriber, signalOffloader, contextMap, contextProvider) + .merge(original, mergeWith, signalOffloader, contextMap, contextProvider); + } } - private static final class Merger implements Subscriber { + private static final class MergerDelayError implements Subscriber { + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater terminalUpdater = + AtomicReferenceFieldUpdater.newUpdater(MergerDelayError.class, TerminalSignal.class, "terminal"); + private final CompletableSubscriber completableSubscriber; + private final Subscriber offloadedSubscriber; + private final DelayedSubscription subscription = new DelayedSubscription(); + @Nullable + private volatile TerminalSignal terminal; + + MergerDelayError(Subscriber subscriber, SignalOffloader signalOffloader, AsyncContextMap contextMap, + AsyncContextProvider contextProvider) { + // This is used only to deliver signals that originate from the mergeWith Publisher. Since, we need to + // preserve the threading semantics of the original Completable, we offload the subscriber so that we do not + // invoke it from the mergeWith Publisher Executor thread. + this.offloadedSubscriber = signalOffloader.offloadSubscriber(contextProvider.wrapPublisherSubscriber( + subscriber, contextMap)); + completableSubscriber = new CompletableSubscriber(); + } + + void merge(Completable original, Publisher mergeWith, SignalOffloader signalOffloader, + AsyncContextMap contextMap, AsyncContextProvider contextProvider) { + offloadedSubscriber.onSubscribe( + new MergedCancellableWithSubscription(subscription, completableSubscriber)); + original.delegateSubscribe(completableSubscriber, signalOffloader, contextMap, + contextProvider); + // SignalOffloader is associated with the original Completable. Since mergeWith Publisher is provided by + // the user, it will have its own Executor, hence we should not pass this signalOffloader to subscribe to + // mergeWith. + // Any signal originating from mergeWith Publisher should be offloaded before they are sent to the + // Subscriber of the resulting Publisher of CompletableMergeWithPublisher as the Executor associated with + // the original Completable defines the threading semantics for that Subscriber. + mergeWith.subscribeInternal(this); + } + + @Override + public void onSubscribe(final Subscription subscription) { + this.subscription.delayedSubscription(subscription); + } + + @Override + public void onNext(@Nullable final T t) { + offloadedSubscriber.onNext(t); + } + + @Override + public void onError(final Throwable t) { + terminateSubscriber(new TerminalSignal(t, true)); + } + + @Override + public void onComplete() { + terminateSubscriber(TerminalSignal.PUB_COMPLETED); + } + + private void terminateSubscriber(final TerminalSignal terminalSignal) { + for (;;) { + final TerminalSignal currState = terminal; + if (currState != null) { + if (currState.fromPublisher == terminalSignal.fromPublisher) { + // The goal of this check is to prevent concurrency on the Subscriber and require both sources + // terminate before terminating downstream. We don't need to enforce each source terminates + // exactly once for the correctness of this operator (so we don't). + throw duplicateTerminalException(currState); + } + if (currState.cause == null) { + if (terminalSignal.cause == null) { + offloadedSubscriber.onComplete(); + } else { + offloadedSubscriber.onError(terminalSignal.cause); + } + } else { + offloadedSubscriber.onError(currState.cause); + } + break; + } else if (terminalUpdater.compareAndSet(this, null, terminalSignal)) { + break; + } + } + } + + private final class CompletableSubscriber extends DelayedCancellable implements CompletableSource.Subscriber { + @Override + public void onSubscribe(Cancellable cancellable) { + delayedCancellable(cancellable); + } + + @Override + public void onComplete() { + terminateSubscriber(TerminalSignal.COM_COMPLETED); + } + + @Override + public void onError(Throwable t) { + terminateSubscriber(new TerminalSignal(t, false)); + } + } + + private static final class TerminalSignal { + private static final TerminalSignal PUB_COMPLETED = new TerminalSignal(true); + private static final TerminalSignal COM_COMPLETED = new TerminalSignal(false); + @Nullable + final Throwable cause; + final boolean fromPublisher; + + TerminalSignal(boolean fromPublisher) { + cause = null; + this.fromPublisher = fromPublisher; + } + TerminalSignal(Throwable cause, boolean fromPublisher) { + this.cause = requireNonNull(cause); + this.fromPublisher = fromPublisher; + } + } + + private static IllegalStateException duplicateTerminalException(TerminalSignal currState) { + throw new IllegalStateException("duplicate terminal event from " + (currState.fromPublisher ? + Publisher.class.getSimpleName() : Completable.class.getSimpleName()), currState.cause); + } + } + + private static final class Merger implements Subscriber { + @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater completionCountUpdater = AtomicIntegerFieldUpdater.newUpdater(Merger.class, "completionCount"); @@ -67,7 +200,6 @@ private static final class Merger implements Subscriber { // invoke it from the mergeWith Publisher Executor thread. this.offloadedSubscriber = new ConcurrentTerminalSubscriber<>(signalOffloader.offloadSubscriber( contextProvider.wrapPublisherSubscriber(subscriber, contextMap)), false); - completableSubscriber = new CompletableSubscriber(); } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/CompletableMergeWithPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/CompletableMergeWithPublisherTest.java index 937e8c8dee..cb267e028f 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/CompletableMergeWithPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/CompletableMergeWithPublisherTest.java @@ -15,17 +15,26 @@ */ package io.servicetalk.concurrent.api; +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.PublisherSource.Subscription; +import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; +import org.mockito.stubbing.Answer; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; import static io.servicetalk.concurrent.api.ExecutorRule.newRule; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static io.servicetalk.utils.internal.PlatformDependent.throwException; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -34,8 +43,18 @@ import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class CompletableMergeWithPublisherTest { + @Rule + public final Timeout timeout = new ServiceTalkTestTimeout(); @Rule public final ExecutorRule executorRule = newRule(); private final TestSubscription subscription = new TestSubscription(); @@ -45,11 +64,30 @@ public class CompletableMergeWithPublisherTest { @Test public void testDelayedPublisherSubscriptionForReqNBuffering() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testDelayedPublisherSubscriptionForReqNBuffering(false); + } + + @Test + public void delayErrorDelayedPublisherSubscriptionForReqNBuffering() { + testDelayedPublisherSubscriptionForReqNBuffering(true); + } + + private Publisher applyMerge(Completable completable, boolean delayError) { + return applyMerge(completable, delayError, publisher); + } + + private static Publisher applyMerge(Completable completable, boolean delayError, + Publisher publisher) { + return delayError ? completable.mergeDelayError(publisher) : completable.merge(publisher); + } + + private void testDelayedPublisherSubscriptionForReqNBuffering(boolean delayError) { + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); subscriber.awaitSubscription().request(5); completable.onComplete(); subscriber.awaitSubscription().request(7); + publisher.onSubscribe(subscription); publisher.onNext("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"); publisher.onComplete(); assertThat(subscriber.takeOnNext(12), contains("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12")); @@ -58,8 +96,17 @@ public void testDelayedPublisherSubscriptionForReqNBuffering() { @Test public void testDelayedPublisherSubscriptionForCancelBuffering() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testDelayedPublisherSubscriptionForCancelBuffering(false); + } + + @Test + public void delayErrorDelayedPublisherSubscriptionForCancelBuffering() { + testDelayedPublisherSubscriptionForCancelBuffering(true); + } + + private void testDelayedPublisherSubscriptionForCancelBuffering(boolean delayError) { + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); subscriber.awaitSubscription().request(5); publisher.onSubscribe(subscription); completable.onComplete(); @@ -69,20 +116,30 @@ public void testDelayedPublisherSubscriptionForCancelBuffering() { @Test public void testDelayedCompletableSubscriptionForCancelBuffering() { - LegacyTestCompletable completable = new LegacyTestCompletable(false, true); - toSource(completable.merge(publisher)).subscribe(subscriber); + testDelayedCompletableSubscriptionForCancelBuffering(false); + } + + @Test + public void delayErrorDelayedCompletableSubscriptionForCancelBuffering() { + testDelayedCompletableSubscriptionForCancelBuffering(true); + } + + private void testDelayedCompletableSubscriptionForCancelBuffering(boolean delayError) { + TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(); + TestCancellable cancellable = new TestCancellable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); subscriber.awaitSubscription().request(5); - completable.sendOnSubscribe(); + completable.onSubscribe(cancellable); publisher.onSubscribe(subscription); completable.onComplete(); subscriber.awaitSubscription().cancel(); assertTrue(subscription.isCancelled()); - completable.verifyCancelled(); + assertTrue(cancellable.isCancelled()); } @Test public void testCompletableFailCancelsPublisher() { - LegacyTestCompletable completable = new LegacyTestCompletable(); + TestCompletable completable = new TestCompletable(); toSource(completable.merge(publisher)).subscribe(subscriber); publisher.onSubscribe(subscription); completable.onError(DELIBERATE_EXCEPTION); @@ -90,34 +147,110 @@ public void testCompletableFailCancelsPublisher() { assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } + @Test + public void delayErrorCompletableFailDoesNotCancelPublisherFail() { + delayErrorCompletableFailDoesNotCancelPublisher(true); + } + + @Test + public void delayErrorCompletableFailDoesNotCancelPublisher() { + delayErrorCompletableFailDoesNotCancelPublisher(false); + } + + private void delayErrorCompletableFailDoesNotCancelPublisher(boolean secondIsError) { + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, true)).subscribe(subscriber); + publisher.onSubscribe(subscription); + completable.onError(DELIBERATE_EXCEPTION); + assertFalse(subscription.isCancelled()); + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); + if (secondIsError) { + publisher.onError(newSecondException()); + } else { + publisher.onComplete(); + } + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + @Test public void testPublisherFailCancelsCompletable() { - LegacyTestCompletable completable = new LegacyTestCompletable(); + TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(); + TestCancellable cancellable = new TestCancellable(); toSource(completable.merge(publisher)).subscribe(subscriber); publisher.onSubscribe(subscription); + completable.onSubscribe(cancellable); assertFalse(subscription.isCancelled()); publisher.onError(DELIBERATE_EXCEPTION); - completable.verifyCancelled(); + assertTrue(cancellable.isCancelled()); assertFalse(subscription.isCancelled()); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } + @Test + public void delayErrorPublisherFailDoesNotCancelCompletableFail() { + delayErrorPublisherFailDoesNotCancelCompletable(true); + } + + @Test + public void delayErrorPublisherFailDoesNotCancelCompletable() { + delayErrorPublisherFailDoesNotCancelCompletable(false); + } + + private void delayErrorPublisherFailDoesNotCancelCompletable(boolean secondIsError) { + TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(); + TestCancellable cancellable = new TestCancellable(); + toSource(applyMerge(completable, true)).subscribe(subscriber); + publisher.onSubscribe(subscription); + completable.onSubscribe(cancellable); + assertFalse(subscription.isCancelled()); + publisher.onError(DELIBERATE_EXCEPTION); + assertFalse(cancellable.isCancelled()); + assertFalse(subscription.isCancelled()); + assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); + if (secondIsError) { + completable.onError(newSecondException()); + } else { + completable.onComplete(); + } + assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); + } + @Test public void testCancelCancelsPendingSourceSubscription() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testCancelCancelsPendingSourceSubscription(false); + } + + @Test + public void delayErrorCancelCancelsPendingSourceSubscription() { + testCancelCancelsPendingSourceSubscription(true); + } + + private void testCancelCancelsPendingSourceSubscription(boolean delayError) { + TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(); + TestCancellable cancellable = new TestCancellable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); publisher.onSubscribe(subscription); + completable.onSubscribe(cancellable); subscriber.awaitSubscription().cancel(); assertTrue(subscription.isCancelled()); - completable.verifyCancelled(); + assertTrue(cancellable.isCancelled()); assertThat(subscriber.pollOnNext(10, MILLISECONDS), is(nullValue())); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); } @Test public void testCancelCompletableCompletePublisherPendingCancelsNoMoreInteraction() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testCancelCompletableCompletePublisherPendingCancelsNoMoreInteraction(false); + } + + @Test + public void delayErrorCancelCompletableCompletePublisherPendingCancelsNoMoreInteraction() { + testCancelCompletableCompletePublisherPendingCancelsNoMoreInteraction(true); + } + + private void testCancelCompletableCompletePublisherPendingCancelsNoMoreInteraction(boolean delayError) { + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); publisher.onSubscribe(subscription); completable.onComplete(); subscriber.awaitSubscription().request(2); @@ -131,8 +264,20 @@ public void testCancelCompletableCompletePublisherPendingCancelsNoMoreInteractio @Test public void testCancelPublisherCompleteCompletablePendingCancelsNoMoreInteraction() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testCancelPublisherCompleteCompletablePendingCancelsNoMoreInteraction(false); + } + + @Test + public void delayErrorCancelPublisherCompleteCompletablePendingCancelsNoMoreInteraction() { + testCancelPublisherCompleteCompletablePendingCancelsNoMoreInteraction(true); + } + + private void testCancelPublisherCompleteCompletablePendingCancelsNoMoreInteraction(boolean delayError) { + TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(); + TestCancellable cancellable = new TestCancellable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); + completable.onSubscribe(cancellable); + publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(2); publisher.onNext("one", "two"); publisher.onComplete(); @@ -140,13 +285,23 @@ public void testCancelPublisherCompleteCompletablePendingCancelsNoMoreInteractio assertThat(subscriber.takeOnNext(2), contains("one", "two")); assertThat(subscriber.pollOnNext(10, MILLISECONDS), is(nullValue())); assertThat(subscriber.pollTerminal(10, MILLISECONDS), is(nullValue())); - completable.verifyCancelled(); + assertTrue(cancellable.isCancelled()); } @Test public void testCompletableAndPublisherCompleteSingleCompleteSignal() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testCompletableAndPublisherCompleteSingleCompleteSignal(false); + } + + @Test + public void delayErrorCompletableAndPublisherCompleteSingleCompleteSignal() { + testCompletableAndPublisherCompleteSingleCompleteSignal(true); + } + + private void testCompletableAndPublisherCompleteSingleCompleteSignal(boolean delayError) { + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); + publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(2); completable.onComplete(); publisher.onNext("one", "two"); @@ -157,22 +312,43 @@ public void testCompletableAndPublisherCompleteSingleCompleteSignal() { @Test public void testCompletableAndPublisherFailOnlySingleErrorSignal() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testCompletableAndPublisherFailOnlySingleErrorSignal(false); + } + + @Test + public void delayErrorCompletableAndPublisherFailOnlySingleErrorSignal() { + testCompletableAndPublisherFailOnlySingleErrorSignal(true); + } + + private void testCompletableAndPublisherFailOnlySingleErrorSignal(boolean delayError) { + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(3); publisher.onNext("one", "two"); completable.onError(DELIBERATE_EXCEPTION); - assertTrue(subscription.isCancelled()); - publisher.onError(DELIBERATE_EXCEPTION); + if (!delayError) { + assertTrue(subscription.isCancelled()); + } + publisher.onError(newSecondException()); assertThat(subscriber.takeOnNext(2), contains("one", "two")); assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION)); } @Test public void testCompletableFailsAndPublisherCompletesSingleErrorSignal() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testCompletableFailsAndPublisherCompletesSingleErrorSignal(false); + } + + @Test + public void delayErrorCompletableFailsAndPublisherCompletesSingleErrorSignal() { + testCompletableFailsAndPublisherCompletesSingleErrorSignal(true); + } + + private void testCompletableFailsAndPublisherCompletesSingleErrorSignal(boolean delayError) { + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); + publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(2); publisher.onNext("one", "two"); publisher.onComplete(); @@ -183,8 +359,17 @@ public void testCompletableFailsAndPublisherCompletesSingleErrorSignal() { @Test public void testPublisherFailsAndCompletableCompletesSingleErrorSignal() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - toSource(completable.merge(publisher)).subscribe(subscriber); + testPublisherFailsAndCompletableCompletesSingleErrorSignal(false); + } + + @Test + public void delayErrorPublisherFailsAndCompletableCompletesSingleErrorSignal() { + testPublisherFailsAndCompletableCompletesSingleErrorSignal(true); + } + + private void testPublisherFailsAndCompletableCompletesSingleErrorSignal(boolean delayError) { + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(2); publisher.onNext("one", "two"); @@ -197,11 +382,20 @@ public void testPublisherFailsAndCompletableCompletesSingleErrorSignal() { @Test public void offloadingWaitsForPublisherSignalsEvenIfCompletableTerminates() throws Exception { + offloadingWaitsForPublisherSignalsEvenIfCompletableTerminates(false); + } + + @Test + public void delayErrorOffloadingWaitsForPublisherSignalsEvenIfCompletableTerminates() throws Exception { + offloadingWaitsForPublisherSignalsEvenIfCompletableTerminates(true); + } + + private void offloadingWaitsForPublisherSignalsEvenIfCompletableTerminates(boolean delayError) throws Exception { TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(); TestCancellable testCancellable = new TestCancellable(); CountDownLatch latch = new CountDownLatch(1); - toSource(completable.publishOn(executorRule.executor()) - .merge(publisher.publishOn(executorRule.executor())).afterOnNext(item -> { + toSource(applyMerge(completable.publishOn(executorRule.executor()), delayError, + publisher.publishOn(executorRule.executor())).afterOnNext(item -> { // The goal of this test is to have the Completable terminate, but have onNext signals from the Publisher be // delayed on the Executor. Even in this case the merge operator should correctly sequence the onComplete to // the downstream subscriber until after all the onNext events have completed. @@ -229,4 +423,185 @@ public void offloadingWaitsForPublisherSignalsEvenIfCompletableTerminates() thro assertThat(subscriber.takeOnNext(values.length), contains(values)); subscriber.awaitOnComplete(); } + + @Test + public void publisherAndCompletableErrorDoesNotDuplicateErrorDownstream() throws Exception { + publisherAndCompletableErrorDoesNotDuplicateErrorDownstream(false); + } + + @Test + public void delayErrorPublisherAndCompletableErrorDoesNotDuplicateErrorDownstream() throws Exception { + publisherAndCompletableErrorDoesNotDuplicateErrorDownstream(true); + } + + private void publisherAndCompletableErrorDoesNotDuplicateErrorDownstream(boolean delayError) throws Exception { + CyclicBarrier barrier = new CyclicBarrier(2); + TestCompletable completable = new TestCompletable(); + toSource(applyMerge(completable, delayError)).subscribe(subscriber); + publisher.onSubscribe(subscription); + Future f = executorRule.executor().submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throwException(e); + } + completable.onError(DELIBERATE_EXCEPTION); + }).toFuture(); + + barrier.await(); + publisher.onError(DELIBERATE_EXCEPTION); + + f.get(); + assertThat(subscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); + } + + @Test + public void onNextConcurrentWithCompletableError() throws Exception { + onNextConcurrentWithCompletableError(false); + } + + @Test + public void delayErrorOnNextConcurrentWithCompletableError() throws Exception { + onNextConcurrentWithCompletableError(true); + } + + private void onNextConcurrentWithCompletableError(boolean delayError) throws Exception { + CountDownLatch nextLatch1 = new CountDownLatch(1); + CountDownLatch nextLatch2 = new CountDownLatch(1); + TestCompletable completable = new TestCompletable(); + @SuppressWarnings("unchecked") + PublisherSource.Subscriber mockSubscriber = mock(PublisherSource.Subscriber.class); + doAnswer((Answer) invocation -> { + Subscription s = invocation.getArgument(0); + s.request(Long.MAX_VALUE); + return null; + }).when(mockSubscriber).onSubscribe(any()); + doAnswer((Answer) invocation -> { + nextLatch2.countDown(); + nextLatch1.await(); + return null; + }).when(mockSubscriber).onNext(any()); + toSource(applyMerge(completable, delayError)).subscribe(mockSubscriber); + publisher.onSubscribe(subscription); + Future f = executorRule.executor().submit(() -> { + try { + nextLatch2.await(); + } catch (Exception e) { + throwException(e); + } + completable.onError(DELIBERATE_EXCEPTION); + nextLatch1.countDown(); + }).toFuture(); + + subscription.awaitRequestN(1); + publisher.onNext("one"); + f.get(); + publisher.onError(newSecondException()); + verify(mockSubscriber).onNext(eq("one")); + verify(mockSubscriber).onError(eq(DELIBERATE_EXCEPTION)); + verify(mockSubscriber, never()).onComplete(); + } + + private static IllegalStateException newSecondException() { + return new IllegalStateException("second exception should be ignored"); + } + + @Test + public void onNextReentryCompleteConcurrentWithCompletable() throws Exception { + onNextReentryCompleteConcurrentWithCompletable(false, true, true); + } + + @Test + public void onNextReentryErrorConcurrentWithCompletable() throws Exception { + onNextReentryCompleteConcurrentWithCompletable(false, false, true); + } + + @Test + public void onNextReentryCompleteConcurrentWithCompletableError() throws Exception { + onNextReentryCompleteConcurrentWithCompletable(false, true, false); + } + + @Test + public void onNextReentryErrorConcurrentWithCompletableError() throws Exception { + onNextReentryCompleteConcurrentWithCompletable(false, false, false); + } + + @Test + public void delayErrorOnNextReentryCompleteConcurrentWithCompletable() throws Exception { + onNextReentryCompleteConcurrentWithCompletable(true, true, true); + } + + @Test + public void delayErrorOnNextReentryErrorConcurrentWithCompletable() throws Exception { + onNextReentryCompleteConcurrentWithCompletable(true, false, true); + } + + @Test + public void delayErrorOnNextReentryCompleteConcurrentWithCompletableError() throws Exception { + onNextReentryCompleteConcurrentWithCompletable(true, true, false); + } + + @Test + public void delayErrorOnNextReentryErrorConcurrentWithCompletableError() throws Exception { + onNextReentryCompleteConcurrentWithCompletable(true, false, false); + } + + private void onNextReentryCompleteConcurrentWithCompletable(boolean delayError, boolean pubOnComplete, + boolean compOnComplete) throws Exception { + CountDownLatch nextLatch1 = new CountDownLatch(1); + CountDownLatch nextLatch2 = new CountDownLatch(1); + TestCompletable completable = new TestCompletable(); + AtomicInteger onNextCount = new AtomicInteger(); + @SuppressWarnings("unchecked") + PublisherSource.Subscriber mockSubscriber = mock(PublisherSource.Subscriber.class); + doAnswer((Answer) invocation -> { + Subscription s = invocation.getArgument(0); + s.request(Long.MAX_VALUE); + return null; + }).when(mockSubscriber).onSubscribe(any()); + doAnswer((Answer) invocation -> { + nextLatch2.countDown(); + final int count = onNextCount.incrementAndGet(); + if (count == 1) { + publisher.onNext("two"); + } else if (count == 2) { + if (pubOnComplete) { + publisher.onComplete(); + } else { + publisher.onError(DELIBERATE_EXCEPTION); + } + } + nextLatch1.await(); + return null; + }).when(mockSubscriber).onNext(any()); + toSource(applyMerge(completable, delayError)).subscribe(mockSubscriber); + publisher.onSubscribe(subscription); + Future f = executorRule.executor().submit(() -> { + try { + nextLatch2.await(); + } catch (Exception e) { + throwException(e); + } + if (compOnComplete) { + completable.onComplete(); + } else { + completable.onError(DELIBERATE_EXCEPTION); + } + nextLatch1.countDown(); + }).toFuture(); + + subscription.awaitRequestN(2); + publisher.onNext("one"); + + f.get(); + verify(mockSubscriber).onNext(eq("one")); + verify(mockSubscriber, compOnComplete ? times(1) : atMost(1)).onNext(eq("two")); + if (pubOnComplete && compOnComplete) { + verify(mockSubscriber).onComplete(); + verify(mockSubscriber, never()).onError(any()); + } else { + verify(mockSubscriber, never()).onComplete(); + verify(mockSubscriber).onError(eq(DELIBERATE_EXCEPTION)); + } + } } diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/CompletableMergeWithPublisherDelayErrorTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/CompletableMergeWithPublisherDelayErrorTckTest.java new file mode 100644 index 0000000000..18d15fcfa7 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/CompletableMergeWithPublisherDelayErrorTckTest.java @@ -0,0 +1,29 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +@Test +public class CompletableMergeWithPublisherDelayErrorTckTest extends AbstractPublisherOperatorTckTest { + @Override + protected Publisher composePublisher(Publisher publisher, int elements) { + return Completable.completed().mergeDelayError(publisher); + } +} diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyPipelinedConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyPipelinedConnection.java index cbe88ceee1..9187f18864 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyPipelinedConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyPipelinedConnection.java @@ -236,7 +236,7 @@ void run() { // the most straightforward way to propagate an error through the APIs is through the read async // source. This has a side effect that the read async source isn't strictly full-duplex (data // will be full-duplex, but completion will be delayed until the write completes). - .merge(new Publisher() { + .mergeDelayError(new Publisher() { @Override protected void handleSubscribe(final Subscriber rSubscriber) { final Subscriber nextReadSubscriber; diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java index 851b72f397..000c1ba52c 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java @@ -39,12 +39,12 @@ final class NonPipelinedStreamingHttpConnection protected Publisher writeAndRead(final Publisher requestStream, @Nullable final FlushStrategy flushStrategy) { if (flushStrategy == null) { - return connection.write(requestStream).merge(connection.read()); + return connection.write(requestStream).mergeDelayError(connection.read()); } else { return Publisher.defer(() -> { final Cancellable resetFlushStrategy = connection.updateFlushStrategy( (prev, isOriginal) -> isOriginal ? flushStrategy : prev); - return connection.write(requestStream).merge(connection.read()) + return connection.write(requestStream).mergeDelayError(connection.read()) .afterFinally(resetFlushStrategy::cancel); }); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionCloseHeaderHandlingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionCloseHeaderHandlingTest.java index 5a4b9e9f99..8802d212bb 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionCloseHeaderHandlingTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionCloseHeaderHandlingTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -85,17 +86,14 @@ @RunWith(Enclosed.class) public class ConnectionCloseHeaderHandlingTest { - private static final Collection TRUE_FALSE = asList(true, false); private static final String SERVER_SHOULD_CLOSE = "serverShouldClose"; public abstract static class ConnectionSetup { - @ClassRule public static final ExecutionContextRule SERVER_CTX = cached("server-io", "server-executor"); @ClassRule public static final ExecutionContextRule CLIENT_CTX = cached("client-io", "client-executor"); - @Rule public final ServiceTalkTestTimeout timeout = new ServiceTalkTestTimeout(); @@ -209,11 +207,18 @@ public void tearDown() throws Exception { } protected void assertClosedChannelException(String path) { - Exception e = assertThrows(ExecutionException.class, - () -> connection.request(connection.get(path).addHeader(CONTENT_LENGTH, ZERO)).toFuture().get()); + assertClosedChannelException(sendZeroLengthRequest(path)); + } + + protected void assertClosedChannelException(Future responseFuture) { + Exception e = assertThrows(ExecutionException.class, responseFuture::get); assertThat(e.getCause(), instanceOf(ClosedChannelException.class)); } + protected Future sendZeroLengthRequest(String path) { + return connection.request(connection.get(path).addHeader(CONTENT_LENGTH, ZERO)).toFuture(); + } + protected static void assertResponse(StreamingHttpResponse response) { assertThat(response.status(), is(OK)); assertThat(response.headers().get(CONNECTION), contentEqualTo(CLOSE)); @@ -400,10 +405,11 @@ public void serverCloseTwoPipelinedRequestsInSequence() throws Exception { assertResponse(response); // Send another request before connection reads payload body of the first request: - assertClosedChannelException("/second"); + Future secondFuture = sendZeroLengthRequest("/second"); responseReceived.countDown(); assertResponsePayloadBody(response); + assertClosedChannelException(secondFuture); awaitConnectionClosed(); } @@ -417,12 +423,13 @@ public void clientCloseTwoPipelinedRequestsSentFirstInitiatesClosure() throws Ex responseReceived.countDown(); }); // Send another request before connection receives a response for the first request: - assertClosedChannelException("/second"); + Future secondFuture = sendZeroLengthRequest("/second"); sendResponse.countDown(); StreamingHttpResponse response = responses.take(); assertResponse(response); assertResponsePayloadBody(response); + assertClosedChannelException(secondFuture); awaitConnectionClosed(); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverTest.java index 8ecd291211..75e595ae36 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpTransportObserverTest.java @@ -280,8 +280,6 @@ public void testServerFailsResponsePayloadBody(String path, boolean serverReadCo verify(clientWriteObserver, atLeastOnce()).onFlushRequest(); verify(clientWriteObserver, atLeastOnce()).itemWritten(); verify(clientWriteObserver).writeComplete(); - // Failure of the read triggers cancellation of the write. - verify(clientWriteObserver, await()).writeCancelled(); verify(serverReadObserver, atLeastOnce()).requestedToRead(anyLong()); verify(serverReadObserver, atLeastOnce()).itemRead(); @@ -362,7 +360,7 @@ public void clientFailsRequestPayloadBody() throws Exception { verify(serverWriteObserver, atMostOnce()).writeCancelled(); verify(clientReadObserver, atLeastOnce()).requestedToRead(anyLong()); - verify(clientReadObserver).readCancelled(); + verify(clientReadObserver).readFailed(any(IOException.class)); if (protocol == HTTP_1) { verify(clientConnectionObserver).connectionClosed(DELIBERATE_EXCEPTION); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyPipelinedConnectionTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyPipelinedConnectionTest.java index 26fa80dfdb..fd2fc07468 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyPipelinedConnectionTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyPipelinedConnectionTest.java @@ -334,7 +334,7 @@ public void writeErrorFailsPendingReadsDoesNotSubscribeToPendingWrites() { writePublisher1.onError(DELIBERATE_EXCEPTION); assertThat(readSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); assertThat(readSubscriber2.awaitOnError(), is(instanceOf(ClosedChannelException.class))); - assertFalse(writePublisher2.isSubscribed()); + assertTrue(writePublisher2.isSubscribed()); assertFalse(channel.isOpen()); } @@ -365,6 +365,7 @@ protected void handleSubscribe(final CompletableSource.Subscriber subscriber) { readSubscription.request(1); assertTrue(mockReadPublisher1.isSubscribed()); + mockReadPublisher1.onError(newSecondException()); assertThat(readSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); assertFalse(writePublisher1.isSubscribed()); @@ -397,9 +398,9 @@ protected void handleSubscribe(final PublisherSource.Subscriber Subscription readSubscription = readSubscriber.awaitSubscription(); readSubscription.request(1); - assertThat(readSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); assertTrue(writePublisher1.isSubscribed()); - writePublisher1.onComplete(); + writePublisher1.onError(newSecondException()); + assertThat(readSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); readSubscriber2.awaitSubscription(); assertTrue(writePublisher2.isSubscribed()); @@ -408,6 +409,10 @@ protected void handleSubscribe(final PublisherSource.Subscriber verify(mockConnection, never()).closeAsync(); } + private static IllegalStateException newSecondException() { + return new IllegalStateException("second exception shouldn't propagate"); + } + @Test public void writeThrowsClosesConnection() { TestPublisher mockReadPublisher2 = new TestPublisher<>(); @@ -450,6 +455,7 @@ public void readThrowsClosesConnection() { Subscription readSubscription = readSubscriber.awaitSubscription(); readSubscription.request(1); + writePublisher1.onError(newSecondException()); assertThat(readSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION)); assertTrue(writePublisher1.isSubscribed()); verify(mockConnection).closeAsync();