Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal for ConnectableObservable#connectOnSubscribe #3238

Closed
Airblader opened this issue Jan 19, 2018 · 14 comments
Closed

Proposal for ConnectableObservable#connectOnSubscribe #3238

Airblader opened this issue Jan 19, 2018 · 14 comments

Comments

@Airblader
Copy link
Contributor

ConnectableObservable currently supports two standard ways to manage subscriptions to the source:

  1. connect which immediately subscribes to the source and never unsubscribes (unless the source completes)
  2. refCount which subscribes when the number of subscriptions goes up from 0, and unsubscribes the source when the number of subscriptions hits 0 again.

Something I ran into a couple times now is a mixture of the two: subscribe to the source on the first subscription, but then stay connected even if all subscriptions go away. Perhaps I'm also just missing something and this can be trivially done already in a natural way (?), but if not, I'd love to see it. :-)

@cartant
Copy link
Collaborator

cartant commented Jan 19, 2018

You might find this comment of Paul Taylor's interesting. In fact, that entire thread is interesting.

@Airblader
Copy link
Contributor Author

Thanks, that is interesting. Looks like that issue has been closed without the proposal ever having been opened back up or implemented, though.

@cartant
Copy link
Collaborator

cartant commented Jan 19, 2018

Yep, but the idea expressed in the comment's snippet is useful. I used it to create an operator that waits a specified amount of time before unsubscribing: https://github.com/cartant/rxjs-etc/blob/master/source/let/refCountAuditTime.ts

You could also look at the implementation of shareReplay - in this repo - as it does what you seem to want: it never unsubscribes. Some parts of that implementation can be removed, too. See this thread: #3127

@Airblader
Copy link
Contributor Author

Airblader commented Jan 19, 2018

I can solve my usecase even simpler using operator composition:

const connectOnSubscribe = source$ => Observable.defer(() => Observable.of(source$.connect())
  .concatMapTo(source$);

Unless you think there's something wrong with that (seeing how much more complicated the other ones seem to be) I guess I can be happy with this.

@cartant
Copy link
Collaborator

cartant commented Jan 19, 2018

I don't think that will do what you want, as connect returns a Subscription. And you should ensure that connect is called after a subscription is made to the ConnectableObservable - otherwise you will miss any immediate emissions.

I think this should do what you are looking for:

const connectOnSubscribe = connectable$ => Rx.Observable.create(observer => {
  const subscription = connectable$.subscribe(observer);
  connectable$.connect();
  return subscription;
});

However, I'm not at all sure that it will behave itself if there are multiple subscriptions. What happens when connect is called more than once is not something I've ever considered.

@Airblader
Copy link
Contributor Author

Thanks! In my case there are no sync emissions so it wasn't an issue, but I'll switch to you solution anyway just to be cleaner.

As far as connect goes, when I looked at the implementation it seemed that it's an idempotent call. Perhaps this is just an implementation detail, though, and not guaranteed by the contract. Perhaps it'd be a separate issue to clarify this?

@cartant
Copy link
Collaborator

cartant commented Jan 20, 2018

Yep, you're right, connect can be called as many times as you like with no ill effects. As for whether it's guaranteed, I don't know. I've not been able to find it mentioned in any docs (v4 or v5) and there don't appear to be any tests that focus explicitly on multiple connect calls. If you are interested, the commit that introduced the if statement in connect is this one: d1412bc

@cartant
Copy link
Collaborator

cartant commented Jan 20, 2018

FYI, whilst looking into something else, I found a thread that's somewhat related to the can-connect-be-called-multiple-times question. See this comment and onwards. And, again, the entire thread is pretty interesting - if that's your thing.

@Airblader
Copy link
Contributor Author

Yes, I absolutely appreciate the insight, thank you!

While that thread, if I understand correctly, is mostly about connect behavior when the source has completed or errored (which isn't an issue in my specific case as they never will), I do wonder whether "connect() is idempotent while the source is alive" is still as safe of an assumption as I thought at first.

@Airblader
Copy link
Contributor Author

Airblader commented Jan 20, 2018

I guess one could avoid the question of calling connect() multiple times by using

const connectOnSubscribe = source$ => {
  let connection = null;
  return Rx.Observable.create(observer => {
    const subscription = source$.subscribe(observer);
    if (!connection) {
      connection = source$.connect();
    }

    return subscription;
  });
};

That way it'd be on the safe side either way.


Edit: Updated as per @cartant's hint.

@cartant
Copy link
Collaborator

cartant commented Jan 20, 2018

Yep, but I'd encourage subscribing before connecting. There's no reason not to and it's more general, as it'll work with sources that emit synchronously, too.

@Airblader
Copy link
Contributor Author

Same mistake twice in a row facepalm. You're right of course, thanks again!

@Airblader
Copy link
Contributor Author

I've realized that I'd actually need the inner subscription to end on errors so the entire thing is retryable. This as per my understanding means what I need is effectively the new behavior of shareReplay. I'm going to close this in favor of the discussion at #3127.

@lock
Copy link

lock bot commented Jun 6, 2018

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 6, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants