Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: publish variants returning ConnectableObservable not properly utilizing lift #6003

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 241 additions & 1 deletion spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { Observer, TeardownLogic } from '../src/internal/types';
import { Observable, config, Subscription, noop, Subscriber, Operator, NEVER, Subject, of, throwError, empty } from 'rxjs';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap } from 'rxjs/operators';
import { map, multicast, refCount, filter, count, tap, combineLatest, concat, merge, race, zip, catchError, concatMap, switchMap, publish, publishLast, publishBehavior, share} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';

Expand Down Expand Up @@ -810,6 +810,246 @@ describe('Observable.lift', () => {
);
});


it('should compose through publish and refCount', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
publish(),
refCount(),
map((x) => 10 * x)
);

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [10, 20, 30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
},
() => {
done(new Error('should not be called'));
},
() => {
done();
}
);
});


it('should compose through publishLast and refCount', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
publishLast(),
refCount(),
map((x) => 10 * x)
);

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
},
() => {
done(new Error('should not be called'));
},
() => {
done();
}
);
});

it('should compose through publishBehavior and refCount', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
publishBehavior(0),
refCount(),
map((x) => 10 * x)
);

expect(result instanceof MyCustomObservable).to.be.true;

const expected = [0, 10, 20, 30];

result.subscribe(
function (x) {
expect(x).to.equal(expected.shift());
},
() => {
done(new Error('should not be called'));
},
() => {
done();
}
);
});

it('should composes Subjects in the simple case', () => {
const subject = new Subject<number>();

const result = subject.pipe(
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct. (but you're advised not to do this)

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});

/**
* Seriously, never do this. It's probably bad that we've allowed this. Fortunately, it's not
* a common practice, so maybe we can remove it?
*/
Comment on lines +921 to +924
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG, don't give those naughty devs more ideas! 😬 🙈

it('should demonstrate the horrors of sharing and lifting the Subject through', () => {
const subject = new Subject<number>();

const shared = subject.pipe(
share()
);

const result1 = shared.pipe(
map(x => x * 10)
) as any as Subject<number>; // Yes, this is correct.

const result2 = shared.pipe(
map(x => x - 10)
) as any as Subject<number>; // Yes, this is correct.
expect(result1 instanceof Subject).to.be.true;

const emitted1: any[] = [];
result1.subscribe(value => emitted1.push(value));

const emitted2: any[] = [];
result2.subscribe(value => emitted2.push(value));

// THIS IS HORRIBLE DON'T DO THIS.
result1.next(10);
result2.next(20); // Yuck
result1.next(30);

expect(emitted1).to.deep.equal([100, 200, 300]);
expect(emitted2).to.deep.equal([0, 10, 20]);
});

/**
* This section outlines one of the reasons that we need to get rid of operators that return
* Connectable observable. Likewise it also reveals a slight design flaw in `lift`. It
* probably should have never tried to compose through the Subject's observer methods.
* If you're a user and you're reading this... NEVER try to use this feature, it's likely
* to go away at some point.
Comment on lines +960 to +961
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙈

*
* The problem is that you can have the Subject parts, or you can have the ConnectableObservable parts,
* but you can't have both.
*/
describe.skip('The lift through Connectable gaff', () => {
it('should compose through multicast and refCount, even if it is a Subject', () => {
const subject = new Subject<number>();

const result = subject.pipe(
multicast(() => new Subject<number>()),
refCount(),
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct.

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});

it('should compose through publish and refCount, even if it is a Subject', () => {
const subject = new Subject<number>();

const result = subject.pipe(
publish(),
refCount(),
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct.

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});


it('should compose through publishLast and refCount, even if it is a Subject', () => {
const subject = new Subject<number>();

const result = subject.pipe(
publishLast(),
refCount(),
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct.

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([100, 200, 300]);
});

it('should compose through publishBehavior and refCount, even if it is a Subject', () => {
const subject = new Subject<number>();

const result = subject.pipe(
publishBehavior(0),
refCount(),
map((x) => 10 * x)
) as any as Subject<number>; // Yes, this is correct.

expect(result instanceof Subject).to.be.true;

const emitted: any[] = [];
result.subscribe(value => emitted.push(value));

result.next(10);
result.next(20);
result.next(30);

expect(emitted).to.deep.equal([0, 100, 200, 300]);
});
});

it('should compose through multicast with selector function', (done) => {
const result = new MyCustomObservable<number>((observer) => {
observer.next(1);
Expand Down
7 changes: 7 additions & 0 deletions src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { refCount as higherOrderRefCount } from '../operators/refCount';
import { OperatorSubscriber } from '../operators/OperatorSubscriber';
import { hasLift } from '../util/lift';

/**
* @class ConnectableObservable<T>
Expand All @@ -28,6 +29,12 @@ export class ConnectableObservable<T> extends Observable<T> {
*/
constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
super();
// If we have lift, monkey patch that here. This is done so custom observable
// types will compose through multicast. Otherwise the resulting observable would
// simply be an instance of `ConnectableObservable`.
if (hasLift(source)) {
this.lift = source.lift;
}
}

protected _subscribe(subscriber: Subscriber<T>) {
Expand Down
14 changes: 1 addition & 13 deletions src/internal/operators/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Subject } from '../Subject';
import { Observable } from '../Observable';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types';
import { hasLift } from '../util/lift';
import { isFunction } from '../util/isFunction';
import { connect } from './connect';

Expand Down Expand Up @@ -81,16 +80,5 @@ export function multicast<T, R>(
});
}

return (source: Observable<T>) => {
const connectable: any = new ConnectableObservable(source, subjectFactory);
// If we have lift, monkey patch that here. This is done so custom observable
// types will compose through multicast. Otherwise the resulting observable would
// simply be an instance of `ConnectableObservable`.
if (hasLift(source)) {
connectable.lift = source.lift;
}
connectable.source = source;
connectable.subjectFactory = subjectFactory;
return connectable;
};
return (source: Observable<T>) => new ConnectableObservable<any>(source, subjectFactory);
}