Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single.observeOn wrongly interrupt thread #6373

Closed
ArtemShaban opened this issue Jan 17, 2019 · 3 comments · Fixed by #6375
Closed

Single.observeOn wrongly interrupt thread #6373

ArtemShaban opened this issue Jan 17, 2019 · 3 comments · Fixed by #6375

Comments

@ArtemShaban
Copy link

I tested issue on RxJava 2.2.5

I encountered the issue when Single.observeOn interrupt the thread, that performs downstream operation.

Below code of unit test and a slightly modified for test purposes "SingleObserveOn" class.

@Test
public void test() throws InterruptedException
{
    Single.ambArray
            (
                    Single
                            .fromCallable(() ->
                            {
                                System.out.println(System.currentTimeMillis() + " " + "Callable1! " + Thread.currentThread());
                                return "Qqq";
                            })
                            .subscribeOn(Schedulers.newThread())
                            .observeOn(Schedulers.newThread())
                    ,
                    Single.never()
            )
          .subscribe((s, throwable) ->
          {
              System.out.println(System.currentTimeMillis() + " " + "Subscribe1 and blocking await! " + Thread.currentThread());

              Completable.timer(1000, TimeUnit.MILLISECONDS).blockingAwait();

              System.out.println(System.currentTimeMillis() + " " + "Subscribe2! " + Thread.currentThread());
              System.out.println(s);
          });

    Thread.sleep(10000);
}
public final class SingleObserveOn<T> extends Single<T>
{

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler)
    {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer)
    {
        source.subscribe(new ObserveOnSingleObserver<T>(observer, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
            implements SingleObserver<T>, Disposable, Runnable
    {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler)
        {
            this.downstream = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d)
        {
            if (DisposableHelper.setOnce(this, d))
            {
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value)
        {
            this.value = value;

            System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.onSuccess1! " + Thread.currentThread());

            Disposable d = scheduler.scheduleDirect(this);
            //I added this loop for simulate "busy" thread situation.
            for (int i = 0; i < 1_000_000; i++)
            {
            }

            System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.onSuccess2! " + Thread.currentThread());

            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e)
        {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run()
        {
            System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.run! " + Thread.currentThread());

            Throwable ex = error;
            if (ex != null)
            {
                downstream.onError(ex);
            }
            else
            {
                downstream.onSuccess(value);
            }
        }

        @Override
        public void dispose()
        {
            System.out.println(System.currentTimeMillis() + " " + "ObserveOnSingleObserver.dispose! " + Thread.currentThread());
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed()
        {
            return DisposableHelper.isDisposed(get());
        }
    }
}

Result of run this test next:

1547699583389 Callable1! Thread[RxNewThreadScheduler-1,5,main]
1547699583389 ObserveOnSingleObserver.onSuccess1! Thread[RxNewThreadScheduler-1,5,main]
1547699583390 ObserveOnSingleObserver.run! Thread[RxNewThreadScheduler-2,5,main]
1547699583390 ObserveOnSingleObserver.dispose! Thread[RxNewThreadScheduler-2,5,main]
1547699583390 Subscribe1 and blocking await! Thread[RxNewThreadScheduler-2,5,main]
1547699583393 ObserveOnSingleObserver.onSuccess2! Thread[RxNewThreadScheduler-1,5,main]
io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:60)
	at io.reactivex.internal.operators.single.SingleAmb$AmbSingleObserver.onSuccess(SingleAmb.java:110)
	at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:112)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
	at io.reactivex.Completable.blockingAwait(Completable.java:1219)
	at com.multibrains.InterruptedExceptionTest.lambda$test$1(InterruptedExceptionTest.java:44)
	at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:57)
	... 11 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:90)
	... 14 more
Exception in thread "RxNewThreadScheduler-2" io.reactivex.exceptions.UndeliverableException: The exception could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
	at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:60)
	at io.reactivex.internal.operators.single.SingleAmb$AmbSingleObserver.onSuccess(SingleAmb.java:110)
	at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:112)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
	at io.reactivex.Completable.blockingAwait(Completable.java:1219)
	at com.multibrains.InterruptedExceptionTest.lambda$test$1(InterruptedExceptionTest.java:44)
	at io.reactivex.internal.observers.BiConsumerSingleObserver.onSuccess(BiConsumerSingleObserver.java:57)
	... 11 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:90)
	... 14 more

explanation:

Callable1! Thread[RxNewThreadScheduler-1,5,main] - just return value

ObserveOnSingleObserver.onSuccess1! Thread[RxNewThreadScheduler-1,5,main] - after this line value scheduled to RxNewThreadScheduler-2, and sometime in the future will be excuted. And current thread RxNewThreadScheduler-1 stuck for any reason.

ObserveOnSingleObserver.run! Thread[RxNewThreadScheduler-2,5,main] - scheduled value ecxecuting

ObserveOnSingleObserver.dispose! Thread[RxNewThreadScheduler-2,5,main] - ObserveOnSingleObserver is disposing, cause value was throw to the donwstream

Subscribe1 and blocking await! Thread[RxNewThreadScheduler-2,5,main] - we get value in subscribe callback and do some long operaion (for example blocking await)

ObserveOnSingleObserver.onSuccess2! Thread[RxNewThreadScheduler-1,5,main] - RxNewThreadScheduler-1 revive and continue do his work...so it replace disposable, and see than current disposable have already disposed...and he cancel future (that related long operation started in RxNewThreadScheduler-2), and interrupt RxNewThreadScheduler-2 thread.

@akarnokd
Copy link
Member

Could you post the stacktrace of the dispose! call?

@ArtemShaban
Copy link
Author

at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.dispose(SingleObserveOn.java:99)
	  at io.reactivex.disposables.CompositeDisposable.dispose(CompositeDisposable.java:235)
	  at io.reactivex.disposables.CompositeDisposable.dispose(CompositeDisposable.java:80)
	  at io.reactivex.internal.operators.single.SingleAmb$AmbSingleObserver.onSuccess(SingleAmb.java:109)
	  at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:92)
	  at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	  at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	  at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	  at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	  at java.util.concurrent.FutureTask.run(FutureTask.java:-1)
	  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	  at java.lang.Thread.run(Thread.java:745)

Below screenshot from Idea with Async Stacktrace...maybe it will more usefull
screenshot 2019-01-17 17 47 09

@akarnokd akarnokd added this to the 2.2 backlog milestone Jan 17, 2019
@akarnokd
Copy link
Member

Indeed, the winnner should not be disposed. I'll post a fix shortly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants