Skip to content

Commit

Permalink
feat(endWith): add new operator endWith
Browse files Browse the repository at this point in the history
endWith operator returns an Observable that emits given items last after items emitted by the source
Observable
  • Loading branch information
natmegs committed May 10, 2018
1 parent dc66731 commit 1b1baaa
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 0 deletions.
158 changes: 158 additions & 0 deletions spec/operators/endWith-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import { of } from 'rxjs';
import { endWith, mergeMap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';

declare function asDiagram(arg: string): Function;

declare const rxTestScheduler: TestScheduler;

/** @test {endWith} */
describe('endWith operator', () => {
const defaultStartValue = 'x';

asDiagram('endWith(s)')('should append to a cold Observable', () => {
const e1 = cold('---a--b--c--|');
const e1subs = '^ !';
const expected = '---a--b--c--(s|)';

expectObservable(e1.pipe(endWith('s'))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should end an observable with given value', () => {
const e1 = hot('--a--|');
const e1subs = '^ !';
const expected = '--a--(x|)';

expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not end with given value if source does not complete', () => {
const e1 = hot('----a-');
const e1subs = '^ ';
const expected = '----a-';

expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not end with given value if source never emits and does not completes', () => {
const e1 = cold('-');
const e1subs = '^';
const expected = '-';

expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should end with given value if source does not emit but does complete', () => {
const e1 = hot('---|');
const e1subs = '^ !';
const expected = '---(x|)';

expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit given value and complete immediately if source is empty', () => {
const e1 = cold('|');
const e1subs = '(^!)';
const expected = '(x|)';

expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should end with given value and source both if source emits single value', () => {
const e1 = cold('(a|)');
const e1subs = '(^!)';
const expected = '(ax|)';

expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should end with given values when given more than one value', () => {
const e1 = hot('-----a--|');
const e1subs = '^ !';
const expected = '-----a--(yz|)';

expectObservable(e1.pipe(endWith('y', 'z'))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raise error and not end with given value if source raises error', () => {
const e1 = hot('--#');
const e1subs = '^ !';
const expected = '--#';

expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected, defaultStartValue);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raise error immediately and not end with given value if source throws error immediately', () => {
const e1 = cold('#');
const e1subs = '(^!)';
const expected = '#';

expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected, defaultStartValue);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow unsubscribing explicitly and early', () => {
const e1 = hot('---a--b----c--d--|');
const unsub = ' ! ';
const e1subs = '^ ! ';
const expected = '---a--b---';

const result = e1.pipe(endWith('s', rxTestScheduler));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
const e1 = hot('---a--b----c--d--|');
const e1subs = '^ ! ';
const expected = '---a--b--- ';
const unsub = ' ! ';

const result = e1.pipe(
mergeMap((x: string) => of(x)),
endWith('s', rxTestScheduler),
mergeMap((x: string) => of(x))
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should end with empty if given value is not specified', () => {
const e1 = hot('-a-|');
const e1subs = '^ !';
const expected = '-a-|';

expectObservable(e1.pipe(endWith(rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should accept scheduler as last argument with single value', () => {
const e1 = hot('--a--|');
const e1subs = '^ !';
const expected = '--a--(x|)';

expectObservable(e1.pipe(endWith(defaultStartValue, rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should accept scheduler as last argument with multiple value', () => {
const e1 = hot('-----a--|');
const e1subs = '^ !';
const expected = '-----a--(yz|)';

expectObservable(e1.pipe(endWith('y', 'z', rxTestScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
50 changes: 50 additions & 0 deletions src/internal/operators/endWith.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Observable } from '../Observable';
import { fromArray } from '../observable/fromArray';
import { scalar } from '../observable/scalar';
import { empty } from '../observable/empty';
import { concat as concatStatic } from '../observable/concat';
import { isScheduler } from '../util/isScheduler';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';

/* tslint:disable:max-line-length */
export function endWith<T>(scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function endWith<T>(v1: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function endWith<T>(v1: T, v2: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function endWith<T>(v1: T, v2: T, v3: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function endWith<T>(v1: T, v2: T, v3: T, v4: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function endWith<T>(v1: T, v2: T, v3: T, v4: T, v5: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function endWith<T>(v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function endWith<T>(...array: Array<T | SchedulerLike>): MonoTypeOperatorFunction<T>;
/* tslint:enable:max-line-length */

/**
* Returns an Observable that emits the items you specify as arguments after it finishes emitting
* items emitted by the source Observable.
*
* @param {...T} values - Items you want the modified Observable to emit last.
* @param {Scheduler} [scheduler] - A {@link IScheduler} to use for scheduling
* the emissions of the `next` notifications.
* @return {Observable} An Observable that emits the items emitted by the source Observable
* and then emits the items in the specified Iterable.
* @method endWith
* @owner Observable
*/
export function endWith<T>(...array: Array<T | SchedulerLike>): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
let scheduler = <SchedulerLike>array[array.length - 1];
if (isScheduler(scheduler)) {
array.pop();
} else {
scheduler = null;
}

const len = array.length;
if (len === 1 && !scheduler) {
return concatStatic(source, scalar(array[0] as T));
} else if (len > 0) {
return concatStatic(source, fromArray(array as T[], scheduler));
} else {
return concatStatic<T>(source, empty(scheduler) as any);
}
};
}
1 change: 1 addition & 0 deletions src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export { distinct } from '../internal/operators/distinct';
export { distinctUntilChanged } from '../internal/operators/distinctUntilChanged';
export { distinctUntilKeyChanged } from '../internal/operators/distinctUntilKeyChanged';
export { elementAt } from '../internal/operators/elementAt';
export { endWith } from '../internal/operators/endWith';
export { every } from '../internal/operators/every';
export { exhaust } from '../internal/operators/exhaust';
export { exhaustMap } from '../internal/operators/exhaustMap';
Expand Down

0 comments on commit 1b1baaa

Please sign in to comment.