Skip to content

Commit

Permalink
feat(operators): Use lift in the operators that don't currently use l…
Browse files Browse the repository at this point in the history
…ift.

BREAKING CHANGE:  Removes MulticastObservable subclass in favor of a MulticastOperator.
  • Loading branch information
trxcllnt committed Sep 15, 2016
1 parent cd953b1 commit 68af9ef
Show file tree
Hide file tree
Showing 19 changed files with 254 additions and 59 deletions.
138 changes: 130 additions & 8 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,16 +579,22 @@ describe('Observable.create', () => {

/** @test {Observable} */
describe('Observable.lift', () => {
it('should be overrideable in a custom Observable type that composes', (done: MochaDone) => {
class MyCustomObservable<T> extends Rx.Observable<T> {
lift<R>(operator: Rx.Operator<T, R>): Rx.Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
return observable;
}

class MyCustomObservable<T> extends Rx.Observable<T> {
static from<T>(source: any) {
const observable = new MyCustomObservable<T>();
observable.source = <Rx.Observable<T>> source;
return observable;
}
lift<R>(operator: Rx.Operator<T, R>): Rx.Observable<R> {
const observable = new MyCustomObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = operator;
return observable;
}
}

it('should be overrideable in a custom Observable type that composes', (done: MochaDone) => {
const result = new MyCustomObservable((observer: Rx.Observer<number>) => {
observer.next(1);
observer.next(2);
Expand All @@ -610,6 +616,122 @@ describe('Observable.lift', () => {
});
});

it('should compose through multicast and refCount', (done: MochaDone) => {
const result = new MyCustomObservable((observer: Rx.Observer<number>) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
.multicast(() => new Rx.Subject())
.refCount()
.map((x: number) => { return 10 * x; });

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [10, 20, 30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
}, () => {
done();
});
});

it('should compose through multicast with selector function', (done: MochaDone) => {
const result = new MyCustomObservable((observer: Rx.Observer<number>) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
})
.multicast(() => new Rx.Subject(), (shared) => shared.map((x: number) => { return 10 * x; }));

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [10, 20, 30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
}, () => {
done();
});
});

it('should compose through combineLatest', () => {
const e1 = cold('-a--b-----c-d-e-|');
const e2 = cold('--1--2-3-4---| ');
const expected = '--A-BC-D-EF-G-H-|';

const result = MyCustomObservable.from(e1).combineLatest(e2, (a: any, b: any) => String(a) + String(b));

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected, {
A: 'a1', B: 'b1', C: 'b2', D: 'b3', E: 'b4', F: 'c4', G: 'd4', H: 'e4'
});
});

it('should compose through concat', () => {
const e1 = cold('--a--b-|');
const e2 = cold( '--x---y--|');
const expected = '--a--b---x---y--|';

const result = MyCustomObservable.from(e1).concat(e2, rxTestScheduler);

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
});

it('should compose through merge', () => {
const e1 = cold('-a--b-| ');
const e2 = cold('--x--y-|');
const expected = '-ax-by-|';

const result = MyCustomObservable.from(e1).merge(e2, rxTestScheduler);

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
});

it('should compose through race', () => {
const e1 = cold('---a-----b-----c----|');
const e1subs = '^ !';
const e2 = cold('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '---a-----b-----c----|';

const result = MyCustomObservable.from(e1).race(e2);

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should compose through zip', () => {
const e1 = cold('-a--b-----c-d-e-|');
const e2 = cold('--1--2-3-4---| ');
const expected = ('--A--B----C-D| ');

const result = MyCustomObservable.from(e1).zip(e2, (a: any, b: any) => String(a) + String(b));

expect(result instanceof MyCustomObservable).to.be.true;

expectObservable(result).toBe(expected, {
A: 'a1', B: 'b2', C: 'c3', D: 'd4'
});
});

it('should allow injecting behaviors into all subscribers in an operator ' +
'chain when overridden', (done: MochaDone) => {
// The custom Subscriber
Expand Down
22 changes: 21 additions & 1 deletion spec/observables/concat-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,24 @@ describe('Observable.concat', () => {
expect(vals).to.equal(r.shift());
}, null, done);
});
});

it('should use the scheduler even when one Observable is concat\'d', (done: MochaDone) => {
let e1Subscribed = false;
const e1 = Observable.defer(() => {
e1Subscribed = true;
return Observable.of('a');
});

Observable
.concat(e1, Rx.Scheduler.async)
.subscribe({
error: done,
complete: () => {
expect(e1Subscribed).to.be.true;
done();
}
});

expect(e1Subscribed).to.be.false;
});
});
20 changes: 20 additions & 0 deletions spec/observables/merge-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,24 @@ describe('Observable.merge(...observables, Scheduler, number)', () => {
const expected = '-d-a-e-b-f-c---x---y---z---|';
expectObservable(Observable.merge(e1, e2, e3, 2, rxTestScheduler)).toBe(expected);
});

it('should use the scheduler even when one Observable is merged', (done: MochaDone) => {
let e1Subscribed = false;
const e1 = Observable.defer(() => {
e1Subscribed = true;
return Observable.of('a');
});

Observable
.merge(e1, Rx.Scheduler.async)
.subscribe({
error: done,
complete: () => {
expect(e1Subscribed).to.be.true;
done();
}
});

expect(e1Subscribed).to.be.false;
});
});
4 changes: 2 additions & 2 deletions spec/operators/combineLatest-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ const Observable = Rx.Observable;
/** @test {combineLatest} */
describe('Observable.prototype.combineLatest', () => {
asDiagram('combineLatest')('should combine events from two cold observables', () => {
const e1 = hot('-a--b-----c-d-e-|');
const e2 = hot('--1--2-3-4---| ');
const e1 = cold('-a--b-----c-d-e-|');
const e2 = cold('--1--2-3-4---| ');
const expected = '--A-BC-D-EF-G-H-|';

const result = e1.combineLatest(e2, (a: any, b: any) => String(a) + String(b));
Expand Down
2 changes: 1 addition & 1 deletion spec/operators/finally-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,4 @@ describe('Observable.prototype.finally', () => {
rxTestScheduler.flush();
expect(executed).to.be.true;
});
});
});
7 changes: 5 additions & 2 deletions spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ describe('Observable.prototype.publish', () => {
published.connect();
});

it('should return a ConnectableObservable', () => {
it('should return a ConnectableObservable-ish', () => {
const source = Observable.of(1).publish();
expect(source instanceof Rx.ConnectableObservable).to.be.true;
expect(typeof (<any> source)._subscribe === 'function').to.be.true;
expect(typeof (<any> source).getSubject === 'function').to.be.true;
expect(typeof source.connect === 'function').to.be.true;
expect(typeof source.refCount === 'function').to.be.true;
});

it('should do nothing if connect is not called, despite subscriptions', () => {
Expand Down
9 changes: 6 additions & 3 deletions spec/operators/publishBehavior-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ describe('Observable.prototype.publishBehavior', () => {
published.connect();
});

it('should return a ConnectableObservable', () => {
it('should return a ConnectableObservable-ish', () => {
const source = Observable.of(1).publishBehavior(1);
expect(source instanceof Rx.ConnectableObservable).to.be.true;
expect(typeof (<any> source)._subscribe === 'function').to.be.true;
expect(typeof (<any> source).getSubject === 'function').to.be.true;
expect(typeof source.connect === 'function').to.be.true;
expect(typeof source.refCount === 'function').to.be.true;
});

it('should only emit default value if connect is not called, despite subscriptions', () => {
Expand Down Expand Up @@ -327,4 +330,4 @@ describe('Observable.prototype.publishBehavior', () => {
expect(results).to.deep.equal([]);
done();
});
});
});
10 changes: 6 additions & 4 deletions spec/operators/publishLast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ describe('Observable.prototype.publishLast', () => {
published.connect();
});

it('should return a ConnectableObservable', () => {
it('should return a ConnectableObservable-ish', () => {
const source = Observable.of(1).publishLast();

expect(source instanceof Rx.ConnectableObservable).to.be.true;
expect(typeof (<any> source)._subscribe === 'function').to.be.true;
expect(typeof (<any> source).getSubject === 'function').to.be.true;
expect(typeof source.connect === 'function').to.be.true;
expect(typeof source.refCount === 'function').to.be.true;
});

it('should do nothing if connect is not called, despite subscriptions', () => {
Expand Down Expand Up @@ -249,4 +251,4 @@ describe('Observable.prototype.publishLast', () => {
expect(subscriptions).to.equal(1);
done();
});
});
});
7 changes: 5 additions & 2 deletions spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ describe('Observable.prototype.publishReplay', () => {
published.connect();
});

it('should return a ConnectableObservable', () => {
it('should return a ConnectableObservable-ish', () => {
const source = Observable.of(1).publishReplay();
expect(source instanceof Rx.ConnectableObservable).to.be.true;
expect(typeof (<any> source)._subscribe === 'function').to.be.true;
expect(typeof (<any> source).getSubject === 'function').to.be.true;
expect(typeof source.connect === 'function').to.be.true;
expect(typeof source.refCount === 'function').to.be.true;
});

it('should do nothing if connect is not called, despite subscriptions', () => {
Expand Down
14 changes: 14 additions & 0 deletions spec/operators/race-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ describe('Observable.prototype.race', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race cold and cold and accept an Array of Observable argument', () => {
const e1 = cold('---a-----b-----c----|');
const e1subs = '^ !';
const e2 = cold('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '---a-----b-----c----|';

const result = e1.race([e2]);

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race hot and hot', () => {
const e1 = hot('---a-----b-----c----|');
const e1subs = '^ !';
Expand Down
1 change: 0 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ export {Subscriber} from './Subscriber';
export {AsyncSubject} from './AsyncSubject';
export {ReplaySubject} from './ReplaySubject';
export {BehaviorSubject} from './BehaviorSubject';
export {MulticastObservable} from './observable/MulticastObservable';
export {ConnectableObservable} from './observable/ConnectableObservable';
export {Notification} from './Notification';
export {EmptyError} from './util/EmptyError';
Expand Down
9 changes: 9 additions & 0 deletions src/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ export class ConnectableObservable<T> extends Observable<T> {
}
}

export const connectableObservableDescriptor: PropertyDescriptorMap = {
operator: { value: null },
_refCount: { value: 0, writable: true },
_subscribe: { value: (<any> ConnectableObservable.prototype)._subscribe },
getSubject: { value: (<any> ConnectableObservable.prototype).getSubject },
connect: { value: (<any> ConnectableObservable.prototype).connect },
refCount: { value: (<any> ConnectableObservable.prototype).refCount }
};

class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
constructor(destination: Subject<T>,
private connectable: ConnectableObservable<T>) {
Expand Down
21 changes: 0 additions & 21 deletions src/observable/MulticastObservable.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/operator/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export function combineLatest<T, R>(...observables: Array<ObservableInput<any> |

observables.unshift(this);

return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
return this.lift.call(new ArrayObservable(observables), new CombineLatestOperator(project));
}

/* tslint:disable:max-line-length */
Expand Down
6 changes: 5 additions & 1 deletion src/operator/concat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import { MergeAllOperator } from './mergeAll';
* @owner Observable
*/
export function concat<T, R>(...observables: Array<ObservableInput<any> | Scheduler>): Observable<R> {
return concatStatic<T, R>(this, ...observables);
return this.lift.call(concatStatic<T, R>(this, ...observables));
}

/* tslint:disable:max-line-length */
Expand Down Expand Up @@ -119,5 +119,9 @@ export function concatStatic<T, R>(...observables: Array<ObservableInput<any> |
scheduler = args.pop();
}

if (scheduler === null && observables.length === 1) {
return <Observable<R>>observables[0];
}

return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator<R>(1));
}
Loading

0 comments on commit 68af9ef

Please sign in to comment.