diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 9ce73c4f63..41a8dcddbc 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -3,6 +3,7 @@ import * as sinon from 'sinon'; import * as Rx from '../dist/cjs/Rx'; import {TeardownLogic} from '../dist/cjs/Subscription'; import marbleTestingSignature = require('./helpers/marble-testing'); // tslint:disable-line:no-require-imports +import { map } from '../dist/cjs/operators'; declare const { asDiagram, rxTestScheduler }; declare const cold: typeof marbleTestingSignature.cold; @@ -621,6 +622,34 @@ describe('Observable', () => { }); }); }); + + describe('pipe', () => { + it('should exist', () => { + const source = Observable.of('test'); + expect(source.pipe).to.be.a('function'); + }); + + it('should pipe multiple operations', (done) => { + Observable.of('test') + .pipe( + map((x: string) => x + x), + map((x: string) => x + '!!!') + ) + .subscribe( + x => { + expect(x).to.equal('testtest!!!'); + }, + null, + done + ); + }); + + it('should return the same observable if there are no arguments', () => { + const source = Observable.of('test'); + const result = source.pipe(); + expect(result).to.equal(source); + }); + }); }); /** @test {Observable} */ diff --git a/src/Observable.ts b/src/Observable.ts index 586c1dcca9..2745be84e7 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -7,6 +7,8 @@ import { toSubscriber } from './util/toSubscriber'; import { IfObservable } from './observable/IfObservable'; import { ErrorObservable } from './observable/ErrorObservable'; import { observable as Symbol_observable } from './symbol/observable'; +import { OperatorFunction } from './interfaces'; +import { compose } from './util/compose'; export interface Subscribable { subscribe(observerOrNext?: PartialObserver | ((value: T) => void), @@ -286,4 +288,43 @@ export class Observable implements Subscribable { [Symbol_observable]() { return this; } + + /* tslint:disable:max-line-length */ + pipe(): Observable + pipe(op1: OperatorFunction): Observable + pipe(op1: OperatorFunction, op2: OperatorFunction): Observable + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction): Observable + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction): Observable + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction): Observable + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction): Observable + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction): Observable + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction): Observable + pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction): Observable + /* tslint:enable:max-line-length */ + + /** + * Used to stitch together functional operators into a chain. + * @method pipe + * @return {Observable} the Observable result of all of the operators having + * been called in the order they were passed in. + * + * @example + * + * import { map, filter, scan } from 'rxjs/operators'; + * + * Rx.Observable.interval(1000) + * .pipe( + * filter(x => x % 2 === 0), + * map(x => x + x), + * scan((acc, x) => acc + x) + * ) + * .subscribe(x => console.log(x)) + */ + pipe(...operations: OperatorFunction[]): Observable { + if (operations.length === 0) { + return this as any; + } + + return compose.apply(this, operations)(this); + } } diff --git a/src/util/compose.ts b/src/util/compose.ts index 017b74f4aa..94283b825b 100644 --- a/src/util/compose.ts +++ b/src/util/compose.ts @@ -1,6 +1,19 @@ import { noop } from './noop'; import { UnaryFunction } from '../interfaces'; +/* tslint:disable:max-line-length */ +export function compose(): UnaryFunction; +export function compose(op1: UnaryFunction): UnaryFunction; +export function compose(op1: UnaryFunction, op2: UnaryFunction): UnaryFunction; +export function compose(op1: UnaryFunction, op2: UnaryFunction, op3: UnaryFunction): UnaryFunction; +export function compose(op1: UnaryFunction, op2: UnaryFunction, op3: UnaryFunction, op4: UnaryFunction): UnaryFunction; +export function compose(op1: UnaryFunction, op2: UnaryFunction, op3: UnaryFunction, op4: UnaryFunction, op5: UnaryFunction): UnaryFunction; +export function compose(op1: UnaryFunction, op2: UnaryFunction, op3: UnaryFunction, op4: UnaryFunction, op5: UnaryFunction, op6: UnaryFunction): UnaryFunction; +export function compose(op1: UnaryFunction, op2: UnaryFunction, op3: UnaryFunction, op4: UnaryFunction, op5: UnaryFunction, op6: UnaryFunction, op7: UnaryFunction): UnaryFunction; +export function compose(op1: UnaryFunction, op2: UnaryFunction, op3: UnaryFunction, op4: UnaryFunction, op5: UnaryFunction, op6: UnaryFunction, op7: UnaryFunction, op8: UnaryFunction): UnaryFunction; +export function compose(op1: UnaryFunction, op2: UnaryFunction, op3: UnaryFunction, op4: UnaryFunction, op5: UnaryFunction, op6: UnaryFunction, op7: UnaryFunction, op8: UnaryFunction, op9: UnaryFunction): UnaryFunction; +/* tslint:enable:max-line-length */ + export function compose(...fns: Array>): UnaryFunction { if (!fns) { return noop as UnaryFunction;