From 144b626c3905640b4adeb2b97e722912eff1b264 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Sun, 5 Jul 2020 11:26:15 -0500 Subject: [PATCH] refactor(Observable): Update property and method types (#5572) - Removes `_isScalar` as it was unused - makes `lift` `protected`. This is an internal implementation detail. - makes `source` `protected`, this is an internal implementation detail. - Refactors operators to use new utility functions that do the lift or provide a reasonable error if the observable someone is trying to use with the operator does not have a lift method. Adds documentation. BREAKING CHANGE: `lift` no longer exposed. It was _NEVER_ documented that end users of the library should be creating operators using `lift`. Lift has a [variety of issues](https://github.com/ReactiveX/rxjs/issues/5431) and was always an internal implementation detail of rxjs that might have been used by a few power users in the early days when it had the most value. The value of `lift`, originally, was that subclassed `Observable`s would compose through all operators that implemented lift. The reality is that feature is not widely known, used, or supported, and it was never documented as it was very experimental when it was first added. Until the end of v7, `lift` will remain on Observable. Standard JavaScript users will notice no difference. However, TypeScript users might see complaints about `lift` not being a member of observable. To workaround this issue there are two things you can do: 1. Rewrite your operators as [outlined in the documentation](https://rxjs.dev/guide/operators), such that they return `new Observable`. or 2. cast your observable as `any` and access `lift` that way. Method 1 is recommended if you do not want things to break when we move to version 8. --- spec/Observable-spec.ts | 6 +-- spec/helpers/interop-helper.ts | 2 +- src/internal/Observable.ts | 37 ++++--------- src/internal/Operator.ts | 3 ++ src/internal/observable/combineLatest.ts | 3 +- src/internal/observable/race.ts | 3 +- src/internal/observable/zip.ts | 3 +- src/internal/operators/audit.ts | 3 +- src/internal/operators/buffer.ts | 3 +- src/internal/operators/bufferCount.ts | 3 +- src/internal/operators/bufferTime.ts | 3 +- src/internal/operators/bufferToggle.ts | 3 +- src/internal/operators/bufferWhen.ts | 3 +- src/internal/operators/catchError.ts | 6 ++- src/internal/operators/combineAll.ts | 3 +- src/internal/operators/combineLatestWith.ts | 6 ++- src/internal/operators/concat.ts | 7 +-- src/internal/operators/concatWith.ts | 9 ++-- src/internal/operators/count.ts | 3 +- src/internal/operators/debounce.ts | 3 +- src/internal/operators/debounceTime.ts | 3 +- src/internal/operators/defaultIfEmpty.ts | 3 +- src/internal/operators/delay.ts | 3 +- src/internal/operators/delayWhen.ts | 6 +-- src/internal/operators/dematerialize.ts | 3 +- src/internal/operators/distinct.ts | 3 +- .../operators/distinctUntilChanged.ts | 3 +- src/internal/operators/every.ts | 3 +- src/internal/operators/exhaust.ts | 3 +- src/internal/operators/exhaustMap.ts | 3 +- src/internal/operators/expand.ts | 3 +- src/internal/operators/filter.ts | 3 +- src/internal/operators/finalize.ts | 3 +- src/internal/operators/find.ts | 3 +- src/internal/operators/findIndex.ts | 3 +- src/internal/operators/groupBy.ts | 3 +- src/internal/operators/ignoreElements.ts | 3 +- src/internal/operators/isEmpty.ts | 3 +- src/internal/operators/map.ts | 3 +- src/internal/operators/mapTo.ts | 3 +- src/internal/operators/materialize.ts | 3 +- src/internal/operators/mergeMap.ts | 3 +- src/internal/operators/mergeScan.ts | 3 +- src/internal/operators/mergeWith.ts | 9 ++-- src/internal/operators/multicast.ts | 3 +- src/internal/operators/observeOn.ts | 3 +- src/internal/operators/onErrorResumeNext.ts | 5 +- src/internal/operators/pairwise.ts | 3 +- src/internal/operators/raceWith.ts | 35 ++++++------ src/internal/operators/refCount.ts | 3 +- src/internal/operators/repeat.ts | 5 +- src/internal/operators/repeatWhen.ts | 3 +- src/internal/operators/retry.ts | 3 +- src/internal/operators/retryWhen.ts | 3 +- src/internal/operators/sample.ts | 3 +- src/internal/operators/sampleTime.ts | 3 +- src/internal/operators/scan.ts | 3 +- src/internal/operators/sequenceEqual.ts | 3 +- src/internal/operators/shareReplay.ts | 3 +- src/internal/operators/single.ts | 3 +- src/internal/operators/skip.ts | 3 +- src/internal/operators/skipLast.ts | 3 +- src/internal/operators/skipUntil.ts | 3 +- src/internal/operators/skipWhile.ts | 3 +- src/internal/operators/subscribeOn.ts | 3 +- src/internal/operators/switchMap.ts | 3 +- src/internal/operators/take.ts | 3 +- src/internal/operators/takeLast.ts | 3 +- src/internal/operators/takeUntil.ts | 3 +- src/internal/operators/takeWhile.ts | 3 +- src/internal/operators/tap.ts | 3 +- src/internal/operators/throttle.ts | 3 +- src/internal/operators/throttleTime.ts | 3 +- src/internal/operators/throwIfEmpty.ts | 3 +- src/internal/operators/timeoutWith.ts | 3 +- src/internal/operators/window.ts | 3 +- src/internal/operators/windowCount.ts | 3 +- src/internal/operators/windowTime.ts | 3 +- src/internal/operators/windowToggle.ts | 3 +- src/internal/operators/windowWhen.ts | 3 +- src/internal/operators/withLatestFrom.ts | 3 +- src/internal/operators/zipAll.ts | 3 +- src/internal/operators/zipWith.ts | 9 ++-- src/internal/util/lift.ts | 54 +++++++++++++++++++ 84 files changed, 264 insertions(+), 142 deletions(-) create mode 100644 src/internal/util/lift.ts diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 39d0fec419..b47fc6b620 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -698,7 +698,7 @@ describe('Observable.lift', () => { } }; - NEVER.lift(myOperator) + (NEVER as any).lift(myOperator) .subscribe() .unsubscribe(); @@ -883,8 +883,8 @@ describe('Observable.lift', () => { class LogObservable extends Observable { lift(operator: Operator): Observable { const observable = new LogObservable(); - (observable).source = this; - (observable).operator = new LogOperator(operator); + observable.source = this; + observable.operator = new LogOperator(operator); return observable; } } diff --git a/spec/helpers/interop-helper.ts b/spec/helpers/interop-helper.ts index db5c11564b..8e776c34ac 100644 --- a/spec/helpers/interop-helper.ts +++ b/spec/helpers/interop-helper.ts @@ -11,7 +11,7 @@ export function asInteropObservable(observable: Observable): Observable return new Proxy(observable, { get(target: Observable, key: string | number | symbol) { if (key === 'lift') { - const { lift } = target; + const { lift } = target as any; return interopLift(lift); } if (key === 'subscribe') { diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 23c77472f9..48cf3653e7 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -7,8 +7,6 @@ import { Subscription } from './Subscription'; import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable } from './types'; import { canReportError } from './util/canReportError'; import { toSubscriber } from './util/toSubscriber'; -import { iif } from './observable/iif'; -import { throwError } from './observable/throwError'; import { observable as Symbol_observable } from './symbol/observable'; import { pipeFromArray } from './util/pipe'; import { config } from './config'; @@ -20,14 +18,11 @@ import { config } from './config'; * @class Observable */ export class Observable implements Subscribable { - /** Internal implementation detail, do not use directly. */ - public _isScalar: boolean = false; - /** @deprecated This is an internal implementation detail, do not use. */ - source: Observable | undefined; + protected source: Observable | undefined; /** @deprecated This is an internal implementation detail, do not use. */ - operator: Operator | undefined; + protected operator: Operator | undefined; /** * @constructor @@ -59,13 +54,16 @@ export class Observable implements Subscribable { }; /** - * Creates a new Observable, with this Observable as the source, and the passed + * Creates a new Observable, with this Observable instance as the source, and the passed * operator defined as the new observable's operator. * @method lift - * @param {Operator} operator the operator defining the operation to take on the observable - * @return {Observable} a new observable with the Operator applied + * @param operator the operator defining the operation to take on the observable + * @return a new observable with the Operator applied + * @deprecated This is an internal implementation detail, do not use directly. If you have implemented an operator + * using `lift`, it is recommended that you create an operator by simply returning `new Observable()` directly. + * See "Creating new operators from scratch" section here: https://rxjs.dev/guide/operators */ - lift(operator?: Operator): Observable { + protected lift(operator?: Operator): Observable { const observable = new Observable(); observable.source = this; observable.operator = operator; @@ -236,7 +234,7 @@ export class Observable implements Subscribable { } /** @deprecated This is an internal implementation detail, do not use. */ - _trySubscribe(sink: Subscriber): TeardownLogic { + protected _trySubscribe(sink: Subscriber): TeardownLogic { try { return this._subscribe(sink); } catch (err) { @@ -336,24 +334,11 @@ export class Observable implements Subscribable { } /** @internal This is an internal implementation detail, do not use. */ - _subscribe(subscriber: Subscriber): TeardownLogic { + protected _subscribe(subscriber: Subscriber): TeardownLogic { const { source } = this; return source && source.subscribe(subscriber); } - // `if` and `throw` are special snow flakes, the compiler sees them as reserved words. Deprecated in - // favor of iif and throwError functions. - /** - * @nocollapse - * @deprecated In favor of iif creation function: import { iif } from 'rxjs'; - */ - static if: typeof iif; - /** - * @nocollapse - * @deprecated In favor of throwError creation function: import { throwError } from 'rxjs'; - */ - static throw: typeof throwError; - /** * An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable * @method Symbol.observable diff --git a/src/internal/Operator.ts b/src/internal/Operator.ts index 93b65e6464..08aa5e438e 100644 --- a/src/internal/Operator.ts +++ b/src/internal/Operator.ts @@ -1,6 +1,9 @@ import { Subscriber } from './Subscriber'; import { TeardownLogic } from './types'; +/*** + * @deprecated Internal implementation detail, do not use. + */ export interface Operator { call(subscriber: Subscriber, source: any): TeardownLogic; } diff --git a/src/internal/observable/combineLatest.ts b/src/internal/observable/combineLatest.ts index c9c0e0a0aa..500c70a8bd 100644 --- a/src/internal/observable/combineLatest.ts +++ b/src/internal/observable/combineLatest.ts @@ -8,6 +8,7 @@ import { Operator } from '../Operator'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { fromArray } from './fromArray'; +import { lift } from '../util/lift'; const NONE = {}; @@ -233,7 +234,7 @@ export function combineLatest, R>( observables = observables[0] as any; } - return fromArray(observables, scheduler).lift(new CombineLatestOperator, R>(resultSelector)); + return lift(fromArray(observables, scheduler), new CombineLatestOperator, R>(resultSelector)); } export class CombineLatestOperator implements Operator { diff --git a/src/internal/observable/race.ts b/src/internal/observable/race.ts index 27929e52c7..70540181e5 100644 --- a/src/internal/observable/race.ts +++ b/src/internal/observable/race.ts @@ -9,6 +9,7 @@ import { TeardownLogic, ObservableInput, ObservedValueUnionFromArray } from '../ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +import { lift } from '../util/lift'; export function race[]>(observables: A): Observable>; export function race[]>(...observables: A): Observable>; @@ -65,7 +66,7 @@ export function race(...observables: (ObservableInput | ObservableInput } } - return fromArray(observables, undefined).lift(new RaceOperator()); + return lift(fromArray(observables, undefined), new RaceOperator()); } export class RaceOperator implements Operator { diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index 327657ab50..938ac2fabb 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -9,6 +9,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { iterator as Symbol_iterator } from '../../internal/symbol/iterator'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated resultSelector is no longer supported, pipe to map instead */ @@ -85,7 +86,7 @@ export function zip, R>( if (typeof last === 'function') { resultSelector = observables.pop() as typeof resultSelector; } - return fromArray(observables, undefined).lift(new ZipOperator(resultSelector)); + return lift(fromArray(observables, undefined), new ZipOperator(resultSelector)); } export class ZipOperator implements Operator { diff --git a/src/internal/operators/audit.ts b/src/internal/operators/audit.ts index 7fbb5201be..704d4d5047 100644 --- a/src/internal/operators/audit.ts +++ b/src/internal/operators/audit.ts @@ -6,6 +6,7 @@ import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from ' import { OuterSubscriber } from '../OuterSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +import { lift } from '../util/lift'; /** * Ignores source values for a duration determined by another Observable, then @@ -54,7 +55,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; */ export function audit(durationSelector: (value: T) => SubscribableOrPromise): MonoTypeOperatorFunction { return function auditOperatorFunction(source: Observable) { - return source.lift(new AuditOperator(durationSelector)); + return lift(source, new AuditOperator(durationSelector)); }; } diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 71172ea91e..4cf80c78c0 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Buffers the source Observable values until `closingNotifier` emits. @@ -47,7 +48,7 @@ import { OperatorFunction } from '../types'; */ export function buffer(closingNotifier: Observable): OperatorFunction { return function bufferOperatorFunction(source: Observable) { - return source.lift(new BufferOperator(closingNotifier)); + return lift(source, new BufferOperator(closingNotifier)); }; } diff --git a/src/internal/operators/bufferCount.ts b/src/internal/operators/bufferCount.ts index 636429b102..c39d8e3ecf 100644 --- a/src/internal/operators/bufferCount.ts +++ b/src/internal/operators/bufferCount.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Buffers the source Observable values until the size hits the maximum @@ -59,7 +60,7 @@ import { OperatorFunction, TeardownLogic } from '../types'; */ export function bufferCount(bufferSize: number, startBufferEvery: number | null = null): OperatorFunction { return function bufferCountOperatorFunction(source: Observable) { - return source.lift(new BufferCountOperator(bufferSize, startBufferEvery)); + return lift(source, new BufferCountOperator(bufferSize, startBufferEvery)); }; } diff --git a/src/internal/operators/bufferTime.ts b/src/internal/operators/bufferTime.ts index 4bd97f2933..a53126edc3 100644 --- a/src/internal/operators/bufferTime.ts +++ b/src/internal/operators/bufferTime.ts @@ -5,6 +5,7 @@ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { isScheduler } from '../util/isScheduler'; import { OperatorFunction, SchedulerAction, SchedulerLike } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function bufferTime(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction; @@ -88,7 +89,7 @@ export function bufferTime(bufferTimeSpan: number): OperatorFunction } return function bufferTimeOperatorFunction(source: Observable) { - return source.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler)); + return lift(source, new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler)); }; } diff --git a/src/internal/operators/bufferToggle.ts b/src/internal/operators/bufferToggle.ts index fd7eb9653d..1a8770f033 100644 --- a/src/internal/operators/bufferToggle.ts +++ b/src/internal/operators/bufferToggle.ts @@ -6,6 +6,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { OperatorFunction, SubscribableOrPromise } from '../types'; +import { lift } from '../util/lift'; /** * Buffers the source Observable values starting from an emission from @@ -57,7 +58,7 @@ export function bufferToggle( closingSelector: (value: O) => SubscribableOrPromise ): OperatorFunction { return function bufferToggleOperatorFunction(source: Observable) { - return source.lift(new BufferToggleOperator(openings, closingSelector)); + return lift(source, new BufferToggleOperator(openings, closingSelector)); }; } diff --git a/src/internal/operators/bufferWhen.ts b/src/internal/operators/bufferWhen.ts index 4f8b6184c5..9e61aeb66b 100644 --- a/src/internal/operators/bufferWhen.ts +++ b/src/internal/operators/bufferWhen.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Buffers the source Observable values, using a factory function of closing @@ -50,7 +51,7 @@ import { OperatorFunction } from '../types'; */ export function bufferWhen(closingSelector: () => Observable): OperatorFunction { return function (source: Observable) { - return source.lift(new BufferWhenOperator(closingSelector)); + return lift(source, new BufferWhenOperator(closingSelector)); }; } diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 68d50fe771..56b6c83c4d 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function catchError>(selector: (err: any, caught: Observable) => O): OperatorFunction>; @@ -108,8 +109,9 @@ export function catchError>( ): OperatorFunction> { return function catchErrorOperatorFunction(source: Observable): Observable> { const operator = new CatchOperator(selector); - const caught = source.lift(operator); - return (operator.caught = caught as Observable); + const caught = lift(source, operator); + operator.caught = caught; + return caught; }; } diff --git a/src/internal/operators/combineAll.ts b/src/internal/operators/combineAll.ts index f696000341..93cca3f0f2 100644 --- a/src/internal/operators/combineAll.ts +++ b/src/internal/operators/combineAll.ts @@ -1,6 +1,7 @@ import { CombineLatestOperator } from '../observable/combineLatest'; import { Observable } from '../Observable'; import { OperatorFunction, ObservableInput } from '../types'; +import { lift } from '../util/lift'; export function combineAll(): OperatorFunction, T[]>; export function combineAll(): OperatorFunction; @@ -53,5 +54,5 @@ export function combineAll(project: (...values: Array) => R): OperatorFu * @name combineAll */ export function combineAll(project?: (...values: Array) => R): OperatorFunction { - return (source: Observable) => source.lift(new CombineLatestOperator(project)); + return (source: Observable) => lift(source, new CombineLatestOperator(project)); } diff --git a/src/internal/operators/combineLatestWith.ts b/src/internal/operators/combineLatestWith.ts index bcca48d2c2..e0f5d8f52a 100644 --- a/src/internal/operators/combineLatestWith.ts +++ b/src/internal/operators/combineLatestWith.ts @@ -4,6 +4,7 @@ import { CombineLatestOperator } from '../observable/combineLatest'; import { from } from '../observable/from'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueTupleFromArray, Cons } from '../types'; +import { lift, stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated use {@link combineLatestWith} */ @@ -53,10 +54,11 @@ export function combineLatest(...observables: Array | observables = (observables[0]).slice(); } - return (source: Observable) => source.lift.call( + return (source: Observable) => stankyLift( + source, from([source, ...observables]), new CombineLatestOperator(project) - ) as Observable; + ); } /** diff --git a/src/internal/operators/concat.ts b/src/internal/operators/concat.ts index dbd116245b..acf58b9576 100644 --- a/src/internal/operators/concat.ts +++ b/src/internal/operators/concat.ts @@ -1,6 +1,7 @@ import { concat as concatStatic } from '../observable/concat'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerLike } from '../types'; +import { stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated remove in v8. Use {@link concatWith} */ @@ -25,8 +26,8 @@ export function concat(...observables: Array | Schedu * @deprecated remove in v8. Use {@link concatWith} */ export function concat(...observables: Array | SchedulerLike | undefined>): OperatorFunction { - return (source: Observable) => source.lift.call( + return (source: Observable) => stankyLift( + source, concatStatic(source, ...(observables as any[])), - undefined - ) as Observable; + ); } diff --git a/src/internal/operators/concatWith.ts b/src/internal/operators/concatWith.ts index 5060057402..414203a09c 100644 --- a/src/internal/operators/concatWith.ts +++ b/src/internal/operators/concatWith.ts @@ -1,6 +1,7 @@ import { concat as concatStatic } from '../observable/concat'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueUnionFromArray } from '../types'; +import { stankyLift } from '../util/lift'; export function concatWith(): OperatorFunction; export function concatWith[]>(...otherSources: A): OperatorFunction | T>; @@ -44,8 +45,8 @@ export function concatWith[]>(...otherSources: * @param otherSources Other observable sources to subscribe to, in sequence, after the original source is complete. */ export function concatWith[]>(...otherSources: A): OperatorFunction | T> { - return (source: Observable) => source.lift.call( - concatStatic(source, ...otherSources), - undefined - ) as Observable | T>; + return (source: Observable) => stankyLift( + source, + concatStatic(source, ...otherSources) + ); } diff --git a/src/internal/operators/count.ts b/src/internal/operators/count.ts index 14e0f83b9e..b9b4a8587d 100644 --- a/src/internal/operators/count.ts +++ b/src/internal/operators/count.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Observer, OperatorFunction } from '../types'; import { Subscriber } from '../Subscriber'; +import { lift } from '../util/lift'; /** * Counts the number of emissions on the source and emits that number when the * source completes. @@ -62,7 +63,7 @@ import { Subscriber } from '../Subscriber'; */ export function count(predicate?: (value: T, index: number, source: Observable) => boolean): OperatorFunction { - return (source: Observable) => source.lift(new CountOperator(predicate, source)); + return (source: Observable) => lift(source, new CountOperator(predicate, source)); } class CountOperator implements Operator { diff --git a/src/internal/operators/debounce.ts b/src/internal/operators/debounce.ts index 70f30c6391..3d699435b5 100644 --- a/src/internal/operators/debounce.ts +++ b/src/internal/operators/debounce.ts @@ -7,6 +7,7 @@ import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from ' import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +import { lift } from '../util/lift'; /** * Emits a notification from the source Observable only after a particular time span @@ -67,7 +68,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @name debounce */ export function debounce(durationSelector: (value: T) => SubscribableOrPromise): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new DebounceOperator(durationSelector)); + return (source: Observable) => lift(source, new DebounceOperator(durationSelector)); } class DebounceOperator implements Operator { diff --git a/src/internal/operators/debounceTime.ts b/src/internal/operators/debounceTime.ts index 83b456b157..4e8b073aa1 100644 --- a/src/internal/operators/debounceTime.ts +++ b/src/internal/operators/debounceTime.ts @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { async } from '../scheduler/async'; import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits a notification from the source Observable only after a particular time span @@ -64,7 +65,7 @@ import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types * @name debounceTime */ export function debounceTime(dueTime: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new DebounceTimeOperator(dueTime, scheduler)); + return (source: Observable) => lift(source, new DebounceTimeOperator(dueTime, scheduler)); } class DebounceTimeOperator implements Operator { diff --git a/src/internal/operators/defaultIfEmpty.ts b/src/internal/operators/defaultIfEmpty.ts index b2770827aa..e2dcf8dcff 100644 --- a/src/internal/operators/defaultIfEmpty.ts +++ b/src/internal/operators/defaultIfEmpty.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function defaultIfEmpty(defaultValue?: R): OperatorFunction; @@ -43,7 +44,7 @@ export function defaultIfEmpty(defaultValue?: R): OperatorFunction(defaultValue: R | null = null): OperatorFunction { - return (source: Observable) => source.lift(new DefaultIfEmptyOperator(defaultValue)) as Observable; + return (source: Observable) => lift(source, new DefaultIfEmptyOperator(defaultValue)) as Observable; } class DefaultIfEmptyOperator implements Operator { diff --git a/src/internal/operators/delay.ts b/src/internal/operators/delay.ts index 7a232507a7..e1e620d865 100644 --- a/src/internal/operators/delay.ts +++ b/src/internal/operators/delay.ts @@ -9,6 +9,7 @@ import { SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Delays the emission of items from the source Observable by a given timeout or @@ -60,7 +61,7 @@ import { */ export function delay(delay: number | Date, scheduler: SchedulerLike = async): MonoTypeOperatorFunction { const delayFor = isValidDate(delay) ? +delay - scheduler.now() : Math.abs(delay); - return (source: Observable) => source.lift(new DelayOperator(delayFor, scheduler)); + return (source: Observable) => lift(source, new DelayOperator(delayFor, scheduler)); } class DelayOperator implements Operator { diff --git a/src/internal/operators/delayWhen.ts b/src/internal/operators/delayWhen.ts index 4174faa407..6caf7e6702 100644 --- a/src/internal/operators/delayWhen.ts +++ b/src/internal/operators/delayWhen.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated In future versions, empty notifiers will no longer re-emit the source value on the output observable. */ @@ -76,10 +77,9 @@ export function delayWhen(delayDurationSelector: (value: T, index: number) => subscriptionDelay?: Observable): MonoTypeOperatorFunction { if (subscriptionDelay) { return (source: Observable) => - new SubscriptionDelayObservable(source, subscriptionDelay) - .lift(new DelayWhenOperator(delayDurationSelector)); + lift(new SubscriptionDelayObservable(source, subscriptionDelay), new DelayWhenOperator(delayDurationSelector)); } - return (source: Observable) => source.lift(new DelayWhenOperator(delayDurationSelector)); + return (source: Observable) => lift(source, new DelayWhenOperator(delayDurationSelector)); } class DelayWhenOperator implements Operator { diff --git a/src/internal/operators/dematerialize.ts b/src/internal/operators/dematerialize.ts index 4e8252db82..93660eaad4 100644 --- a/src/internal/operators/dematerialize.ts +++ b/src/internal/operators/dematerialize.ts @@ -3,6 +3,7 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { observeNotification } from '../Notification'; import { OperatorFunction, ObservableNotification, ValueFromNotification } from '../types'; +import { lift } from '../util/lift'; /** * Converts an Observable of {@link ObservableNotification} objects into the emissions @@ -53,7 +54,7 @@ import { OperatorFunction, ObservableNotification, ValueFromNotification } from */ export function dematerialize>(): OperatorFunction> { return function dematerializeOperatorFunction(source: Observable) { - return source.lift(new DeMaterializeOperator()); + return lift(source, new DeMaterializeOperator()); }; } diff --git a/src/internal/operators/distinct.ts b/src/internal/operators/distinct.ts index 2c5990dce3..18cd0b6145 100644 --- a/src/internal/operators/distinct.ts +++ b/src/internal/operators/distinct.ts @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. @@ -75,7 +76,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; */ export function distinct(keySelector?: (value: T) => K, flushes?: Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new DistinctOperator(keySelector, flushes)); + return (source: Observable) => lift(source, new DistinctOperator(keySelector, flushes)); } class DistinctOperator implements Operator { diff --git a/src/internal/operators/distinctUntilChanged.ts b/src/internal/operators/distinctUntilChanged.ts index 6b58910cc3..d07a7f517c 100644 --- a/src/internal/operators/distinctUntilChanged.ts +++ b/src/internal/operators/distinctUntilChanged.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function distinctUntilChanged(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction; @@ -63,7 +64,7 @@ export function distinctUntilChanged(compare: (x: K, y: K) => boolean, key * @name distinctUntilChanged */ export function distinctUntilChanged(compare?: (x: K, y: K) => boolean, keySelector?: (x: T) => K): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new DistinctUntilChangedOperator(compare, keySelector)); + return (source: Observable) => lift(source, new DistinctUntilChangedOperator(compare, keySelector)); } class DistinctUntilChangedOperator implements Operator { diff --git a/src/internal/operators/every.ts b/src/internal/operators/every.ts index dd7ea07bdf..43d349460e 100644 --- a/src/internal/operators/every.ts +++ b/src/internal/operators/every.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Observer, OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that emits whether or not every item of the source satisfies the condition specified. @@ -30,7 +31,7 @@ import { Observer, OperatorFunction } from '../types'; */ export function every(predicate: (value: T, index: number, source: Observable) => boolean, thisArg?: any): OperatorFunction { - return (source: Observable) => source.lift(new EveryOperator(predicate, thisArg, source)); + return (source: Observable) => lift(source, new EveryOperator(predicate, thisArg, source)); } class EveryOperator implements Operator { diff --git a/src/internal/operators/exhaust.ts b/src/internal/operators/exhaust.ts index 643bf5241e..3ea6a6ce40 100644 --- a/src/internal/operators/exhaust.ts +++ b/src/internal/operators/exhaust.ts @@ -5,6 +5,7 @@ import { Subscription } from '../Subscription'; import { OuterSubscriber } from '../OuterSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; export function exhaust(): OperatorFunction, T>; export function exhaust(): OperatorFunction; @@ -53,7 +54,7 @@ export function exhaust(): OperatorFunction; * @name exhaust */ export function exhaust(): OperatorFunction { - return (source: Observable) => source.lift(new SwitchFirstOperator()); + return (source: Observable) => lift(source, new SwitchFirstOperator()); } class SwitchFirstOperator implements Operator { diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 27f209c61d..86385dd5a0 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -8,6 +8,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function exhaustMap>(project: (value: T, index: number) => O): OperatorFunction>; @@ -74,7 +75,7 @@ export function exhaustMap>( ); } return (source: Observable) => - source.lift(new ExhaustMapOperator(project)); + lift(source, new ExhaustMapOperator(project)); } class ExhaustMapOperator implements Operator { diff --git a/src/internal/operators/expand.ts b/src/internal/operators/expand.ts index 13a953ee26..ab0baef1fb 100644 --- a/src/internal/operators/expand.ts +++ b/src/internal/operators/expand.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput, SchedulerLike } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function expand(project: (value: T, index: number) => ObservableInput, concurrent?: number, scheduler?: SchedulerLike): OperatorFunction; @@ -68,7 +69,7 @@ export function expand(project: (value: T, index: number) => ObservableInp scheduler?: SchedulerLike): OperatorFunction { concurrent = (concurrent || 0) < 1 ? Infinity : concurrent; - return (source: Observable) => source.lift(new ExpandOperator(project, concurrent, scheduler)); + return (source: Observable) => lift(source, new ExpandOperator(project, concurrent, scheduler)); } export class ExpandOperator implements Operator { diff --git a/src/internal/operators/filter.ts b/src/internal/operators/filter.ts index 1868f4922f..55a4d92681 100644 --- a/src/internal/operators/filter.ts +++ b/src/internal/operators/filter.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction, MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function filter(predicate: (value: T, index: number) => value is S, @@ -56,7 +57,7 @@ export function filter(predicate: (value: T, index: number) => boolean, export function filter(predicate: (value: T, index: number) => boolean, thisArg?: any): MonoTypeOperatorFunction { return function filterOperatorFunction(source: Observable): Observable { - return source.lift(new FilterOperator(predicate, thisArg)); + return lift(source, new FilterOperator(predicate, thisArg)); }; } diff --git a/src/internal/operators/finalize.ts b/src/internal/operators/finalize.ts index ebeadb79c2..ec8d978f66 100644 --- a/src/internal/operators/finalize.ts +++ b/src/internal/operators/finalize.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that mirrors the source Observable, but will call a specified function when @@ -59,7 +60,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name finally */ export function finalize(callback: () => void): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new FinallyOperator(callback)); + return (source: Observable) => lift(source, new FinallyOperator(callback)); } class FinallyOperator implements Operator { diff --git a/src/internal/operators/find.ts b/src/internal/operators/find.ts index 6ea7b2bcf4..e8724336de 100644 --- a/src/internal/operators/find.ts +++ b/src/internal/operators/find.ts @@ -2,6 +2,7 @@ import {Observable} from '../Observable'; import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {OperatorFunction} from '../types'; +import { lift } from '../util/lift'; export function find(predicate: (value: T, index: number, source: Observable) => value is S, thisArg?: any): OperatorFunction; @@ -50,7 +51,7 @@ export function find(predicate: (value: T, index: number, source: Observable< if (typeof predicate !== 'function') { throw new TypeError('predicate is not a function'); } - return (source: Observable) => source.lift(new FindValueOperator(predicate, source, false, thisArg)) as Observable; + return (source: Observable) => lift(source, new FindValueOperator(predicate, source, false, thisArg)) as Observable; } export class FindValueOperator implements Operator { diff --git a/src/internal/operators/findIndex.ts b/src/internal/operators/findIndex.ts index 99828453bf..d42d1e2129 100644 --- a/src/internal/operators/findIndex.ts +++ b/src/internal/operators/findIndex.ts @@ -1,6 +1,7 @@ import { Observable } from '../Observable'; import { FindValueOperator } from '../operators/find'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Emits only the index of the first value emitted by the source Observable that * meets some condition. @@ -42,5 +43,5 @@ import { OperatorFunction } from '../types'; */ export function findIndex(predicate: (value: T, index: number, source: Observable) => boolean, thisArg?: any): OperatorFunction { - return (source: Observable) => source.lift(new FindValueOperator(predicate, source, true, thisArg)) as Observable; + return (source: Observable) => lift(source, new FindValueOperator(predicate, source, true, thisArg)) as Observable; } diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index 86fcb82b07..c8b17f4ec5 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -4,6 +4,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subject } from '../Subject'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function groupBy(keySelector: (value: T) => value is K): OperatorFunction | GroupedObservable>>; @@ -109,7 +110,7 @@ export function groupBy(keySelector: (value: T) => K, durationSelector?: (grouped: GroupedObservable) => Observable, subjectSelector?: () => Subject): OperatorFunction> { return (source: Observable) => - source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); + lift(source, new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); } export interface RefCountSubscription { diff --git a/src/internal/operators/ignoreElements.ts b/src/internal/operators/ignoreElements.ts index 528ea4300a..16b06167c5 100644 --- a/src/internal/operators/ignoreElements.ts +++ b/src/internal/operators/ignoreElements.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Ignores all items emitted by the source Observable and only passes calls of `complete` or `error`. @@ -37,7 +38,7 @@ import { OperatorFunction } from '../types'; */ export function ignoreElements(): OperatorFunction { return function ignoreElementsOperatorFunction(source: Observable) { - return source.lift(new IgnoreElementsOperator()); + return lift(source, new IgnoreElementsOperator()); }; } diff --git a/src/internal/operators/isEmpty.ts b/src/internal/operators/isEmpty.ts index 7c7e561d55..0f995c7128 100644 --- a/src/internal/operators/isEmpty.ts +++ b/src/internal/operators/isEmpty.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Emits `false` if the input Observable emits any values, or emits `true` if the @@ -68,7 +69,7 @@ import { OperatorFunction } from '../types'; */ export function isEmpty(): OperatorFunction { - return (source: Observable) => source.lift(new IsEmptyOperator()); + return (source: Observable) => lift(source, new IsEmptyOperator()); } class IsEmptyOperator implements Operator { diff --git a/src/internal/operators/map.ts b/src/internal/operators/map.ts index 92d6fe5db8..14ffa54c4c 100644 --- a/src/internal/operators/map.ts +++ b/src/internal/operators/map.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Applies a given `project` function to each value emitted by the source @@ -46,7 +47,7 @@ export function map(project: (value: T, index: number) => R, thisArg?: any if (typeof project !== 'function') { throw new TypeError('argument is not a function. Are you looking for `mapTo()`?'); } - return source.lift(new MapOperator(project, thisArg)); + return lift(source, new MapOperator(project, thisArg)); }; } diff --git a/src/internal/operators/mapTo.ts b/src/internal/operators/mapTo.ts index 6d34b29dae..5bc5edd880 100644 --- a/src/internal/operators/mapTo.ts +++ b/src/internal/operators/mapTo.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; export function mapTo(value: R): OperatorFunction; /** @deprecated remove in v8. Use mapTo(value: R): OperatorFunction signature instead **/ @@ -39,7 +40,7 @@ export function mapTo(value: R): OperatorFunction; * @name mapTo */ export function mapTo(value: R): OperatorFunction { - return (source: Observable) => source.lift(new MapToOperator(value)); + return (source: Observable) => lift(source, new MapToOperator(value)); } class MapToOperator implements Operator { diff --git a/src/internal/operators/materialize.ts b/src/internal/operators/materialize.ts index ec4fa3ee12..2d50edea3d 100644 --- a/src/internal/operators/materialize.ts +++ b/src/internal/operators/materialize.ts @@ -3,6 +3,7 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Notification } from '../Notification'; import { OperatorFunction, ObservableNotification } from '../types'; +import { lift } from '../util/lift'; /** * Represents all of the notifications from the source Observable as `next` @@ -60,7 +61,7 @@ import { OperatorFunction, ObservableNotification } from '../types'; */ export function materialize(): OperatorFunction & ObservableNotification> { return function materializeOperatorFunction(source: Observable) { - return source.lift(new MaterializeOperator()); + return lift(source, new MaterializeOperator()); }; } diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 15e4cd402a..60ac8877a4 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -8,6 +8,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function mergeMap>(project: (value: T, index: number) => O, concurrent?: number): OperatorFunction>; @@ -86,7 +87,7 @@ export function mergeMap>( } else if (typeof resultSelector === 'number') { concurrent = resultSelector; } - return (source: Observable) => source.lift(new MergeMapOperator(project, concurrent)); + return (source: Observable) => lift(source, new MergeMapOperator(project, concurrent)); } export class MergeMapOperator implements Operator { diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index afd476b9c7..1aa1804d73 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -6,6 +6,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { ObservableInput, OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Applies an accumulator function over the source Observable where the @@ -48,7 +49,7 @@ import { ObservableInput, OperatorFunction } from '../types'; export function mergeScan(accumulator: (acc: R, value: T, index: number) => ObservableInput, seed: R, concurrent: number = Infinity): OperatorFunction { - return (source: Observable) => source.lift(new MergeScanOperator(accumulator, seed, concurrent)); + return (source: Observable) => lift(source, new MergeScanOperator(accumulator, seed, concurrent)); } export class MergeScanOperator implements Operator { diff --git a/src/internal/operators/mergeWith.ts b/src/internal/operators/mergeWith.ts index 6d34e89c91..5383c3b7a4 100644 --- a/src/internal/operators/mergeWith.ts +++ b/src/internal/operators/mergeWith.ts @@ -1,6 +1,7 @@ import { merge as mergeStatic } from '../observable/merge'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerLike, ObservedValueUnionFromArray } from '../types'; +import { stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ @@ -57,10 +58,10 @@ export function merge(...observables: Array | Schedul * @deprecated use {@link mergeWith} or static {@link merge} */ export function merge(...observables: Array | SchedulerLike | number | undefined>): OperatorFunction { - return (source: Observable) => source.lift.call( - mergeStatic(source, ...(observables as any[])), - undefined - ) as Observable; + return (source: Observable) => stankyLift( + source, + mergeStatic(source, ...(observables as any[])) + ); } export function mergeWith(): OperatorFunction; diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 03d7b63b13..5134d425ef 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable'; import { OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function multicast(subject: Subject): UnaryFunction, ConnectableObservable>; @@ -43,7 +44,7 @@ export function multicast(subjectOrSubjectFactory: Subject | (() => Sub } if (typeof selector === 'function') { - return source.lift(new MulticastOperator(subjectFactory, selector)); + return lift(source, new MulticastOperator(subjectFactory, selector)); } const connectable: any = Object.create(source, connectableObservableDescriptor); diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 0f8875cd9a..683f9590b9 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -9,6 +9,7 @@ import { TeardownLogic, ObservableNotification, } from '../types'; +import { lift } from '../util/lift'; /** * @@ -64,7 +65,7 @@ import { */ export function observeOn(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction { return function observeOnOperatorFunction(source: Observable): Observable { - return source.lift(new ObserveOnOperator(scheduler, delay)); + return lift(source, new ObserveOnOperator(scheduler, delay)); }; } diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index b2b2027a1a..262e6544c7 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -8,6 +8,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function onErrorResumeNext(): OperatorFunction; @@ -95,7 +96,7 @@ export function onErrorResumeNext(...nextSources: Array>>nextSources[0]; } - return (source: Observable) => source.lift(new OnErrorResumeNextOperator(nextSources)); + return (source: Observable) => lift(source, new OnErrorResumeNextOperator(nextSources)); } /* tslint:disable:max-line-length */ @@ -119,7 +120,7 @@ export function onErrorResumeNextStatic(...nextSources: Array(nextSources)); + return lift(from(source), new OnErrorResumeNextOperator(nextSources)); } class OnErrorResumeNextOperator implements Operator { diff --git a/src/internal/operators/pairwise.ts b/src/internal/operators/pairwise.ts index eefef1c713..1f0abeb1a2 100644 --- a/src/internal/operators/pairwise.ts +++ b/src/internal/operators/pairwise.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Groups pairs of consecutive emissions together and emits them as an array of @@ -46,7 +47,7 @@ import { OperatorFunction } from '../types'; * @name pairwise */ export function pairwise(): OperatorFunction { - return (source: Observable) => source.lift(new PairwiseOperator()); + return (source: Observable) => lift(source, new PairwiseOperator()); } class PairwiseOperator implements Operator { diff --git a/src/internal/operators/raceWith.ts b/src/internal/operators/raceWith.ts index e6bd07a328..d25e399572 100644 --- a/src/internal/operators/raceWith.ts +++ b/src/internal/operators/raceWith.ts @@ -1,7 +1,9 @@ import { Observable } from '../Observable'; import { isArray } from '../util/isArray'; import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput, ObservedValueUnionFromArray } from '../types'; -import { race as raceStatic } from '../observable/race'; +import { race as raceStatic, RaceOperator } from '../observable/race'; +import { fromArray } from '../observable/fromArray'; +import { lift, stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated Deprecated use {@link raceWith} */ @@ -21,19 +23,14 @@ export function race(...observables: Array | Array(...observables: (Observable | Observable[])[]): MonoTypeOperatorFunction { - return function raceOperatorFunction(source: Observable) { - // if the only argument is an array, it was most likely called with - // `pair([obs1, obs2, ...])` - if (observables.length === 1 && isArray(observables[0])) { - observables = observables[0] as Observable[]; - } +export function race(...observables: ObservableInput[]): OperatorFunction { + // if the only argument is an array, it was most likely called with + // `pair([obs1, obs2, ...])` + if (observables.length === 1 && isArray(observables[0])) { + observables = observables[0]; + } - return source.lift.call( - raceStatic(source, ...(observables as Observable[])), - undefined - ) as Observable; - }; + return raceWith(...observables) as any; } /** @@ -68,9 +65,13 @@ export function raceWith[]>( ...otherSources: A ): OperatorFunction> { return function raceWithOperatorFunction(source: Observable) { - return source.lift.call( - raceStatic(source, ...otherSources), - undefined - ) as Observable>; + if (otherSources.length === 0) { + return source; + } + + return stankyLift( + source, + raceStatic(source, ...otherSources) + ); }; } diff --git a/src/internal/operators/refCount.ts b/src/internal/operators/refCount.ts index 0a1fa786c6..b7baf6932c 100644 --- a/src/internal/operators/refCount.ts +++ b/src/internal/operators/refCount.ts @@ -4,6 +4,7 @@ import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { ConnectableObservable } from '../observable/ConnectableObservable'; import { Observable } from '../Observable'; +import { lift } from '../util/lift'; /** * Make a {@link ConnectableObservable} behave like a ordinary observable and automates the way @@ -60,7 +61,7 @@ import { Observable } from '../Observable'; */ export function refCount(): MonoTypeOperatorFunction { return function refCountOperatorFunction(source: ConnectableObservable): Observable { - return source.lift(new RefCountOperator()); + return lift(source, new RefCountOperator()); } as MonoTypeOperatorFunction; } diff --git a/src/internal/operators/repeat.ts b/src/internal/operators/repeat.ts index cb60b70511..8809b78f6e 100644 --- a/src/internal/operators/repeat.ts +++ b/src/internal/operators/repeat.ts @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { EMPTY } from '../observable/empty'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that will resubscribe to the source stream when the source stream completes, at most count times. @@ -64,9 +65,9 @@ export function repeat(count: number = -1): MonoTypeOperatorFunction { if (count === 0) { return EMPTY; } else if (count < 0) { - return source.lift(new RepeatOperator(-1, source)); + return lift(source, new RepeatOperator(-1, source)); } else { - return source.lift(new RepeatOperator(count - 1, source)); + return lift(source, new RepeatOperator(count - 1, source)); } }; } diff --git a/src/internal/operators/repeatWhen.ts b/src/internal/operators/repeatWhen.ts index 89d739c5d4..fa7530de1f 100644 --- a/src/internal/operators/repeatWhen.ts +++ b/src/internal/operators/repeatWhen.ts @@ -9,6 +9,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that mirrors the source Observable with the exception of a `complete`. If the source @@ -40,7 +41,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name repeatWhen */ export function repeatWhen(notifier: (notifications: Observable) => Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new RepeatWhenOperator(notifier)); + return (source: Observable) => lift(source, new RepeatWhenOperator(notifier)); } class RepeatWhenOperator implements Operator { diff --git a/src/internal/operators/retry.ts b/src/internal/operators/retry.ts index cb86d8b179..8997bb309f 100644 --- a/src/internal/operators/retry.ts +++ b/src/internal/operators/retry.ts @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; export interface RetryConfig { count: number; @@ -66,7 +67,7 @@ export function retry(configOrCount: number | RetryConfig = -1): MonoTypeOper count: configOrCount as number }; } - return (source: Observable) => source.lift(new RetryOperator(config.count, !!config.resetOnSuccess, source)); + return (source: Observable) => lift(source, new RetryOperator(config.count, !!config.resetOnSuccess, source)); } class RetryOperator implements Operator { diff --git a/src/internal/operators/retryWhen.ts b/src/internal/operators/retryWhen.ts index ac02bfd0a7..2a8bda0780 100644 --- a/src/internal/operators/retryWhen.ts +++ b/src/internal/operators/retryWhen.ts @@ -9,6 +9,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable @@ -63,7 +64,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name retryWhen */ export function retryWhen(notifier: (errors: Observable) => Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new RetryWhenOperator(notifier, source)); + return (source: Observable) => lift(source, new RetryWhenOperator(notifier, source)); } class RetryWhenOperator implements Operator { diff --git a/src/internal/operators/sample.ts b/src/internal/operators/sample.ts index 0cda8903c6..86afa79b8b 100644 --- a/src/internal/operators/sample.ts +++ b/src/internal/operators/sample.ts @@ -6,6 +6,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits the most recently emitted value from the source Observable whenever @@ -47,7 +48,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name sample */ export function sample(notifier: Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SampleOperator(notifier)); + return (source: Observable) => lift(source, new SampleOperator(notifier)); } class SampleOperator implements Operator { diff --git a/src/internal/operators/sampleTime.ts b/src/internal/operators/sampleTime.ts index 317f718f15..b805d1a1b1 100644 --- a/src/internal/operators/sampleTime.ts +++ b/src/internal/operators/sampleTime.ts @@ -3,6 +3,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { async } from '../scheduler/async'; import { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits the most recently emitted value from the source Observable within @@ -46,7 +47,7 @@ import { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic * @name sampleTime */ export function sampleTime(period: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SampleTimeOperator(period, scheduler)); + return (source: Observable) => lift(source, new SampleTimeOperator(period, scheduler)); } class SampleTimeOperator implements Operator { diff --git a/src/internal/operators/scan.ts b/src/internal/operators/scan.ts index 66434c9093..2916be207e 100644 --- a/src/internal/operators/scan.ts +++ b/src/internal/operators/scan.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { OperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function scan(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction; @@ -63,7 +64,7 @@ export function scan(accumulator: (acc: V|A|S, value: V, index: number) } return function scanOperatorFunction(source: Observable) { - return source.lift(new ScanOperator(accumulator, seed, hasSeed)); + return lift(source, new ScanOperator(accumulator, seed, hasSeed)); }; } diff --git a/src/internal/operators/sequenceEqual.ts b/src/internal/operators/sequenceEqual.ts index a808bafb46..4ddb126041 100644 --- a/src/internal/operators/sequenceEqual.ts +++ b/src/internal/operators/sequenceEqual.ts @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { Observer, OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Compares all values of two observables in sequence using an optional comparator function @@ -63,7 +64,7 @@ import { Observer, OperatorFunction } from '../types'; */ export function sequenceEqual(compareTo: Observable, comparator?: (a: T, b: T) => boolean): OperatorFunction { - return (source: Observable) => source.lift(new SequenceEqualOperator(compareTo, comparator)); + return (source: Observable) => lift(source, new SequenceEqualOperator(compareTo, comparator)); } export class SequenceEqualOperator implements Operator { diff --git a/src/internal/operators/shareReplay.ts b/src/internal/operators/shareReplay.ts index c3bbe5d2f7..6a0244eb16 100644 --- a/src/internal/operators/shareReplay.ts +++ b/src/internal/operators/shareReplay.ts @@ -3,6 +3,7 @@ import { ReplaySubject } from '../ReplaySubject'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; import { Subscriber } from '../Subscriber'; +import { lift } from '../util/lift'; export interface ShareReplayConfig { bufferSize?: number; @@ -139,7 +140,7 @@ export function shareReplay( scheduler }; } - return (source: Observable) => source.lift(shareReplayOperator(config)); + return (source: Observable) => lift(source, shareReplayOperator(config)); } function shareReplayOperator({ diff --git a/src/internal/operators/single.ts b/src/internal/operators/single.ts index cf6c28cb81..c918640fc6 100644 --- a/src/internal/operators/single.ts +++ b/src/internal/operators/single.ts @@ -5,6 +5,7 @@ import { EmptyError } from '../util/EmptyError'; import { MonoTypeOperatorFunction } from '../types'; import { SequenceError } from '../util/SequenceError'; import { NotFoundError } from '../util/NotFoundError'; +import { lift } from '../util/lift'; const defaultPredicate = () => true; @@ -91,7 +92,7 @@ const defaultPredicate = () => true; export function single( predicate: (value: T, index: number, source: Observable) => boolean = defaultPredicate ): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(singleOperator(predicate)); + return (source: Observable) => lift(source, singleOperator(predicate)); } function singleOperator(predicate: (value: T, index: number, source: Observable) => boolean) { diff --git a/src/internal/operators/skip.ts b/src/internal/operators/skip.ts index 33876879f9..96827f86fa 100644 --- a/src/internal/operators/skip.ts +++ b/src/internal/operators/skip.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that skips the first `count` items emitted by the source Observable. @@ -13,7 +14,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name skip */ export function skip(count: number): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SkipOperator(count)); + return (source: Observable) => lift(source, new SkipOperator(count)); } class SkipOperator implements Operator { diff --git a/src/internal/operators/skipLast.ts b/src/internal/operators/skipLast.ts index f5045a88c1..1c4e8dd1ff 100644 --- a/src/internal/operators/skipLast.ts +++ b/src/internal/operators/skipLast.ts @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber'; import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Skip the last `count` values emitted by the source Observable. @@ -42,7 +43,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name skipLast */ export function skipLast(count: number): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SkipLastOperator(count)); + return (source: Observable) => lift(source, new SkipLastOperator(count)); } class SkipLastOperator implements Operator { diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index 12912cffa0..60b2aa1b22 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -6,6 +6,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types'; import { Subscription } from '../Subscription'; +import { lift } from '../util/lift'; /** * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. @@ -46,7 +47,7 @@ import { Subscription } from '../Subscription'; * @name skipUntil */ export function skipUntil(notifier: Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SkipUntilOperator(notifier)); + return (source: Observable) => lift(source, new SkipUntilOperator(notifier)); } class SkipUntilOperator implements Operator { diff --git a/src/internal/operators/skipWhile.ts b/src/internal/operators/skipWhile.ts index 1991a72442..2dcb4090bf 100644 --- a/src/internal/operators/skipWhile.ts +++ b/src/internal/operators/skipWhile.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds @@ -15,7 +16,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name skipWhile */ export function skipWhile(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SkipWhileOperator(predicate)); + return (source: Observable) => lift(source, new SkipWhileOperator(predicate)); } class SkipWhileOperator implements Operator { diff --git a/src/internal/operators/subscribeOn.ts b/src/internal/operators/subscribeOn.ts index ad5e0e7d20..0b9df046c9 100644 --- a/src/internal/operators/subscribeOn.ts +++ b/src/internal/operators/subscribeOn.ts @@ -5,6 +5,7 @@ import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic, SchedulerAction import { asap as asapScheduler } from '../scheduler/asap'; import { Subscription } from '../Subscription'; import { isScheduler } from '../util/isScheduler'; +import { lift } from '../util/lift'; export interface DispatchArg { source: Observable; @@ -106,7 +107,7 @@ class SubscribeOnObservable extends Observable { */ export function subscribeOn(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction { return function subscribeOnOperatorFunction(source: Observable): Observable { - return source.lift(new SubscribeOnOperator(scheduler, delay)); + return lift(source, new SubscribeOnOperator(scheduler, delay)); }; } diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index 72c8f3ddbc..e05fa29149 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -8,6 +8,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function switchMap>(project: (value: T, index: number) => O): OperatorFunction>; @@ -89,7 +90,7 @@ export function switchMap>( )) ); } - return (source: Observable) => source.lift(new SwitchMapOperator(project)); + return (source: Observable) => lift(source, new SwitchMapOperator(project)); } class SwitchMapOperator implements Operator { diff --git a/src/internal/operators/take.ts b/src/internal/operators/take.ts index c525b020fd..27e250e578 100644 --- a/src/internal/operators/take.ts +++ b/src/internal/operators/take.ts @@ -4,6 +4,7 @@ import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { EMPTY } from '../observable/empty'; +import { lift } from '../util/lift'; /** * Emits only the first `count` values emitted by the source Observable. @@ -57,7 +58,7 @@ export function take(count: number): MonoTypeOperatorFunction { throw new ArgumentOutOfRangeError; } - return (source: Observable) => (count === 0) ? EMPTY : source.lift(new TakeOperator(count)); + return (source: Observable) => (count === 0) ? EMPTY : lift(source, new TakeOperator(count)); } class TakeOperator implements Operator { diff --git a/src/internal/operators/takeLast.ts b/src/internal/operators/takeLast.ts index 36c77d0bb7..296bcdcb2e 100644 --- a/src/internal/operators/takeLast.ts +++ b/src/internal/operators/takeLast.ts @@ -4,6 +4,7 @@ import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; import { EMPTY } from '../observable/empty'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits only the last `count` values emitted by the source Observable. @@ -58,7 +59,7 @@ export function takeLast(count: number): MonoTypeOperatorFunction { if (count === 0) { return EMPTY; } else { - return source.lift(new TakeLastOperator(count)); + return lift(source, new TakeLastOperator(count)); } }; } diff --git a/src/internal/operators/takeUntil.ts b/src/internal/operators/takeUntil.ts index 3d9dd1afa8..629ca024ad 100644 --- a/src/internal/operators/takeUntil.ts +++ b/src/internal/operators/takeUntil.ts @@ -7,6 +7,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits the values emitted by the source Observable until a `notifier` @@ -48,7 +49,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name takeUntil */ export function takeUntil(notifier: Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new TakeUntilOperator(notifier)); + return (source: Observable) => lift(source, new TakeUntilOperator(notifier)); } class TakeUntilOperator implements Operator { diff --git a/src/internal/operators/takeWhile.ts b/src/internal/operators/takeWhile.ts index 05045ad50b..6e68916682 100644 --- a/src/internal/operators/takeWhile.ts +++ b/src/internal/operators/takeWhile.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { OperatorFunction, MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; export function takeWhile(predicate: (value: T, index: number) => value is S): OperatorFunction; export function takeWhile(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction; @@ -54,7 +55,7 @@ export function takeWhile( predicate: (value: T, index: number) => boolean, inclusive = false): MonoTypeOperatorFunction { return (source: Observable) => - source.lift(new TakeWhileOperator(predicate, inclusive)); + lift(source, new TakeWhileOperator(predicate, inclusive)); } class TakeWhileOperator implements Operator { diff --git a/src/internal/operators/tap.ts b/src/internal/operators/tap.ts index 67b400de61..c6d75cdb41 100644 --- a/src/internal/operators/tap.ts +++ b/src/internal/operators/tap.ts @@ -4,6 +4,7 @@ import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, PartialObserver, TeardownLogic } from '../types'; import { noop } from '../util/noop'; import { isFunction } from '../util/isFunction'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated Use an observer instead of a complete callback */ @@ -68,7 +69,7 @@ export function tap(nextOrObserver?: PartialObserver | ((x: T) => void) | error?: ((e: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction { return function tapOperatorFunction(source: Observable): Observable { - return source.lift(new DoOperator(nextOrObserver, error, complete)); + return lift(source, new DoOperator(nextOrObserver, error, complete)); }; } diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 250193a94b..d912fc99ab 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -8,6 +8,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; export interface ThrottleConfig { leading?: boolean; @@ -66,7 +67,7 @@ export const defaultThrottleConfig: ThrottleConfig = { */ export function throttle(durationSelector: (value: T) => SubscribableOrPromise, config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing)); + return (source: Observable) => lift(source, new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing)); } class ThrottleOperator implements Operator { diff --git a/src/internal/operators/throttleTime.ts b/src/internal/operators/throttleTime.ts index df5e94cfaa..4f10e8908b 100644 --- a/src/internal/operators/throttleTime.ts +++ b/src/internal/operators/throttleTime.ts @@ -5,6 +5,7 @@ import { async } from '../scheduler/async'; import { Observable } from '../Observable'; import { ThrottleConfig, defaultThrottleConfig } from './throttle'; import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits a value from the source Observable, then ignores subsequent source @@ -87,7 +88,7 @@ import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types export function throttleTime(duration: number, scheduler: SchedulerLike = async, config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new ThrottleTimeOperator(duration, scheduler, !!config.leading, !!config.trailing)); + return (source: Observable) => lift(source, new ThrottleTimeOperator(duration, scheduler, !!config.leading, !!config.trailing)); } class ThrottleTimeOperator implements Operator { diff --git a/src/internal/operators/throwIfEmpty.ts b/src/internal/operators/throwIfEmpty.ts index 7f39c4e93f..db2ddb0a83 100644 --- a/src/internal/operators/throwIfEmpty.ts +++ b/src/internal/operators/throwIfEmpty.ts @@ -3,6 +3,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { TeardownLogic, MonoTypeOperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * If the source observable completes without emitting a value, it will emit @@ -36,7 +37,7 @@ import { TeardownLogic, MonoTypeOperatorFunction } from '../types'; */ export function throwIfEmpty (errorFactory: (() => any) = defaultErrorFactory): MonoTypeOperatorFunction { return (source: Observable) => { - return source.lift(new ThrowIfEmptyOperator(errorFactory)); + return lift(source, new ThrowIfEmptyOperator(errorFactory)); }; } diff --git a/src/internal/operators/timeoutWith.ts b/src/internal/operators/timeoutWith.ts index 74cddec458..946d4feb5f 100644 --- a/src/internal/operators/timeoutWith.ts +++ b/src/internal/operators/timeoutWith.ts @@ -6,6 +6,7 @@ import { isValidDate } from '../util/isDate'; import { OuterSubscriber } from '../OuterSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function timeoutWith(due: number | Date, withObservable: ObservableInput, scheduler?: SchedulerLike): OperatorFunction; @@ -69,7 +70,7 @@ export function timeoutWith(due: number | Date, return (source: Observable) => { let absoluteTimeout = isValidDate(due); let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(due); - return source.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler)); + return lift(source, new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler)); }; } diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index a1b6180b46..bb71085fe8 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { Operator } from '../Operator'; +import { lift } from '../util/lift'; /** * Branch out the source Observable values as a nested Observable whenever @@ -51,7 +52,7 @@ import { Operator } from '../Operator'; */ export function window(windowBoundaries: Observable): OperatorFunction> { return function windowOperatorFunction(source: Observable) { - return source.lift(new WindowOperator(windowBoundaries)); + return lift(source, new WindowOperator(windowBoundaries)); }; } diff --git a/src/internal/operators/windowCount.ts b/src/internal/operators/windowCount.ts index ef384c7ee1..ff670ba0dd 100644 --- a/src/internal/operators/windowCount.ts +++ b/src/internal/operators/windowCount.ts @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Branch out the source Observable values as a nested Observable with each @@ -69,7 +70,7 @@ import { OperatorFunction } from '../types'; export function windowCount(windowSize: number, startWindowEvery: number = 0): OperatorFunction> { return function windowCountOperatorFunction(source: Observable) { - return source.lift(new WindowCountOperator(windowSize, startWindowEvery)); + return lift(source, new WindowCountOperator(windowSize, startWindowEvery)); }; } diff --git a/src/internal/operators/windowTime.ts b/src/internal/operators/windowTime.ts index acf35ebba9..4fcb0f18c5 100644 --- a/src/internal/operators/windowTime.ts +++ b/src/internal/operators/windowTime.ts @@ -7,6 +7,7 @@ import { Subscription } from '../Subscription'; import { isNumeric } from '../util/isNumeric'; import { isScheduler } from '../util/isScheduler'; import { OperatorFunction, SchedulerLike, SchedulerAction } from '../types'; +import { lift } from '../util/lift'; export function windowTime(windowTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction>; @@ -118,7 +119,7 @@ export function windowTime(windowTimeSpan: number): OperatorFunction) { - return source.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)); + return lift(source, new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)); }; } diff --git a/src/internal/operators/windowToggle.ts b/src/internal/operators/windowToggle.ts index dd6d62d3f4..79fb1cde25 100644 --- a/src/internal/operators/windowToggle.ts +++ b/src/internal/operators/windowToggle.ts @@ -7,6 +7,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Branch out the source Observable values as a nested Observable starting from @@ -57,7 +58,7 @@ import { OperatorFunction } from '../types'; */ export function windowToggle(openings: Observable, closingSelector: (openValue: O) => Observable): OperatorFunction> { - return (source: Observable) => source.lift(new WindowToggleOperator(openings, closingSelector)); + return (source: Observable) => lift(source, new WindowToggleOperator(openings, closingSelector)); } class WindowToggleOperator implements Operator> { diff --git a/src/internal/operators/windowWhen.ts b/src/internal/operators/windowWhen.ts index b6164fa7e1..a479d4a471 100644 --- a/src/internal/operators/windowWhen.ts +++ b/src/internal/operators/windowWhen.ts @@ -7,6 +7,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Branch out the source Observable values as a nested Observable using a @@ -54,7 +55,7 @@ import { OperatorFunction } from '../types'; */ export function windowWhen(closingSelector: () => Observable): OperatorFunction> { return function windowWhenOperatorFunction(source: Observable) { - return source.lift(new WindowOperator(closingSelector)); + return lift(source, new WindowOperator(closingSelector)); }; } diff --git a/src/internal/operators/withLatestFrom.ts b/src/internal/operators/withLatestFrom.ts index 55ff79b143..4413c873f2 100644 --- a/src/internal/operators/withLatestFrom.ts +++ b/src/internal/operators/withLatestFrom.ts @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function withLatestFrom(project: (v1: T) => R): OperatorFunction; @@ -74,7 +75,7 @@ export function withLatestFrom(...args: Array | ((... project = args.pop(); } const observables = []>args; - return source.lift(new WithLatestFromOperator(observables, project)); + return lift(source, new WithLatestFromOperator(observables, project)); }; } diff --git a/src/internal/operators/zipAll.ts b/src/internal/operators/zipAll.ts index 163ae43a85..3fd9d33e94 100644 --- a/src/internal/operators/zipAll.ts +++ b/src/internal/operators/zipAll.ts @@ -1,6 +1,7 @@ import { ZipOperator } from '../observable/zip'; import { Observable } from '../Observable'; import { OperatorFunction, ObservableInput } from '../types'; +import { lift } from '../util/lift'; export function zipAll(): OperatorFunction, T[]>; export function zipAll(): OperatorFunction; @@ -8,5 +9,5 @@ export function zipAll(project: (...values: T[]) => R): OperatorFunction(project: (...values: Array) => R): OperatorFunction; export function zipAll(project?: (...values: Array) => R): OperatorFunction { - return (source: Observable) => source.lift(new ZipOperator(project)); + return (source: Observable) => lift(source, new ZipOperator(project)); } diff --git a/src/internal/operators/zipWith.ts b/src/internal/operators/zipWith.ts index 9f472731b6..6db36af106 100644 --- a/src/internal/operators/zipWith.ts +++ b/src/internal/operators/zipWith.ts @@ -1,6 +1,7 @@ import { zip as zipStatic } from '../observable/zip'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueTupleFromArray, Cons } from '../types'; +import { stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated Deprecated use {@link zipWith} */ @@ -38,10 +39,10 @@ export function zip(array: Array>, project */ export function zip(...observables: Array | ((...values: Array) => R)>): OperatorFunction { return function zipOperatorFunction(source: Observable) { - return source.lift.call( - zipStatic(source, ...observables), - undefined - ) as Observable; + return stankyLift( + source, + zipStatic(source, ...observables) + ); }; } diff --git a/src/internal/util/lift.ts b/src/internal/util/lift.ts new file mode 100644 index 0000000000..e9fbf7c316 --- /dev/null +++ b/src/internal/util/lift.ts @@ -0,0 +1,54 @@ +/** @prettier */ +import { Observable } from '../Observable'; +import { Operator } from '../Operator'; + +/** + * A utility to lift observables. Will also error if an observable is passed that does not + * have the lift mechanism. + * + * We _must_ do this for version 7, because it is what allows subclassed observables to compose through + * the operators that use this. That will be going away in v8. + * + * See https://github.com/ReactiveX/rxjs/issues/5571 + * and https://github.com/ReactiveX/rxjs/issues/5431 + * + * @param source The source observable to lift + * @param operator The operator to lift it with. Note that the operator possibly being undefined here + * is related to issues around the "stanky" lifting of static creation functions as operators, see below. + */ +export function lift(source: Observable, operator?: Operator): Observable { + if (hasLift(source)) { + return source.lift(operator); + } + throw new TypeError('Unable to lift unknown Observable type'); +} + +// TODO: Figure out proper typing for what we're doing below at some point. +// For right now it's not that important, as it's internal implementation and not +// public typings on a public API. + +/** + * A utility used to lift observables in the case that we are trying to convert a static observable + * creation function to an operator that appropriately uses lift. Ultimately this is a smell + * related to `lift`, hence the name. + * + * We _must_ do this for version 7, because it is what allows subclassed observables to compose through + * the operators that use this. That will be going away in v8. + * + * See https://github.com/ReactiveX/rxjs/issues/5571 + * and https://github.com/ReactiveX/rxjs/issues/5431 + * + * @param source the original observable source for the operator + * @param liftedSource the actual composed source we want to lift + * @param operator the operator to lift it with (often undefined in this case) + */ +export function stankyLift(source: Observable, liftedSource: Observable, operator?: Operator): Observable { + if (hasLift(source)) { + return source.lift.call(liftedSource, operator); + } + throw new TypeError('Unable to lift unknown Observable type'); +} + +function hasLift(source: any): source is { lift: InstanceType['lift'] } { + return source && typeof source.lift === 'function'; +}