Skip to content

Commit

Permalink
Merge pull request #1911 from DomiR/master
Browse files Browse the repository at this point in the history
feat(operator): Add repeatWhen operator
  • Loading branch information
jayphelps authored Sep 1, 2016
2 parents 843d135 + c288d88 commit a61c8b2
Show file tree
Hide file tree
Showing 6 changed files with 468 additions and 1 deletion.
7 changes: 6 additions & 1 deletion doc/decision-tree-widget/tree.yml
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,12 @@ children:
children:
- label: I want to re-subscribe
children:
- label: repeat
- label: immediately
children:
- label: repeat
- label: when another Observable emits
children:
- label: repeatWhen
- label: I want to start a new Observable
children:
- label: concat
Expand Down
1 change: 1 addition & 0 deletions doc/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ There are operators for different purposes, and they may be categorized as: crea
- [`never`](../class/es6/Observable.js~Observable.html#static-method-never)
- [`of`](../class/es6/Observable.js~Observable.html#static-method-of)
- `repeat`
- `repeatWhen`
- [`range`](../class/es6/Observable.js~Observable.html#static-method-range)
- [`throw`](../class/es6/Observable.js~Observable.html#static-method-throw)
- [`timer`](../class/es6/Observable.js~Observable.html#static-method-timer)
Expand Down
323 changes: 323 additions & 0 deletions spec/operators/repeatWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;

/** @test {repeatWhen} */
describe('Observable.prototype.repeatWhen', () => {
asDiagram('repeatWhen')('should handle a source with eventual complete using a hot notifier', () => {
const source = cold('-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ !'];
const notifier = hot('-------------r------------r-|');
const expected = '-1--2---------1--2---------1|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should handle a source with eventual complete using a hot notifier that raises error', () => {
const source = cold( '-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ ! '];
const notifier = hot('-----------r-------r---------#');
const expected = '-1--2-------1--2----1--2-----#';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should retry when notified via returned notifier on complete', (done: MochaDone) => {
let retried = false;
const expected = [1, 2, 1, 2];
let i = 0;
Observable.of(1, 2)
.map((n: number) => {
return n;
})
.repeatWhen((notifications: any) => notifications.map((x: any) => {
if (retried) {
throw new Error('done');
}
retried = true;
return x;
}))
.subscribe((x: any) => {
expect(x).to.equal(expected[i++]);
},
(err: any) => {
expect(err).to.be.an('error', 'done');
done();
});
});

it('should retry when notified and complete on returned completion', (done: MochaDone) => {
const expected = [1, 2, 1, 2];
Observable.of(1, 2)
.map((n: number) => {
return n;
})
.repeatWhen((notifications: any) => Observable.empty())
.subscribe((n: number) => {
expect(n).to.equal(expected.shift());
}, (err: any) => {
done(new Error('should not be called'));
}, () => {
done();
});
});

it('should apply an empty notifier on an empty source', () => {
const source = cold( '|');
const subs = '(^!)';
const notifier = cold('|');
const expected = '|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should apply a never notifier on an empty source', () => {
const source = cold( '|');
const subs = '(^!)';
const notifier = cold('-');
const expected = '-';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should apply an empty notifier on a never source', () => {
const source = cold( '-');
const unsub = ' !';
const subs = '^ !';
const notifier = cold('|');
const expected = '-';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should apply a never notifier on a never source', () => {
const source = cold( '-');
const unsub = ' !';
const subs = '^ !';
const notifier = cold('-');
const expected = '-';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should return an empty observable given a just-throw source and empty notifier', () => {
const source = cold( '#');
const notifier = cold('|');
const expected = '#';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
});

it('should return a error observable given a just-throw source and never notifier', () => {
const source = cold( '#');
const notifier = cold('-');
const expected = '#';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
});

xit('should hide errors using a never notifier on a source with eventual error', () => {
const source = cold( '--a--b--c--#');
const subs = '^ !';
const notifier = cold( '-');
const expected = '--a--b--c---------------------------------';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

xit('should propagate error thrown from notifierSelector function', () => {
const source = cold('--a--b--c--|');
const subs = '^ !';
const expected = '--a--b--c--#';

const result = source.repeatWhen(<any>(() => { throw 'bad!'; }));

expectObservable(result).toBe(expected, undefined, 'bad!');
expectSubscriptions(source.subscriptions).toBe(subs);
});

xit('should replace error with complete using an empty notifier on a source ' +
'with eventual error', () => {
const source = cold( '--a--b--c--#');
const subs = '^ !';
const notifier = cold( '|');
const expected = '--a--b--c--|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should mirror a basic cold source with complete, given a never notifier', () => {
const source = cold( '--a--b--c--|');
const subs = '^ !';
const notifier = cold( '|');
const expected = '--a--b--c--|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should mirror a basic cold source with no termination, given a never notifier', () => {
const source = cold( '--a--b--c---');
const subs = '^ ';
const notifier = cold( '|');
const expected = '--a--b--c---';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should mirror a basic hot source with complete, given a never notifier', () => {
const source = hot('-a-^--b--c--|');
const subs = '^ !';
const notifier = cold( '|');
const expected = '---b--c--|';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

xit('should handle a hot source that raises error but eventually completes', () => {
const source = hot('-1--2--3----4--5---|');
const ssubs = ['^ ! ',
' ^ !'];
const notifier = hot('--------------r--------r---r--r--r---|');
const nsubs = ' ^ !';
const expected = '-1--2--- -5---|';

const result = source
.map((x: string) => {
if (x === '3') {
throw 'error';
}
return x;
}).repeatWhen(() => notifier);

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(ssubs);
expectSubscriptions(notifier.subscriptions).toBe(nsubs);
});

it('should tear down resources when result is unsubscribed early', () => {
const source = cold( '-1--2--|');
const unsub = ' ! ';
const subs = ['^ ! ',
' ^ ! ',
' ^ ! '];
const notifier = hot('---------r-------r---------#');
const nsubs = ' ^ ! ';
const expected = '-1--2-----1--2----1-- ';

const result = source.repeatWhen((notifications: any) => notifier);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(notifier.subscriptions).toBe(nsubs);
});

it('should not break unsubscription chains when unsubscribed explicitly', () => {
const source = cold( '-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ ! '];
const notifier = hot('---------r-------r-------r-#');
const nsubs = ' ^ ! ';
const expected = '-1--2-----1--2----1-- ';
const unsub = ' ! ';

const result = source
.mergeMap((x: string) => Observable.of(x))
.repeatWhen((notifications: any) => notifier)
.mergeMap((x: string) => Observable.of(x));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(notifier.subscriptions).toBe(nsubs);
});

it('should handle a source with eventual error using a dynamic notifier ' +
'selector which eventually throws', () => {
const source = cold('-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-1--2---1--2---1--2--#';

let invoked = 0;
const result = source.repeatWhen((notifications: any) =>
notifications.map((err: any) => {
if (++invoked === 3) {
throw 'error';
} else {
return 'x';
}
}));

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should handle a source with eventual error using a dynamic notifier ' +
'selector which eventually completes', () => {
const source = cold('-1--2--|');
const subs = ['^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-1--2---1--2---1--2--|';

let invoked = 0;
const result = source.repeatWhen((notifications: any) => notifications
.map(() => 'x')
.takeUntil(
notifications.flatMap(() => {
if (++invoked < 3) {
return Observable.empty();
} else {
return Observable.of('stop!');
}
})
));

expectObservable(result).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ import './add/operator/publishLast';
import './add/operator/race';
import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/repeatWhen';
import './add/operator/retry';
import './add/operator/retryWhen';
import './add/operator/sample';
Expand Down
11 changes: 11 additions & 0 deletions src/add/operator/repeatWhen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

import { Observable } from '../../Observable';
import { repeatWhen, RepeatWhenSignature } from '../../operator/repeatWhen';

Observable.prototype.repeatWhen = repeatWhen;

declare module '../../Observable' {
interface Observable<T> {
repeatWhen: RepeatWhenSignature<T>;
}
}
Loading

0 comments on commit a61c8b2

Please sign in to comment.