Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rxSingle/Completable/Maybe in kotlin-coroutines-rx2 use Default instead of Unconfined as default dispatcher #2925

Open
jingibus opened this issue Sep 10, 2021 · 7 comments
Assignees
Labels

Comments

@jingibus
Copy link

Our team has started using kotlin-coroutines-rx2 to incrementally migrate our codebase from RxJava2 to coroutines. In so doing, we encounter a lot of call sites like the following:

          disposables += resultObservable
            ...
            .observeOn(ioScheduler)
            .flatMapCompletable { secureStore.writeCompletable(it, null) }
            .errorHandlingSubscribe()

The most natural choice that kotlin-coroutines-rx2 presents us with is the following:

          disposables += resultObservable
            ...
            .observeOn(ioScheduler)
            .flatMapCompletable { rxCompletable { secureStore.write(it, null) } }
            .errorHandlingSubscribe()

This would be how the problem would be solved with rxObservable in an analogous situation.

...but this is not correct. rxCompletable uses Dispatchers.Default if no other dispatcher is specified. The resulting observable will be multithreaded, even if Schedulers.trampoline() were the injected value for ioScheduler. This is also inconsistent with rxObservable, which also uses Dispatchers.Unconfined.

Our RxJava2 unit testing codebase relies upon injecting Schedulers.trampoline() to validate reactive code on a single thread. The presence of a stray Dispatchers.Default will break any one of these tests.

It would be less surprising and more consistent to use Dispatchers.Unconfined as the default dispatcher instead of Dispatchers.Default.

@mhernand40
Copy link

I like this proposal, however I worry that this can potentially break a lot of existing usages. FWIW, on the project I work on, we always use our own injectable Dispatchers wrapper. This allows us to then use a TestDispatchers implementation which is single threaded by default (actually backed by TestCoroutineDispatcher).

Then I wrote an internal custom Detekt rule that prohibits omitting the CoroutineContext argument when using rxCompletable, rxMaybe, or rxSingle. This way it fails the build if a developer calls one of these APIs without passing the CoroutineDispatcher.

@dkhalanskyjb
Copy link
Collaborator

I think this is the same issue as #2485.

Dispatchers.Unconfined would probably not be enough for your use case, as a single context switch inside a rxSingle would make the execution happen on another thread. What would be useful here is a dispatcher that would create an event loop on the thread on which it is run, but we don't yet have such a dispatcher.

@jingibus
Copy link
Author

I think this is the same issue as #2485.

Yes, it is. I disagree with the outcome of that discussion, however.

Dispatchers.Unconfined would probably not be enough for your use case, as a single context switch inside a rxSingle would make the execution happen on another thread. What would be useful here is a dispatcher that would create an event loop on the thread on which it is run, but we don't yet have such a dispatcher.

This behavior is exactly what we expect in RxJava, though. RxJava does not defend against context switches in dependent code. If we want to be sure about what thread we are on, we must program in our own defences by adding in additional .observeOn(Scheduler) calls and by injecting fakes that avoid scheduler changes. So while such a dispatcher sounds helpful and valuable from the coroutines side, its additional encapsulation would go unnoticed and unappreciated by our Rx engineers, who must program defensively either way.

@dkhalanskyjb
Copy link
Collaborator

I probably didn't illustrate my point well enough, so I'll try again.

        val single: Single<Int> = rxSingle(Dispatchers.Unconfined) {
            println("In `rxSingle`, the current thread is ${Thread.currentThread().name}")
            withContext(Dispatchers.Default) {
                println("Inside `withContext`, the thread is ${Thread.currentThread().name}")
            }
            println("Outside `withContext`, the thread is ${Thread.currentThread().name}")
            0
        }
        single.observeOn(Schedulers.trampoline()).await()

prints:

In `rxSingle`, the current thread is Test worker @coroutine#2
Inside `withContext`, the thread is DefaultDispatcher-worker-1 @coroutine#2
Outside `withContext`, the thread is DefaultDispatcher-worker-1 @coroutine#2

As you can see, after Dispatchers.Default, the execution happens not on Test worker but still in the default dispatcher, and Schedulers.trampoline() has no effect. The reason for this is that Dispatchers.Unconfined doesn't confine the execution to any particular thread.

Because of this, I don't see what could be accomplished by using Dispatchers.Unconfined by default. True, before the first context switch everything will be happening on the thread specified in observeOn, but why would one use coroutines in this particular case if not for the context switches?

What I think would help is a dispatcher with which the result of the code above would be

In `rxSingle`, the current thread is Test worker @coroutine#2
Inside `withContext`, the thread is DefaultDispatcher-worker-1 @coroutine#2
Outside `withContext`, the thread is Test worker @coroutine#2

This is the dispatcher that I was proposing.

@jingibus
Copy link
Author

I see. I still disagree, however.

Let's call the proposed dispatcher CallingDispatcher. The tl;dr of my disagreement is this: CallingDispatcher is better encapsulation, but it does not behave in a way that is natural to RxJava. Unconfined does, and so it is the right choice in a compatibility library.

This can cause problems in production code. A real-world example can illustrate the problem. rxSingle is used for incremental migration, so let's look at an incremental migration scenario.

Here's an RxJava presenter, similar to many we have in our codebase:

class Presenter(
  val appService: AppService, 
  val params: ServiceParams, 
  val secureStore: SecureStore
  ) {
  fun present(events: Observable<ViewEvent>): Observable<ViewModel> {
    return events.flatMap { event ->
      when (event) {
        is SubmitClicked -> appService.submit(params)
          .toObservable()
          .map { result ->
            secureStore.write(result)
            ViewModel("Success!")
          }
          .startWith(ViewModel("loading"))
        else -> Observable.empty()
      }
    }
      .startWith(ViewModel("I await your service"))
  }
}

In this example, two operations need to be run on an IO thread: appService.submit, and secureStore.write. In our codebase, all generated appService Single methods implicitly run on Schedulers.IO. So appService.submit is running on the correct thread here. Thread changes propagate downstream in RxJava, so secureStore.write (which doesn't have a similar guard internally) is also running on the correct thread.

Now, let's incrementally migrate appService.submit over to be a suspend fun instead of a Single. And let's put ourselves in the shoes of an RxJava engineer who has not yet put in the work to understand coroutines. We want to use rxSingle to call into this new API, and we don't want to interrupt our workday by researching the differences between coroutines and RxJava:

class Presenter(
  val appService: AppService,
  val params: ServiceParams,
  val secureStore: SecureStore
) {
  fun present(events: Observable<ViewEvent>): Observable<ViewModel> {
    return events.flatMap { event ->
      when (event) {
        is SubmitClicked -> rxSingle { appService.submit(params) }
          .toObservable()
          .map { result ->
            secureStore.write(result)
            ViewModel("Success!")
          }
          .startWith(ViewModel("loading"))
        else -> Observable.empty()
      }
    }
      .startWith(ViewModel("I await your service"))
  }
}

In this case, the production appService.submit is doing the same thing the RxJava version was doing: running on Dispatchers.IO.

So here is the problem before us: What thread should secureStore.write run on in this example? I think we all agree that Dispatchers.Default is wrong; here, it achieves the right results, but purely by accident. And if we try to write a single-threaded test for this presenter, the presence of Dispatchers.Default will break it.

What about the proposed dispatcher? (Let's call it Dispatcher.CallingThread.) This is better, and addresses the test code, but it breaks the example code here: secureStore.write will be run on the subscription thread, not on an IO thread.

This is unexpected to the RxJava engineer performing this work: in RxJava, if appService.submit runs on an IO thread, all downstream operators will run on the IO thread, too. There's no getting around it: in RxJava, the behavior of Dispatcher.CallingThread is impossible to implement. (Maybe it is possible, but I've never seen it and I'd be surprised if I did see it. As a practical matter, this behavior does not exist in RxJava.)

What about Dispatchers.Unconfined? Dispatchers.Unconfined is perfect: it does the right thing in prod, and it does the right thing in test. More importantly, the Single produced by rxSingle behaves in the horrible way that RxJava engineers know and love: when it switches threads, it leaves itself running on that thread.

So there are the choices:

  1. Use Default, which we all agree is bad, but is...shipped
  2. Use Unconfined, which already exists and produces exactly the behavior one would expect from a real RxJava Single
  3. Use CallingThread, which would need to be built and implements a behavior that is impossible to recreate in RxJava

@bradroid
Copy link

@jingibus Hi, you're saying that rxObservable is different than rxSingle, rxMaybe, and rxCompletable such that it already uses Unconfined as a default scheduler. Are you sure, because the documentation says otherwise:
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/rx-observable.html

I'm trying to figure out whether I need to change my rxObservable usages to use Unconfined explicitly too.

@jingibus
Copy link
Author

Assuming my comment was correct at the time I wrote it, then rxObservable has been corrected to be consistent with the other tools. That means that I will need to lint against usages of rxObservable, too!

I continue to be extremely dissatisfied at the lack of understanding of RxJava patterns shown in this design decision. We have a huge body of unit tests against asynchronous RxJava-based code; the choice of default here breaks all of them, because RxJava isn't structured concurrent and never will be.

We have an obvious workaround, thankfully (deny list the default), but having put in a significant amount of work into brightening the path for folks attempting to make this paradigm shift, it's disappointing to see such stubbornness about this particular snake in the bush.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants