Skip to content

Commit

Permalink
fix(bindNodeCallback): ensure underlying function is not called twice…
Browse files Browse the repository at this point in the history
… during subscription (#5780)

- Fixes an issue where `bindNodeCallback` observables would call the underlying function twice if it has not called back yet, and more than one subscription occurred.
- Reduces the size of the implementation
- Adds more comments to the file.
  • Loading branch information
benlesh authored Sep 30, 2020
1 parent 0ace906 commit 74aa4b2
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 96 deletions.
25 changes: 25 additions & 0 deletions spec/observables/bindNodeCallback-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,29 @@ describe('bindNodeCallback', () => {

expect(receivedError).to.equal('kaboom');
});

it('should not call the function if subscribed twice in a row before it resolves', () => {
let executeCallback: any;
let calls = 0;
function myFunc(callback: (error: any, result: any) => void) {
calls++;
if (calls > 1) {
throw new Error('too many calls to myFunc');
}
executeCallback = callback;
}

const source$ = bindNodeCallback(myFunc)();

let result1: any;
let result2: any;
source$.subscribe(value => result1 = value);
source$.subscribe(value => result2 = value);

expect(calls).to.equal(1);
executeCallback(null, 'test');
expect(result1).to.equal('test');
expect(result2).to.equal('test');
expect(calls).to.equal(1);
})
});
173 changes: 77 additions & 96 deletions src/internal/observable/bindNodeCallback.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/** @prettier */
import { AsyncSubject } from '../AsyncSubject';
import { Observable } from '../Observable';
import { observeOn } from '../operators/observeOn';
import { SchedulerLike } from '../types';
import { isScheduler } from '../util/isScheduler';
import { mapOneOrManyArgs } from '../util/mapOneOrManyArgs';

/* tslint:disable:max-line-length */
/** @deprecated resultSelector is deprecated, pipe to map instead */
export function bindNodeCallback(
callbackFunc: Function,
Expand Down Expand Up @@ -146,7 +147,7 @@ export function bindNodeCallback<A1, A2, A3, A4, A5, R1>(
export function bindNodeCallback<A1, A2, A3, A4, A5>(
callbackFunc: (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5, callback: (err: any) => any) => any,
scheduler?: SchedulerLike
): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable<void>; /* tslint:enable:max-line-length */
): (arg1: A1, arg2: A2, arg3: A3, arg4: A4, arg5: A5) => Observable<void>;

export function bindNodeCallback(callbackFunc: Function, scheduler?: SchedulerLike): (...args: any[]) => Observable<any[]>;
/**
Expand Down Expand Up @@ -258,114 +259,94 @@ export function bindNodeCallback<T>(
callbackFunc: Function,
resultSelector?: Function | SchedulerLike,
scheduler?: SchedulerLike
): (...args: any[]) => Observable<T> {
): (...args: any[]) => Observable<any> {
if (resultSelector) {
if (isScheduler(resultSelector)) {
scheduler = resultSelector;
} else {
// DEPRECATED PATH
return (...args: any[]) => bindNodeCallback(callbackFunc, scheduler)(...args).pipe(mapOneOrManyArgs(resultSelector as any));
return function (this: any, ...args: any[]) {
return bindNodeCallback(callbackFunc, scheduler)
.apply(this, args)
.pipe(mapOneOrManyArgs(resultSelector as any));
};
}
}

if (scheduler) {
return function (this: any, ...args: any[]) {
return bindNodeCallback(callbackFunc).apply(this, args).pipe(observeOn(scheduler!));
};
}

// We're using AsyncSubject, because it emits when it completes,
// and it will play the value to all late-arriving subscribers.
const subject = new AsyncSubject<any>();

return function (this: any, ...args: any[]): Observable<T> {
let results: any;
let hasResults = false;
let hasError = false;
let error: any;
return new Observable<T>((subscriber) => {
if (!scheduler) {
let isCurrentlyAsync = false;
let hasCompletedSynchronously = false;
if (hasResults) {
subscriber.next(results);
subscriber.complete();
} else if (hasError) {
subscriber.error(error);
} else {
const handler = (...innerArgs: any[]) => {
const err = innerArgs.shift();
if (err != null) {
hasError = true;
error = err;
subscriber.error(err);
} else {
hasResults = true;
results = innerArgs.length <= 1 ? innerArgs[0] : innerArgs;
subscriber.next(results);
if (isCurrentlyAsync) {
subscriber.complete();
} else {
hasCompletedSynchronously = true;
}
}
};
// If this is true, then we haven't called our function yet.
let uninitialized = true;
return new Observable((subscriber) => {
const subs = subject.subscribe(subscriber);
if (uninitialized) {
uninitialized = false;
// We're going to execute the bound function
// This bit is to signal that we are hitting the callback asychronously.
// Because we don't have any anti-"Zalgo" gaurantees with whatever
// function we are handed, we use this bit to figure out whether or not
// we are getting hit in a callback synchronously during our call.
let isAsync = false;

try {
callbackFunc.apply(this, [...args, handler]);
} catch (err) {
hasError = true;
error = err;
subscriber.error(err);
}
isCurrentlyAsync = true;
// This is used to signal that the callback completed synchronously.
let isComplete = false;

if (hasCompletedSynchronously && !hasError) {
subscriber.complete();
}
}
return;
} else {
const scheduleNext = (value: any[]) => {
hasResults = true;
results = value.length <= 1 ? value[0] : value;
subscriber.add(
scheduler!.schedule(() => {
subscriber.next(results);
subscriber.add(
scheduler!.schedule(() => {
subscriber.complete();
})
);
})
);
};
// Call our function that has a callback. If at any time during this
// call, an error is thrown, it will be caught by the Observable
// subscription process and sent to the consumer.

const scheduleError = (err: any) => {
hasError = true;
error = err;
subscriber.add(
scheduler!.schedule(() => {
subscriber.error(error);
})
);
};
callbackFunc.apply(
// Pass the appropriate `this` context.
this,
[
// Pass the arguments.
...args,
// And our callback handler.
(err: any, ...rest: any[]) => {
if (err != null) {
subject.error(err);
} else {
// If we have one argument after the error, notify the consumer
// of it as a single value, otherwise, if there's more than one, pass
// them as an array. Note that if there are no arguments, `undefined`
// will be emitted.
subject.next(1 < rest.length ? rest : rest[0]);
// Flip this flag, so we know we can complete it in the synchronous
// case below.
isComplete = true;
// If we're not asynchronous, we need to defer the `complete` call
// until after the call to the function is over. This is because an
// error could be thrown in the function after it calls our callback,
// and if that is the case, if we complete here, we are unable to notify
// the consumer than an error occured.
if (isAsync) {
subject.complete();
}
}
},
]
);
// If we flipped `isComplete` during the call, we resolved synchronously,
// notify complete, because we skipped it in the callback to wait
// to make sure there were no errors during the call.
if (isComplete) {
subject.complete();
}

return scheduler.schedule(() => {
if (hasResults) {
scheduleNext(results);
} else if (hasError) {
scheduleError(error);
} else {
try {
callbackFunc.apply(this, [
...args,
(...innerArgs: any[]) => {
const err = innerArgs.shift();
if (err != null) {
scheduleError(err);
} else {
scheduleNext(innerArgs);
}
},
]);
} catch (err) {
scheduleError(err);
return;
}
}
});
// We're no longer synchronous. If the callback is called at this point
// we can notify complete on the spot.
isAsync = true;
}
return subs;
});
};
}

0 comments on commit 74aa4b2

Please sign in to comment.