From 33d1c5ec8f8d99b6bc6908aec01b5b0032ed4d3b Mon Sep 17 00:00:00 2001 From: Ikhun Um Date: Tue, 14 Feb 2023 14:20:18 +0900 Subject: [PATCH] Fix another `NullPointerException` in `FixedStreamMessage` (#4667) * Fix another `NullPointerException` on `FixedStreamMessage` a `NullPointerException` caused due to a race condition between `collect()` and `abort() was fixed in #4652. Howerver, we got another reoprt from Slack community. https://line-armeria.slack.com/archives/C1NGPBUH2/p1675994120153789 ``` 2023-02-09T02:08:55,526 [armeria-common-worker-epoll-3-3] WARN com.linecorp.armeria.internal.common.stream.FixedStreamMessage - Subscriber.onError() should not raise an exception. subscriber: null com.linecorp.armeria.common.util.CompositeException: 2 exceptions occurred. at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.onError0(FixedStreamMessage.java:247) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.onError(FixedStreamMessage.java:237) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort1(FixedStreamMessage.java:342) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort0(FixedStreamMessage.java:328) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort(FixedStreamMessage.java:308) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.stream.OneElementFixedStreamMessage.abort(OneElementFixedStreamMessage.java:112) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.server.grpc.AbstractServerCall.closeListener(AbstractServerCall.java:287) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.server.grpc.AbstractServerCall.closeListener(AbstractServerCall.java:264) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.server.grpc.AbstractServerCall.doClose(AbstractServerCall.java:239) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.server.grpc.AbstractServerCall.close(AbstractServerCall.java:222) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.server.grpc.AbstractServerCall.close(AbstractServerCall.java:217) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.server.grpc.FramedGrpcService.lambda$startCall$4(FramedGrpcService.java:318) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?] at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) ~[?:?] at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?] at com.linecorp.armeria.common.util.UnmodifiableFuture.doComplete(UnmodifiableFuture.java:164) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.CancellationScheduler$CancellationFuture.doComplete(CancellationScheduler.java:521) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.CancellationScheduler.invokeTask(CancellationScheduler.java:477) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.CancellationScheduler.finishNow0(CancellationScheduler.java:322) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.common.CancellationScheduler.finishNow(CancellationScheduler.java:306) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.internal.server.DefaultServiceRequestContext.cancel(DefaultServiceRequestContext.java:327) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.server.StreamingDecodedHttpRequest.abortResponse(StreamingDecodedHttpRequest.java:181) ~[armeria-1.21.1-SNAPSHOT.jar:?] at com.linecorp.armeria.server.Http2RequestDecoder.onRstStreamRead(Http2RequestDecoder.java:356) ~[armeria-1.21.1-SNAPSHOT.jar:?] at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onRstStreamRead(Http2FrameListenerDecorator.java:59) ~[netty-codec-http2-4.1.86.Final.jar:4.1.86.Final] at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onRstStreamRead(DefaultHttp2ConnectionDecoder.java:442) ~[netty-codec-http2-4.1.86.Final.jar:4.1.86.Final] at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onRstStreamRead(Http2InboundFrameLogger.java:80) ~[netty-codec-http2-4.1.86.Final.jar:4.1.86.Final] at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readRstStreamFrame(DefaultHttp2FrameReader.java:509) ~[netty-codec-http2-4.1.86.Final.jar:4.1.86.Final] ... Caused by: com.linecorp.armeria.common.util.CompositeException$ExceptionOverview: Multiple exceptions (2) |-- java.lang.NullPointerException: Cannot invoke "org.reactivestreams.Subscriber.onError(java.lang.Throwable)" because "this.subscriber" is null at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.onError0(FixedStreamMessage.java:242) at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.onError(FixedStreamMessage.java:237) at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort1(FixedStreamMessage.java:342) at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort0(FixedStreamMessage.java:328) at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort(FixedStreamMessage.java:308) at com.linecorp.armeria.internal.common.stream.OneElementFixedStreamMessage.abort(OneElementFixedStreamMessage.java:112) at com.linecorp.armeria.server.grpc.AbstractServerCall.closeListener(AbstractServerCall.java:287) at com.linecorp.armeria.server.grpc.AbstractServerCall.closeListener(AbstractServerCall.java:264) at com.linecorp.armeria.server.grpc.AbstractServerCall.doClose(AbstractServerCall.java:239) at com.linecorp.armeria.server.grpc.AbstractServerCall.close(AbstractServerCall.java:222) at com.linecorp.armeria.server.grpc.AbstractServerCall.close(AbstractServerCall.java:217) at com.linecorp.armeria.server.grpc.FramedGrpcService.lambda$startCall$4(FramedGrpcService.java:318) ... ``` Modifications: - Check if a stream is aborted while `subscribe0()` or `collect()` is the pending queue of an event executor. - If it is aborted, abort the subscriber or the collection future. - Check if a stream is subscribed while `abort1()` is in the pending queue of an event executor. - If it is subscribed, delegate the subscribe0() to signal abortCause via onError(). - Test possible race conditions by switching the execution order of in an event executor. Result: You no longer see a `NullPointerException` when a stream is aborted. * Fix cruft * polish * package private * polish * Update comment * Handle a race for double abortions * minor style * Address comments by @minwoox * comment --------- Co-authored-by: jrhee17 --- .../common/stream/FixedStreamMessage.java | 62 ++- .../common/stream/EventExecutorWrapper.java | 198 +++++++++ .../common/stream/FixedStreamMessageTest.java | 385 +++++++++++++++++- 3 files changed, 632 insertions(+), 13 deletions(-) create mode 100644 core/src/test/java/com/linecorp/armeria/internal/common/stream/EventExecutorWrapper.java diff --git a/core/src/main/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessage.java b/core/src/main/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessage.java index ec7ebb316c2..c5dfadbe06e 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessage.java +++ b/core/src/main/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessage.java @@ -55,11 +55,14 @@ public abstract class FixedStreamMessage extends AggregationSupport private static final AtomicReferenceFieldUpdater executorUpdater = AtomicReferenceFieldUpdater.newUpdater(FixedStreamMessage.class, EventExecutor.class, "executor"); + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater abortCauseUpdater = + AtomicReferenceFieldUpdater.newUpdater(FixedStreamMessage.class, Throwable.class, "abortCause"); + private final CompletableFuture completionFuture = new EventLoopCheckingFuture<>(); @Nullable private Subscriber subscriber; - private boolean withPooledObjects; private boolean notifyCancellation; private boolean completed; @@ -68,6 +71,7 @@ public abstract class FixedStreamMessage extends AggregationSupport @Nullable private volatile EventExecutor executor; + // Updated only by abortCauseUpdater @Nullable private volatile Throwable abortCause; @@ -127,6 +131,17 @@ public void subscribe(Subscriber subscriber, EventExecutor executor, return; } + if (executor.inEventLoop()) { + subscribe0(subscriber, executor, options); + } else { + executor.execute(() -> subscribe0(subscriber, executor, options)); + } + } + + private void subscribe0(Subscriber subscriber, EventExecutor executor, + SubscriptionOption[] options) { + //noinspection unchecked + this.subscriber = (Subscriber) subscriber; for (SubscriptionOption option : options) { if (option == SubscriptionOption.WITH_POOLED_OBJECTS) { withPooledObjects = true; @@ -134,16 +149,15 @@ public void subscribe(Subscriber subscriber, EventExecutor executor, notifyCancellation = true; } } - if (executor.inEventLoop()) { - subscribe0(subscriber); - } else { - executor.execute(() -> subscribe0(subscriber)); + + if (completed) { + // A stream is aborted while the method is pending in `executor`. + final Throwable abortCause = this.abortCause; + assert abortCause != null; + abortSubscriber(executor, subscriber, abortCause); + return; } - } - private void subscribe0(Subscriber subscriber) { - //noinspection unchecked - this.subscriber = (Subscriber) subscriber; try { subscriber.onSubscribe(this); if (isEmpty()) { @@ -202,6 +216,20 @@ public CompletableFuture> collect(EventExecutor executor, SubscriptionOp private void collect(CompletableFuture> collectingFuture, EventExecutor executor, SubscriptionOption[] options, boolean directExecution) { + if (completed) { + // A stream is aborted while the method is pending in `executor`. + final Throwable abortCause = this.abortCause; + assert abortCause != null; + if (directExecution) { + collectingFuture.completeExceptionally(abortCause); + } else { + executor.execute(() -> { + collectingFuture.completeExceptionally(abortCause); + }); + } + return; + } + completed = true; final boolean withPooledObjects = containsWithPooledObjects(options); collectingFuture.complete(drainAll(withPooledObjects)); @@ -316,7 +344,10 @@ private void abort0(@Nullable Throwable cause) { final Throwable finalCause = cause != null ? cause : AbortedStreamException.get(); // Should set `abortCause` before `executor` is written and get after `executor` is written for // atomicity. - abortCause = finalCause; + if (!abortCauseUpdater.compareAndSet(this, null, finalCause)) { + // Double abortion + return; + } if (executorUpdater.compareAndSet(this, null, ImmediateEventExecutor.INSTANCE)) { // No subscription was made. Safely clean the resources. @@ -336,10 +367,19 @@ private void abort1(Throwable cause, boolean subscribed) { if (completed) { return; } + completed = true; cleanupObjects(cause); if (subscribed) { - onError(cause); + final Subscriber subscriber = this.subscriber; + if (subscriber != null) { + onError0(cause); + } else { + // A subscription is started but `subscribe0()` isn't called yet. Since `completed` is set to + // true at the beginning of this method, `abortSubscriber()` will propagate `abortCause` via + // `onError()` when `subscribe0()` is scheduled. + completionFuture.completeExceptionally(cause); + } } else { completionFuture.completeExceptionally(cause); } diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/stream/EventExecutorWrapper.java b/core/src/test/java/com/linecorp/armeria/internal/common/stream/EventExecutorWrapper.java new file mode 100644 index 00000000000..6bf8261ddcc --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/internal/common/stream/EventExecutorWrapper.java @@ -0,0 +1,198 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + */ + +package com.linecorp.armeria.internal.common.stream; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; + +class EventExecutorWrapper implements EventExecutor { + private final EventExecutor delegate; + + EventExecutorWrapper(EventExecutor delegate) { + this.delegate = delegate; + } + + @Override + public boolean isShuttingDown() { + return delegate.isShuttingDown(); + } + + @Override + public Future shutdownGracefully() { + return delegate.shutdownGracefully(); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return delegate.shutdownGracefully(quietPeriod, timeout, unit); + } + + @Override + public Future terminationFuture() { + return delegate.terminationFuture(); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + @Override + public EventExecutor next() { + return delegate.next(); + } + + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, + long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(task); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return delegate.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return delegate.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, + TimeUnit unit) { + return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, + TimeUnit unit) { + return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public EventExecutorGroup parent() { + return delegate.parent(); + } + + @Override + public boolean inEventLoop() { + return delegate.inEventLoop(); + } + + @Override + public boolean inEventLoop(Thread thread) { + return delegate.inEventLoop(thread); + } + + @Override + public Promise newPromise() { + return delegate.newPromise(); + } + + @Override + public ProgressivePromise newProgressivePromise() { + return delegate.newProgressivePromise(); + } + + @Override + public Future newSucceededFuture(V result) { + return delegate.newSucceededFuture(result); + } + + @Override + public Future newFailedFuture(Throwable cause) { + return delegate.newFailedFuture(cause); + } + + @Override + public void execute(Runnable command) { + delegate.execute(command); + } +} diff --git a/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java b/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java index 3496cbfe066..a39ad345ac6 100644 --- a/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java +++ b/core/src/test/java/com/linecorp/armeria/internal/common/stream/FixedStreamMessageTest.java @@ -17,12 +17,18 @@ package com.linecorp.armeria.internal.common.stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.awaitility.Awaitility.await; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -40,6 +46,7 @@ import com.linecorp.armeria.internal.testing.AnticipatedException; import com.linecorp.armeria.testing.junit5.common.EventLoopExtension; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor; class FixedStreamMessageTest { @@ -60,7 +67,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Integer integer) { - received.getAndIncrement(); + received.getAndIncrement(); } @Override @@ -120,7 +127,381 @@ public void onComplete() {} } } - private static final class FixedStreamMessageProvider implements ArgumentsProvider { + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void raceBetweenCollectAndAbort_startCollectFirst_eventLoopCollectFirst( + FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + // Execute collect() first on the event loop. + final TestEventExecutor eventExecutor = new TestEventExecutor(eventLoop.get(), 2, false); + + final CompletableFuture> collectionFuture = stream.collect(eventExecutor); + assertThat(eventExecutor.numPendingTasks()).isOne(); + assertThat(stream.isComplete()).isFalse(); + + stream.abort(); + + // The race result: + // - collect() should win the race and return the list successfully. + // - abort() tries to clean up resources but nothing remains to clean. + assertThat(collectionFuture.join()).isInstanceOf(List.class); + assertThatCode(() -> { + stream.whenComplete().join(); + }).doesNotThrowAnyException(); + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void raceBetweenCollectAndAbort_startCollectFirst_eventLoopAbortFirst(FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + // Execute abort() first on the event loop. + final TestEventExecutor eventExecutor = new TestEventExecutor(eventLoop.get(), 2, true); + + final CompletableFuture> collectionFuture = stream.collect(eventExecutor); + assertThat(eventExecutor.numPendingTasks()).isOne(); + assertThat(stream.isComplete()).isFalse(); + + final AnticipatedException abortCause = new AnticipatedException(); + stream.abort(abortCause); + + // The race result: + // - collect() returns a future which is exceptionally completed with abortCause. + // - abort() cleans up the resources and completes whenComplete() exceptionally. + assertThatThrownBy(collectionFuture::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + + assertThatThrownBy(stream.whenComplete()::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void raceBetweenSubscribeAndAbort_startSubscribeFirst_eventLoopSubscribeFirst( + FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + // Execute subscribe() first on the event loop. + final TestEventExecutor eventExecutor = new TestEventExecutor(eventLoop.get(), 2, false); + + final AtomicReference causeRef = new AtomicReference<>(); + final AtomicReference subscriptionRef = new AtomicReference<>(); + stream.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriptionRef.set(s); + } + + @Override + public void onNext(Integer integer) {} + + @Override + public void onError(Throwable t) { + causeRef.set(t); + } + + @Override + public void onComplete() {} + }, eventExecutor); + + assertThat(eventExecutor.numPendingTasks()).isOne(); + assertThat(stream.isComplete()).isFalse(); + + final AnticipatedException abortCause = new AnticipatedException(); + stream.abort(abortCause); + + // The race result: + // - subscribe() finishes successfully. + // - abort() cleans up the resources and propagates abortCause via onError(). + await().untilAsserted(() -> { + assertThat(subscriptionRef).hasValue(stream); + }); + + assertThatThrownBy(stream.whenComplete()::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + assertThat(causeRef).hasValue(abortCause); + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void raceBetweenSubscribeAndAbort_startSubscribeFirst_eventLoopAbortFirst( + FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + // Execute abort() first on the event loop. + final TestEventExecutor eventExecutor = new TestEventExecutor(eventLoop.get(), 2, true); + + final AtomicReference causeRef = new AtomicReference<>(); + final AtomicReference subscriptionRef = new AtomicReference<>(); + stream.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + subscriptionRef.set(s); + } + + @Override + public void onNext(Integer integer) {} + + @Override + public void onError(Throwable t) { + causeRef.set(t); + } + + @Override + public void onComplete() {} + }, eventExecutor); + + assertThat(eventExecutor.numPendingTasks()).isOne(); + assertThat(stream.isComplete()).isFalse(); + + final AnticipatedException abortCause = new AnticipatedException(); + stream.abort(abortCause); + + // The race result: + // - subscribe() is aborted with NoopSubscription. + // - abort() cleans the resources and completes whenCompletes() exceptionally with abortCause. + assertThatThrownBy(stream.whenComplete()::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + assertThat(subscriptionRef).hasValue(NoopSubscription.get()); + assertThat(causeRef).hasValue(abortCause); + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void raceBetweenCollectAndAbort_startAbortFirst(FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + final AnticipatedException abortCause = new AnticipatedException(); + stream.abort(abortCause); + final CompletableFuture> collectionFuture = stream.collect(); + + // The race result: + // - collect() fails with abortCause synchronously. + // - abort() cleans up the resources synchronously. + assertThat(collectionFuture).isCompletedExceptionally(); + assertThatThrownBy(collectionFuture::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + + assertThatThrownBy(stream.whenComplete()::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void abortOnSubscribe(FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + final AnticipatedException abortCause = new AnticipatedException(); + final AtomicReference causeRef = new AtomicReference<>(); + final AtomicBoolean completed = new AtomicBoolean(); + stream.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + stream.abort(abortCause); + } + + @Override + public void onNext(Integer integer) {} + + @Override + public void onError(Throwable t) { + causeRef.set(t); + } + + @Override + public void onComplete() { + completed.set(true); + } + }); + + assertThatThrownBy(stream.whenComplete()::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + assertThat(causeRef).hasValue(abortCause); + assertThat(completed).isFalse(); + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void abortOnNext(FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + final AnticipatedException abortCause = new AnticipatedException(); + final AtomicReference causeRef = new AtomicReference<>(); + final AtomicBoolean completed = new AtomicBoolean(); + stream.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(Integer integer) { + stream.abort(abortCause); + } + + @Override + public void onError(Throwable t) { + causeRef.set(t); + } + + @Override + public void onComplete() { + completed.set(true); + } + }); + + if (stream instanceof OneElementFixedStreamMessage) { + // One element was published before the abortion. + assertThatCode(stream.whenComplete()::join) + .doesNotThrowAnyException(); + assertThat(causeRef).hasValue(null); + assertThat(completed).isTrue(); + } else { + assertThatThrownBy(stream.whenComplete()::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + assertThat(causeRef).hasValue(abortCause); + assertThat(completed).isFalse(); + } + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void abortOnComplete(FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + final AnticipatedException abortCause = new AnticipatedException(); + final AtomicReference causeRef = new AtomicReference<>(); + final AtomicBoolean completed = new AtomicBoolean(); + stream.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Integer integer) {} + + @Override + public void onError(Throwable t) { + causeRef.set(t); + } + + @Override + public void onComplete() { + completed.set(true); + stream.abort(abortCause); + } + }); + + // abort() performs nothing when all elements are published. + assertThatCode(stream.whenComplete()::join) + .doesNotThrowAnyException(); + assertThat(causeRef).hasValue(null); + assertThat(completed).isTrue(); + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void abortOnError(FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + final AnticipatedException onNextCause = new AnticipatedException(); + final AnticipatedException abortCause = new AnticipatedException(); + final AtomicReference causeRef = new AtomicReference<>(); + final AtomicBoolean completed = new AtomicBoolean(); + final AtomicInteger errorCount = new AtomicInteger(); + stream.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Integer integer) { + // Make onError() get invoked. + throw onNextCause; + } + + @Override + public void onError(Throwable t) { + errorCount.getAndIncrement(); + causeRef.set(t); + stream.abort(abortCause); + } + + @Override + public void onComplete() { + completed.set(true); + } + }); + + assertThatThrownBy(stream.whenComplete()::join) + .isInstanceOf(CompletionException.class) + .hasCause(onNextCause); + assertThat(causeRef).hasValue(onNextCause); + assertThat(completed).isFalse(); + // Should invoke onError() exactly once. + assertThat(errorCount).hasValue(1); + } + + @ArgumentsSource(FixedStreamMessageProvider.class) + @ParameterizedTest + void doubleAbort(FixedStreamMessage stream) { + assumeThat(stream.isEmpty()).isFalse(); + + final AnticipatedException abortCause = new AnticipatedException(); + stream.abort(abortCause); + assertThatThrownBy(stream.whenComplete()::join) + .isInstanceOf(CompletionException.class) + .hasCause(abortCause); + // Should perform nothing + stream.abort(); + } + + private static class TestEventExecutor extends EventExecutorWrapper { + private final Deque pendingTasks = new ArrayDeque<>(); + private int latchCount; + private final boolean reverseExecution; + + TestEventExecutor(EventExecutor delegate, int latchCount, boolean reverseExecution) { + super(delegate); + this.latchCount = latchCount; + this.reverseExecution = reverseExecution; + } + + @Override + public synchronized void execute(Runnable command) { + if (--latchCount >= 0) { + if (reverseExecution) { + pendingTasks.addFirst(command); + } else { + pendingTasks.addLast(command); + } + if (latchCount == 0) { + for (Runnable task : pendingTasks) { + super.execute(task); + } + } + } else { + super.execute(command); + } + } + + int numPendingTasks() { + return pendingTasks.size(); + } + } + + private static class FixedStreamMessageProvider implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext context) throws Exception {