Skip to content

Commit

Permalink
fix(publish,publishReplay) resolve sharing Subject
Browse files Browse the repository at this point in the history
change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue ReactiveX#5411
  • Loading branch information
e-davidson committed Jul 10, 2020
1 parent 381aedb commit 16fe755
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
26 changes: 25 additions & 1 deletion spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publish} */
describe('publish operator', () => {
Expand Down Expand Up @@ -337,4 +337,28 @@ describe('publish operator', () => {
expect(subscriptions).to.equal(1);
done();
});

it('should subscribe to its own source when using a shared pipeline', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';

const sharedPipeLine = pipe(
publish()
);

const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable<any>;
const expected1 = '-1-2-3-4-5-|';
const expected2 = '-6-7-8-9-0-|';

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
26 changes: 25 additions & 1 deletion spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs';
import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators';

/** @test {publishReplay} */
Expand Down Expand Up @@ -488,4 +488,28 @@ describe('publishReplay operator', () => {
expectObservable(published).toBe(expected, undefined, error);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should subscribe to its own source when using a shared pipeline', () => {
const source1 = cold('-1-2-3-4-5-|');
const source2 = cold('-6-7-8-9-0-|');
const expected1 = '-1-2-3-4-5-|';
const expected2 = '-6-7-8-9-0-|';
const source1Subs = '^ !';
const source2Subs = '^ !';

const sharedPipeLine = pipe(
publishReplay(1)
);

const published1 = source1.pipe(sharedPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(sharedPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
4 changes: 2 additions & 2 deletions src/internal/operators/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ export function publish<T>(selector: MonoTypeOperatorFunction<T>): MonoTypeOpera
*/
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector ?
multicast(() => new Subject<T>(), selector) :
multicast(new Subject<T>());
(source: Observable<T>) => multicast(() => new Subject<T>(), selector)(source) :
(source: Observable<T>) => multicast(new Subject<T>())(source);
}
3 changes: 1 addition & 2 deletions src/internal/operators/publishReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ export function publishReplay<T, R>(bufferSize?: number,
}

const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined;
const subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);

return (source: Observable<T>) => multicast(() => subject, selector!)(source) as ConnectableObservable<R>;
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, scheduler), selector!)(source) as ConnectableObservable<R>;
}

0 comments on commit 16fe755

Please sign in to comment.