-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
2.x: fix Completable.concat to use replace (don't dispose old) #5695
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #5695 +/- ##
===========================================
+ Coverage 96.23% 96.3% +0.07%
Complexity 5818 5818
===========================================
Files 634 634
Lines 41604 41604
Branches 5761 5761
===========================================
+ Hits 40037 40067 +30
+ Misses 628 610 -18
+ Partials 939 927 -12
Continue to review full report at Codecov.
|
for (int i = 0; i < count; i++) { | ||
Completable.complete() | ||
.subscribeOn(Schedulers.io()) | ||
.observeOn(Schedulers.io()) // The problem does not occur if you comment out this line |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we delete these comments?
try { | ||
Thread.sleep(30); | ||
} catch (InterruptedException e) { | ||
System.out.println("Interrupted! " + Thread.currentThread()); // This is output periodically |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need it, or at least message should be changed to something like System.err.println("Interrupted, but was not expected to")
for (int k = 0; k < 100; k++) { | ||
final int count = 10; | ||
final CountDownLatch latch = new CountDownLatch(count); | ||
final boolean[] interrupted = { false }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: AtomicBoolean
would be a nicer replacement for mutable Boolean :)
Completable.andThen
,concat(Iterable)
andconcatArray()
disposed the previousDisposable
when receiving the newDisposable
from the next source which could lead to interruption.concat(Publisher)
already usesreplace
instead ofupdate
.There is a peculiar interplay between
subscribeOn
,observeOn
and the trampoline inconcat
which can trigger such an interruption: the task of theobserveOn
is cancelled withmayInterruptIfRunning == true
because thedispose
call chain shuts down the worker of theobserveOn
from thesubscribeOn
's thread.Reported in #5694