Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

publish().refCount() race may leave observers hanging #6501

Closed
akarnokd opened this issue Jun 13, 2019 · 5 comments · Fixed by #6505
Closed

publish().refCount() race may leave observers hanging #6501

akarnokd opened this issue Jun 13, 2019 · 5 comments · Fixed by #6505

Comments

@akarnokd
Copy link
Member

This test eventually times out because one of the sources will not complete or trigger a reconnection:

for (int i = 0; i < 10_000; i++) {
    Observable<Integer> observable = Observable.just(1).publish().refCount();

    TestObserver<Integer> observer1 = observable
        .subscribeOn(Schedulers.computation())
        .test();

        TestObserver<Integer> observer2 = observable
        .subscribeOn(Schedulers.computation())
        .test();

    observer1
        .withTag("observer1 " + i)
        .awaitDone(5, TimeUnit.SECONDS)
        .assertNoErrors()
        .assertComplete();

    observer2
        .withTag("observer2 " + i)
        .awaitDone(5, TimeUnit.SECONDS)
        .assertNoErrors()
        .assertComplete();
}

(Originally reported as a comment.)

@jhansche
Copy link

In our anecdotal case, it's an observable that gets disposed and therefore cancels the HTTP request, while the next subscriber receives the immediately-canceled stream from the first subscriber.

Something like:

val apiRequest: Observable<Foo> = retrofit.apiRequest()...share()

// subscriber 1, main thread:
val disposable = apiRequest....subscribeOn(Schedulers.io()).subscribe(...)
...
disposable.dispose()

// subscriber 2, also main thread:
val disposable = apiRequest....subsribeOn(Schedulers.io()).subscribe(...)

Subscriber 1 for one reason or another, disposes before the API request responds. Nearly the same time as the first one disposing, Subscriber 2 subscribes, and only receives the InterruptedIOException from the first request being canceled.

As I understood, when the first subscriber disposes, it should cancel the API request and also removes itself as a subscriber, so the refCount() should cause the upstream observable to dispose. When the 2nd subscriber attaches, it should be starting a brand new request to the upstream API source, rather than receiving the first error.

So it's slightly different because it's not actually a timeout (as if the stream had zero events), it's actually receiving the error from the first subscriber's canceled attempt.

@jhansche
Copy link

We were able to track down the part of our code that was triggering this issue, but I still believe that it is a race condition in .share(). Where your original suspicion was "one of the observers connect right between onNext and onComplete", in our case RefCount is dispatching the dispose/cancel upstream to the original request, but is not decrementing the RefCount quickly enough, so that the 2nd request is coming in as if it is refCount=2, even though it has already terminated the upstream (and therefore should have already become refCount=0 and the ConnectableObservable should already have been disconnected).

In our case (Android), we had 2 separate Fragments being added approximately simultaneously. Fragment 1 makes a request for configuration (which is implemented with share() and intended to be shared across multiple requests for the same configs), and then shortly after also disposes that request (this was a mistake, and that's what has been fixed on our end -- short version is, we were using CompositeDisposable.clear(), but we should have only been disposing a single unrelated observable). Fragment 2 also makes the same request for configurations.

As a result of CompositeDisposable.clear(), Fragment 1's config request got disposed, and rather than allowing Fragment 2's request to continue operating due to the refCount, Fragment 2 only receives the OkHttp InterruptedIOException from the canceled operation and never receives a completed successful response.

@akarnokd
Copy link
Member Author

share() is generally tricky, even without this bug, because people have a sense of it staying alive a bit longer and also receiving onNext events in time, which is not guaranteed with it. You could try using replay(1).refCount() or publish().refCount(1, TimeUnit.SECONDS) to give it some wiggle room until I find a way to fix this bug (my attempts this far have broken other unit tests so it may take a few days).

@akarnokd
Copy link
Member Author

This is a tough one.

What happens is that the termination happens when a new observer subscribes, which makes refCount=2 but the new observer gets assigned to a fresh publishing instance, ready to be connected. However, the refCount's termination management sees refCount=2 and won't reset nor will it reconnect. If it reset though, that would clear the new observer and still result in a hang too.

This could be resolved if a late observer wasn't assigned to a fresh publishing instance, but receive the terminal signal instead (thus making refCount get down to zero and reset/reconnect). Unfortunately, this is a behavior change that, in addition, requires API change to allow manual resetting for those who work with ConnectableObservable directly. I already planned to resolve the underlying issue in 3.x: #5628

I'll keep thinking about this problem, until then, you could use the workarounds above, or use timeout().retry() to detect the hang and make it cancel/reconnect.

@jhansche
Copy link

I tried using .retry() and the problem with that seemed to be that the .share() caused it to keep the existing InterruptedIOException error, so it just kept re-emitting the same error on each retry.

I don't think I tried .replay(1).refCount(), but that is worth testing - thanks! I thought I was avoiding replay for some some reason, but now I can't think of any reason why I would need to avoid that...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants