Skip to content

Commit

Permalink
Fix another NullPointerException in FixedStreamMessage (#4667)
Browse files Browse the repository at this point in the history
* Fix another `NullPointerException` on `FixedStreamMessage`

a `NullPointerException` caused due to a race condition between
`collect()` and `abort() was fixed in #4652. Howerver, we got another
reoprt from Slack community. https://line-armeria.slack.com/archives/C1NGPBUH2/p1675994120153789
```
2023-02-09T02:08:55,526 [armeria-common-worker-epoll-3-3] WARN  com.linecorp.armeria.internal.common.stream.FixedStreamMessage - Subscriber.onError() should not raise an exception. subscriber: null
com.linecorp.armeria.common.util.CompositeException: 2 exceptions occurred.
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.onError0(FixedStreamMessage.java:247) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.onError(FixedStreamMessage.java:237) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort1(FixedStreamMessage.java:342) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort0(FixedStreamMessage.java:328) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort(FixedStreamMessage.java:308) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.stream.OneElementFixedStreamMessage.abort(OneElementFixedStreamMessage.java:112) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.server.grpc.AbstractServerCall.closeListener(AbstractServerCall.java:287) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.server.grpc.AbstractServerCall.closeListener(AbstractServerCall.java:264) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.server.grpc.AbstractServerCall.doClose(AbstractServerCall.java:239) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.server.grpc.AbstractServerCall.close(AbstractServerCall.java:222) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.server.grpc.AbstractServerCall.close(AbstractServerCall.java:217) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.server.grpc.FramedGrpcService.lambda$startCall$4(FramedGrpcService.java:318) ~[armeria-grpc-1.21.1-SNAPSHOT.jar:?]
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?]
    at com.linecorp.armeria.common.util.UnmodifiableFuture.doComplete(UnmodifiableFuture.java:164) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.CancellationScheduler$CancellationFuture.doComplete(CancellationScheduler.java:521) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.CancellationScheduler.invokeTask(CancellationScheduler.java:477) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.CancellationScheduler.finishNow0(CancellationScheduler.java:322) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.common.CancellationScheduler.finishNow(CancellationScheduler.java:306) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.internal.server.DefaultServiceRequestContext.cancel(DefaultServiceRequestContext.java:327) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.server.StreamingDecodedHttpRequest.abortResponse(StreamingDecodedHttpRequest.java:181) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at com.linecorp.armeria.server.Http2RequestDecoder.onRstStreamRead(Http2RequestDecoder.java:356) ~[armeria-1.21.1-SNAPSHOT.jar:?]
    at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onRstStreamRead(Http2FrameListenerDecorator.java:59) ~[netty-codec-http2-4.1.86.Final.jar:4.1.86.Final]
    at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onRstStreamRead(DefaultHttp2ConnectionDecoder.java:442) ~[netty-codec-http2-4.1.86.Final.jar:4.1.86.Final]
    at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onRstStreamRead(Http2InboundFrameLogger.java:80) ~[netty-codec-http2-4.1.86.Final.jar:4.1.86.Final]
    at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readRstStreamFrame(DefaultHttp2FrameReader.java:509) ~[netty-codec-http2-4.1.86.Final.jar:4.1.86.Final]
    ...
Caused by: com.linecorp.armeria.common.util.CompositeException$ExceptionOverview: Multiple exceptions (2)
|-- java.lang.NullPointerException: Cannot invoke "org.reactivestreams.Subscriber.onError(java.lang.Throwable)" because "this.subscriber" is null
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.onError0(FixedStreamMessage.java:242)
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.onError(FixedStreamMessage.java:237)
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort1(FixedStreamMessage.java:342)
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort0(FixedStreamMessage.java:328)
    at com.linecorp.armeria.internal.common.stream.FixedStreamMessage.abort(FixedStreamMessage.java:308)
    at com.linecorp.armeria.internal.common.stream.OneElementFixedStreamMessage.abort(OneElementFixedStreamMessage.java:112)
    at com.linecorp.armeria.server.grpc.AbstractServerCall.closeListener(AbstractServerCall.java:287)
    at com.linecorp.armeria.server.grpc.AbstractServerCall.closeListener(AbstractServerCall.java:264)
    at com.linecorp.armeria.server.grpc.AbstractServerCall.doClose(AbstractServerCall.java:239)
    at com.linecorp.armeria.server.grpc.AbstractServerCall.close(AbstractServerCall.java:222)
    at com.linecorp.armeria.server.grpc.AbstractServerCall.close(AbstractServerCall.java:217)
    at com.linecorp.armeria.server.grpc.FramedGrpcService.lambda$startCall$4(FramedGrpcService.java:318)
    ...
```

Modifications:

- Check if a stream is aborted while `subscribe0()` or `collect()` is
  the pending queue of an event executor.
  - If it is aborted, abort the subscriber or the collection future.
- Check if a stream is subscribed while `abort1()` is in the pending
  queue of an event executor.
  - If it is subscribed, delegate the subscribe0() to signal abortCause
    via onError().
- Test possible race conditions by switching the execution order of in
  an event executor.

Result:

You no longer see a `NullPointerException` when a stream is aborted.

* Fix cruft

* polish

* package private

* polish

* Update comment

* Handle a race for double abortions

* minor style

* Address comments by @minwoox

* comment

---------

Co-authored-by: jrhee17 <guins_j@guins.org>
  • Loading branch information
ikhoon and jrhee17 authored Feb 14, 2023
1 parent 1278e70 commit 33d1c5e
Show file tree
Hide file tree
Showing 3 changed files with 632 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ public abstract class FixedStreamMessage<T> extends AggregationSupport
private static final AtomicReferenceFieldUpdater<FixedStreamMessage, EventExecutor> executorUpdater =
AtomicReferenceFieldUpdater.newUpdater(FixedStreamMessage.class, EventExecutor.class, "executor");

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<FixedStreamMessage, Throwable> abortCauseUpdater =
AtomicReferenceFieldUpdater.newUpdater(FixedStreamMessage.class, Throwable.class, "abortCause");

private final CompletableFuture<Void> completionFuture = new EventLoopCheckingFuture<>();

@Nullable
private Subscriber<T> subscriber;

private boolean withPooledObjects;
private boolean notifyCancellation;
private boolean completed;
Expand All @@ -68,6 +71,7 @@ public abstract class FixedStreamMessage<T> extends AggregationSupport
@Nullable
private volatile EventExecutor executor;

// Updated only by abortCauseUpdater
@Nullable
private volatile Throwable abortCause;

Expand Down Expand Up @@ -127,23 +131,33 @@ public void subscribe(Subscriber<? super T> subscriber, EventExecutor executor,
return;
}

if (executor.inEventLoop()) {
subscribe0(subscriber, executor, options);
} else {
executor.execute(() -> subscribe0(subscriber, executor, options));
}
}

private void subscribe0(Subscriber<? super T> subscriber, EventExecutor executor,
SubscriptionOption[] options) {
//noinspection unchecked
this.subscriber = (Subscriber<T>) subscriber;
for (SubscriptionOption option : options) {
if (option == SubscriptionOption.WITH_POOLED_OBJECTS) {
withPooledObjects = true;
} else if (option == SubscriptionOption.NOTIFY_CANCELLATION) {
notifyCancellation = true;
}
}
if (executor.inEventLoop()) {
subscribe0(subscriber);
} else {
executor.execute(() -> subscribe0(subscriber));

if (completed) {
// A stream is aborted while the method is pending in `executor`.
final Throwable abortCause = this.abortCause;
assert abortCause != null;
abortSubscriber(executor, subscriber, abortCause);
return;
}
}

private void subscribe0(Subscriber<? super T> subscriber) {
//noinspection unchecked
this.subscriber = (Subscriber<T>) subscriber;
try {
subscriber.onSubscribe(this);
if (isEmpty()) {
Expand Down Expand Up @@ -202,6 +216,20 @@ public CompletableFuture<List<T>> collect(EventExecutor executor, SubscriptionOp

private void collect(CompletableFuture<List<T>> collectingFuture, EventExecutor executor,
SubscriptionOption[] options, boolean directExecution) {
if (completed) {
// A stream is aborted while the method is pending in `executor`.
final Throwable abortCause = this.abortCause;
assert abortCause != null;
if (directExecution) {
collectingFuture.completeExceptionally(abortCause);
} else {
executor.execute(() -> {
collectingFuture.completeExceptionally(abortCause);
});
}
return;
}

completed = true;
final boolean withPooledObjects = containsWithPooledObjects(options);
collectingFuture.complete(drainAll(withPooledObjects));
Expand Down Expand Up @@ -316,7 +344,10 @@ private void abort0(@Nullable Throwable cause) {
final Throwable finalCause = cause != null ? cause : AbortedStreamException.get();
// Should set `abortCause` before `executor` is written and get after `executor` is written for
// atomicity.
abortCause = finalCause;
if (!abortCauseUpdater.compareAndSet(this, null, finalCause)) {
// Double abortion
return;
}

if (executorUpdater.compareAndSet(this, null, ImmediateEventExecutor.INSTANCE)) {
// No subscription was made. Safely clean the resources.
Expand All @@ -336,10 +367,19 @@ private void abort1(Throwable cause, boolean subscribed) {
if (completed) {
return;
}
completed = true;

cleanupObjects(cause);
if (subscribed) {
onError(cause);
final Subscriber<T> subscriber = this.subscriber;
if (subscriber != null) {
onError0(cause);
} else {
// A subscription is started but `subscribe0()` isn't called yet. Since `completed` is set to
// true at the beginning of this method, `abortSubscriber()` will propagate `abortCause` via
// `onError()` when `subscribe0()` is scheduled.
completionFuture.completeExceptionally(cause);
}
} else {
completionFuture.completeExceptionally(cause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/

package com.linecorp.armeria.internal.common.stream;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ProgressivePromise;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;

class EventExecutorWrapper implements EventExecutor {
private final EventExecutor delegate;

EventExecutorWrapper(EventExecutor delegate) {
this.delegate = delegate;
}

@Override
public boolean isShuttingDown() {
return delegate.isShuttingDown();
}

@Override
public Future<?> shutdownGracefully() {
return delegate.shutdownGracefully();
}

@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return delegate.shutdownGracefully(quietPeriod, timeout, unit);
}

@Override
public Future<?> terminationFuture() {
return delegate.terminationFuture();
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

@Override
public EventExecutor next() {
return delegate.next();
}

@Override
public Iterator<EventExecutor> iterator() {
return delegate.iterator();
}

@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return delegate.invokeAll(tasks);
}

@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(task, result);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(task);
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate.schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate.schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
TimeUnit unit) {
return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

@Override
public EventExecutorGroup parent() {
return delegate.parent();
}

@Override
public boolean inEventLoop() {
return delegate.inEventLoop();
}

@Override
public boolean inEventLoop(Thread thread) {
return delegate.inEventLoop(thread);
}

@Override
public <V> Promise<V> newPromise() {
return delegate.newPromise();
}

@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
return delegate.newProgressivePromise();
}

@Override
public <V> Future<V> newSucceededFuture(V result) {
return delegate.newSucceededFuture(result);
}

@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
return delegate.newFailedFuture(cause);
}

@Override
public void execute(Runnable command) {
delegate.execute(command);
}
}
Loading

0 comments on commit 33d1c5e

Please sign in to comment.