From 9a011c6e82f2cf4cf23a6739c7463d5951adfd2d Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Sun, 7 Feb 2021 21:37:37 -0600 Subject: [PATCH] fix: publish variants returning ConnectableObservable not properly utilizing lift - Adds tests to show fixed issues - Adds tests to show issues that simply can't be fixed, and support a case against removing operators that return ConnectableObservable, as well as possibly a case against lifting Subject. - Moves logic that patched lift for ConnectableObservable to the constructor so it is used by all multicast operators --- spec/Observable-spec.ts | 242 +++++++++++++++++- .../observable/ConnectableObservable.ts | 7 + src/internal/operators/multicast.ts | 13 +- 3 files changed, 249 insertions(+), 13 deletions(-) diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 61c425ab69..1ab85d27d2 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { Observer, TeardownLogic } from '../src/internal/types'; import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs'; -import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap } from 'rxjs/operators'; +import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share} from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from './helpers/observableMatcher'; @@ -810,6 +810,246 @@ describe('Observable.lift', () => { ); }); + + it('should compose through publish and refCount', (done) => { + const result = new MyCustomObservable((observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }).pipe( + publish(), + refCount(), + map((x) => 10 * x) + ); + + expect(result instanceof MyCustomObservable).to.be.true; + + const expected = [10, 20, 30]; + + result.subscribe( + function (x) { + expect(x).to.equal(expected.shift()); + }, + () => { + done(new Error('should not be called')); + }, + () => { + done(); + } + ); + }); + + + it('should compose through publishLast and refCount', (done) => { + const result = new MyCustomObservable((observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }).pipe( + publishLast(), + refCount(), + map((x) => 10 * x) + ); + + expect(result instanceof MyCustomObservable).to.be.true; + + const expected = [30]; + + result.subscribe( + function (x) { + expect(x).to.equal(expected.shift()); + }, + () => { + done(new Error('should not be called')); + }, + () => { + done(); + } + ); + }); + + it('should compose through publishBehavior and refCount', (done) => { + const result = new MyCustomObservable((observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }).pipe( + publishBehavior(0), + refCount(), + map((x) => 10 * x) + ); + + expect(result instanceof MyCustomObservable).to.be.true; + + const expected = [0, 10, 20, 30]; + + result.subscribe( + function (x) { + expect(x).to.equal(expected.shift()); + }, + () => { + done(new Error('should not be called')); + }, + () => { + done(); + } + ); + }); + + it('should composes Subjects in the simple case', () => { + const subject = new Subject(); + + const result = subject.pipe( + map((x) => 10 * x) + ) as any as Subject; // Yes, this is correct. (but you're advised not to do this) + + expect(result instanceof Subject).to.be.true; + + const emitted: any[] = []; + result.subscribe(value => emitted.push(value)); + + result.next(10); + result.next(20); + result.next(30); + + expect(emitted).to.deep.equal([100, 200, 300]); + }); + + /** + * Seriously, never do this. It's probably bad that we've allowed this. Fortunately, it's not + * a common practice, so maybe we can remove it? + */ + it('should demonstrate the horrors of sharing and lifting the Subject through', () => { + const subject = new Subject(); + + const shared = subject.pipe( + share() + ); + + const result1 = shared.pipe( + map(x => x * 10) + ) as any as Subject; // Yes, this is correct. + + const result2 = shared.pipe( + map(x => x - 10) + ) as any as Subject; // Yes, this is correct. + expect(result1 instanceof Subject).to.be.true; + + const emitted1: any[] = []; + result1.subscribe(value => emitted1.push(value)); + + const emitted2: any[] = []; + result2.subscribe(value => emitted2.push(value)); + + // THIS IS HORRIBLE DON'T DO THIS. + result1.next(10); + result2.next(20); // Yuck + result1.next(30); + + expect(emitted1).to.deep.equal([100, 200, 300]); + expect(emitted2).to.deep.equal([0, 10, 20]); + }); + + /** + * This section outlines one of the reasons that we need to get rid of operators that return + * Connectable observable. Likewise it also reveals a slight design flaw in `lift`. It + * probably should have never tried to compose through the Subject's observer methods. + * If you're a user and you're reading this... NEVER try to use this feature, it's likely + * to go away at some point. + * + * The problem is that you can have the Subject parts, or you can have the ConnectableObservable parts, + * but you can't have both. + */ + describe.skip('The lift through Connectable gaff', () => { + it('should compose through multicast and refCount, even if it is a Subject', () => { + const subject = new Subject(); + + const result = subject.pipe( + multicast(() => new Subject()), + refCount(), + map((x) => 10 * x) + ) as any as Subject; // Yes, this is correct. + + expect(result instanceof Subject).to.be.true; + + const emitted: any[] = []; + result.subscribe(value => emitted.push(value)); + + result.next(10); + result.next(20); + result.next(30); + + expect(emitted).to.deep.equal([100, 200, 300]); + }); + + it('should compose through publish and refCount, even if it is a Subject', () => { + const subject = new Subject(); + + const result = subject.pipe( + publish(), + refCount(), + map((x) => 10 * x) + ) as any as Subject; // Yes, this is correct. + + expect(result instanceof Subject).to.be.true; + + const emitted: any[] = []; + result.subscribe(value => emitted.push(value)); + + result.next(10); + result.next(20); + result.next(30); + + expect(emitted).to.deep.equal([100, 200, 300]); + }); + + + it('should compose through publishLast and refCount, even if it is a Subject', () => { + const subject = new Subject(); + + const result = subject.pipe( + publishLast(), + refCount(), + map((x) => 10 * x) + ) as any as Subject; // Yes, this is correct. + + expect(result instanceof Subject).to.be.true; + + const emitted: any[] = []; + result.subscribe(value => emitted.push(value)); + + result.next(10); + result.next(20); + result.next(30); + + expect(emitted).to.deep.equal([100, 200, 300]); + }); + + it('should compose through publishBehavior and refCount, even if it is a Subject', () => { + const subject = new Subject(); + + const result = subject.pipe( + publishBehavior(0), + refCount(), + map((x) => 10 * x) + ) as any as Subject; // Yes, this is correct. + + expect(result instanceof Subject).to.be.true; + + const emitted: any[] = []; + result.subscribe(value => emitted.push(value)); + + result.next(10); + result.next(20); + result.next(30); + + expect(emitted).to.deep.equal([0, 100, 200, 300]); + }); + }); + it('should compose through multicast with selector function', (done) => { const result = new MyCustomObservable((observer) => { observer.next(1); diff --git a/src/internal/observable/ConnectableObservable.ts b/src/internal/observable/ConnectableObservable.ts index b2a7f17980..2932ea5b2d 100644 --- a/src/internal/observable/ConnectableObservable.ts +++ b/src/internal/observable/ConnectableObservable.ts @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { refCount as higherOrderRefCount } from '../operators/refCount'; import { OperatorSubscriber } from '../operators/OperatorSubscriber'; +import { hasLift } from '../util/lift'; /** * @class ConnectableObservable @@ -28,6 +29,12 @@ export class ConnectableObservable extends Observable { */ constructor(public source: Observable, protected subjectFactory: () => Subject) { super(); + // If we have lift, monkey patch that here. This is done so custom observable + // types will compose through multicast. Otherwise the resulting observable would + // simply be an instance of `ConnectableObservable`. + if (hasLift(source)) { + this.lift = source.lift; + } } protected _subscribe(subscriber: Subscriber) { diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 9befa73eef..9d38dd4d7a 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -81,16 +81,5 @@ export function multicast( }); } - return (source: Observable) => { - const connectable: any = new ConnectableObservable(source, subjectFactory); - // If we have lift, monkey patch that here. This is done so custom observable - // types will compose through multicast. Otherwise the resulting observable would - // simply be an instance of `ConnectableObservable`. - if (hasLift(source)) { - connectable.lift = source.lift; - } - connectable.source = source; - connectable.subjectFactory = subjectFactory; - return connectable; - }; + return (source: Observable) => new ConnectableObservable(source, subjectFactory); }