Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Exceptions will be swallowed when schedulers are involved #6954

Closed
jxdabc opened this issue Apr 11, 2020 · 4 comments · Fixed by #6956
Closed

3.x: Exceptions will be swallowed when schedulers are involved #6954

jxdabc opened this issue Apr 11, 2020 · 4 comments · Fixed by #6956

Comments

@jxdabc
Copy link

jxdabc commented Apr 11, 2020

In general, exceptions should never be swallowed and should be handled somewhere, eg., catch clause or uncaught exception handlers of threads.

As to RxJava, exceptions are expected to be handled at:

  • observer's onError(), or
  • RxJava exception handler (RxJavaPlugins.onError()), or
  • thread's uncaught exception handler (eg., exceptions thrown by Exceptions.throwIfFatal()),

and not to be magically swallowed somewhere.

This is true in most situations, but is broken when schedulers are involved. That is, the following tests will fail:

(Test 1) Exceptions from observables are swallowed:

@Test
public void exceptionFromObservableShouldNotBeSwallowed() throws Exception {
    // Exceptions, fatal or not, should be handled by
    // #1 observer's onError(), or
    // #2 RxJava exception handler, or
    // #3 thread's uncaught exception handler,
    // and should not be swallowed.
    try {
        CountDownLatch latch = new CountDownLatch(1);

        // #3 thread's uncaught exception handler
        Scheduler computationScheduler = new ComputationScheduler(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setUncaughtExceptionHandler((thread, throwable) -> {
                    latch.countDown();
                });
                return t;
            }
        });

        // #2 RxJava exception handler
        RxJavaPlugins.setErrorHandler(h -> {
            latch.countDown();
        });

        // #1 observer's onError()
        Observable.create(s -> {

            s.onNext(1);

            if (true) {
                throw new OutOfMemoryError();
            }

            s.onComplete();

        }).subscribeOn(computationScheduler)
        .subscribe(v -> {
        }, e -> {
            latch.countDown();
        });

        assertTrue(latch.await(2, TimeUnit.SECONDS));
    } finally {
        RxJavaPlugins.reset();
    }
}

(Test 2) Exceptions from observers are swallowed:

@Test
public void exceptionFromObserverShouldNotBeSwallowed() throws Exception {
    // Exceptions, fatal or not, should be handled by
    // #1 observer's onError(), or
    // #2 RxJava exception handler, or
    // #3 thread's uncaught exception handler,
    // and should not be swallowed.
    try {
        CountDownLatch latch = new CountDownLatch(1);

        // #3 thread's uncaught exception handler
        Scheduler computationScheduler = new ComputationScheduler(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setUncaughtExceptionHandler((thread, throwable) -> {
                    latch.countDown();
                });
                return t;
            }
        });

        // #2 RxJava exception handler
        RxJavaPlugins.setErrorHandler(h -> {
            latch.countDown();
        });

        // #1 observer's onError()
        Flowable.interval(500, TimeUnit.MILLISECONDS, computationScheduler)
                .subscribe(v -> {
                    throw new OutOfMemoryError();
                }, e -> {
                    latch.countDown();
                });

        assertTrue(latch.await(2, TimeUnit.SECONDS));
    } finally {
        RxJavaPlugins.reset();
    }
}

Sometimes these broken cases matter in certain situation or on certain platform, eg.,

  • As Test 1, when out-of-memory occurs before emitter.onComplete(), the application may block forever waiting for onComplete(), because no one in the whole application could possibly known about this exception.
  • On Android, on which I code a lot, the exception handing contract implies that any runtime exception or error in any thread should throw to the uncaught exception handler, which will terminate the application. Almost all async tools, including the built in AsyncTask, conform to this contract. Setting an error handler that always throws runtime exceptions and errors via RxJavaPlugins.setErrorHandler() on Android almost makes it. But it doesn't work when schedulers are involved.

The cause is clear. Internal to Scheduler implementation, RxJava only uses Future of a task submitted to Executor as a cancel-handle and never check exceptions inside the Future while any exception thrown by the submitted task will go into the Future.

But the fix is not as easy, there is no chance to check the Future for exception since RxJava pushes task result instead of pulls it.

Pulling results is the design intent of Future. When we won't, I think we should customize the FutureTask which runs the task in Executor and provides the Future function to give us a chance to handle the result(including the exception).

Actually, JDK has given us all we need to do this via ThreadPoolExecutor#newTaskFor, ScheduledThreadPoolExecutor#decorateTask, RunnableFuture, RunnableScheduledFuture, etc. And Android did something similar in its AsyncTask implementation.

I'll propose a PR that:

  • Implements a CompleteScheduledExecutorService which is a ScheduledExecutorService that supports setting a complete handler that will receive the Future(and handle the task result) of the submitted task once it completes in the executing thread.
  • Modifies executor based schedulers to use CompleteScheduledExecutorService and check the Future for exceptions(and route it to RxJavaPlugins.onError() or throw it using Exceptions.throwIfFatal()) of submitted tasks in their complete handlers.
  • With test cases cover CompleteScheduledExecutorService related code and the above Test 1 & Test 2.
jxdabc pushed a commit to jxdabc/RxJava that referenced this issue Apr 11, 2020
Fix that RxJava will miss exceptions in Future thrown by tasks executed by
executors

Signed-off-by: jiaoxiaodong <milestonejxd@gmail.com>
@akarnokd
Copy link
Member

Thanks for the report but there is no need for such complicated resolution. We only need to remove one throwIfFatal and add catch with RxJavaPlugins.onError call. See #6956.

@jxdabc
Copy link
Author

jxdabc commented Apr 11, 2020

Thanks for the report but there is no need for such complicated resolution. We only need to remove one throwIfFatal and add catch with RxJavaPlugins.onError call. See #6956

yeah, I considered that adding RxJavaPlugins.onError()s and removing throwIfFatal()s, but it makes #748 a problem again. It seems tricky to balance this issue and #748. After all, if we passed StackOverflowError to RxJavaPlugins.onError(), RxJavaPlugins.onError() should throw it and it will be swallowed again.

I think customizing the future task will completely solve #748 and this issue. JDK supports that and at least as I know Android are using that approach. And I think it is not complex as the core code is only implementing the JDK's interface. It even simplifies the schedulers implementation, as I see ; )

@akarnokd
Copy link
Member

If you crash the onError handler, it will call the uncaught exception handler.

@jxdabc
Copy link
Author

jxdabc commented Apr 11, 2020

If you crash the onError handler, it will call the uncaught exception handler.

as replied in #6956 : )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants