Skip to content

Commit

Permalink
feat(operators): higher-order lettables of reduce, min, max and defau…
Browse files Browse the repository at this point in the history
…ltIfEmpty added
  • Loading branch information
benlesh committed Jun 15, 2017
1 parent cd7e7dd commit 9974fc2
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 111 deletions.
41 changes: 3 additions & 38 deletions src/operator/defaultIfEmpty.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Operator } from '../Operator';

import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { defaultIfEmpty as higherOrder } from '../operators';

/* tslint:disable:max-line-length */
export function defaultIfEmpty<T>(this: Observable<T>, defaultValue?: T): Observable<T>;
Expand Down Expand Up @@ -38,40 +38,5 @@ export function defaultIfEmpty<T, R>(this: Observable<T>, defaultValue?: R): Obs
* @owner Observable
*/
export function defaultIfEmpty<T, R>(this: Observable<T>, defaultValue: R = null): Observable<T | R> {
return this.lift(new DefaultIfEmptyOperator(defaultValue));
}

class DefaultIfEmptyOperator<T, R> implements Operator<T, T | R> {

constructor(private defaultValue: R) {
}

call(subscriber: Subscriber<T | R>, source: any): any {
return source.subscribe(new DefaultIfEmptySubscriber(subscriber, this.defaultValue));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DefaultIfEmptySubscriber<T, R> extends Subscriber<T> {
private isEmpty: boolean = true;

constructor(destination: Subscriber<T | R>, private defaultValue: R) {
super(destination);
}

protected _next(value: T): void {
this.isEmpty = false;
this.destination.next(value);
}

protected _complete(): void {
if (this.isEmpty) {
this.destination.next(this.defaultValue);
}
this.destination.complete();
}
return higherOrder<T, R>(defaultValue)(this);
}
7 changes: 2 additions & 5 deletions src/operator/max.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Observable } from '../Observable';
import { ReduceOperator } from './reduce';
import { max as higherOrderMax } from '../operators';

/**
* The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function),
Expand Down Expand Up @@ -33,8 +33,5 @@ import { ReduceOperator } from './reduce';
* @owner Observable
*/
export function max<T>(this: Observable<T>, comparer?: (x: T, y: T) => number): Observable<T> {
const max: (x: T, y: T) => T = (typeof comparer === 'function')
? (x, y) => comparer(x, y) > 0 ? x : y
: (x, y) => x > y ? x : y;
return this.lift(new ReduceOperator(max));
return higherOrderMax(comparer)(this);
}
7 changes: 2 additions & 5 deletions src/operator/min.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Observable } from '../Observable';
import { ReduceOperator } from './reduce';
import { min as higherOrderMin } from '../operators';

/**
* The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function),
Expand Down Expand Up @@ -33,8 +33,5 @@ import { ReduceOperator } from './reduce';
* @owner Observable
*/
export function min<T>(this: Observable<T>, comparer?: (x: T, y: T) => number): Observable<T> {
const min: (x: T, y: T) => T = (typeof comparer === 'function')
? (x, y) => comparer(x, y) < 0 ? x : y
: (x, y) => x < y ? x : y;
return this.lift(new ReduceOperator(min));
return higherOrderMin(comparer)(this);
}
66 changes: 3 additions & 63 deletions src/operator/reduce.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { reduce as higherOrderReduce } from '../operators';

/* tslint:disable:max-line-length */
export function reduce<T>(this: Observable<T>, accumulator: (acc: T, value: T, index: number) => T, seed?: T): Observable<T>;
Expand Down Expand Up @@ -53,73 +52,14 @@ export function reduce<T, R>(this: Observable<T>, accumulator: (acc: R, value: T
* @owner Observable
*/
export function reduce<T, R>(this: Observable<T>, accumulator: (acc: R, value: T, index?: number) => R, seed?: R): Observable<R> {
let hasSeed = false;
// providing a seed of `undefined` *should* be valid and trigger
// hasSeed! so don't use `seed !== undefined` checks!
// For this reason, we have to check it here at the original call site
// otherwise inside Operator/Subscriber we won't know if `undefined`
// means they didn't provide anything or if they literally provided `undefined`
if (arguments.length >= 2) {
hasSeed = true;
return higherOrderReduce(accumulator, seed)(this);
}

return this.lift(new ReduceOperator(accumulator, seed, hasSeed));
}

export class ReduceOperator<T, R> implements Operator<T, R> {
constructor(private accumulator: (acc: R, value: T, index?: number) => R, private seed?: R, private hasSeed: boolean = false) {}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new ReduceSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
export class ReduceSubscriber<T, R> extends Subscriber<T> {
private index: number = 0;
private acc: T | R;
private hasValue: boolean = false;

constructor(destination: Subscriber<R>,
private accumulator: (acc: R, value: T, index?: number) => R,
seed: R,
private hasSeed: boolean) {
super(destination);
this.acc = seed;

if (!this.hasSeed) {
this.index++;
}
}

protected _next(value: T) {
if (this.hasValue || (this.hasValue = this.hasSeed)) {
this._tryReduce(value);
} else {
this.acc = value;
this.hasValue = true;
}
}

private _tryReduce(value: T) {
let result: any;
try {
result = this.accumulator(<R>this.acc, value, this.index++);
} catch (err) {
this.destination.error(err);
return;
}
this.acc = result;
}

protected _complete() {
if (this.hasValue || this.hasSeed) {
this.destination.next(this.acc);
}
this.destination.complete();
}
return higherOrderReduce(accumulator)(this);
}
80 changes: 80 additions & 0 deletions src/operators/defaultIfEmpty.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { OperatorFunction } from '../interfaces';

/* tslint:disable:max-line-length */
export function defaultIfEmpty<T>(defaultValue?: T): OperatorFunction<T, T>;
export function defaultIfEmpty<T, R>(defaultValue?: R): OperatorFunction<T, T | R>;
/* tslint:enable:max-line-length */

/**
* Emits a given value if the source Observable completes without emitting any
* `next` value, otherwise mirrors the source Observable.
*
* <span class="informal">If the source Observable turns out to be empty, then
* this operator will emit a default value.</span>
*
* <img src="./img/defaultIfEmpty.png" width="100%">
*
* `defaultIfEmpty` emits the values emitted by the source Observable or a
* specified default value if the source Observable is empty (completes without
* having emitted any `next` value).
*
* @example <caption>If no clicks happen in 5 seconds, then emit "no clicks"</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
* var result = clicksBeforeFive.defaultIfEmpty('no clicks');
* result.subscribe(x => console.log(x));
*
* @see {@link empty}
* @see {@link last}
*
* @param {any} [defaultValue=null] The default value used if the source
* Observable is empty.
* @return {Observable} An Observable that emits either the specified
* `defaultValue` if the source Observable emits no items, or the values emitted
* by the source Observable.
* @method defaultIfEmpty
* @owner Observable
*/
export function defaultIfEmpty<T, R>(defaultValue: R = null): OperatorFunction<T, T | R> {
return function defaultIfEmptyOperatorFunction(source: Observable<T>) {
return source.lift(new DefaultIfEmptyOperator(defaultValue));
};
}

class DefaultIfEmptyOperator<T, R> implements Operator<T, T | R> {

constructor(private defaultValue: R) {
}

call(subscriber: Subscriber<T | R>, source: any): any {
return source.subscribe(new DefaultIfEmptySubscriber(subscriber, this.defaultValue));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DefaultIfEmptySubscriber<T, R> extends Subscriber<T> {
private isEmpty: boolean = true;

constructor(destination: Subscriber<T | R>, private defaultValue: R) {
super(destination);
}

protected _next(value: T): void {
this.isEmpty = false;
this.destination.next(value);
}

protected _complete(): void {
if (this.isEmpty) {
this.destination.next(this.defaultValue);
}
this.destination.complete();
}
}
4 changes: 4 additions & 0 deletions src/operators/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
export { concatMap } from './concatMap';
export { defaultIfEmpty } from './defaultIfEmpty';
export { filter } from './filter';
export { map } from './map';
export { max } from './max';
export { mergeMap } from './mergeMap';
export { min } from './min';
export { reduce } from './reduce';
export { scan } from './scan';
export { switchMap } from './switchMap';
export { takeLast } from './takeLast';
41 changes: 41 additions & 0 deletions src/operators/max.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { reduce } from './reduce';
import { OperatorFunction } from '../interfaces';

/**
* The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function),
* and when source Observable completes it emits a single item: the item with the largest value.
*
* <img src="./img/max.png" width="100%">
*
* @example <caption>Get the maximal value of a series of numbers</caption>
* Rx.Observable.of(5, 4, 7, 2, 8)
* .max()
* .subscribe(x => console.log(x)); // -> 8
*
* @example <caption>Use a comparer function to get the maximal item</caption>
* interface Person {
* age: number,
* name: string
* }
* Observable.of<Person>({age: 7, name: 'Foo'},
* {age: 5, name: 'Bar'},
* {age: 9, name: 'Beer'})
* .max<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
* .subscribe((x: Person) => console.log(x.name)); // -> 'Beer'
* }
*
* @see {@link min}
*
* @param {Function} [comparer] - Optional comparer function that it will use instead of its default to compare the
* value of two items.
* @return {Observable} An Observable that emits item with the largest value.
* @method max
* @owner Observable
*/
export function max<T>(comparer?: (x: T, y: T) => number): OperatorFunction<T, T> {
const max: (x: T, y: T) => T = (typeof comparer === 'function')
? (x, y) => comparer(x, y) > 0 ? x : y
: (x, y) => x > y ? x : y;

return reduce(max);
}
40 changes: 40 additions & 0 deletions src/operators/min.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { reduce } from './reduce';
import { OperatorFunction } from '../interfaces';

/**
* The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function),
* and when source Observable completes it emits a single item: the item with the smallest value.
*
* <img src="./img/min.png" width="100%">
*
* @example <caption>Get the minimal value of a series of numbers</caption>
* Rx.Observable.of(5, 4, 7, 2, 8)
* .min()
* .subscribe(x => console.log(x)); // -> 2
*
* @example <caption>Use a comparer function to get the minimal item</caption>
* interface Person {
* age: number,
* name: string
* }
* Observable.of<Person>({age: 7, name: 'Foo'},
* {age: 5, name: 'Bar'},
* {age: 9, name: 'Beer'})
* .min<Person>( (a: Person, b: Person) => a.age < b.age ? -1 : 1)
* .subscribe((x: Person) => console.log(x.name)); // -> 'Bar'
* }
*
* @see {@link max}
*
* @param {Function} [comparer] - Optional comparer function that it will use instead of its default to compare the
* value of two items.
* @return {Observable<R>} An Observable that emits item with the smallest value.
* @method min
* @owner Observable
*/
export function min<T>(comparer?: (x: T, y: T) => number): OperatorFunction<T, T> {
const min: (x: T, y: T) => T = (typeof comparer === 'function')
? (x, y) => comparer(x, y) < 0 ? x : y
: (x, y) => x < y ? x : y;
return reduce(min);
}
Loading

0 comments on commit 9974fc2

Please sign in to comment.