Skip to content

Commit

Permalink
feat(share): ShareConfig factory properties should support Observable…
Browse files Browse the repository at this point in the history
…Input (#7093)

* feat(share): ShareConfig factory properties should support ObservableInput

* test(share): add test that verifies Promise support
  • Loading branch information
jakovljevic-mladen authored Dec 15, 2022
1 parent dfd95db commit cc3995a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 16 deletions.
5 changes: 5 additions & 0 deletions spec-dtslint/operators/share-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,8 @@ it('should infer correctly', () => {
it('should enforce types', () => {
const o = of('foo', 'bar', 'baz').pipe(share('abc')); // $ExpectError
});

it('should support Promises', () => {
const factory = () => Promise.resolve();
of(1, 2, 3).pipe(share({ resetOnError: factory, resetOnComplete: factory, resetOnRefCountZero: factory })); // $ExpectType Observable<number>
});
31 changes: 15 additions & 16 deletions src/internal/operators/share.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Observable } from '../Observable';
import { innerFrom } from '../observable/innerFrom';
import { Subject } from '../Subject';
import { SafeSubscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SubjectLike } from '../types';
import { MonoTypeOperatorFunction, SubjectLike, ObservableInput } from '../types';
import { operate } from '../util/lift';

export interface ShareConfig<T> {
Expand All @@ -13,37 +12,37 @@ export interface ShareConfig<T> {
*/
connector?: () => SubjectLike<T>;
/**
* If true, the resulting observable will reset internal state on error from source and return to a "cold" state. This
* If `true`, the resulting observable will reset internal state on error from source and return to a "cold" state. This
* allows the resulting observable to be "retried" in the event of an error.
* If false, when an error comes from the source it will push the error into the connecting subject, and the subject
* If `false`, when an error comes from the source it will push the error into the connecting subject, and the subject
* will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries
* or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however
* {@link ReplaySubject} will also push its buffered values before pushing the error.
* It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained
* It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
* control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
*/
resetOnError?: boolean | ((error: any) => Observable<any>);
resetOnError?: boolean | ((error: any) => ObservableInput<any>);
/**
* If true, the resulting observable will reset internal state on completion from source and return to a "cold" state. This
* If `true`, the resulting observable will reset internal state on completion from source and return to a "cold" state. This
* allows the resulting observable to be "repeated" after it is done.
* If false, when the source completes, it will push the completion through the connecting subject, and the subject
* If `false`, when the source completes, it will push the completion through the connecting subject, and the subject
* will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats
* or resubscriptions will resubscribe to that same subject.
* It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained
* It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
* control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
*/
resetOnComplete?: boolean | (() => Observable<any>);
resetOnComplete?: boolean | (() => ObservableInput<any>);
/**
* If true, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the
* If `true`, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the
* internal state will be reset and the resulting observable will return to a "cold" state. This means that the next
* time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to
* again.
* If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject
* If `false`, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject
* will remain connected to the source, and new subscriptions to the result will be connected through that same subject.
* It is also possible to pass a notifier factory returning an observable instead which grants more fine-grained
* It is also possible to pass a notifier factory returning an `ObservableInput` instead which grants more fine-grained
* control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
*/
resetOnRefCountZero?: boolean | (() => Observable<any>);
resetOnRefCountZero?: boolean | (() => ObservableInput<any>);
}

export function share<T>(): MonoTypeOperatorFunction<T>;
Expand Down Expand Up @@ -245,7 +244,7 @@ export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction

function handleReset<T extends unknown[] = never[]>(
reset: () => void,
on: boolean | ((...args: T) => Observable<any>),
on: boolean | ((...args: T) => ObservableInput<any>),
...args: T
): Subscription | undefined {
if (on === true) {
Expand All @@ -264,5 +263,5 @@ function handleReset<T extends unknown[] = never[]>(
},
});

return on(...args).subscribe(onSubscriber);
return innerFrom(on(...args)).subscribe(onSubscriber);
}

0 comments on commit cc3995a

Please sign in to comment.