From 241787ed5b415f4ef6613b2c9de4dade89d7ae8c Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Mon, 12 Sep 2016 22:22:57 -0700 Subject: [PATCH] feat(operators): Use lift in the operators that don't currently use lift. #1829 --- spec/Observable-spec.ts | 138 ++++++++++++++++++++++-- spec/operators/combineLatest-spec.ts | 4 +- spec/operators/finally-spec.ts | 2 +- spec/operators/publish-spec.ts | 7 +- spec/operators/publishBehavior-spec.ts | 9 +- spec/operators/publishLast-spec.ts | 10 +- spec/operators/publishReplay-spec.ts | 7 +- src/Rx.ts | 1 - src/observable/ConnectableObservable.ts | 9 ++ src/observable/MulticastObservable.ts | 21 ---- src/operator/combineLatest.ts | 2 +- src/operator/concat.ts | 6 +- src/operator/merge.ts | 5 +- src/operator/multicast.ts | 30 +++++- src/operator/race.ts | 3 +- src/operator/zip.ts | 3 +- 16 files changed, 199 insertions(+), 58 deletions(-) delete mode 100644 src/observable/MulticastObservable.ts diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 8d96a0b0270..e5b2b405a20 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -579,16 +579,22 @@ describe('Observable.create', () => { /** @test {Observable} */ describe('Observable.lift', () => { - it('should be overrideable in a custom Observable type that composes', (done: MochaDone) => { - class MyCustomObservable extends Rx.Observable { - lift(operator: Rx.Operator): Rx.Observable { - const observable = new MyCustomObservable(); - (observable).source = this; - (observable).operator = operator; - return observable; - } + + class MyCustomObservable extends Rx.Observable { + static from(source: any) { + const observable = new MyCustomObservable(); + observable.source = > source; + return observable; } + lift(operator: Rx.Operator): Rx.Observable { + const observable = new MyCustomObservable(); + (observable).source = this; + (observable).operator = operator; + return observable; + } + } + it('should be overrideable in a custom Observable type that composes', (done: MochaDone) => { const result = new MyCustomObservable((observer: Rx.Observer) => { observer.next(1); observer.next(2); @@ -610,6 +616,122 @@ describe('Observable.lift', () => { }); }); + it('should compose through multicast and refCount', (done: MochaDone) => { + const result = new MyCustomObservable((observer: Rx.Observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }) + .multicast(() => new Rx.Subject()) + .refCount() + .map((x: number) => { return 10 * x; }); + + expect(result instanceof MyCustomObservable).to.be.true; + + const expected = [10, 20, 30]; + + result.subscribe( + function (x) { + expect(x).to.equal(expected.shift()); + }, (x) => { + done(new Error('should not be called')); + }, () => { + done(); + }); + }); + + it('should compose through multicast with selector function', (done: MochaDone) => { + const result = new MyCustomObservable((observer: Rx.Observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }) + .multicast(() => new Rx.Subject(), (shared) => shared.map((x: number) => { return 10 * x; })); + + expect(result instanceof MyCustomObservable).to.be.true; + + const expected = [10, 20, 30]; + + result.subscribe( + function (x) { + expect(x).to.equal(expected.shift()); + }, (x) => { + done(new Error('should not be called')); + }, () => { + done(); + }); + }); + + it('should compose through combineLatest', () => { + const e1 = cold('-a--b-----c-d-e-|'); + const e2 = cold('--1--2-3-4---| '); + const expected = '--A-BC-D-EF-G-H-|'; + + const result = MyCustomObservable.from(e1).combineLatest(e2, (a: any, b: any) => String(a) + String(b)); + + expect(result instanceof MyCustomObservable).to.be.true; + + expectObservable(result).toBe(expected, { + A: 'a1', B: 'b1', C: 'b2', D: 'b3', E: 'b4', F: 'c4', G: 'd4', H: 'e4' + }); + }); + + it('should compose through concat', () => { + const e1 = cold('--a--b-|'); + const e2 = cold( '--x---y--|'); + const expected = '--a--b---x---y--|'; + + const result = MyCustomObservable.from(e1).concat(e2, rxTestScheduler); + + expect(result instanceof MyCustomObservable).to.be.true; + + expectObservable(result).toBe(expected); + }); + + it('should compose through merge', () => { + const e1 = cold('-a--b-| '); + const e2 = cold('--x--y-|'); + const expected = '-ax-by-|'; + + const result = MyCustomObservable.from(e1).merge(e2, rxTestScheduler); + + expect(result instanceof MyCustomObservable).to.be.true; + + expectObservable(result).toBe(expected); + }); + + it('should compose through race', () => { + const e1 = cold('---a-----b-----c----|'); + const e1subs = '^ !'; + const e2 = cold('------x-----y-----z----|'); + const e2subs = '^ !'; + const expected = '---a-----b-----c----|'; + + const result = MyCustomObservable.from(e1).race(e2); + + expect(result instanceof MyCustomObservable).to.be.true; + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should compose through zip', () => { + const e1 = cold('-a--b-----c-d-e-|'); + const e2 = cold('--1--2-3-4---| '); + const expected = ('--A--B----C-D| '); + + const result = MyCustomObservable.from(e1).zip(e2, (a: any, b: any) => String(a) + String(b)); + + expect(result instanceof MyCustomObservable).to.be.true; + + expectObservable(result).toBe(expected, { + A: 'a1', B: 'b2', C: 'c3', D: 'd4' + }); + }); + it('should allow injecting behaviors into all subscribers in an operator ' + 'chain when overridden', (done: MochaDone) => { // The custom Subscriber diff --git a/spec/operators/combineLatest-spec.ts b/spec/operators/combineLatest-spec.ts index 32efce1e5ed..0aaefe11253 100644 --- a/spec/operators/combineLatest-spec.ts +++ b/spec/operators/combineLatest-spec.ts @@ -6,8 +6,8 @@ const Observable = Rx.Observable; /** @test {combineLatest} */ describe('Observable.prototype.combineLatest', () => { asDiagram('combineLatest')('should combine events from two cold observables', () => { - const e1 = hot('-a--b-----c-d-e-|'); - const e2 = hot('--1--2-3-4---| '); + const e1 = cold('-a--b-----c-d-e-|'); + const e2 = cold('--1--2-3-4---| '); const expected = '--A-BC-D-EF-G-H-|'; const result = e1.combineLatest(e2, (a: any, b: any) => String(a) + String(b)); diff --git a/spec/operators/finally-spec.ts b/spec/operators/finally-spec.ts index 6b16458c99b..60e1ebc8f85 100644 --- a/spec/operators/finally-spec.ts +++ b/spec/operators/finally-spec.ts @@ -160,4 +160,4 @@ describe('Observable.prototype.finally', () => { rxTestScheduler.flush(); expect(executed).to.be.true; }); -}); \ No newline at end of file +}); diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index 1a49ce4e2a7..47891178af6 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -18,9 +18,12 @@ describe('Observable.prototype.publish', () => { published.connect(); }); - it('should return a ConnectableObservable', () => { + it('should return a ConnectableObservable-ish', () => { const source = Observable.of(1).publish(); - expect(source instanceof Rx.ConnectableObservable).to.be.true; + expect(typeof ( source)._subscribe === 'function').to.be.true; + expect(typeof ( source).getSubject === 'function').to.be.true; + expect(typeof source.connect === 'function').to.be.true; + expect(typeof source.refCount === 'function').to.be.true; }); it('should do nothing if connect is not called, despite subscriptions', () => { diff --git a/spec/operators/publishBehavior-spec.ts b/spec/operators/publishBehavior-spec.ts index 329782ef637..9cdbd7c280f 100644 --- a/spec/operators/publishBehavior-spec.ts +++ b/spec/operators/publishBehavior-spec.ts @@ -18,9 +18,12 @@ describe('Observable.prototype.publishBehavior', () => { published.connect(); }); - it('should return a ConnectableObservable', () => { + it('should return a ConnectableObservable-ish', () => { const source = Observable.of(1).publishBehavior(1); - expect(source instanceof Rx.ConnectableObservable).to.be.true; + expect(typeof ( source)._subscribe === 'function').to.be.true; + expect(typeof ( source).getSubject === 'function').to.be.true; + expect(typeof source.connect === 'function').to.be.true; + expect(typeof source.refCount === 'function').to.be.true; }); it('should only emit default value if connect is not called, despite subscriptions', () => { @@ -327,4 +330,4 @@ describe('Observable.prototype.publishBehavior', () => { expect(results).to.deep.equal([]); done(); }); -}); \ No newline at end of file +}); diff --git a/spec/operators/publishLast-spec.ts b/spec/operators/publishLast-spec.ts index c956182dd2f..ce981fdc922 100644 --- a/spec/operators/publishLast-spec.ts +++ b/spec/operators/publishLast-spec.ts @@ -18,10 +18,12 @@ describe('Observable.prototype.publishLast', () => { published.connect(); }); - it('should return a ConnectableObservable', () => { + it('should return a ConnectableObservable-ish', () => { const source = Observable.of(1).publishLast(); - - expect(source instanceof Rx.ConnectableObservable).to.be.true; + expect(typeof ( source)._subscribe === 'function').to.be.true; + expect(typeof ( source).getSubject === 'function').to.be.true; + expect(typeof source.connect === 'function').to.be.true; + expect(typeof source.refCount === 'function').to.be.true; }); it('should do nothing if connect is not called, despite subscriptions', () => { @@ -249,4 +251,4 @@ describe('Observable.prototype.publishLast', () => { expect(subscriptions).to.equal(1); done(); }); -}); \ No newline at end of file +}); diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index f0df749a513..825d2cc661d 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -18,9 +18,12 @@ describe('Observable.prototype.publishReplay', () => { published.connect(); }); - it('should return a ConnectableObservable', () => { + it('should return a ConnectableObservable-ish', () => { const source = Observable.of(1).publishReplay(); - expect(source instanceof Rx.ConnectableObservable).to.be.true; + expect(typeof ( source)._subscribe === 'function').to.be.true; + expect(typeof ( source).getSubject === 'function').to.be.true; + expect(typeof source.connect === 'function').to.be.true; + expect(typeof source.refCount === 'function').to.be.true; }); it('should do nothing if connect is not called, despite subscriptions', () => { diff --git a/src/Rx.ts b/src/Rx.ts index 4978b8e7596..39d64953fb0 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -150,7 +150,6 @@ export {Subscriber} from './Subscriber'; export {AsyncSubject} from './AsyncSubject'; export {ReplaySubject} from './ReplaySubject'; export {BehaviorSubject} from './BehaviorSubject'; -export {MulticastObservable} from './observable/MulticastObservable'; export {ConnectableObservable} from './observable/ConnectableObservable'; export {Notification} from './Notification'; export {EmptyError} from './util/EmptyError'; diff --git a/src/observable/ConnectableObservable.ts b/src/observable/ConnectableObservable.ts index 2d00ae9a56c..6d160ae536b 100644 --- a/src/observable/ConnectableObservable.ts +++ b/src/observable/ConnectableObservable.ts @@ -51,6 +51,15 @@ export class ConnectableObservable extends Observable { } } +export const connectableObservableDescriptor: PropertyDescriptorMap = { + operator: { value: null }, + _refCount: { value: 0, writable: true }, + _subscribe: { value: ( ConnectableObservable.prototype)._subscribe }, + getSubject: { value: ( ConnectableObservable.prototype).getSubject }, + connect: { value: ( ConnectableObservable.prototype).connect }, + refCount: { value: ( ConnectableObservable.prototype).refCount } +}; + class ConnectableSubscriber extends SubjectSubscriber { constructor(destination: Subject, private connectable: ConnectableObservable) { diff --git a/src/observable/MulticastObservable.ts b/src/observable/MulticastObservable.ts deleted file mode 100644 index ba490bfa374..00000000000 --- a/src/observable/MulticastObservable.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { Subject } from '../Subject'; -import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { ConnectableObservable } from '../observable/ConnectableObservable'; - -export class MulticastObservable extends Observable { - constructor(protected source: Observable, - private subjectFactory: () => Subject, - private selector: (source: Observable) => Observable) { - super(); - } - - protected _subscribe(subscriber: Subscriber): Subscription { - const { selector, source } = this; - const connectable = new ConnectableObservable(source, this.subjectFactory); - const subscription = selector(connectable).subscribe(subscriber); - subscription.add(connectable.connect()); - return subscription; - } -} diff --git a/src/operator/combineLatest.ts b/src/operator/combineLatest.ts index fef440908f1..b0596d4c6c9 100644 --- a/src/operator/combineLatest.ts +++ b/src/operator/combineLatest.ts @@ -62,7 +62,7 @@ export function combineLatest(...observables: Array | observables.unshift(this); - return new ArrayObservable(observables).lift(new CombineLatestOperator(project)); + return this.lift.call(new ArrayObservable(observables), new CombineLatestOperator(project)); } /* tslint:disable:max-line-length */ diff --git a/src/operator/concat.ts b/src/operator/concat.ts index d4b9fdb695e..6e16f5f325d 100644 --- a/src/operator/concat.ts +++ b/src/operator/concat.ts @@ -45,7 +45,7 @@ import { MergeAllOperator } from './mergeAll'; * @owner Observable */ export function concat(...observables: Array | Scheduler>): Observable { - return concatStatic(this, ...observables); + return this.lift.call(concatStatic(this, ...observables)); } /* tslint:disable:max-line-length */ @@ -119,5 +119,9 @@ export function concatStatic(...observables: Array | scheduler = args.pop(); } + if (scheduler === null && observables.length === 1) { + return >observables[0]; + } + return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator(1)); } diff --git a/src/operator/merge.ts b/src/operator/merge.ts index a034184f349..3987507f471 100644 --- a/src/operator/merge.ts +++ b/src/operator/merge.ts @@ -51,8 +51,7 @@ import { isScheduler } from '../util/isScheduler'; * @owner Observable */ export function merge(...observables: Array | Scheduler | number>): Observable { - observables.unshift(this); - return mergeStatic.apply(this, observables); + return this.lift.call(mergeStatic(this, ...observables)); } /* tslint:disable:max-line-length */ @@ -149,7 +148,7 @@ export function mergeStatic(...observables: Array | S concurrent = observables.pop(); } - if (observables.length === 1) { + if (scheduler === null && observables.length === 1) { return >observables[0]; } diff --git a/src/operator/multicast.ts b/src/operator/multicast.ts index 6d5686526f6..26df717d55d 100644 --- a/src/operator/multicast.ts +++ b/src/operator/multicast.ts @@ -1,7 +1,8 @@ import { Subject } from '../Subject'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { MulticastObservable } from '../observable/MulticastObservable'; -import { ConnectableObservable } from '../observable/ConnectableObservable'; +import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable'; /** * Returns an Observable that emits the results of invoking a specified selector on items @@ -33,9 +34,15 @@ export function multicast(subjectOrSubjectFactory: Subject | (() => Subjec }; } - return !selector ? - new ConnectableObservable(this, subjectFactory) : - new MulticastObservable(this, subjectFactory, selector); + if (typeof selector === 'function') { + return this.lift(new MulticastOperator(subjectFactory, selector)); + } + + const connectable: any = Object.create(this, connectableObservableDescriptor); + connectable.source = this; + connectable.subjectFactory = subjectFactory; + + return > connectable; } export type factoryOrValue = T | (() => T); @@ -45,3 +52,16 @@ export interface MulticastSignature { (subjectOrSubjectFactory: factoryOrValue>): ConnectableObservable; (SubjectFactory: () => Subject, selector?: selector): Observable; } + +export class MulticastOperator implements Operator { + constructor(private subjectFactory: () => Subject, + private selector: (source: Observable) => Observable) { + } + call(subscriber: Subscriber, self: any): any { + const { selector } = this; + const connectable = new ConnectableObservable(self.source, this.subjectFactory); + const subscription = selector(connectable).subscribe(subscriber); + subscription.add(connectable.connect()); + return subscription; + } +} diff --git a/src/operator/race.ts b/src/operator/race.ts index 32217ee4776..c189bae4d49 100644 --- a/src/operator/race.ts +++ b/src/operator/race.ts @@ -23,8 +23,7 @@ export function race(...observables: Array | Array>>observables[0]; } - observables.unshift(this); - return raceStatic.apply(this, observables); + return this.lift.call(raceStatic(this, ...observables)); } export interface RaceSignature { diff --git a/src/operator/zip.ts b/src/operator/zip.ts index a7c7861f824..e182c9f1513 100644 --- a/src/operator/zip.ts +++ b/src/operator/zip.ts @@ -16,8 +16,7 @@ import { $$iterator } from '../symbol/iterator'; * @owner Observable */ export function zipProto(...observables: Array | ((...values: Array) => R)>): Observable { - observables.unshift(this); - return zipStatic.apply(this, observables); + return this.lift.call(zipStatic(this, ...observables)); } /* tslint:disable:max-line-length */