diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 39d0fec419..b47fc6b620 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -698,7 +698,7 @@ describe('Observable.lift', () => { } }; - NEVER.lift(myOperator) + (NEVER as any).lift(myOperator) .subscribe() .unsubscribe(); @@ -883,8 +883,8 @@ describe('Observable.lift', () => { class LogObservable extends Observable { lift(operator: Operator): Observable { const observable = new LogObservable(); - (observable).source = this; - (observable).operator = new LogOperator(operator); + observable.source = this; + observable.operator = new LogOperator(operator); return observable; } } diff --git a/spec/helpers/interop-helper.ts b/spec/helpers/interop-helper.ts index db5c11564b..8e776c34ac 100644 --- a/spec/helpers/interop-helper.ts +++ b/spec/helpers/interop-helper.ts @@ -11,7 +11,7 @@ export function asInteropObservable(observable: Observable): Observable return new Proxy(observable, { get(target: Observable, key: string | number | symbol) { if (key === 'lift') { - const { lift } = target; + const { lift } = target as any; return interopLift(lift); } if (key === 'subscribe') { diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 23c77472f9..48cf3653e7 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -7,8 +7,6 @@ import { Subscription } from './Subscription'; import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable } from './types'; import { canReportError } from './util/canReportError'; import { toSubscriber } from './util/toSubscriber'; -import { iif } from './observable/iif'; -import { throwError } from './observable/throwError'; import { observable as Symbol_observable } from './symbol/observable'; import { pipeFromArray } from './util/pipe'; import { config } from './config'; @@ -20,14 +18,11 @@ import { config } from './config'; * @class Observable */ export class Observable implements Subscribable { - /** Internal implementation detail, do not use directly. */ - public _isScalar: boolean = false; - /** @deprecated This is an internal implementation detail, do not use. */ - source: Observable | undefined; + protected source: Observable | undefined; /** @deprecated This is an internal implementation detail, do not use. */ - operator: Operator | undefined; + protected operator: Operator | undefined; /** * @constructor @@ -59,13 +54,16 @@ export class Observable implements Subscribable { }; /** - * Creates a new Observable, with this Observable as the source, and the passed + * Creates a new Observable, with this Observable instance as the source, and the passed * operator defined as the new observable's operator. * @method lift - * @param {Operator} operator the operator defining the operation to take on the observable - * @return {Observable} a new observable with the Operator applied + * @param operator the operator defining the operation to take on the observable + * @return a new observable with the Operator applied + * @deprecated This is an internal implementation detail, do not use directly. If you have implemented an operator + * using `lift`, it is recommended that you create an operator by simply returning `new Observable()` directly. + * See "Creating new operators from scratch" section here: https://rxjs.dev/guide/operators */ - lift(operator?: Operator): Observable { + protected lift(operator?: Operator): Observable { const observable = new Observable(); observable.source = this; observable.operator = operator; @@ -236,7 +234,7 @@ export class Observable implements Subscribable { } /** @deprecated This is an internal implementation detail, do not use. */ - _trySubscribe(sink: Subscriber): TeardownLogic { + protected _trySubscribe(sink: Subscriber): TeardownLogic { try { return this._subscribe(sink); } catch (err) { @@ -336,24 +334,11 @@ export class Observable implements Subscribable { } /** @internal This is an internal implementation detail, do not use. */ - _subscribe(subscriber: Subscriber): TeardownLogic { + protected _subscribe(subscriber: Subscriber): TeardownLogic { const { source } = this; return source && source.subscribe(subscriber); } - // `if` and `throw` are special snow flakes, the compiler sees them as reserved words. Deprecated in - // favor of iif and throwError functions. - /** - * @nocollapse - * @deprecated In favor of iif creation function: import { iif } from 'rxjs'; - */ - static if: typeof iif; - /** - * @nocollapse - * @deprecated In favor of throwError creation function: import { throwError } from 'rxjs'; - */ - static throw: typeof throwError; - /** * An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable * @method Symbol.observable diff --git a/src/internal/Operator.ts b/src/internal/Operator.ts index 93b65e6464..08aa5e438e 100644 --- a/src/internal/Operator.ts +++ b/src/internal/Operator.ts @@ -1,6 +1,9 @@ import { Subscriber } from './Subscriber'; import { TeardownLogic } from './types'; +/*** + * @deprecated Internal implementation detail, do not use. + */ export interface Operator { call(subscriber: Subscriber, source: any): TeardownLogic; } diff --git a/src/internal/observable/combineLatest.ts b/src/internal/observable/combineLatest.ts index c9c0e0a0aa..500c70a8bd 100644 --- a/src/internal/observable/combineLatest.ts +++ b/src/internal/observable/combineLatest.ts @@ -8,6 +8,7 @@ import { Operator } from '../Operator'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { fromArray } from './fromArray'; +import { lift } from '../util/lift'; const NONE = {}; @@ -233,7 +234,7 @@ export function combineLatest, R>( observables = observables[0] as any; } - return fromArray(observables, scheduler).lift(new CombineLatestOperator, R>(resultSelector)); + return lift(fromArray(observables, scheduler), new CombineLatestOperator, R>(resultSelector)); } export class CombineLatestOperator implements Operator { diff --git a/src/internal/observable/race.ts b/src/internal/observable/race.ts index 27929e52c7..70540181e5 100644 --- a/src/internal/observable/race.ts +++ b/src/internal/observable/race.ts @@ -9,6 +9,7 @@ import { TeardownLogic, ObservableInput, ObservedValueUnionFromArray } from '../ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +import { lift } from '../util/lift'; export function race[]>(observables: A): Observable>; export function race[]>(...observables: A): Observable>; @@ -65,7 +66,7 @@ export function race(...observables: (ObservableInput | ObservableInput } } - return fromArray(observables, undefined).lift(new RaceOperator()); + return lift(fromArray(observables, undefined), new RaceOperator()); } export class RaceOperator implements Operator { diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index 327657ab50..938ac2fabb 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -9,6 +9,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { iterator as Symbol_iterator } from '../../internal/symbol/iterator'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated resultSelector is no longer supported, pipe to map instead */ @@ -85,7 +86,7 @@ export function zip, R>( if (typeof last === 'function') { resultSelector = observables.pop() as typeof resultSelector; } - return fromArray(observables, undefined).lift(new ZipOperator(resultSelector)); + return lift(fromArray(observables, undefined), new ZipOperator(resultSelector)); } export class ZipOperator implements Operator { diff --git a/src/internal/operators/audit.ts b/src/internal/operators/audit.ts index 7fbb5201be..704d4d5047 100644 --- a/src/internal/operators/audit.ts +++ b/src/internal/operators/audit.ts @@ -6,6 +6,7 @@ import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from ' import { OuterSubscriber } from '../OuterSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +import { lift } from '../util/lift'; /** * Ignores source values for a duration determined by another Observable, then @@ -54,7 +55,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; */ export function audit(durationSelector: (value: T) => SubscribableOrPromise): MonoTypeOperatorFunction { return function auditOperatorFunction(source: Observable) { - return source.lift(new AuditOperator(durationSelector)); + return lift(source, new AuditOperator(durationSelector)); }; } diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 71172ea91e..4cf80c78c0 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Buffers the source Observable values until `closingNotifier` emits. @@ -47,7 +48,7 @@ import { OperatorFunction } from '../types'; */ export function buffer(closingNotifier: Observable): OperatorFunction { return function bufferOperatorFunction(source: Observable) { - return source.lift(new BufferOperator(closingNotifier)); + return lift(source, new BufferOperator(closingNotifier)); }; } diff --git a/src/internal/operators/bufferCount.ts b/src/internal/operators/bufferCount.ts index 636429b102..c39d8e3ecf 100644 --- a/src/internal/operators/bufferCount.ts +++ b/src/internal/operators/bufferCount.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Buffers the source Observable values until the size hits the maximum @@ -59,7 +60,7 @@ import { OperatorFunction, TeardownLogic } from '../types'; */ export function bufferCount(bufferSize: number, startBufferEvery: number | null = null): OperatorFunction { return function bufferCountOperatorFunction(source: Observable) { - return source.lift(new BufferCountOperator(bufferSize, startBufferEvery)); + return lift(source, new BufferCountOperator(bufferSize, startBufferEvery)); }; } diff --git a/src/internal/operators/bufferTime.ts b/src/internal/operators/bufferTime.ts index 4bd97f2933..a53126edc3 100644 --- a/src/internal/operators/bufferTime.ts +++ b/src/internal/operators/bufferTime.ts @@ -5,6 +5,7 @@ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { isScheduler } from '../util/isScheduler'; import { OperatorFunction, SchedulerAction, SchedulerLike } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function bufferTime(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction; @@ -88,7 +89,7 @@ export function bufferTime(bufferTimeSpan: number): OperatorFunction } return function bufferTimeOperatorFunction(source: Observable) { - return source.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler)); + return lift(source, new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler)); }; } diff --git a/src/internal/operators/bufferToggle.ts b/src/internal/operators/bufferToggle.ts index fd7eb9653d..1a8770f033 100644 --- a/src/internal/operators/bufferToggle.ts +++ b/src/internal/operators/bufferToggle.ts @@ -6,6 +6,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { OperatorFunction, SubscribableOrPromise } from '../types'; +import { lift } from '../util/lift'; /** * Buffers the source Observable values starting from an emission from @@ -57,7 +58,7 @@ export function bufferToggle( closingSelector: (value: O) => SubscribableOrPromise ): OperatorFunction { return function bufferToggleOperatorFunction(source: Observable) { - return source.lift(new BufferToggleOperator(openings, closingSelector)); + return lift(source, new BufferToggleOperator(openings, closingSelector)); }; } diff --git a/src/internal/operators/bufferWhen.ts b/src/internal/operators/bufferWhen.ts index 4f8b6184c5..9e61aeb66b 100644 --- a/src/internal/operators/bufferWhen.ts +++ b/src/internal/operators/bufferWhen.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Buffers the source Observable values, using a factory function of closing @@ -50,7 +51,7 @@ import { OperatorFunction } from '../types'; */ export function bufferWhen(closingSelector: () => Observable): OperatorFunction { return function (source: Observable) { - return source.lift(new BufferWhenOperator(closingSelector)); + return lift(source, new BufferWhenOperator(closingSelector)); }; } diff --git a/src/internal/operators/catchError.ts b/src/internal/operators/catchError.ts index 68d50fe771..56b6c83c4d 100644 --- a/src/internal/operators/catchError.ts +++ b/src/internal/operators/catchError.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function catchError>(selector: (err: any, caught: Observable) => O): OperatorFunction>; @@ -108,8 +109,9 @@ export function catchError>( ): OperatorFunction> { return function catchErrorOperatorFunction(source: Observable): Observable> { const operator = new CatchOperator(selector); - const caught = source.lift(operator); - return (operator.caught = caught as Observable); + const caught = lift(source, operator); + operator.caught = caught; + return caught; }; } diff --git a/src/internal/operators/combineAll.ts b/src/internal/operators/combineAll.ts index f696000341..93cca3f0f2 100644 --- a/src/internal/operators/combineAll.ts +++ b/src/internal/operators/combineAll.ts @@ -1,6 +1,7 @@ import { CombineLatestOperator } from '../observable/combineLatest'; import { Observable } from '../Observable'; import { OperatorFunction, ObservableInput } from '../types'; +import { lift } from '../util/lift'; export function combineAll(): OperatorFunction, T[]>; export function combineAll(): OperatorFunction; @@ -53,5 +54,5 @@ export function combineAll(project: (...values: Array) => R): OperatorFu * @name combineAll */ export function combineAll(project?: (...values: Array) => R): OperatorFunction { - return (source: Observable) => source.lift(new CombineLatestOperator(project)); + return (source: Observable) => lift(source, new CombineLatestOperator(project)); } diff --git a/src/internal/operators/combineLatestWith.ts b/src/internal/operators/combineLatestWith.ts index bcca48d2c2..e0f5d8f52a 100644 --- a/src/internal/operators/combineLatestWith.ts +++ b/src/internal/operators/combineLatestWith.ts @@ -4,6 +4,7 @@ import { CombineLatestOperator } from '../observable/combineLatest'; import { from } from '../observable/from'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueTupleFromArray, Cons } from '../types'; +import { lift, stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated use {@link combineLatestWith} */ @@ -53,10 +54,11 @@ export function combineLatest(...observables: Array | observables = (observables[0]).slice(); } - return (source: Observable) => source.lift.call( + return (source: Observable) => stankyLift( + source, from([source, ...observables]), new CombineLatestOperator(project) - ) as Observable; + ); } /** diff --git a/src/internal/operators/concat.ts b/src/internal/operators/concat.ts index dbd116245b..acf58b9576 100644 --- a/src/internal/operators/concat.ts +++ b/src/internal/operators/concat.ts @@ -1,6 +1,7 @@ import { concat as concatStatic } from '../observable/concat'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerLike } from '../types'; +import { stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated remove in v8. Use {@link concatWith} */ @@ -25,8 +26,8 @@ export function concat(...observables: Array | Schedu * @deprecated remove in v8. Use {@link concatWith} */ export function concat(...observables: Array | SchedulerLike | undefined>): OperatorFunction { - return (source: Observable) => source.lift.call( + return (source: Observable) => stankyLift( + source, concatStatic(source, ...(observables as any[])), - undefined - ) as Observable; + ); } diff --git a/src/internal/operators/concatWith.ts b/src/internal/operators/concatWith.ts index 5060057402..414203a09c 100644 --- a/src/internal/operators/concatWith.ts +++ b/src/internal/operators/concatWith.ts @@ -1,6 +1,7 @@ import { concat as concatStatic } from '../observable/concat'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueUnionFromArray } from '../types'; +import { stankyLift } from '../util/lift'; export function concatWith(): OperatorFunction; export function concatWith[]>(...otherSources: A): OperatorFunction | T>; @@ -44,8 +45,8 @@ export function concatWith[]>(...otherSources: * @param otherSources Other observable sources to subscribe to, in sequence, after the original source is complete. */ export function concatWith[]>(...otherSources: A): OperatorFunction | T> { - return (source: Observable) => source.lift.call( - concatStatic(source, ...otherSources), - undefined - ) as Observable | T>; + return (source: Observable) => stankyLift( + source, + concatStatic(source, ...otherSources) + ); } diff --git a/src/internal/operators/count.ts b/src/internal/operators/count.ts index 14e0f83b9e..b9b4a8587d 100644 --- a/src/internal/operators/count.ts +++ b/src/internal/operators/count.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Observer, OperatorFunction } from '../types'; import { Subscriber } from '../Subscriber'; +import { lift } from '../util/lift'; /** * Counts the number of emissions on the source and emits that number when the * source completes. @@ -62,7 +63,7 @@ import { Subscriber } from '../Subscriber'; */ export function count(predicate?: (value: T, index: number, source: Observable) => boolean): OperatorFunction { - return (source: Observable) => source.lift(new CountOperator(predicate, source)); + return (source: Observable) => lift(source, new CountOperator(predicate, source)); } class CountOperator implements Operator { diff --git a/src/internal/operators/debounce.ts b/src/internal/operators/debounce.ts index 70f30c6391..3d699435b5 100644 --- a/src/internal/operators/debounce.ts +++ b/src/internal/operators/debounce.ts @@ -7,6 +7,7 @@ import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from ' import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; +import { lift } from '../util/lift'; /** * Emits a notification from the source Observable only after a particular time span @@ -67,7 +68,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @name debounce */ export function debounce(durationSelector: (value: T) => SubscribableOrPromise): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new DebounceOperator(durationSelector)); + return (source: Observable) => lift(source, new DebounceOperator(durationSelector)); } class DebounceOperator implements Operator { diff --git a/src/internal/operators/debounceTime.ts b/src/internal/operators/debounceTime.ts index 83b456b157..4e8b073aa1 100644 --- a/src/internal/operators/debounceTime.ts +++ b/src/internal/operators/debounceTime.ts @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { async } from '../scheduler/async'; import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits a notification from the source Observable only after a particular time span @@ -64,7 +65,7 @@ import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types * @name debounceTime */ export function debounceTime(dueTime: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new DebounceTimeOperator(dueTime, scheduler)); + return (source: Observable) => lift(source, new DebounceTimeOperator(dueTime, scheduler)); } class DebounceTimeOperator implements Operator { diff --git a/src/internal/operators/defaultIfEmpty.ts b/src/internal/operators/defaultIfEmpty.ts index b2770827aa..e2dcf8dcff 100644 --- a/src/internal/operators/defaultIfEmpty.ts +++ b/src/internal/operators/defaultIfEmpty.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function defaultIfEmpty(defaultValue?: R): OperatorFunction; @@ -43,7 +44,7 @@ export function defaultIfEmpty(defaultValue?: R): OperatorFunction(defaultValue: R | null = null): OperatorFunction { - return (source: Observable) => source.lift(new DefaultIfEmptyOperator(defaultValue)) as Observable; + return (source: Observable) => lift(source, new DefaultIfEmptyOperator(defaultValue)) as Observable; } class DefaultIfEmptyOperator implements Operator { diff --git a/src/internal/operators/delay.ts b/src/internal/operators/delay.ts index 7a232507a7..e1e620d865 100644 --- a/src/internal/operators/delay.ts +++ b/src/internal/operators/delay.ts @@ -9,6 +9,7 @@ import { SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Delays the emission of items from the source Observable by a given timeout or @@ -60,7 +61,7 @@ import { */ export function delay(delay: number | Date, scheduler: SchedulerLike = async): MonoTypeOperatorFunction { const delayFor = isValidDate(delay) ? +delay - scheduler.now() : Math.abs(delay); - return (source: Observable) => source.lift(new DelayOperator(delayFor, scheduler)); + return (source: Observable) => lift(source, new DelayOperator(delayFor, scheduler)); } class DelayOperator implements Operator { diff --git a/src/internal/operators/delayWhen.ts b/src/internal/operators/delayWhen.ts index 4174faa407..6caf7e6702 100644 --- a/src/internal/operators/delayWhen.ts +++ b/src/internal/operators/delayWhen.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated In future versions, empty notifiers will no longer re-emit the source value on the output observable. */ @@ -76,10 +77,9 @@ export function delayWhen(delayDurationSelector: (value: T, index: number) => subscriptionDelay?: Observable): MonoTypeOperatorFunction { if (subscriptionDelay) { return (source: Observable) => - new SubscriptionDelayObservable(source, subscriptionDelay) - .lift(new DelayWhenOperator(delayDurationSelector)); + lift(new SubscriptionDelayObservable(source, subscriptionDelay), new DelayWhenOperator(delayDurationSelector)); } - return (source: Observable) => source.lift(new DelayWhenOperator(delayDurationSelector)); + return (source: Observable) => lift(source, new DelayWhenOperator(delayDurationSelector)); } class DelayWhenOperator implements Operator { diff --git a/src/internal/operators/dematerialize.ts b/src/internal/operators/dematerialize.ts index 4e8252db82..93660eaad4 100644 --- a/src/internal/operators/dematerialize.ts +++ b/src/internal/operators/dematerialize.ts @@ -3,6 +3,7 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { observeNotification } from '../Notification'; import { OperatorFunction, ObservableNotification, ValueFromNotification } from '../types'; +import { lift } from '../util/lift'; /** * Converts an Observable of {@link ObservableNotification} objects into the emissions @@ -53,7 +54,7 @@ import { OperatorFunction, ObservableNotification, ValueFromNotification } from */ export function dematerialize>(): OperatorFunction> { return function dematerializeOperatorFunction(source: Observable) { - return source.lift(new DeMaterializeOperator()); + return lift(source, new DeMaterializeOperator()); }; } diff --git a/src/internal/operators/distinct.ts b/src/internal/operators/distinct.ts index 2c5990dce3..18cd0b6145 100644 --- a/src/internal/operators/distinct.ts +++ b/src/internal/operators/distinct.ts @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items. @@ -75,7 +76,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; */ export function distinct(keySelector?: (value: T) => K, flushes?: Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new DistinctOperator(keySelector, flushes)); + return (source: Observable) => lift(source, new DistinctOperator(keySelector, flushes)); } class DistinctOperator implements Operator { diff --git a/src/internal/operators/distinctUntilChanged.ts b/src/internal/operators/distinctUntilChanged.ts index 6b58910cc3..d07a7f517c 100644 --- a/src/internal/operators/distinctUntilChanged.ts +++ b/src/internal/operators/distinctUntilChanged.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function distinctUntilChanged(compare?: (x: T, y: T) => boolean): MonoTypeOperatorFunction; @@ -63,7 +64,7 @@ export function distinctUntilChanged(compare: (x: K, y: K) => boolean, key * @name distinctUntilChanged */ export function distinctUntilChanged(compare?: (x: K, y: K) => boolean, keySelector?: (x: T) => K): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new DistinctUntilChangedOperator(compare, keySelector)); + return (source: Observable) => lift(source, new DistinctUntilChangedOperator(compare, keySelector)); } class DistinctUntilChangedOperator implements Operator { diff --git a/src/internal/operators/every.ts b/src/internal/operators/every.ts index dd7ea07bdf..43d349460e 100644 --- a/src/internal/operators/every.ts +++ b/src/internal/operators/every.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Observer, OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that emits whether or not every item of the source satisfies the condition specified. @@ -30,7 +31,7 @@ import { Observer, OperatorFunction } from '../types'; */ export function every(predicate: (value: T, index: number, source: Observable) => boolean, thisArg?: any): OperatorFunction { - return (source: Observable) => source.lift(new EveryOperator(predicate, thisArg, source)); + return (source: Observable) => lift(source, new EveryOperator(predicate, thisArg, source)); } class EveryOperator implements Operator { diff --git a/src/internal/operators/exhaust.ts b/src/internal/operators/exhaust.ts index 643bf5241e..3ea6a6ce40 100644 --- a/src/internal/operators/exhaust.ts +++ b/src/internal/operators/exhaust.ts @@ -5,6 +5,7 @@ import { Subscription } from '../Subscription'; import { OuterSubscriber } from '../OuterSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; export function exhaust(): OperatorFunction, T>; export function exhaust(): OperatorFunction; @@ -53,7 +54,7 @@ export function exhaust(): OperatorFunction; * @name exhaust */ export function exhaust(): OperatorFunction { - return (source: Observable) => source.lift(new SwitchFirstOperator()); + return (source: Observable) => lift(source, new SwitchFirstOperator()); } class SwitchFirstOperator implements Operator { diff --git a/src/internal/operators/exhaustMap.ts b/src/internal/operators/exhaustMap.ts index 27f209c61d..86385dd5a0 100644 --- a/src/internal/operators/exhaustMap.ts +++ b/src/internal/operators/exhaustMap.ts @@ -8,6 +8,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function exhaustMap>(project: (value: T, index: number) => O): OperatorFunction>; @@ -74,7 +75,7 @@ export function exhaustMap>( ); } return (source: Observable) => - source.lift(new ExhaustMapOperator(project)); + lift(source, new ExhaustMapOperator(project)); } class ExhaustMapOperator implements Operator { diff --git a/src/internal/operators/expand.ts b/src/internal/operators/expand.ts index 13a953ee26..ab0baef1fb 100644 --- a/src/internal/operators/expand.ts +++ b/src/internal/operators/expand.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput, SchedulerLike } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function expand(project: (value: T, index: number) => ObservableInput, concurrent?: number, scheduler?: SchedulerLike): OperatorFunction; @@ -68,7 +69,7 @@ export function expand(project: (value: T, index: number) => ObservableInp scheduler?: SchedulerLike): OperatorFunction { concurrent = (concurrent || 0) < 1 ? Infinity : concurrent; - return (source: Observable) => source.lift(new ExpandOperator(project, concurrent, scheduler)); + return (source: Observable) => lift(source, new ExpandOperator(project, concurrent, scheduler)); } export class ExpandOperator implements Operator { diff --git a/src/internal/operators/filter.ts b/src/internal/operators/filter.ts index 1868f4922f..55a4d92681 100644 --- a/src/internal/operators/filter.ts +++ b/src/internal/operators/filter.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction, MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function filter(predicate: (value: T, index: number) => value is S, @@ -56,7 +57,7 @@ export function filter(predicate: (value: T, index: number) => boolean, export function filter(predicate: (value: T, index: number) => boolean, thisArg?: any): MonoTypeOperatorFunction { return function filterOperatorFunction(source: Observable): Observable { - return source.lift(new FilterOperator(predicate, thisArg)); + return lift(source, new FilterOperator(predicate, thisArg)); }; } diff --git a/src/internal/operators/finalize.ts b/src/internal/operators/finalize.ts index ebeadb79c2..ec8d978f66 100644 --- a/src/internal/operators/finalize.ts +++ b/src/internal/operators/finalize.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that mirrors the source Observable, but will call a specified function when @@ -59,7 +60,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name finally */ export function finalize(callback: () => void): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new FinallyOperator(callback)); + return (source: Observable) => lift(source, new FinallyOperator(callback)); } class FinallyOperator implements Operator { diff --git a/src/internal/operators/find.ts b/src/internal/operators/find.ts index 6ea7b2bcf4..e8724336de 100644 --- a/src/internal/operators/find.ts +++ b/src/internal/operators/find.ts @@ -2,6 +2,7 @@ import {Observable} from '../Observable'; import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; import {OperatorFunction} from '../types'; +import { lift } from '../util/lift'; export function find(predicate: (value: T, index: number, source: Observable) => value is S, thisArg?: any): OperatorFunction; @@ -50,7 +51,7 @@ export function find(predicate: (value: T, index: number, source: Observable< if (typeof predicate !== 'function') { throw new TypeError('predicate is not a function'); } - return (source: Observable) => source.lift(new FindValueOperator(predicate, source, false, thisArg)) as Observable; + return (source: Observable) => lift(source, new FindValueOperator(predicate, source, false, thisArg)) as Observable; } export class FindValueOperator implements Operator { diff --git a/src/internal/operators/findIndex.ts b/src/internal/operators/findIndex.ts index 99828453bf..d42d1e2129 100644 --- a/src/internal/operators/findIndex.ts +++ b/src/internal/operators/findIndex.ts @@ -1,6 +1,7 @@ import { Observable } from '../Observable'; import { FindValueOperator } from '../operators/find'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Emits only the index of the first value emitted by the source Observable that * meets some condition. @@ -42,5 +43,5 @@ import { OperatorFunction } from '../types'; */ export function findIndex(predicate: (value: T, index: number, source: Observable) => boolean, thisArg?: any): OperatorFunction { - return (source: Observable) => source.lift(new FindValueOperator(predicate, source, true, thisArg)) as Observable; + return (source: Observable) => lift(source, new FindValueOperator(predicate, source, true, thisArg)) as Observable; } diff --git a/src/internal/operators/groupBy.ts b/src/internal/operators/groupBy.ts index 86fcb82b07..c8b17f4ec5 100644 --- a/src/internal/operators/groupBy.ts +++ b/src/internal/operators/groupBy.ts @@ -4,6 +4,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subject } from '../Subject'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function groupBy(keySelector: (value: T) => value is K): OperatorFunction | GroupedObservable>>; @@ -109,7 +110,7 @@ export function groupBy(keySelector: (value: T) => K, durationSelector?: (grouped: GroupedObservable) => Observable, subjectSelector?: () => Subject): OperatorFunction> { return (source: Observable) => - source.lift(new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); + lift(source, new GroupByOperator(keySelector, elementSelector, durationSelector, subjectSelector)); } export interface RefCountSubscription { diff --git a/src/internal/operators/ignoreElements.ts b/src/internal/operators/ignoreElements.ts index 528ea4300a..16b06167c5 100644 --- a/src/internal/operators/ignoreElements.ts +++ b/src/internal/operators/ignoreElements.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Ignores all items emitted by the source Observable and only passes calls of `complete` or `error`. @@ -37,7 +38,7 @@ import { OperatorFunction } from '../types'; */ export function ignoreElements(): OperatorFunction { return function ignoreElementsOperatorFunction(source: Observable) { - return source.lift(new IgnoreElementsOperator()); + return lift(source, new IgnoreElementsOperator()); }; } diff --git a/src/internal/operators/isEmpty.ts b/src/internal/operators/isEmpty.ts index 7c7e561d55..0f995c7128 100644 --- a/src/internal/operators/isEmpty.ts +++ b/src/internal/operators/isEmpty.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Emits `false` if the input Observable emits any values, or emits `true` if the @@ -68,7 +69,7 @@ import { OperatorFunction } from '../types'; */ export function isEmpty(): OperatorFunction { - return (source: Observable) => source.lift(new IsEmptyOperator()); + return (source: Observable) => lift(source, new IsEmptyOperator()); } class IsEmptyOperator implements Operator { diff --git a/src/internal/operators/map.ts b/src/internal/operators/map.ts index 92d6fe5db8..14ffa54c4c 100644 --- a/src/internal/operators/map.ts +++ b/src/internal/operators/map.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Applies a given `project` function to each value emitted by the source @@ -46,7 +47,7 @@ export function map(project: (value: T, index: number) => R, thisArg?: any if (typeof project !== 'function') { throw new TypeError('argument is not a function. Are you looking for `mapTo()`?'); } - return source.lift(new MapOperator(project, thisArg)); + return lift(source, new MapOperator(project, thisArg)); }; } diff --git a/src/internal/operators/mapTo.ts b/src/internal/operators/mapTo.ts index 6d34b29dae..5bc5edd880 100644 --- a/src/internal/operators/mapTo.ts +++ b/src/internal/operators/mapTo.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; export function mapTo(value: R): OperatorFunction; /** @deprecated remove in v8. Use mapTo(value: R): OperatorFunction signature instead **/ @@ -39,7 +40,7 @@ export function mapTo(value: R): OperatorFunction; * @name mapTo */ export function mapTo(value: R): OperatorFunction { - return (source: Observable) => source.lift(new MapToOperator(value)); + return (source: Observable) => lift(source, new MapToOperator(value)); } class MapToOperator implements Operator { diff --git a/src/internal/operators/materialize.ts b/src/internal/operators/materialize.ts index ec4fa3ee12..2d50edea3d 100644 --- a/src/internal/operators/materialize.ts +++ b/src/internal/operators/materialize.ts @@ -3,6 +3,7 @@ import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Notification } from '../Notification'; import { OperatorFunction, ObservableNotification } from '../types'; +import { lift } from '../util/lift'; /** * Represents all of the notifications from the source Observable as `next` @@ -60,7 +61,7 @@ import { OperatorFunction, ObservableNotification } from '../types'; */ export function materialize(): OperatorFunction & ObservableNotification> { return function materializeOperatorFunction(source: Observable) { - return source.lift(new MaterializeOperator()); + return lift(source, new MaterializeOperator()); }; } diff --git a/src/internal/operators/mergeMap.ts b/src/internal/operators/mergeMap.ts index 15e4cd402a..60ac8877a4 100644 --- a/src/internal/operators/mergeMap.ts +++ b/src/internal/operators/mergeMap.ts @@ -8,6 +8,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function mergeMap>(project: (value: T, index: number) => O, concurrent?: number): OperatorFunction>; @@ -86,7 +87,7 @@ export function mergeMap>( } else if (typeof resultSelector === 'number') { concurrent = resultSelector; } - return (source: Observable) => source.lift(new MergeMapOperator(project, concurrent)); + return (source: Observable) => lift(source, new MergeMapOperator(project, concurrent)); } export class MergeMapOperator implements Operator { diff --git a/src/internal/operators/mergeScan.ts b/src/internal/operators/mergeScan.ts index afd476b9c7..1aa1804d73 100644 --- a/src/internal/operators/mergeScan.ts +++ b/src/internal/operators/mergeScan.ts @@ -6,6 +6,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { ObservableInput, OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Applies an accumulator function over the source Observable where the @@ -48,7 +49,7 @@ import { ObservableInput, OperatorFunction } from '../types'; export function mergeScan(accumulator: (acc: R, value: T, index: number) => ObservableInput, seed: R, concurrent: number = Infinity): OperatorFunction { - return (source: Observable) => source.lift(new MergeScanOperator(accumulator, seed, concurrent)); + return (source: Observable) => lift(source, new MergeScanOperator(accumulator, seed, concurrent)); } export class MergeScanOperator implements Operator { diff --git a/src/internal/operators/mergeWith.ts b/src/internal/operators/mergeWith.ts index 6d34e89c91..5383c3b7a4 100644 --- a/src/internal/operators/mergeWith.ts +++ b/src/internal/operators/mergeWith.ts @@ -1,6 +1,7 @@ import { merge as mergeStatic } from '../observable/merge'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerLike, ObservedValueUnionFromArray } from '../types'; +import { stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ @@ -57,10 +58,10 @@ export function merge(...observables: Array | Schedul * @deprecated use {@link mergeWith} or static {@link merge} */ export function merge(...observables: Array | SchedulerLike | number | undefined>): OperatorFunction { - return (source: Observable) => source.lift.call( - mergeStatic(source, ...(observables as any[])), - undefined - ) as Observable; + return (source: Observable) => stankyLift( + source, + mergeStatic(source, ...(observables as any[])) + ); } export function mergeWith(): OperatorFunction; diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 03d7b63b13..5134d425ef 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable'; import { OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function multicast(subject: Subject): UnaryFunction, ConnectableObservable>; @@ -43,7 +44,7 @@ export function multicast(subjectOrSubjectFactory: Subject | (() => Sub } if (typeof selector === 'function') { - return source.lift(new MulticastOperator(subjectFactory, selector)); + return lift(source, new MulticastOperator(subjectFactory, selector)); } const connectable: any = Object.create(source, connectableObservableDescriptor); diff --git a/src/internal/operators/observeOn.ts b/src/internal/operators/observeOn.ts index 0f8875cd9a..683f9590b9 100644 --- a/src/internal/operators/observeOn.ts +++ b/src/internal/operators/observeOn.ts @@ -9,6 +9,7 @@ import { TeardownLogic, ObservableNotification, } from '../types'; +import { lift } from '../util/lift'; /** * @@ -64,7 +65,7 @@ import { */ export function observeOn(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction { return function observeOnOperatorFunction(source: Observable): Observable { - return source.lift(new ObserveOnOperator(scheduler, delay)); + return lift(source, new ObserveOnOperator(scheduler, delay)); }; } diff --git a/src/internal/operators/onErrorResumeNext.ts b/src/internal/operators/onErrorResumeNext.ts index b2b2027a1a..262e6544c7 100644 --- a/src/internal/operators/onErrorResumeNext.ts +++ b/src/internal/operators/onErrorResumeNext.ts @@ -8,6 +8,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function onErrorResumeNext(): OperatorFunction; @@ -95,7 +96,7 @@ export function onErrorResumeNext(...nextSources: Array>>nextSources[0]; } - return (source: Observable) => source.lift(new OnErrorResumeNextOperator(nextSources)); + return (source: Observable) => lift(source, new OnErrorResumeNextOperator(nextSources)); } /* tslint:disable:max-line-length */ @@ -119,7 +120,7 @@ export function onErrorResumeNextStatic(...nextSources: Array(nextSources)); + return lift(from(source), new OnErrorResumeNextOperator(nextSources)); } class OnErrorResumeNextOperator implements Operator { diff --git a/src/internal/operators/pairwise.ts b/src/internal/operators/pairwise.ts index eefef1c713..1f0abeb1a2 100644 --- a/src/internal/operators/pairwise.ts +++ b/src/internal/operators/pairwise.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Groups pairs of consecutive emissions together and emits them as an array of @@ -46,7 +47,7 @@ import { OperatorFunction } from '../types'; * @name pairwise */ export function pairwise(): OperatorFunction { - return (source: Observable) => source.lift(new PairwiseOperator()); + return (source: Observable) => lift(source, new PairwiseOperator()); } class PairwiseOperator implements Operator { diff --git a/src/internal/operators/raceWith.ts b/src/internal/operators/raceWith.ts index e6bd07a328..d25e399572 100644 --- a/src/internal/operators/raceWith.ts +++ b/src/internal/operators/raceWith.ts @@ -1,7 +1,9 @@ import { Observable } from '../Observable'; import { isArray } from '../util/isArray'; import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput, ObservedValueUnionFromArray } from '../types'; -import { race as raceStatic } from '../observable/race'; +import { race as raceStatic, RaceOperator } from '../observable/race'; +import { fromArray } from '../observable/fromArray'; +import { lift, stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated Deprecated use {@link raceWith} */ @@ -21,19 +23,14 @@ export function race(...observables: Array | Array(...observables: (Observable | Observable[])[]): MonoTypeOperatorFunction { - return function raceOperatorFunction(source: Observable) { - // if the only argument is an array, it was most likely called with - // `pair([obs1, obs2, ...])` - if (observables.length === 1 && isArray(observables[0])) { - observables = observables[0] as Observable[]; - } +export function race(...observables: ObservableInput[]): OperatorFunction { + // if the only argument is an array, it was most likely called with + // `pair([obs1, obs2, ...])` + if (observables.length === 1 && isArray(observables[0])) { + observables = observables[0]; + } - return source.lift.call( - raceStatic(source, ...(observables as Observable[])), - undefined - ) as Observable; - }; + return raceWith(...observables) as any; } /** @@ -68,9 +65,13 @@ export function raceWith[]>( ...otherSources: A ): OperatorFunction> { return function raceWithOperatorFunction(source: Observable) { - return source.lift.call( - raceStatic(source, ...otherSources), - undefined - ) as Observable>; + if (otherSources.length === 0) { + return source; + } + + return stankyLift( + source, + raceStatic(source, ...otherSources) + ); }; } diff --git a/src/internal/operators/refCount.ts b/src/internal/operators/refCount.ts index 0a1fa786c6..b7baf6932c 100644 --- a/src/internal/operators/refCount.ts +++ b/src/internal/operators/refCount.ts @@ -4,6 +4,7 @@ import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { ConnectableObservable } from '../observable/ConnectableObservable'; import { Observable } from '../Observable'; +import { lift } from '../util/lift'; /** * Make a {@link ConnectableObservable} behave like a ordinary observable and automates the way @@ -60,7 +61,7 @@ import { Observable } from '../Observable'; */ export function refCount(): MonoTypeOperatorFunction { return function refCountOperatorFunction(source: ConnectableObservable): Observable { - return source.lift(new RefCountOperator()); + return lift(source, new RefCountOperator()); } as MonoTypeOperatorFunction; } diff --git a/src/internal/operators/repeat.ts b/src/internal/operators/repeat.ts index cb60b70511..8809b78f6e 100644 --- a/src/internal/operators/repeat.ts +++ b/src/internal/operators/repeat.ts @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { EMPTY } from '../observable/empty'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that will resubscribe to the source stream when the source stream completes, at most count times. @@ -64,9 +65,9 @@ export function repeat(count: number = -1): MonoTypeOperatorFunction { if (count === 0) { return EMPTY; } else if (count < 0) { - return source.lift(new RepeatOperator(-1, source)); + return lift(source, new RepeatOperator(-1, source)); } else { - return source.lift(new RepeatOperator(count - 1, source)); + return lift(source, new RepeatOperator(count - 1, source)); } }; } diff --git a/src/internal/operators/repeatWhen.ts b/src/internal/operators/repeatWhen.ts index 89d739c5d4..fa7530de1f 100644 --- a/src/internal/operators/repeatWhen.ts +++ b/src/internal/operators/repeatWhen.ts @@ -9,6 +9,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that mirrors the source Observable with the exception of a `complete`. If the source @@ -40,7 +41,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name repeatWhen */ export function repeatWhen(notifier: (notifications: Observable) => Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new RepeatWhenOperator(notifier)); + return (source: Observable) => lift(source, new RepeatWhenOperator(notifier)); } class RepeatWhenOperator implements Operator { diff --git a/src/internal/operators/retry.ts b/src/internal/operators/retry.ts index cb86d8b179..8997bb309f 100644 --- a/src/internal/operators/retry.ts +++ b/src/internal/operators/retry.ts @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; export interface RetryConfig { count: number; @@ -66,7 +67,7 @@ export function retry(configOrCount: number | RetryConfig = -1): MonoTypeOper count: configOrCount as number }; } - return (source: Observable) => source.lift(new RetryOperator(config.count, !!config.resetOnSuccess, source)); + return (source: Observable) => lift(source, new RetryOperator(config.count, !!config.resetOnSuccess, source)); } class RetryOperator implements Operator { diff --git a/src/internal/operators/retryWhen.ts b/src/internal/operators/retryWhen.ts index ac02bfd0a7..2a8bda0780 100644 --- a/src/internal/operators/retryWhen.ts +++ b/src/internal/operators/retryWhen.ts @@ -9,6 +9,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that mirrors the source Observable with the exception of an `error`. If the source Observable @@ -63,7 +64,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name retryWhen */ export function retryWhen(notifier: (errors: Observable) => Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new RetryWhenOperator(notifier, source)); + return (source: Observable) => lift(source, new RetryWhenOperator(notifier, source)); } class RetryWhenOperator implements Operator { diff --git a/src/internal/operators/sample.ts b/src/internal/operators/sample.ts index 0cda8903c6..86afa79b8b 100644 --- a/src/internal/operators/sample.ts +++ b/src/internal/operators/sample.ts @@ -6,6 +6,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits the most recently emitted value from the source Observable whenever @@ -47,7 +48,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name sample */ export function sample(notifier: Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SampleOperator(notifier)); + return (source: Observable) => lift(source, new SampleOperator(notifier)); } class SampleOperator implements Operator { diff --git a/src/internal/operators/sampleTime.ts b/src/internal/operators/sampleTime.ts index 317f718f15..b805d1a1b1 100644 --- a/src/internal/operators/sampleTime.ts +++ b/src/internal/operators/sampleTime.ts @@ -3,6 +3,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { async } from '../scheduler/async'; import { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits the most recently emitted value from the source Observable within @@ -46,7 +47,7 @@ import { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic * @name sampleTime */ export function sampleTime(period: number, scheduler: SchedulerLike = async): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SampleTimeOperator(period, scheduler)); + return (source: Observable) => lift(source, new SampleTimeOperator(period, scheduler)); } class SampleTimeOperator implements Operator { diff --git a/src/internal/operators/scan.ts b/src/internal/operators/scan.ts index 66434c9093..2916be207e 100644 --- a/src/internal/operators/scan.ts +++ b/src/internal/operators/scan.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { OperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function scan(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction; @@ -63,7 +64,7 @@ export function scan(accumulator: (acc: V|A|S, value: V, index: number) } return function scanOperatorFunction(source: Observable) { - return source.lift(new ScanOperator(accumulator, seed, hasSeed)); + return lift(source, new ScanOperator(accumulator, seed, hasSeed)); }; } diff --git a/src/internal/operators/sequenceEqual.ts b/src/internal/operators/sequenceEqual.ts index a808bafb46..4ddb126041 100644 --- a/src/internal/operators/sequenceEqual.ts +++ b/src/internal/operators/sequenceEqual.ts @@ -4,6 +4,7 @@ import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; import { Observer, OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Compares all values of two observables in sequence using an optional comparator function @@ -63,7 +64,7 @@ import { Observer, OperatorFunction } from '../types'; */ export function sequenceEqual(compareTo: Observable, comparator?: (a: T, b: T) => boolean): OperatorFunction { - return (source: Observable) => source.lift(new SequenceEqualOperator(compareTo, comparator)); + return (source: Observable) => lift(source, new SequenceEqualOperator(compareTo, comparator)); } export class SequenceEqualOperator implements Operator { diff --git a/src/internal/operators/shareReplay.ts b/src/internal/operators/shareReplay.ts index c3bbe5d2f7..6a0244eb16 100644 --- a/src/internal/operators/shareReplay.ts +++ b/src/internal/operators/shareReplay.ts @@ -3,6 +3,7 @@ import { ReplaySubject } from '../ReplaySubject'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; import { Subscriber } from '../Subscriber'; +import { lift } from '../util/lift'; export interface ShareReplayConfig { bufferSize?: number; @@ -139,7 +140,7 @@ export function shareReplay( scheduler }; } - return (source: Observable) => source.lift(shareReplayOperator(config)); + return (source: Observable) => lift(source, shareReplayOperator(config)); } function shareReplayOperator({ diff --git a/src/internal/operators/single.ts b/src/internal/operators/single.ts index cf6c28cb81..c918640fc6 100644 --- a/src/internal/operators/single.ts +++ b/src/internal/operators/single.ts @@ -5,6 +5,7 @@ import { EmptyError } from '../util/EmptyError'; import { MonoTypeOperatorFunction } from '../types'; import { SequenceError } from '../util/SequenceError'; import { NotFoundError } from '../util/NotFoundError'; +import { lift } from '../util/lift'; const defaultPredicate = () => true; @@ -91,7 +92,7 @@ const defaultPredicate = () => true; export function single( predicate: (value: T, index: number, source: Observable) => boolean = defaultPredicate ): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(singleOperator(predicate)); + return (source: Observable) => lift(source, singleOperator(predicate)); } function singleOperator(predicate: (value: T, index: number, source: Observable) => boolean) { diff --git a/src/internal/operators/skip.ts b/src/internal/operators/skip.ts index 33876879f9..96827f86fa 100644 --- a/src/internal/operators/skip.ts +++ b/src/internal/operators/skip.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that skips the first `count` items emitted by the source Observable. @@ -13,7 +14,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name skip */ export function skip(count: number): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SkipOperator(count)); + return (source: Observable) => lift(source, new SkipOperator(count)); } class SkipOperator implements Operator { diff --git a/src/internal/operators/skipLast.ts b/src/internal/operators/skipLast.ts index f5045a88c1..1c4e8dd1ff 100644 --- a/src/internal/operators/skipLast.ts +++ b/src/internal/operators/skipLast.ts @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber'; import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Skip the last `count` values emitted by the source Observable. @@ -42,7 +43,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name skipLast */ export function skipLast(count: number): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SkipLastOperator(count)); + return (source: Observable) => lift(source, new SkipLastOperator(count)); } class SkipLastOperator implements Operator { diff --git a/src/internal/operators/skipUntil.ts b/src/internal/operators/skipUntil.ts index 12912cffa0..60b2aa1b22 100644 --- a/src/internal/operators/skipUntil.ts +++ b/src/internal/operators/skipUntil.ts @@ -6,6 +6,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic, ObservableInput } from '../types'; import { Subscription } from '../Subscription'; +import { lift } from '../util/lift'; /** * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. @@ -46,7 +47,7 @@ import { Subscription } from '../Subscription'; * @name skipUntil */ export function skipUntil(notifier: Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SkipUntilOperator(notifier)); + return (source: Observable) => lift(source, new SkipUntilOperator(notifier)); } class SkipUntilOperator implements Operator { diff --git a/src/internal/operators/skipWhile.ts b/src/internal/operators/skipWhile.ts index 1991a72442..2dcb4090bf 100644 --- a/src/internal/operators/skipWhile.ts +++ b/src/internal/operators/skipWhile.ts @@ -2,6 +2,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Returns an Observable that skips all items emitted by the source Observable as long as a specified condition holds @@ -15,7 +16,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name skipWhile */ export function skipWhile(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new SkipWhileOperator(predicate)); + return (source: Observable) => lift(source, new SkipWhileOperator(predicate)); } class SkipWhileOperator implements Operator { diff --git a/src/internal/operators/subscribeOn.ts b/src/internal/operators/subscribeOn.ts index ad5e0e7d20..0b9df046c9 100644 --- a/src/internal/operators/subscribeOn.ts +++ b/src/internal/operators/subscribeOn.ts @@ -5,6 +5,7 @@ import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic, SchedulerAction import { asap as asapScheduler } from '../scheduler/asap'; import { Subscription } from '../Subscription'; import { isScheduler } from '../util/isScheduler'; +import { lift } from '../util/lift'; export interface DispatchArg { source: Observable; @@ -106,7 +107,7 @@ class SubscribeOnObservable extends Observable { */ export function subscribeOn(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction { return function subscribeOnOperatorFunction(source: Observable): Observable { - return source.lift(new SubscribeOnOperator(scheduler, delay)); + return lift(source, new SubscribeOnOperator(scheduler, delay)); }; } diff --git a/src/internal/operators/switchMap.ts b/src/internal/operators/switchMap.ts index 72c8f3ddbc..e05fa29149 100644 --- a/src/internal/operators/switchMap.ts +++ b/src/internal/operators/switchMap.ts @@ -8,6 +8,7 @@ import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; import { map } from './map'; import { from } from '../observable/from'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function switchMap>(project: (value: T, index: number) => O): OperatorFunction>; @@ -89,7 +90,7 @@ export function switchMap>( )) ); } - return (source: Observable) => source.lift(new SwitchMapOperator(project)); + return (source: Observable) => lift(source, new SwitchMapOperator(project)); } class SwitchMapOperator implements Operator { diff --git a/src/internal/operators/take.ts b/src/internal/operators/take.ts index c525b020fd..27e250e578 100644 --- a/src/internal/operators/take.ts +++ b/src/internal/operators/take.ts @@ -4,6 +4,7 @@ import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; import { EMPTY } from '../observable/empty'; +import { lift } from '../util/lift'; /** * Emits only the first `count` values emitted by the source Observable. @@ -57,7 +58,7 @@ export function take(count: number): MonoTypeOperatorFunction { throw new ArgumentOutOfRangeError; } - return (source: Observable) => (count === 0) ? EMPTY : source.lift(new TakeOperator(count)); + return (source: Observable) => (count === 0) ? EMPTY : lift(source, new TakeOperator(count)); } class TakeOperator implements Operator { diff --git a/src/internal/operators/takeLast.ts b/src/internal/operators/takeLast.ts index 36c77d0bb7..296bcdcb2e 100644 --- a/src/internal/operators/takeLast.ts +++ b/src/internal/operators/takeLast.ts @@ -4,6 +4,7 @@ import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; import { EMPTY } from '../observable/empty'; import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits only the last `count` values emitted by the source Observable. @@ -58,7 +59,7 @@ export function takeLast(count: number): MonoTypeOperatorFunction { if (count === 0) { return EMPTY; } else { - return source.lift(new TakeLastOperator(count)); + return lift(source, new TakeLastOperator(count)); } }; } diff --git a/src/internal/operators/takeUntil.ts b/src/internal/operators/takeUntil.ts index 3d9dd1afa8..629ca024ad 100644 --- a/src/internal/operators/takeUntil.ts +++ b/src/internal/operators/takeUntil.ts @@ -7,6 +7,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits the values emitted by the source Observable until a `notifier` @@ -48,7 +49,7 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types'; * @name takeUntil */ export function takeUntil(notifier: Observable): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new TakeUntilOperator(notifier)); + return (source: Observable) => lift(source, new TakeUntilOperator(notifier)); } class TakeUntilOperator implements Operator { diff --git a/src/internal/operators/takeWhile.ts b/src/internal/operators/takeWhile.ts index 05045ad50b..6e68916682 100644 --- a/src/internal/operators/takeWhile.ts +++ b/src/internal/operators/takeWhile.ts @@ -2,6 +2,7 @@ import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { OperatorFunction, MonoTypeOperatorFunction, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; export function takeWhile(predicate: (value: T, index: number) => value is S): OperatorFunction; export function takeWhile(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction; @@ -54,7 +55,7 @@ export function takeWhile( predicate: (value: T, index: number) => boolean, inclusive = false): MonoTypeOperatorFunction { return (source: Observable) => - source.lift(new TakeWhileOperator(predicate, inclusive)); + lift(source, new TakeWhileOperator(predicate, inclusive)); } class TakeWhileOperator implements Operator { diff --git a/src/internal/operators/tap.ts b/src/internal/operators/tap.ts index 67b400de61..c6d75cdb41 100644 --- a/src/internal/operators/tap.ts +++ b/src/internal/operators/tap.ts @@ -4,6 +4,7 @@ import { Observable } from '../Observable'; import { MonoTypeOperatorFunction, PartialObserver, TeardownLogic } from '../types'; import { noop } from '../util/noop'; import { isFunction } from '../util/isFunction'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated Use an observer instead of a complete callback */ @@ -68,7 +69,7 @@ export function tap(nextOrObserver?: PartialObserver | ((x: T) => void) | error?: ((e: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction { return function tapOperatorFunction(source: Observable): Observable { - return source.lift(new DoOperator(nextOrObserver, error, complete)); + return lift(source, new DoOperator(nextOrObserver, error, complete)); }; } diff --git a/src/internal/operators/throttle.ts b/src/internal/operators/throttle.ts index 250193a94b..d912fc99ab 100644 --- a/src/internal/operators/throttle.ts +++ b/src/internal/operators/throttle.ts @@ -8,6 +8,7 @@ import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; export interface ThrottleConfig { leading?: boolean; @@ -66,7 +67,7 @@ export const defaultThrottleConfig: ThrottleConfig = { */ export function throttle(durationSelector: (value: T) => SubscribableOrPromise, config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing)); + return (source: Observable) => lift(source, new ThrottleOperator(durationSelector, !!config.leading, !!config.trailing)); } class ThrottleOperator implements Operator { diff --git a/src/internal/operators/throttleTime.ts b/src/internal/operators/throttleTime.ts index df5e94cfaa..4f10e8908b 100644 --- a/src/internal/operators/throttleTime.ts +++ b/src/internal/operators/throttleTime.ts @@ -5,6 +5,7 @@ import { async } from '../scheduler/async'; import { Observable } from '../Observable'; import { ThrottleConfig, defaultThrottleConfig } from './throttle'; import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /** * Emits a value from the source Observable, then ignores subsequent source @@ -87,7 +88,7 @@ import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types export function throttleTime(duration: number, scheduler: SchedulerLike = async, config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction { - return (source: Observable) => source.lift(new ThrottleTimeOperator(duration, scheduler, !!config.leading, !!config.trailing)); + return (source: Observable) => lift(source, new ThrottleTimeOperator(duration, scheduler, !!config.leading, !!config.trailing)); } class ThrottleTimeOperator implements Operator { diff --git a/src/internal/operators/throwIfEmpty.ts b/src/internal/operators/throwIfEmpty.ts index 7f39c4e93f..db2ddb0a83 100644 --- a/src/internal/operators/throwIfEmpty.ts +++ b/src/internal/operators/throwIfEmpty.ts @@ -3,6 +3,7 @@ import { Observable } from '../Observable'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { TeardownLogic, MonoTypeOperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * If the source observable completes without emitting a value, it will emit @@ -36,7 +37,7 @@ import { TeardownLogic, MonoTypeOperatorFunction } from '../types'; */ export function throwIfEmpty (errorFactory: (() => any) = defaultErrorFactory): MonoTypeOperatorFunction { return (source: Observable) => { - return source.lift(new ThrowIfEmptyOperator(errorFactory)); + return lift(source, new ThrowIfEmptyOperator(errorFactory)); }; } diff --git a/src/internal/operators/timeoutWith.ts b/src/internal/operators/timeoutWith.ts index 74cddec458..946d4feb5f 100644 --- a/src/internal/operators/timeoutWith.ts +++ b/src/internal/operators/timeoutWith.ts @@ -6,6 +6,7 @@ import { isValidDate } from '../util/isDate'; import { OuterSubscriber } from '../OuterSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function timeoutWith(due: number | Date, withObservable: ObservableInput, scheduler?: SchedulerLike): OperatorFunction; @@ -69,7 +70,7 @@ export function timeoutWith(due: number | Date, return (source: Observable) => { let absoluteTimeout = isValidDate(due); let waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(due); - return source.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler)); + return lift(source, new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler)); }; } diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index a1b6180b46..bb71085fe8 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { Operator } from '../Operator'; +import { lift } from '../util/lift'; /** * Branch out the source Observable values as a nested Observable whenever @@ -51,7 +52,7 @@ import { Operator } from '../Operator'; */ export function window(windowBoundaries: Observable): OperatorFunction> { return function windowOperatorFunction(source: Observable) { - return source.lift(new WindowOperator(windowBoundaries)); + return lift(source, new WindowOperator(windowBoundaries)); }; } diff --git a/src/internal/operators/windowCount.ts b/src/internal/operators/windowCount.ts index ef384c7ee1..ff670ba0dd 100644 --- a/src/internal/operators/windowCount.ts +++ b/src/internal/operators/windowCount.ts @@ -3,6 +3,7 @@ import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subject } from '../Subject'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Branch out the source Observable values as a nested Observable with each @@ -69,7 +70,7 @@ import { OperatorFunction } from '../types'; export function windowCount(windowSize: number, startWindowEvery: number = 0): OperatorFunction> { return function windowCountOperatorFunction(source: Observable) { - return source.lift(new WindowCountOperator(windowSize, startWindowEvery)); + return lift(source, new WindowCountOperator(windowSize, startWindowEvery)); }; } diff --git a/src/internal/operators/windowTime.ts b/src/internal/operators/windowTime.ts index acf35ebba9..4fcb0f18c5 100644 --- a/src/internal/operators/windowTime.ts +++ b/src/internal/operators/windowTime.ts @@ -7,6 +7,7 @@ import { Subscription } from '../Subscription'; import { isNumeric } from '../util/isNumeric'; import { isScheduler } from '../util/isScheduler'; import { OperatorFunction, SchedulerLike, SchedulerAction } from '../types'; +import { lift } from '../util/lift'; export function windowTime(windowTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction>; @@ -118,7 +119,7 @@ export function windowTime(windowTimeSpan: number): OperatorFunction) { - return source.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)); + return lift(source, new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)); }; } diff --git a/src/internal/operators/windowToggle.ts b/src/internal/operators/windowToggle.ts index dd6d62d3f4..79fb1cde25 100644 --- a/src/internal/operators/windowToggle.ts +++ b/src/internal/operators/windowToggle.ts @@ -7,6 +7,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Branch out the source Observable values as a nested Observable starting from @@ -57,7 +58,7 @@ import { OperatorFunction } from '../types'; */ export function windowToggle(openings: Observable, closingSelector: (openValue: O) => Observable): OperatorFunction> { - return (source: Observable) => source.lift(new WindowToggleOperator(openings, closingSelector)); + return (source: Observable) => lift(source, new WindowToggleOperator(openings, closingSelector)); } class WindowToggleOperator implements Operator> { diff --git a/src/internal/operators/windowWhen.ts b/src/internal/operators/windowWhen.ts index b6164fa7e1..a479d4a471 100644 --- a/src/internal/operators/windowWhen.ts +++ b/src/internal/operators/windowWhen.ts @@ -7,6 +7,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { OperatorFunction } from '../types'; +import { lift } from '../util/lift'; /** * Branch out the source Observable values as a nested Observable using a @@ -54,7 +55,7 @@ import { OperatorFunction } from '../types'; */ export function windowWhen(closingSelector: () => Observable): OperatorFunction> { return function windowWhenOperatorFunction(source: Observable) { - return source.lift(new WindowOperator(closingSelector)); + return lift(source, new WindowOperator(closingSelector)); }; } diff --git a/src/internal/operators/withLatestFrom.ts b/src/internal/operators/withLatestFrom.ts index 55ff79b143..4413c873f2 100644 --- a/src/internal/operators/withLatestFrom.ts +++ b/src/internal/operators/withLatestFrom.ts @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber'; import { InnerSubscriber } from '../InnerSubscriber'; import { subscribeToResult } from '../util/subscribeToResult'; import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types'; +import { lift } from '../util/lift'; /* tslint:disable:max-line-length */ export function withLatestFrom(project: (v1: T) => R): OperatorFunction; @@ -74,7 +75,7 @@ export function withLatestFrom(...args: Array | ((... project = args.pop(); } const observables = []>args; - return source.lift(new WithLatestFromOperator(observables, project)); + return lift(source, new WithLatestFromOperator(observables, project)); }; } diff --git a/src/internal/operators/zipAll.ts b/src/internal/operators/zipAll.ts index 163ae43a85..3fd9d33e94 100644 --- a/src/internal/operators/zipAll.ts +++ b/src/internal/operators/zipAll.ts @@ -1,6 +1,7 @@ import { ZipOperator } from '../observable/zip'; import { Observable } from '../Observable'; import { OperatorFunction, ObservableInput } from '../types'; +import { lift } from '../util/lift'; export function zipAll(): OperatorFunction, T[]>; export function zipAll(): OperatorFunction; @@ -8,5 +9,5 @@ export function zipAll(project: (...values: T[]) => R): OperatorFunction(project: (...values: Array) => R): OperatorFunction; export function zipAll(project?: (...values: Array) => R): OperatorFunction { - return (source: Observable) => source.lift(new ZipOperator(project)); + return (source: Observable) => lift(source, new ZipOperator(project)); } diff --git a/src/internal/operators/zipWith.ts b/src/internal/operators/zipWith.ts index 9f472731b6..6db36af106 100644 --- a/src/internal/operators/zipWith.ts +++ b/src/internal/operators/zipWith.ts @@ -1,6 +1,7 @@ import { zip as zipStatic } from '../observable/zip'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueTupleFromArray, Cons } from '../types'; +import { stankyLift } from '../util/lift'; /* tslint:disable:max-line-length */ /** @deprecated Deprecated use {@link zipWith} */ @@ -38,10 +39,10 @@ export function zip(array: Array>, project */ export function zip(...observables: Array | ((...values: Array) => R)>): OperatorFunction { return function zipOperatorFunction(source: Observable) { - return source.lift.call( - zipStatic(source, ...observables), - undefined - ) as Observable; + return stankyLift( + source, + zipStatic(source, ...observables) + ); }; } diff --git a/src/internal/util/lift.ts b/src/internal/util/lift.ts new file mode 100644 index 0000000000..e9fbf7c316 --- /dev/null +++ b/src/internal/util/lift.ts @@ -0,0 +1,54 @@ +/** @prettier */ +import { Observable } from '../Observable'; +import { Operator } from '../Operator'; + +/** + * A utility to lift observables. Will also error if an observable is passed that does not + * have the lift mechanism. + * + * We _must_ do this for version 7, because it is what allows subclassed observables to compose through + * the operators that use this. That will be going away in v8. + * + * See https://github.com/ReactiveX/rxjs/issues/5571 + * and https://github.com/ReactiveX/rxjs/issues/5431 + * + * @param source The source observable to lift + * @param operator The operator to lift it with. Note that the operator possibly being undefined here + * is related to issues around the "stanky" lifting of static creation functions as operators, see below. + */ +export function lift(source: Observable, operator?: Operator): Observable { + if (hasLift(source)) { + return source.lift(operator); + } + throw new TypeError('Unable to lift unknown Observable type'); +} + +// TODO: Figure out proper typing for what we're doing below at some point. +// For right now it's not that important, as it's internal implementation and not +// public typings on a public API. + +/** + * A utility used to lift observables in the case that we are trying to convert a static observable + * creation function to an operator that appropriately uses lift. Ultimately this is a smell + * related to `lift`, hence the name. + * + * We _must_ do this for version 7, because it is what allows subclassed observables to compose through + * the operators that use this. That will be going away in v8. + * + * See https://github.com/ReactiveX/rxjs/issues/5571 + * and https://github.com/ReactiveX/rxjs/issues/5431 + * + * @param source the original observable source for the operator + * @param liftedSource the actual composed source we want to lift + * @param operator the operator to lift it with (often undefined in this case) + */ +export function stankyLift(source: Observable, liftedSource: Observable, operator?: Operator): Observable { + if (hasLift(source)) { + return source.lift.call(liftedSource, operator); + } + throw new TypeError('Unable to lift unknown Observable type'); +} + +function hasLift(source: any): source is { lift: InstanceType['lift'] } { + return source && typeof source.lift === 'function'; +}