Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make share, publish, et al. operator calls referentially transparent #6410

Merged
merged 12 commits into from
May 21, 2021
29 changes: 28 additions & 1 deletion spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publish} */
describe('publish operator', () => {
Expand Down Expand Up @@ -337,4 +337,31 @@ describe('publish operator', () => {
expect(subscriptions).to.equal(1);
done();
});

it('should be referentially-transparent', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const expected1 = '-1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';
const expected2 = '-6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(
publish()
);

// The non-referentially-transparent publishing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
29 changes: 28 additions & 1 deletion spec/operators/publishBehavior-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publishBehavior, mergeMapTo, tap, mergeMap, refCount, retry, repeat } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publishBehavior} */
describe('publishBehavior operator', () => {
Expand Down Expand Up @@ -344,4 +344,31 @@ describe('publishBehavior operator', () => {
expect(results).to.deep.equal([]);
done();
});

it('should be referentially-transparent', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const expected1 = 'x1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';
const expected2 = 'x6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(
publishBehavior('x')
);

// The non-referentially-transparent publishing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
29 changes: 28 additions & 1 deletion spec/operators/publishLast-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { publishLast, mergeMapTo, tap, mergeMap, refCount, retry } from 'rxjs/operators';
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';

/** @test {publishLast} */
describe('publishLast operator', () => {
Expand Down Expand Up @@ -261,4 +261,31 @@ describe('publishLast operator', () => {
expect(subscriptions).to.equal(1);
done();
});

it('should be referentially-transparent', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const expected1 = '-----------(5|)';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';
const expected2 = '-----------(0|)';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(
publishLast()
);

// The non-referentially-transparent publishing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
29 changes: 28 additions & 1 deletion spec/operators/publishReplay-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs';
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs';
import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators';

/** @test {publishReplay} */
Expand Down Expand Up @@ -487,4 +487,31 @@ describe('publishReplay operator', () => {
expectObservable(published).toBe(expected, undefined, error);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should be referentially-transparent', () => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = '^ !';
const expected1 = '-1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = '^ !';
const expected2 = '-6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(
publishReplay(1)
);

// The non-referentially-transparent publishing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;

expectObservable(published1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(published2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);

published1.connect();
published2.connect();
});
});
26 changes: 25 additions & 1 deletion spec/operators/share-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/** @prettier */
import { expect } from 'chai';
import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError } from 'rxjs';
import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError, pipe } from 'rxjs';
import {
map,
mergeMap,
Expand Down Expand Up @@ -619,6 +619,30 @@ describe('share', () => {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});

it('should be referentially-transparent', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = ' ^----------!';
const expected1 = ' -1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = ' ^----------!';
const expected2 = ' -6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(share({ resetOnRefCountZero }));

// The non-referentially-transparent sharing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const shared1 = source1.pipe(partialPipeLine);
const shared2 = source2.pipe(partialPipeLine);

expectObservable(shared1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(shared2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
});
});
});
}

Expand Down
26 changes: 25 additions & 1 deletion spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { expect } from 'chai';
import * as sinon from 'sinon';
import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { Observable, Operator, Observer, of, from, defer } from 'rxjs';
import { Observable, Operator, Observer, of, from, defer, pipe } from 'rxjs';
import { observableMatcher } from '../helpers/observableMatcher';

/** @test {shareReplay} */
Expand Down Expand Up @@ -387,4 +387,28 @@ describe('shareReplay', () => {
} else {
console.warn(`No support for FinalizationRegistry in Node ${process.version}`);
}

it('should be referentially-transparent', () => {
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const source1 = cold('-1-2-3-4-5-|');
const source1Subs = ' ^----------!';
const expected1 = ' -1-2-3-4-5-|';
const source2 = cold('-6-7-8-9-0-|');
const source2Subs = ' ^----------!';
const expected2 = ' -6-7-8-9-0-|';

// Calls to the _operator_ must be referentially-transparent.
const partialPipeLine = pipe(shareReplay({ refCount: false }));

// The non-referentially-transparent sharing occurs within the _operator function_
// returned by the _operator_ and that happens when the complete pipeline is composed.
const shared1 = source1.pipe(partialPipeLine);
const shared2 = source2.pipe(partialPipeLine);

expectObservable(shared1).toBe(expected1);
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
expectObservable(shared2).toBe(expected2);
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
});
});
});
2 changes: 1 addition & 1 deletion src/internal/operators/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ export function publish<T, O extends ObservableInput<any>>(selector: (shared: Ob
* Details: https://rxjs.dev/deprecations/multicasting
*/
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector ? connect(selector) : multicast(new Subject<T>());
return selector ? (source) => connect(selector)(source) : (source) => multicast(new Subject<T>())(source);
}
6 changes: 4 additions & 2 deletions src/internal/operators/publishBehavior.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import { UnaryFunction } from '../types';
* Details: https://rxjs.dev/deprecations/multicasting
*/
export function publishBehavior<T>(initialValue: T): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
const subject = new BehaviorSubject<T>(initialValue);
// Note that this has *never* supported the selector function.
return (source) => new ConnectableObservable(source, () => subject);
return (source) => {
const subject = new BehaviorSubject<T>(initialValue);
return new ConnectableObservable(source, () => subject);
};
}
6 changes: 4 additions & 2 deletions src/internal/operators/publishLast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ import { UnaryFunction } from '../types';
* Details: https://rxjs.dev/deprecations/multicasting
*/
export function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
const subject = new AsyncSubject<T>();
// Note that this has *never* supported a selector function like `publish` and `publishReplay`.
return (source) => new ConnectableObservable(source, () => subject);
return (source) => {
const subject = new AsyncSubject<T>();
return new ConnectableObservable(source, () => subject);
};
}
5 changes: 1 addition & 4 deletions src/internal/operators/publishReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,8 @@ export function publishReplay<T, R>(
if (selectorOrScheduler && !isFunction(selectorOrScheduler)) {
timestampProvider = selectorOrScheduler;
}

const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined;
const subject = new ReplaySubject<T>(bufferSize, windowTime, timestampProvider);

// Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument
// but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross).
return (source: Observable<T>) => multicast(subject, selector!)(source);
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, timestampProvider), selector!)(source);
}
Loading