Skip to content

Commit

Permalink
fix: returned operator functions from multicast operators share, `p…
Browse files Browse the repository at this point in the history
…ublish`, `publishReplay` are now referentially transparent. Meaning if you take the result of calling `publish()` and pass it to N observable `pipe` methods, it will behave the same in each case, rather than having a cumulative effect, which was a regression introduced sometime in 6.x. If you required this broken behavior, there is a work around posted [here](#6410 (comment)) (#6410)

* fix(publish,publishReplay): resolve sharing Subject

change publish operator to use factory
change publishReplay operator to not share ReplaySubject
fixes issue #5411

* test: rearrange tests

* test: add failing ref transparency tests

* fix(publishBehavior): make ref transparent

* fix(publishLast): make ref transparent

* test: add failing ref transparency tests

* fix(share): make ref transparent

* chore: add a comment

* test: change descriptions and add comments

* refactor: destructure options outside of op func

* chore: use consistent terminology in comments

* test: use consistent terminology

Co-authored-by: Eli Davidson <edavidson@broadfinancial.com>
  • Loading branch information
cartant and e-davidson authored May 21, 2021
1 parent 2fb22bf commit e2f2e51
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 94 deletions.
29 changes: 28 additions & 1 deletion spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publish} */
describe('publish operator', () => {
Expand Down Expand Up @@ -337,4 +337,31 @@ describe('publish operator', () => {
expect(subscriptions).to.equal(1);
done();
});

it('should be referentially-transparent', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const expected1 = '-1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';
const expected2 = '-6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(
publish()
);

// The non-referentially-transparent publishing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
29 changes: 28 additions & 1 deletion spec/operators/publishBehavior-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publishBehavior, mergeMapTo, tap, mergeMap, refCount, retry, repeat } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publishBehavior} */
describe('publishBehavior operator', () => {
Expand Down Expand Up @@ -344,4 +344,31 @@ describe('publishBehavior operator', () => {
expect(results).to.deep.equal([]);
done();
});

it('should be referentially-transparent', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const expected1 = 'x1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';
const expected2 = 'x6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(
publishBehavior('x')
);

// The non-referentially-transparent publishing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
29 changes: 28 additions & 1 deletion spec/operators/publishLast-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publishLast, mergeMapTo, tap, mergeMap, refCount, retry } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publishLast} */
describe('publishLast operator', () => {
Expand Down Expand Up @@ -261,4 +261,31 @@ describe('publishLast operator', () => {
expect(subscriptions).to.equal(1);
done();
});

it('should be referentially-transparent', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const expected1 = '-----------(5|)';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';
const expected2 = '-----------(0|)';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(
publishLast()
);

// The non-referentially-transparent publishing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
29 changes: 28 additions & 1 deletion spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs';
import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators';

/** @test {publishReplay} */
Expand Down Expand Up @@ -487,4 +487,31 @@ describe('publishReplay operator', () => {
expectObservable(published).toBe(expected, undefined, error);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should be referentially-transparent', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const expected1 = '-1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';
const expected2 = '-6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(
publishReplay(1)
);

// The non-referentially-transparent publishing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
26 changes: 25 additions & 1 deletion spec/operators/share-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/** @prettier */
import { expect } from 'chai';
import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError } from 'rxjs';
import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError, pipe } from 'rxjs';
import {
map,
mergeMap,
Expand Down Expand Up @@ -619,6 +619,30 @@ describe('share', () => {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});

it('should be referentially-transparent', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = ' ^----------!';
const expected1 = ' -1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = ' ^----------!';
const expected2 = ' -6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(share({ resetOnRefCountZero }));

// The non-referentially-transparent sharing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const shared1 = source1.pipe(partialPipeLine);
const shared2 = source2.pipe(partialPipeLine);

expectObservable(shared1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(shared2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
});
});
});
}

Expand Down
26 changes: 25 additions & 1 deletion spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { Observable, Operator, Observer, of, from, defer } from 'rxjs';
import { Observable, Operator, Observer, of, from, defer, pipe } from 'rxjs';
import { observableMatcher } from '../helpers/observableMatcher';

/** @test {shareReplay} */
Expand Down Expand Up @@ -387,4 +387,28 @@ describe('shareReplay', () => {
} else {
console.warn(`No support for FinalizationRegistry in Node ${process.version}`);
}

it('should be referentially-transparent', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = ' ^----------!';
const expected1 = ' -1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = ' ^----------!';
const expected2 = ' -6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(shareReplay({ refCount: false }));

// The non-referentially-transparent sharing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const shared1 = source1.pipe(partialPipeLine);
const shared2 = source2.pipe(partialPipeLine);

expectObservable(shared1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(shared2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
});
});
});
2 changes: 1 addition & 1 deletion src/internal/operators/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ export function publish<T, O extends ObservableInput<any>>(selector: (shared: Ob
* Details: https://rxjs.dev/deprecations/multicasting
*/
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector ? connect(selector) : multicast(new Subject<T>());
return selector ? (source) => connect(selector)(source) : (source) => multicast(new Subject<T>())(source);
}
6 changes: 4 additions & 2 deletions src/internal/operators/publishBehavior.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import { UnaryFunction } from '../types';
* Details: https://rxjs.dev/deprecations/multicasting
*/
export function publishBehavior<T>(initialValue: T): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
const subject = new BehaviorSubject<T>(initialValue);
// Note that this has *never* supported the selector function.
return (source) => new ConnectableObservable(source, () => subject);
return (source) => {
const subject = new BehaviorSubject<T>(initialValue);
return new ConnectableObservable(source, () => subject);
};
}
6 changes: 4 additions & 2 deletions src/internal/operators/publishLast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ import { UnaryFunction } from '../types';
* Details: https://rxjs.dev/deprecations/multicasting
*/
export function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
const subject = new AsyncSubject<T>();
// Note that this has *never* supported a selector function like `publish` and `publishReplay`.
return (source) => new ConnectableObservable(source, () => subject);
return (source) => {
const subject = new AsyncSubject<T>();
return new ConnectableObservable(source, () => subject);
};
}
5 changes: 1 addition & 4 deletions src/internal/operators/publishReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,8 @@ export function publishReplay<T, R>(
if (selectorOrScheduler && !isFunction(selectorOrScheduler)) {
timestampProvider = selectorOrScheduler;
}

const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined;
const subject = new ReplaySubject<T>(bufferSize, windowTime, timestampProvider);

// Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument
// but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross).
return (source: Observable<T>) => multicast(subject, selector!)(source);
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, timestampProvider), selector!)(source);
}
Loading

0 comments on commit e2f2e51

Please sign in to comment.