Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates for thisArgs issue #992

Merged
merged 2 commits into from
Dec 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions spec/observable-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('Observable', function () {
var expected = [1,2,3];
var result = Observable.of(1,2,3).forEach(function (x) {
expect(x).toBe(expected.shift());
}, Promise)
}, null, Promise)
.then(done);

expect(typeof result.then).toBe('function');
Expand All @@ -41,7 +41,7 @@ describe('Observable', function () {
}, function (err) {
expect(err).toBe('bad');
done();
}, Promise);
}, null, Promise);
});

it('should allow Promise to be globally configured', function (done) {
Expand All @@ -60,6 +60,18 @@ describe('Observable', function () {
done();
});
});

it('should accept a thisArg argument', function (done) {
var expected = [1,2,3];
var thisArg = {};
var result = Observable.of(1,2,3).forEach(function (x) {
expect(this).toBe(thisArg);
expect(x).toBe(expected.shift());
}, thisArg, Promise)
.then(done);

expect(typeof result.then).toBe('function');
});
});

describe('subscribe', function () {
Expand Down
17 changes: 0 additions & 17 deletions spec/operators/skipWhile-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,6 @@ describe('Observable.prototype.skipWhile()', function () {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should accept a thisArg', function () {
var source = hot('-1-^--2--3--4--5--6--|');
var sourceSubs = '^ !';
var expected = '---------4--5--6--|';

function Skiper() {
this.doSkip = function (v) { return +v < 4; };
}

var skiper = new Skiper();

expectObservable(
source.skipWhile(function (v) { return this.doSkip(v); }, skiper)
).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should handle Observable.empty', function () {
var source = cold('|');
var subs = '(^!)';
Expand Down
6 changes: 3 additions & 3 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export interface CoreOperators<T> {
filter?: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
finally?: (finallySelector: () => void) => Observable<T>;
first?: <R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean,
resultSelector?: (value: T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable<T> | Observable<R>;
resultSelector?: (value: T, index: number) => R, defaultValue?: any) => Observable<T> | Observable<R>;
flatMap?: <R>(project: ((x: T, ix: number) => Observable<any>),
projectResult?: (x: T, y: any, ix: number, iy: number) => R,
concurrent?: number) => Observable<R>;
Expand All @@ -41,7 +41,7 @@ export interface CoreOperators<T> {
ignoreElements?: () => Observable<T>;
last?: <R>(predicate?: (value: T, index: number) => boolean,
resultSelector?: (value: T, index: number) => R,
thisArg?: any, defaultValue?: any) => Observable<T> | Observable<R>;
defaultValue?: any) => Observable<T> | Observable<R>;
every?: (predicate: (value: T, index: number) => boolean, thisArg?: any) => Observable<T>;
map?: <R>(project: (x: T, ix?: number) => R, thisArg?: any) => Observable<R>;
mapTo?: <R>(value: R) => Observable<R>;
Expand Down Expand Up @@ -69,7 +69,7 @@ export interface CoreOperators<T> {
single?: (predicate?: (value: T, index: number) => boolean) => Observable<T>;
skip?: (count: number) => Observable<T>;
skipUntil?: (notifier: Observable<any>) => Observable<T>;
skipWhile?: (predicate: (x: T, index: number) => boolean, thisArg?: any) => Observable<T>;
skipWhile?: (predicate: (x: T, index: number) => boolean) => Observable<T>;
startWith?: (x: T) => Observable<T>;
subscribeOn?: (scheduler: Scheduler, delay?: number) => Observable<T>;
switch?: () => Observable<T>;
Expand Down
29 changes: 24 additions & 5 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ export class Observable<T> implements CoreOperators<T> {
/**
* @method forEach
* @param {Function} next a handler for each value emitted by the observable
* @param {PromiseConstructor} PromiseCtor? a constructor function used to instantiate the Promise
* @param {any} [thisArg] a `this` context for the `next` handler function
* @param {PromiseConstructor} [PromiseCtor] a constructor function used to instantiate the Promise
* @returns {Promise} a promise that either resolves on observable completion or
* rejects with the handled error
*/
forEach(next: (value: T) => void, PromiseCtor?: PromiseConstructor): Promise<void> {
forEach(next: (value: T) => void, thisArg: any, PromiseCtor?: PromiseConstructor): Promise<void> {
if (!PromiseCtor) {
if (root.Rx && root.Rx.config && root.Rx.config.Promise) {
PromiseCtor = root.Rx.config.Promise;
Expand All @@ -127,9 +128,27 @@ export class Observable<T> implements CoreOperators<T> {
throw new Error('no Promise impl found');
}

return new PromiseCtor<void>((resolve, reject) => {
this.subscribe(next, reject, resolve);
});
let nextHandler;

if (thisArg) {
nextHandler = function nextHandlerFn(value: any): void {
const { thisArg, next } = <any>nextHandlerFn;
return next.call(thisArg, value);
};
nextHandler.thisArg = thisArg;
nextHandler.next = next;
} else {
nextHandler = next;
}

const promiseCallback = function promiseCallbackFn(resolve, reject) {
const { source, nextHandler } = <any>promiseCallbackFn;
source.subscribe(nextHandler, reject, resolve);
};
(<any>promiseCallback).source = this;
(<any>promiseCallback).nextHandler = nextHandler;

return new PromiseCtor<void>(promiseCallback);
}

_subscribe(subscriber: Subscriber<any>): Subscription<T> | Function | void {
Expand Down
10 changes: 3 additions & 7 deletions src/operator/skipWhile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {bindCallback} from '../util/bindCallback';

export function skipWhile<T>(predicate: (x: T, index: number) => boolean, thisArg?: any): Observable<T> {
return this.lift(new SkipWhileOperator(predicate, thisArg));
export function skipWhile<T>(predicate: (x: T, index: number) => boolean): Observable<T> {
return this.lift(new SkipWhileOperator(predicate));
}

class SkipWhileOperator<T, R> implements Operator<T, R> {
private predicate: (x: T, index: number) => boolean;

constructor(predicate: (x: T, index: number) => boolean, thisArg?: any) {
this.predicate = <(x: T, index: number) => boolean>bindCallback(predicate, thisArg, 2);
constructor(private predicate: (x: T, index: number) => boolean) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
Expand Down