Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(Observable): Update property and method types #5572

Merged
merged 1 commit into from
Jul 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ describe('Observable.lift', () => {
}
};

NEVER.lift(myOperator)
(NEVER as any).lift(myOperator)
.subscribe()
.unsubscribe();

Expand Down Expand Up @@ -883,8 +883,8 @@ describe('Observable.lift', () => {
class LogObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new LogObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = new LogOperator(operator);
observable.source = this;
observable.operator = new LogOperator(operator);
return observable;
}
}
Expand Down
2 changes: 1 addition & 1 deletion spec/helpers/interop-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export function asInteropObservable<T>(observable: Observable<T>): Observable<T>
return new Proxy(observable, {
get(target: Observable<T>, key: string | number | symbol) {
if (key === 'lift') {
const { lift } = target;
const { lift } = target as any;
return interopLift(lift);
}
if (key === 'subscribe') {
Expand Down
37 changes: 11 additions & 26 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import { Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable } from './types';
import { canReportError } from './util/canReportError';
import { toSubscriber } from './util/toSubscriber';
import { iif } from './observable/iif';
import { throwError } from './observable/throwError';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
Expand All @@ -20,14 +18,11 @@ import { config } from './config';
* @class Observable<T>
*/
export class Observable<T> implements Subscribable<T> {
/** Internal implementation detail, do not use directly. */
public _isScalar: boolean = false;

/** @deprecated This is an internal implementation detail, do not use. */
source: Observable<any> | undefined;
protected source: Observable<any> | undefined;

/** @deprecated This is an internal implementation detail, do not use. */
operator: Operator<any, T> | undefined;
protected operator: Operator<any, T> | undefined;

/**
* @constructor
Expand Down Expand Up @@ -59,13 +54,16 @@ export class Observable<T> implements Subscribable<T> {
};

/**
* Creates a new Observable, with this Observable as the source, and the passed
* Creates a new Observable, with this Observable instance as the source, and the passed
* operator defined as the new observable's operator.
* @method lift
* @param {Operator} operator the operator defining the operation to take on the observable
* @return {Observable} a new observable with the Operator applied
* @param operator the operator defining the operation to take on the observable
* @return a new observable with the Operator applied
* @deprecated This is an internal implementation detail, do not use directly. If you have implemented an operator
* using `lift`, it is recommended that you create an operator by simply returning `new Observable()` directly.
* See "Creating new operators from scratch" section here: https://rxjs.dev/guide/operators
*/
lift<R>(operator?: Operator<T, R>): Observable<R> {
protected lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
Expand Down Expand Up @@ -236,7 +234,7 @@ export class Observable<T> implements Subscribable<T> {
}

/** @deprecated This is an internal implementation detail, do not use. */
_trySubscribe(sink: Subscriber<T>): TeardownLogic {
protected _trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink);
} catch (err) {
Expand Down Expand Up @@ -336,24 +334,11 @@ export class Observable<T> implements Subscribable<T> {
}

/** @internal This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber<any>): TeardownLogic {
protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
const { source } = this;
return source && source.subscribe(subscriber);
}

// `if` and `throw` are special snow flakes, the compiler sees them as reserved words. Deprecated in
// favor of iif and throwError functions.
/**
* @nocollapse
* @deprecated In favor of iif creation function: import { iif } from 'rxjs';
*/
static if: typeof iif;
/**
* @nocollapse
* @deprecated In favor of throwError creation function: import { throwError } from 'rxjs';
*/
static throw: typeof throwError;

/**
* An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable
* @method Symbol.observable
Expand Down
3 changes: 3 additions & 0 deletions src/internal/Operator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Subscriber } from './Subscriber';
import { TeardownLogic } from './types';

/***
* @deprecated Internal implementation detail, do not use.
*/
export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
3 changes: 2 additions & 1 deletion src/internal/observable/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Operator } from '../Operator';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { fromArray } from './fromArray';
import { lift } from '../util/lift';

const NONE = {};

Expand Down Expand Up @@ -233,7 +234,7 @@ export function combineLatest<O extends ObservableInput<any>, R>(
observables = observables[0] as any;
}

return fromArray(observables, scheduler).lift(new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
return lift(fromArray(observables, scheduler), new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
}

export class CombineLatestOperator<T, R> implements Operator<T, R> {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/observable/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { TeardownLogic, ObservableInput, ObservedValueUnionFromArray } from '../
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { lift } from '../util/lift';

export function race<A extends ObservableInput<any>[]>(observables: A): Observable<ObservedValueUnionFromArray<A>>;
export function race<A extends ObservableInput<any>[]>(...observables: A): Observable<ObservedValueUnionFromArray<A>>;
Expand Down Expand Up @@ -65,7 +66,7 @@ export function race<T>(...observables: (ObservableInput<T> | ObservableInput<T>
}
}

return fromArray(observables, undefined).lift(new RaceOperator<T>());
return lift(fromArray(observables, undefined), new RaceOperator<T>());
}

export class RaceOperator<T> implements Operator<T, T> {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/observable/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { iterator as Symbol_iterator } from '../../internal/symbol/iterator';
import { lift } from '../util/lift';

/* tslint:disable:max-line-length */
/** @deprecated resultSelector is no longer supported, pipe to map instead */
Expand Down Expand Up @@ -85,7 +86,7 @@ export function zip<O extends ObservableInput<any>, R>(
if (typeof last === 'function') {
resultSelector = observables.pop() as typeof resultSelector;
}
return fromArray(observables, undefined).lift(new ZipOperator(resultSelector));
return lift(fromArray(observables, undefined), new ZipOperator(resultSelector));
}

export class ZipOperator<T, R> implements Operator<T, R> {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/audit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '

import { OuterSubscriber } from '../OuterSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { lift } from '../util/lift';

/**
* Ignores source values for a duration determined by another Observable, then
Expand Down Expand Up @@ -54,7 +55,7 @@ import { subscribeToResult } from '../util/subscribeToResult';
*/
export function audit<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): MonoTypeOperatorFunction<T> {
return function auditOperatorFunction(source: Observable<T>) {
return source.lift(new AuditOperator(durationSelector));
return lift(source, new AuditOperator(durationSelector));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { OperatorFunction } from '../types';
import { lift } from '../util/lift';

/**
* Buffers the source Observable values until `closingNotifier` emits.
Expand Down Expand Up @@ -47,7 +48,7 @@ import { OperatorFunction } from '../types';
*/
export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]> {
return function bufferOperatorFunction(source: Observable<T>) {
return source.lift(new BufferOperator<T>(closingNotifier));
return lift(source, new BufferOperator<T>(closingNotifier));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { OperatorFunction, TeardownLogic } from '../types';
import { lift } from '../util/lift';

/**
* Buffers the source Observable values until the size hits the maximum
Expand Down Expand Up @@ -59,7 +60,7 @@ import { OperatorFunction, TeardownLogic } from '../types';
*/
export function bufferCount<T>(bufferSize: number, startBufferEvery: number | null = null): OperatorFunction<T, T[]> {
return function bufferCountOperatorFunction(source: Observable<T>) {
return source.lift(new BufferCountOperator<T>(bufferSize, startBufferEvery));
return lift(source, new BufferCountOperator<T>(bufferSize, startBufferEvery));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { isScheduler } from '../util/isScheduler';
import { OperatorFunction, SchedulerAction, SchedulerLike } from '../types';
import { lift } from '../util/lift';

/* tslint:disable:max-line-length */
export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
Expand Down Expand Up @@ -88,7 +89,7 @@ export function bufferTime<T>(bufferTimeSpan: number): OperatorFunction<T, T[]>
}

return function bufferTimeOperatorFunction(source: Observable<T>) {
return source.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
return lift(source, new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { subscribeToResult } from '../util/subscribeToResult';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { OperatorFunction, SubscribableOrPromise } from '../types';
import { lift } from '../util/lift';

/**
* Buffers the source Observable values starting from an emission from
Expand Down Expand Up @@ -57,7 +58,7 @@ export function bufferToggle<T, O>(
closingSelector: (value: O) => SubscribableOrPromise<any>
): OperatorFunction<T, T[]> {
return function bufferToggleOperatorFunction(source: Observable<T>) {
return source.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
return lift(source, new BufferToggleOperator<T, O>(openings, closingSelector));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { OperatorFunction } from '../types';
import { lift } from '../util/lift';

/**
* Buffers the source Observable values, using a factory function of closing
Expand Down Expand Up @@ -50,7 +51,7 @@ import { OperatorFunction } from '../types';
*/
export function bufferWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, T[]> {
return function (source: Observable<T>) {
return source.lift(new BufferWhenOperator(closingSelector));
return lift(source, new BufferWhenOperator(closingSelector));
};
}

Expand Down
6 changes: 4 additions & 2 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { lift } from '../util/lift';

/* tslint:disable:max-line-length */
export function catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
Expand Down Expand Up @@ -108,8 +109,9 @@ export function catchError<T, O extends ObservableInput<any>>(
): OperatorFunction<T, T | ObservedValueOf<O>> {
return function catchErrorOperatorFunction(source: Observable<T>): Observable<T | ObservedValueOf<O>> {
const operator = new CatchOperator(selector);
const caught = source.lift(operator);
return (operator.caught = caught as Observable<T>);
const caught = lift(source, operator);
operator.caught = caught;
return caught;
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/combineAll.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CombineLatestOperator } from '../observable/combineLatest';
import { Observable } from '../Observable';
import { OperatorFunction, ObservableInput } from '../types';
import { lift } from '../util/lift';

export function combineAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
export function combineAll<T>(): OperatorFunction<any, T[]>;
Expand Down Expand Up @@ -53,5 +54,5 @@ export function combineAll<R>(project: (...values: Array<any>) => R): OperatorFu
* @name combineAll
*/
export function combineAll<T, R>(project?: (...values: Array<any>) => R): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift(new CombineLatestOperator(project));
return (source: Observable<T>) => lift(source, new CombineLatestOperator(project));
}
6 changes: 4 additions & 2 deletions src/internal/operators/combineLatestWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { CombineLatestOperator } from '../observable/combineLatest';
import { from } from '../observable/from';
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, ObservedValueTupleFromArray, Cons } from '../types';
import { lift, stankyLift } from '../util/lift';

/* tslint:disable:max-line-length */
/** @deprecated use {@link combineLatestWith} */
Expand Down Expand Up @@ -53,10 +54,11 @@ export function combineLatest<T, R>(...observables: Array<ObservableInput<any> |
observables = (<any>observables[0]).slice();
}

return (source: Observable<T>) => source.lift.call(
return (source: Observable<T>) => stankyLift(
source,
from([source, ...observables]),
new CombineLatestOperator(project)
) as Observable<R>;
);
}

/**
Expand Down
7 changes: 4 additions & 3 deletions src/internal/operators/concat.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { concat as concatStatic } from '../observable/concat';
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { stankyLift } from '../util/lift';

/* tslint:disable:max-line-length */
/** @deprecated remove in v8. Use {@link concatWith} */
Expand All @@ -25,8 +26,8 @@ export function concat<T, R>(...observables: Array<ObservableInput<any> | Schedu
* @deprecated remove in v8. Use {@link concatWith}
*/
export function concat<T, R>(...observables: Array<ObservableInput<any> | SchedulerLike | undefined>): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift.call(
return (source: Observable<T>) => stankyLift(
source,
concatStatic(source, ...(observables as any[])),
undefined
) as Observable<R>;
);
}
9 changes: 5 additions & 4 deletions src/internal/operators/concatWith.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { concat as concatStatic } from '../observable/concat';
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, ObservedValueUnionFromArray } from '../types';
import { stankyLift } from '../util/lift';

export function concatWith<T>(): OperatorFunction<T, T>;
export function concatWith<T, A extends ObservableInput<any>[]>(...otherSources: A): OperatorFunction<T, ObservedValueUnionFromArray<A> | T>;
Expand Down Expand Up @@ -44,8 +45,8 @@ export function concatWith<T, A extends ObservableInput<any>[]>(...otherSources:
* @param otherSources Other observable sources to subscribe to, in sequence, after the original source is complete.
*/
export function concatWith<T, A extends ObservableInput<any>[]>(...otherSources: A): OperatorFunction<T, ObservedValueUnionFromArray<A> | T> {
return (source: Observable<T>) => source.lift.call(
concatStatic(source, ...otherSources),
undefined
) as Observable<ObservedValueUnionFromArray<A> | T>;
return (source: Observable<T>) => stankyLift(
source,
concatStatic(source, ...otherSources)
);
}
3 changes: 2 additions & 1 deletion src/internal/operators/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Observer, OperatorFunction } from '../types';
import { Subscriber } from '../Subscriber';
import { lift } from '../util/lift';
/**
* Counts the number of emissions on the source and emits that number when the
* source completes.
Expand Down Expand Up @@ -62,7 +63,7 @@ import { Subscriber } from '../Subscriber';
*/

export function count<T>(predicate?: (value: T, index: number, source: Observable<T>) => boolean): OperatorFunction<T, number> {
return (source: Observable<T>) => source.lift(new CountOperator(predicate, source));
return (source: Observable<T>) => lift(source, new CountOperator(predicate, source));
}

class CountOperator<T> implements Operator<T, number> {
Expand Down
Loading