Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared observable can not be retried #202

Closed
frederikaalund opened this issue Sep 18, 2017 · 10 comments
Closed

Shared observable can not be retried #202

frederikaalund opened this issue Sep 18, 2017 · 10 comments

Comments

@frederikaalund
Copy link
Contributor

Example program:

#!/usr/bin/env python3.6
from rx import Observable

source = Observable.interval(100).take(3).concat(Observable.throw(Exception('BOOM'))).share()

source.retry(3).subscribe(lambda v: print(f'on_next: {v}'),
                          lambda e: print(f'on_error: {e}'),
                          lambda:   print(f'on_completed'))

input("Press enter to exit...\n")

I expect to see the following output

...
on_next: 0
on_next: 1
on_next: 2
on_next: 0
on_next: 1
on_next: 2
on_next: 0
on_next: 1
on_next: 2
on_error: BOOM
...

Instead, I get the following output

...
on_next: 0
on_next: 1
on_next: 2
on_error: BOOM
...

Python 3.6.1
RxPy: 1.5.9

@thomasnield
Copy link
Contributor

That is interesting, and looks like a problem. I'll take a look when I get a chance if Dag doesn't beat me.

@dbrattli
Copy link
Collaborator

I think the behavior is correct, since you are publishing the result of source. Thus a re-subscribe (retry) will not help since the the published subject already has error-ed. The next subscribe will just fail. If you remove share() you should get what you want.

@dbrattli
Copy link
Collaborator

A similar discussion dotnet/reactive#238

@frederikaalund
Copy link
Contributor Author

frederikaalund commented Sep 19, 2017

Thanks for looking into this. I think the behaviour is different in rxjs. See ReactiveX/rxjs/issues/678. Maybe there is no consensus between the various implementations?

Edit: Here is an example of the expected behaviour with RxJS 5.3.4 (inspect the output in the browser console).

@thomasnield
Copy link
Contributor

I missed that error part, I agree with Dag.

@dbrattli
Copy link
Collaborator

The problem is that you are trying to retry a hot observable. The question is really to what lengths we should go to make things like retry() work in case it happens to be a cold observable behind the multicast. But it feels wrong to re-open the upstream subscription in case of an error, since you are using share/publish/multicast to make sure there is only a single subscription upstream. If this single upstream subscription fails with an error, then yes you have a problem.

My initial thought would be to try and solve this scenario in a different way by using Observable.catch_exception() with an enumerable that is a factory of brand new observables (instead of retrying the same old one).

@dbrattli
Copy link
Collaborator

dbrattli commented Sep 20, 2017

Something like this to keep the source cold. I btw. need to fix this so it works with generators directly so you don't have to use the Enumerable wrapper.

#!/usr/bin/env python3.6
from rx import Observable
from rx.internal.enumerable import Enumerable

source = Observable.interval(100).take(3)

def factory():
    for x in range(3):
        yield source.concat(Observable.throw(Exception('BOOM'))).share()

Observable.catch_exception(Enumerable(factory())).subscribe(lambda v: print('on_next: ', v),
                          lambda e: print('on_error: ', e),
                          lambda:   print('on_completed'))

input("Press enter to exit...\n")

@frederikaalund
Copy link
Contributor Author

Thanks to both of you for taking your time to look at this issue.

The problem is that you are trying to retry a hot observable. The question is really to what lengths we should go to make things like retry() work in case it happens to be a cold observable behind the multicast. But it feels wrong to re-open the upstream subscription in case of an error, since you are using share/publish/multicast to make sure there is only a single subscription upstream. If this single upstream subscription fails with an error, then yes you have a problem.

That makes sense. It would be nice to have multicast special-case this scenario so that .share().retry() works. However, I do fully understand that you don't want to support this (and future) special case scenarios. That could become a heavy maintenance burden.

Something like this to keep the source cold.

Thanks for the code example; I'm going to use that. I tried myself to use catch_exception but couldn't get it to work. Nice trick with the range-based factory.

I btw. need to fix this so it works with generators directly so you don't have to use the Enumerable wrapper.

Yeah, that would simplify things for the user.

Thanks again.

@dbrattli
Copy link
Collaborator

@frederikaalund Note that I have now updated my code example to put the .share() last in the chain as you had in your example.

@lock
Copy link

lock bot commented Jan 23, 2019

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jan 23, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants