Skip to content

Commit

Permalink
fix(shareReplay): shareReplay unsubscribe routine is called upon unsu…
Browse files Browse the repository at this point in the history
…bscribe

Unsubscribe routine for shareReplay was ingonred. Now it is being called.
  • Loading branch information
liqwid committed Feb 28, 2018
1 parent 2f9efbf commit 8b6987b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
5 changes: 3 additions & 2 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,19 @@ describe('Observable.prototype.shareReplay', () => {
it('should replay results to subsequent subscriptions if source completes, bufferSize=2', () => {
const source = cold('-1-2-----3-| ');
const shared = source.shareReplay(2);
const sourceSubs = '^ ! ';
const sourceSubs1 = '^ ! ';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3-| ';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3-| ';
const subscriber3 = hot(' (c|) ').mergeMapTo(shared);
const expected3 = ' (23|)';
const sourceSubs2 = ' (^!)';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(source.subscriptions).toBe([sourceSubs1, sourceSubs2]);
});

it('should completely restart for subsequent subscriptions if source errors, bufferSize=2', () => {
Expand Down
14 changes: 9 additions & 5 deletions src/internal/operators/shareReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import { Subscriber } from '../Subscriber';
* @owner Observable
*/
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler ): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler));
return function shareReplayFunction(source: Observable<T>) {
return source.lift(shareReplayOperator(bufferSize, windowTime, scheduler));
};
}

function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, scheduler?: IScheduler) {
Expand Down Expand Up @@ -38,14 +40,16 @@ function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, schedu
});
}

const innerSub = subject.subscribe(this);

return () => {
const _unsubscribe = this.unsubscribe;
this.unsubscribe = function() {
refCount--;
innerSub.unsubscribe();
if (subscription && refCount === 0 && isComplete) {
subscription.unsubscribe();
}

_unsubscribe.apply(this, arguments);
};

subject.subscribe(this);
};
}

0 comments on commit 8b6987b

Please sign in to comment.