Description
Consider the follow setup in which two rxSingle
calls are chained together and the second one uses blockingGet()
:
fun main() {
val dispatcher1 = Dispatchers.Unconfined
val dispatcher2 = Dispatchers.Unconfined
val disposable = rxSingle(dispatcher1) { }
.map {
rxSingle(dispatcher2) { }
.blockingGet() // <----- This code deadlocks here
}
.timeout(100, TimeUnit.MILLISECONDS)
.subscribe(
{ println("Success") },
{ println("Error : $it") }
)
while (!disposable.isDisposed) {
// Wait for request to end
}
}
(Set aside that blockingGet()
is not a good practical choice here...this is just to highlight the problem.)
The above triggers the timeout because the blockingGet()
call is deadlocked:
Error : java.util.concurrent.TimeoutException: The source did not signal an event for 100 milliseconds and has been terminated.
This happens even if the suspending code inside rxSingle
operates with a different dispatcher:
fun main() {
val dispatcher1 = Dispatchers.Unconfined
val dispatcher2 = Dispatchers.Unconfined
val disposable = rxSingle(dispatcher1) {
withContext(Dispatchers.IO) {}
}
.map {
rxSingle(dispatcher2) {
withContext(Dispatchers.IO) {}
}
.blockingGet()
}
.timeout(100, TimeUnit.MILLISECONDS)
.subscribe(
{ println("Success") },
{ println("Error : $it") }
)
while (!disposable.isDisposed) {
// Wait for request to end
}
}
The code completes successfully and prints "Success"
if either dispatcher1
or dispatcher2
is set to Dispatchers.Default
/ Dispatchers.IO
etc. so it appears that the use of Unconfined
on both is key to the issue.
My main question is: is this expected? If so, why does it require both requests to use Unconfined
to see the issue? Is this because of the "event-loop" that gets involved here for nested coroutines:
Nested coroutines launched in this dispatcher form an event-loop to avoid stack overflows.
My understanding is that this was actually introduced in #860 to avoid deadlocks when using nested runBlocking
calls. Is it possible the conversion from suspending functions to Single
/ Completable
/ etc. unintentionally allows this problem to occur again when using Single.blockingGet()
?
For a little more context, the practical consideration here is that we have a networking layer that exposes suspend functions and then an RxJava wrapper around that network layer for callers who prefer to use RxJava. The calls are set up like the following:
fun rxJavaNetworkCall(...) = rxSingle(Dispatchers.Unconfined) {
suspendingNetworkingCall(...)
}
The reasoning behind Dispatchers.Unconfined
here is twofold:
- Avoid unnecessary thread creation and thread hopping (since each
suspendingNetworkCall
already manages its own threading). - Simulate existing RxJava behavior (for better or worse) in which a change in thread is propagated downstream.
These ideas were similarly discussed in #2925 (comment) . Setting up our RxJava wrapper layer in this manner leaves us susceptible to the deadlocks discussed above, however, if any callers use blockingGet()
in a network request chain for any reason. Is this just expected behavior and we should avoid using Dispatchers.Unconfined
in this way? I see there has been some discussion about a hypothetical dispatcher that works like Unconfined
and could be used in these kinds of cases : #2485 (comment)
Kotlin version : 1.5.21
coroutines-core / coroutines-rx2 version : 1.5.1
RxJava version : 2.2.19