-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix a race in some tests for JavaRX integration #1801
Conversation
f84bf69
to
4ba9802
Compare
An extremely rare race could happen in any of the tests in `LeakedExceptionTest` in the following case: * `withExceptionHandler` runs the block passed to it; * In one of the last iterations of `repeat`, `select` in `combine` happens on both flows at the same time, that is, the block that was passed to `rx[Something]` runs in two threads simultaneously; * One of these two threads (thread A) runs anomalously slow; * The other thread successfully throws an exception; * This exception is propagated to `catch`, so `collect` is finished; * `repeat` is exited, the block passed to `withExceptionHandler` is done executing; * `withExceptionHandler` sets back the usual exception handler, which fails when an exception in JavaRX happens (see https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling); * Thread A wakes up and throws an exception. This time, it is passed not to `handler`, which is made specifically to deal with this, but to the default handler. As a fix, now a special coroutine context passed to `rx[Something]` ensures that the spawned executions are run in a thread pool that blocks until all the tasks are done.
4ba9802
to
5a32d76
Compare
repeat(10000) { | ||
combine(flow, flow) { _, _ -> Unit } | ||
.catch {} | ||
.collect { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style nit: Continuation lines should be formatted with 4 spaces of indentation (not 8).
Also, it as we are touching this code anyway, it would be create to fix { }
-> {}
(remove space)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
val dispatcher = pool.asCoroutineDispatcher() | ||
block(dispatcher) | ||
pool.shutdown() | ||
while (!pool.awaitTermination(10, TimeUnit.SECONDS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when you write while
with empty body write { /* deliberately empty */ }
(with comment!) as its body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well done
See the commit message for the rationale.
I looked at the other uses of
withExceptionHandler
, and it seems that all the other executions are deterministic. I have slight doubts abouttestExceptionAfterCancellation
, but, to my understanding,blockingSubscribe
ensures that every execution is finished.