Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Why shareReplay() subscribes to source observable twice? #763

Closed
xwk opened this issue Jun 12, 2015 · 8 comments
Closed

Why shareReplay() subscribes to source observable twice? #763

xwk opened this issue Jun 12, 2015 · 8 comments

Comments

@xwk
Copy link

xwk commented Jun 12, 2015

Hi,

First, the problem is different from another issue with shareReplay() I mentioned in #758, and I tested the code below with Rx version 2.5.3.

I was expecting the createJob observable, as shown in the code below, should only be subscribed once, since I made it a shared one by doing .shareReplay(). However, it actually was subscribed twice.

var Rx = require('rx');

var createJob = function() {
  return Rx.Observable.create(function(subscriber) {

    console.log("subscribed to create job");
    var resolve = function() {
      console.log("resolve createJob");
      subscriber.onNext("job123");
      return subscriber.onCompleted();
    };
    return setTimeout(resolve, 10);
  });
};

var createJobEvtObservable = createJob()
.doOnNext(function(data) {
  return console.log("Side effect " + data);
})
.shareReplay();

var doJob = function(jobId) {
  console.log("start job " + jobId);
  return Rx.Observable.interval(300).take(3).map(function(i) {
    return "completed step " + i;
  });
};

var doJobEvtObservable = createJobEvtObservable.flatMap(doJob);

createJobEvtObservable.concat(doJobEvtObservable).subscribe(function(evt) {
  return console.log("evt=", evt);
});

The output is as below

subscribed to create job
resolve createJob
Side effect job123
evt= job123
subscribed to create job
start job job123
resolve createJob
evt= completed step 0
evt= completed step 1
evt= completed step 2

You can see 'subscribed to create job' is printed twice, despite of shareReplay(). However, more strangely, the "Side effect job123" indeed only is printed once.

@paulpdaniels
Copy link
Contributor

The problem is that shareReplay() is really replay().refCount(), and concat() is really enumerableOf(sources).merge(1).

If you look at the source of refCount you'll see that once the count dips to zero, the subscription disconnects, and once it goes up to one again, it reconnects. In concat() it waits for the previous observable to complete (i.e. count == 0) then it starts the new one (count == 1, reconnect!).

All this is a roundabout way of saying don't use shareReplay() here, use the replay()/connect() combo instead.

@mattpodwysocki is refCount supposed to restart like that? I never noticed it before.

@xwk
Copy link
Author

xwk commented Jun 12, 2015

@paulpdaniels, thanks for your feedback, but why the "Side effect job123" is only printed once? If the source observable is reconnected to, then this message should be printed twice, shouldn't it?

@xwk
Copy link
Author

xwk commented Jun 12, 2015

@paulpdaniels, I also followed you suggestion to use replay()/connect() instead of shareReplay(). The resulted code is shown below and it indeed works. However, there is still one thing confusing me.

var Rx = require('rx');

var createJob = function() {
  return Rx.Observable.create(function(subscriber) {

    console.log("subscribed to create job");
    var resolve = function() {
      console.log("resolve createJob");
      subscriber.onNext("job123");
      return subscriber.onCompleted();
    };
    return setTimeout(resolve, 10);
  });
};

var createJobEvtObservable = createJob()
.doOnNext(function(data) {
  return console.log("Side effect " + data);
})
.replay();

var doJob = function(jobId) {
  console.log("start job " + jobId);
  return Rx.Observable.interval(300).take(3).map(function(i) {
    return "completed step " + i;
  });
};

var doJobEvtObservable = createJobEvtObservable.flatMap(doJob);

createJobEvtObservable.concat(doJobEvtObservable).subscribe(function(evt) {
  return console.log("evt=", evt);
});

createJobEvtObservable.connect();

The code above will output

subscribed to create job
resolve createJob
Side effect job123
evt= job123
start job job123
evt= completed step 0
evt= completed step 1
evt= completed step 2

However, if I remove the last line, the one with the connect() call, from my code, nothing will be output. On the contrary, the example showed in https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/replay.md does NOT have any connect() call at all, but it still works. What confused me is why the connect() call is necessary in my code but not in the example code in replay.md?

@paulpdaniels
Copy link
Contributor

You certainly have found the perfect storm of issues :) . You are right it should be printing twice, but it looks like there is a bug in the perf/tap implementation. 'doOnNext' implicitly creates an 'anonymousobserver', when the observable completes the first time, it stops receiving messages. Until I can push a fix, the solution is to use 'tap' with a non-completing observer. That will override the default behavior of creating a new observer.

@paulpdaniels
Copy link
Contributor

@xwk Yes it is a little confusing, the one in the sample is an overload that redirects to the multicast overload, taking a factory function that returns a subject, which is modified by the selector function.

See here

@mattpodwysocki
Copy link
Member

@paulpdaniels yes, refCount is supposed to restart like that. If you try with a cold observable, it will start from scratch all over again when it hits zero and then adds another subscription.

@mattpodwysocki
Copy link
Member

@xwk yes, the overload that takes a selector will auto connect the observable for you, hence why no need for any connect.

mattpodwysocki added a commit that referenced this issue Jul 20, 2015
@mattpodwysocki
Copy link
Member

@xwk that indeed was a bug which is now fixed.

bouzuya pushed a commit to bouzuya/RxJS that referenced this issue Mar 23, 2016
Fix catch operator to not have anymore a shared underlying Subscription, and instead reset the
subscription for each new observable replacing the caught error. This fixes a potential memory leak
if catch is used as an infinite retry, because subscriptions would be retained since the beginning,
and would increasing each time a catch is performed.

Resolves issue Reactive-Extensions#763.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants