Skip to content

Commit

Permalink
fix(node): Subscriber no longer trampled if from another copy of rxjs
Browse files Browse the repository at this point in the history
Only fixes v6 and above, fix coming for v5 (stable) shortly

related ReactiveX#3475
  • Loading branch information
benlesh authored and cartant committed Mar 27, 2018
1 parent 4d92e2b commit e43f032
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
2 changes: 1 addition & 1 deletion node-tests/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var id = setTimeout(function () {
}, 200);

of1(0).pipe(
mergeMap1(function () { return of(x); }),
mergeMap1(function (x) { return of(x); }),
mergeMap(function () { return from1(Promise.resolve(1)); })
).subscribe({
next: function (value) { actual.push(value); },
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 12 additions & 5 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
break;
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.syncErrorThrowable = destinationOrNext.syncErrorThrowable;
this.destination = (<Subscriber<any>> destinationOrNext);
(<any> this.destination).add(this);
// HACK(benlesh): For situations where Node has multiple copies of rxjs in
// node_modules, we cannot rely on `instanceof` checks
if (isTrustedSubscriber(destinationOrNext)) {
const trustedSubscriber = destinationOrNext[rxSubscriberSymbol]() as Subscriber<any>;
this.syncErrorThrowable = trustedSubscriber.syncErrorThrowable;
this.destination = trustedSubscriber;
trustedSubscriber.add(this);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
Expand Down Expand Up @@ -299,4 +302,8 @@ class SafeSubscriber<T> extends Subscriber<T> {
this._parentSubscriber = null;
_parentSubscriber.unsubscribe();
}
}
}

function isTrustedSubscriber(obj: any) {
return obj instanceof Subscriber || ('syncErrorThrowable' in obj && obj[rxSubscriberSymbol]);
}

0 comments on commit e43f032

Please sign in to comment.