Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shareReplay): adds shareReplay variant of publishReplay #2443

Merged
merged 1 commit into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion spec/helpers/marble-testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ import {Observable} from '../../dist/cjs/Observable';
import {SubscriptionLog} from '../../dist/cjs/testing/SubscriptionLog';
import {ColdObservable} from '../../dist/cjs/testing/ColdObservable';
import {HotObservable} from '../../dist/cjs/testing/HotObservable';
import {observableToBeFn, subscriptionLogsToBeFn} from '../../dist/cjs/testing/TestScheduler';
import {TestScheduler, observableToBeFn, subscriptionLogsToBeFn} from '../../dist/cjs/testing/TestScheduler';

declare const global: any;

export const rxTestScheduler: TestScheduler = global.rxTestScheduler;

export function hot(marbles: string, values?: any, error?: any): HotObservable<any> {
if (!global.rxTestScheduler) {
throw 'tried to use hot() in async test';
Expand Down
167 changes: 167 additions & 0 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const { asDiagram };
declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

const Observable = Rx.Observable;

/** @test {shareReplay} */
describe('Observable.prototype.shareReplay', () => {
it('should mirror a simple source Observable', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = '^ !';
const published = source.shareReplay();
const expected = '--1-2---3-4--5-|';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should do nothing if result is not subscribed', () => {
let subscribed = false;
const source = new Observable(() => {
subscribed = true;
});
source.shareReplay();
expect(subscribed).to.be.false;
});

it('should multicast the same values to multiple observers, bufferSize=1', () => {
const source = cold('-1-2-3----4-|'); const shared = source.shareReplay(1);
const sourceSubs = '^ !';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-3----4-|';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' 23----4-|';
const subscriber3 = hot(' c| ').mergeMapTo(shared);
const expected3 = ' 3-4-|';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast the same values to multiple observers, bufferSize=2', () => {
const source = cold('-1-2-----3------4-|'); const shared = source.shareReplay(2);
const sourceSubs = '^ !';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3------4-|';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3------4-|';
const subscriber3 = hot(' c| ').mergeMapTo(shared);
const expected3 = ' (23)-4-|';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast an error from the source to multiple observers', () => {
const source = cold('-1-2-3----4-#'); const shared = source.shareReplay(1);
const sourceSubs = '^ !';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-3----4-#';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' 23----4-#';
const subscriber3 = hot(' c| ').mergeMapTo(shared);
const expected3 = ' 3-4-#';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast an empty source', () => {
const source = cold('|');
const sourceSubs = '(^!)';
const shared = source.shareReplay(1);
const expected = '|';

expectObservable(shared).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast a never source', () => {
const source = cold('-');
const sourceSubs = '^';

const shared = source.shareReplay(1);
const expected = '-';

expectObservable(shared).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast a throw source', () => {
const source = cold('#');
const sourceSubs = '(^!)';
const shared = source.shareReplay(1);
const expected = '#';

expectObservable(shared).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should replay results to subsequent subscriptions if source completes, bufferSize=2', () => {
const source = cold('-1-2-----3-| ');
const shared = source.shareReplay(2);
const sourceSubs = '^ ! ';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3-| ';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3-| ';
const subscriber3 = hot(' (c|) ').mergeMapTo(shared);
const expected3 = ' (23|)';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should completely restart for subsequent subscriptions if source errors, bufferSize=2', () => {
const source = cold('-1-2-----3-# ');
const shared = source.shareReplay(2);
const sourceSubs1 = '^ ! ';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3-# ';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3-# ';
const subscriber3 = hot(' (c|) ').mergeMapTo(shared);
const expected3 = ' -1-2-----3-#';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't expected3 be (23)#?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore me. It's correct. (because the source errored, it didn't complete, so we create a new ReplaySubject for the next multicast)

const sourceSubs2 = ' ^ !';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe([sourceSubs1, sourceSubs2]);
});

it('should be retryable, bufferSize=2', () => {
const subs = [];
const source = cold('-1-2-----3-# ');
const shared = source.shareReplay(2).retry(1);
subs.push( '^ ! ');
subs.push( ' ^ ! ');
subs.push( ' ^ !');
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3--1-2-----3-# ';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3--1-2-----3-# ';
const subscriber3 = hot(' (c|) ').mergeMapTo(shared);
const expected3 = ' (12)-3--1-2-----3-#';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ import './add/operator/sampleTime';
import './add/operator/scan';
import './add/operator/sequenceEqual';
import './add/operator/share';
import './add/operator/shareReplay';
import './add/operator/single';
import './add/operator/skip';
import './add/operator/skipUntil';
Expand Down
11 changes: 11 additions & 0 deletions src/add/operator/shareReplay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

import { Observable } from '../../Observable';
import { shareReplay } from '../../operator/shareReplay';

Observable.prototype.shareReplay = shareReplay;

declare module '../../Observable' {
interface Observable<T> {
shareReplay: typeof shareReplay;
}
}
24 changes: 15 additions & 9 deletions src/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export class ConnectableObservable<T> extends Observable<T> {
protected _subject: Subject<T>;
protected _refCount: number = 0;
protected _connection: Subscription;
_isComplete = false;

constructor(protected source: Observable<T>,
protected subjectFactory: () => Subject<T>) {
Expand All @@ -33,6 +34,7 @@ export class ConnectableObservable<T> extends Observable<T> {
connect(): Subscription {
let connection = this._connection;
if (!connection) {
this._isComplete = false;
connection = this._connection = new Subscription();
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
Expand All @@ -51,15 +53,18 @@ export class ConnectableObservable<T> extends Observable<T> {
}
}

const connectableProto = <any>ConnectableObservable.prototype;

export const connectableObservableDescriptor: PropertyDescriptorMap = {
operator: { value: null },
_refCount: { value: 0, writable: true },
_subject: { value: null, writable: true },
_connection: { value: null, writable: true },
_subscribe: { value: (<any> ConnectableObservable.prototype)._subscribe },
getSubject: { value: (<any> ConnectableObservable.prototype).getSubject },
connect: { value: (<any> ConnectableObservable.prototype).connect },
refCount: { value: (<any> ConnectableObservable.prototype).refCount }
_subscribe: { value: connectableProto._subscribe },
_isComplete: { value: connectableProto._isComplete, writable: true },
getSubject: { value: connectableProto.getSubject },
connect: { value: connectableProto.connect },
refCount: { value: connectableProto.refCount }
};

class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
Expand All @@ -72,17 +77,18 @@ class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
super._error(err);
}
protected _complete(): void {
this.connectable._isComplete = true;
this._unsubscribe();
super._complete();
}
protected _unsubscribe() {
const { connectable } = this;
const connectable = <any>this.connectable;
if (connectable) {
this.connectable = null;
const connection = (<any> connectable)._connection;
(<any> connectable)._refCount = 0;
(<any> connectable)._subject = null;
(<any> connectable)._connection = null;
const connection = connectable._connection;
connectable._refCount = 0;
connectable._subject = null;
connectable._connection = null;
if (connection) {
connection.unsubscribe();
}
Expand Down
26 changes: 26 additions & 0 deletions src/operator/shareReplay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Observable } from '../Observable';
import { multicast } from './multicast';
import { ReplaySubject } from '../ReplaySubject';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { IScheduler } from '../Scheduler';

/**
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(
this: Observable<T>,
bufferSize?: number,
windowTime?: number,
scheduler?: IScheduler
): Observable<T> {
let subject: ReplaySubject<T>;
const connectable = multicast.call(this, function shareReplaySubjectFactory(this: ConnectableObservable<T>) {
if (this._isComplete) {
return subject;
} else {
return (subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler));
}
});
return connectable.refCount();
};