diff --git a/spec/observable-spec.js b/spec/observable-spec.js index 8a0af17495..b9ac8b841f 100644 --- a/spec/observable-spec.js +++ b/spec/observable-spec.js @@ -27,7 +27,7 @@ describe('Observable', function () { var expected = [1,2,3]; var result = Observable.of(1,2,3).forEach(function (x) { expect(x).toBe(expected.shift()); - }, Promise) + }, null, Promise) .then(done); expect(typeof result.then).toBe('function'); @@ -41,7 +41,7 @@ describe('Observable', function () { }, function (err) { expect(err).toBe('bad'); done(); - }, Promise); + }, null, Promise); }); it('should allow Promise to be globally configured', function (done) { @@ -60,6 +60,18 @@ describe('Observable', function () { done(); }); }); + + it('should accept a thisArg argument', function (done) { + var expected = [1,2,3]; + var thisArg = {}; + var result = Observable.of(1,2,3).forEach(function (x) { + expect(this).toBe(thisArg); + expect(x).toBe(expected.shift()); + }, thisArg, Promise) + .then(done); + + expect(typeof result.then).toBe('function'); + }); }); describe('subscribe', function () { diff --git a/spec/operators/skipWhile-spec.js b/spec/operators/skipWhile-spec.js index 83aea05bfe..9888d5104f 100644 --- a/spec/operators/skipWhile-spec.js +++ b/spec/operators/skipWhile-spec.js @@ -160,23 +160,6 @@ describe('Observable.prototype.skipWhile()', function () { expectSubscriptions(source.subscriptions).toBe(sourceSubs); }); - it('should accept a thisArg', function () { - var source = hot('-1-^--2--3--4--5--6--|'); - var sourceSubs = '^ !'; - var expected = '---------4--5--6--|'; - - function Skiper() { - this.doSkip = function (v) { return +v < 4; }; - } - - var skiper = new Skiper(); - - expectObservable( - source.skipWhile(function (v) { return this.doSkip(v); }, skiper) - ).toBe(expected); - expectSubscriptions(source.subscriptions).toBe(sourceSubs); - }); - it('should handle Observable.empty', function () { var source = cold('|'); var subs = '(^!)'; diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 433f5ad6fc..60e595de1a 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -30,7 +30,7 @@ export interface CoreOperators { filter?: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable; finally?: (finallySelector: () => void) => Observable; first?: (predicate?: (value: T, index: number, source: Observable) => boolean, - resultSelector?: (value: T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable | Observable; + resultSelector?: (value: T, index: number) => R, defaultValue?: any) => Observable | Observable; flatMap?: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable; @@ -41,7 +41,7 @@ export interface CoreOperators { ignoreElements?: () => Observable; last?: (predicate?: (value: T, index: number) => boolean, resultSelector?: (value: T, index: number) => R, - thisArg?: any, defaultValue?: any) => Observable | Observable; + defaultValue?: any) => Observable | Observable; every?: (predicate: (value: T, index: number) => boolean, thisArg?: any) => Observable; map?: (project: (x: T, ix?: number) => R, thisArg?: any) => Observable; mapTo?: (value: R) => Observable; @@ -69,7 +69,7 @@ export interface CoreOperators { single?: (predicate?: (value: T, index: number) => boolean) => Observable; skip?: (count: number) => Observable; skipUntil?: (notifier: Observable) => Observable; - skipWhile?: (predicate: (x: T, index: number) => boolean, thisArg?: any) => Observable; + skipWhile?: (predicate: (x: T, index: number) => boolean) => Observable; startWith?: (x: T) => Observable; subscribeOn?: (scheduler: Scheduler, delay?: number) => Observable; switch?: () => Observable; diff --git a/src/Observable.ts b/src/Observable.ts index 1dfb5d1c69..2a2cd95cd0 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -110,11 +110,12 @@ export class Observable implements CoreOperators { /** * @method forEach * @param {Function} next a handler for each value emitted by the observable - * @param {PromiseConstructor} PromiseCtor? a constructor function used to instantiate the Promise + * @param {any} [thisArg] a `this` context for the `next` handler function + * @param {PromiseConstructor} [PromiseCtor] a constructor function used to instantiate the Promise * @returns {Promise} a promise that either resolves on observable completion or * rejects with the handled error */ - forEach(next: (value: T) => void, PromiseCtor?: PromiseConstructor): Promise { + forEach(next: (value: T) => void, thisArg: any, PromiseCtor?: PromiseConstructor): Promise { if (!PromiseCtor) { if (root.Rx && root.Rx.config && root.Rx.config.Promise) { PromiseCtor = root.Rx.config.Promise; @@ -127,9 +128,27 @@ export class Observable implements CoreOperators { throw new Error('no Promise impl found'); } - return new PromiseCtor((resolve, reject) => { - this.subscribe(next, reject, resolve); - }); + let nextHandler; + + if (thisArg) { + nextHandler = function nextHandlerFn(value: any): void { + const { thisArg, next } = nextHandlerFn; + return next.call(thisArg, value); + }; + nextHandler.thisArg = thisArg; + nextHandler.next = next; + } else { + nextHandler = next; + } + + const promiseCallback = function promiseCallbackFn(resolve, reject) { + const { source, nextHandler } = promiseCallbackFn; + source.subscribe(nextHandler, reject, resolve); + }; + (promiseCallback).source = this; + (promiseCallback).nextHandler = nextHandler; + + return new PromiseCtor(promiseCallback); } _subscribe(subscriber: Subscriber): Subscription | Function | void { diff --git a/src/operator/skipWhile.ts b/src/operator/skipWhile.ts index 46365645da..f0dc49d720 100644 --- a/src/operator/skipWhile.ts +++ b/src/operator/skipWhile.ts @@ -3,17 +3,13 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; -import {bindCallback} from '../util/bindCallback'; -export function skipWhile(predicate: (x: T, index: number) => boolean, thisArg?: any): Observable { - return this.lift(new SkipWhileOperator(predicate, thisArg)); +export function skipWhile(predicate: (x: T, index: number) => boolean): Observable { + return this.lift(new SkipWhileOperator(predicate)); } class SkipWhileOperator implements Operator { - private predicate: (x: T, index: number) => boolean; - - constructor(predicate: (x: T, index: number) => boolean, thisArg?: any) { - this.predicate = <(x: T, index: number) => boolean>bindCallback(predicate, thisArg, 2); + constructor(private predicate: (x: T, index: number) => boolean) { } call(subscriber: Subscriber): Subscriber {