Skip to content

Commit

Permalink
fix(bindNodeCallback): input function context can now be properly set…
Browse files Browse the repository at this point in the history
… via output function (#2320)

Set context of input function to context of output function, so that
context can be controlled at call time and underlying observable is
not available in input function
  • Loading branch information
mpodlasin authored and benlesh committed Feb 2, 2017
1 parent cb91c76 commit 3ec315d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
36 changes: 36 additions & 0 deletions spec/observables/bindNodeCallback-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ describe('Observable.bindNodeCallback', () => {
expect(results).to.deep.equal([42, 'done']);
});

it('should set context of callback to context of boundCallback', () => {
function callback(cb) {
cb(null, this.datum);
}
const boundCallback = Observable.bindNodeCallback(callback);
const results = [];

boundCallback.call({datum: 42})
.subscribe(
(x: number) => results.push(x),
null,
() => results.push('done')
);

expect(results).to.deep.equal([42, 'done']);
});

it('should emit one value chosen by a selector', () => {
function callback(datum, cb) {
cb(null, datum);
Expand Down Expand Up @@ -128,6 +145,25 @@ describe('Observable.bindNodeCallback', () => {
expect(results).to.deep.equal([42, 'done']);
});

it('should set context of callback to context of boundCallback', () => {
function callback(cb) {
cb(null, this.datum);
}
const boundCallback = Observable.bindNodeCallback(callback, null, rxTestScheduler);
const results = [];

boundCallback.call({datum: 42})
.subscribe(
(x: number) => results.push(x),
null,
() => results.push('done')
);

rxTestScheduler.flush();

expect(results).to.deep.equal([42, 'done']);
});

it('should error if callback throws', () => {
const expected = new Error('haha no callback for you');
function callback(datum, cb) {
Expand Down
21 changes: 14 additions & 7 deletions src/observable/BoundNodeCallbackObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ export class BoundNodeCallbackObservable<T> extends Observable<T> {
* last parameter must be a callback function that `func` calls when it is
* done. The callback function is expected to follow Node.js conventions,
* where the first argument to the callback is an error, while remaining
* arguments are the callback result. The output of `bindNodeCallback` is a
* arguments are the callback result.
*
* The output of `bindNodeCallback` is a
* function that takes the same parameters as `func`, except the last one (the
* callback). When the output function is called with arguments, it will
* return an Observable where the results will be delivered to.
*
* As in {@link bindCallback}, context (`this` property) of input function will be set to context
* of returned function, when it is called.
*
* @example <caption>Read a file from the filesystem and get the data as an Observable</caption>
* import * as fs from 'fs';
* var readFileAsObservable = Rx.Observable.bindNodeCallback(fs.readFile);
Expand All @@ -69,14 +74,15 @@ export class BoundNodeCallbackObservable<T> extends Observable<T> {
static create<T>(func: Function,
selector: Function | void = undefined,
scheduler?: IScheduler): (...args: any[]) => Observable<T> {
return (...args: any[]): Observable<T> => {
return new BoundNodeCallbackObservable<T>(func, <any>selector, args, scheduler);
return function(this: any, ...args: any[]): Observable<T> {
return new BoundNodeCallbackObservable<T>(func, <any>selector, args, this, scheduler);
};
}

constructor(private callbackFunc: Function,
private selector: Function,
private args: any[],
private context: any,
public scheduler: IScheduler) {
super();
}
Expand Down Expand Up @@ -113,26 +119,27 @@ export class BoundNodeCallbackObservable<T> extends Observable<T> {
// use named function instance to avoid closure.
(<any>handler).source = this;

const result = tryCatch(callbackFunc).apply(this, args.concat(handler));
const result = tryCatch(callbackFunc).apply(this.context, args.concat(handler));
if (result === errorObject) {
subject.error(errorObject.e);
}
}
return subject.subscribe(subscriber);
} else {
return scheduler.schedule(dispatch, 0, { source: this, subscriber });
return scheduler.schedule(dispatch, 0, { source: this, subscriber, context: this.context });
}
}
}

interface DispatchState<T> {
source: BoundNodeCallbackObservable<T>;
subscriber: Subscriber<T>;
context: any;
}

function dispatch<T>(this: Action<DispatchState<T>>, state: DispatchState<T>) {
const self = (<Subscription> this);
const { source, subscriber } = state;
const { source, subscriber, context } = state;
// XXX: cast to `any` to access to the private field in `source`.
const { callbackFunc, args, scheduler } = source as any;
let subject = source.subject;
Expand Down Expand Up @@ -162,7 +169,7 @@ function dispatch<T>(this: Action<DispatchState<T>>, state: DispatchState<T>) {
// use named function to pass values in without closure
(<any>handler).source = source;

const result = tryCatch(callbackFunc).apply(this, args.concat(handler));
const result = tryCatch(callbackFunc).apply(context, args.concat(handler));
if (result === errorObject) {
subject.error(errorObject.e);
}
Expand Down

0 comments on commit 3ec315d

Please sign in to comment.