Skip to content

Commit

Permalink
Merge pull request ReactiveX#3328 from jasonaden/fix_core_types_2
Browse files Browse the repository at this point in the history
refactor(types): adjust Subscription and Observable types to be easier to consume externally
  • Loading branch information
benlesh authored Feb 22, 2018
2 parents ddffecc + f359f51 commit 2f9efbf
Show file tree
Hide file tree
Showing 174 changed files with 439 additions and 419 deletions.
3 changes: 1 addition & 2 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import * as Rx from '../src/Rx';
import { Observer } from './../src/internal/Observer';
import { TeardownLogic } from '../src/internal/Subscription';
import { Observer, TeardownLogic } from '../src/internal/types';
import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing';
import { map } from '../src/internal/operators/map';
//tslint:disable-next-line
Expand Down
2 changes: 1 addition & 1 deletion spec/helpers/test-helper.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
declare const global: any;

import * as Rx from '../../src/Rx';
import { ObservableInput } from '../../src/internal/Observable';
import { ObservableInput } from '../../src/internal/types';
import { root } from '../../src/internal/util/root';
import { $$iterator } from '../../src/internal/symbol/iterator';
import $$symbolObservable from 'symbol-observable';
Expand Down
3 changes: 1 addition & 2 deletions src/MiscJSDoc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
* we need these bogus classes, which are not stripped away. This file on the
* other hand, is not included in the release bundle.
*/
import { TeardownLogic } from './internal/Subscription';
import { Observer, TeardownLogic } from './internal/types';
import { Observable } from './internal/Observable';
import './internal/observable/dom/MiscJSDoc';
import { Observer } from './internal/Observer';

/**
* We need this JSDoc comment for affecting ESDoc.
Expand Down
2 changes: 1 addition & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ import './add/operator/zipAll';

/* tslint:disable:no-unused-variable */
export {Operator} from './internal/Operator';
export {Observer} from './internal/Observer';
export {Observer} from './internal/types';
export {Subscription} from './internal/Subscription';
export {Subscriber} from './internal/Subscriber';
export {AsyncSubject} from './internal/AsyncSubject';
Expand Down
5 changes: 3 additions & 2 deletions src/internal/BehaviorSubject.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Subject } from './Subject';
import { Subscriber } from './Subscriber';
import { Subscription, ISubscription } from './Subscription';
import { Subscription } from './Subscription';
import { SubscriptionLike } from './types';
import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';

/**
Expand All @@ -18,7 +19,7 @@ export class BehaviorSubject<T> extends Subject<T> {

protected _subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = super._subscribe(subscriber);
if (subscription && !(<ISubscription>subscription).closed) {
if (subscription && !(<SubscriptionLike>subscription).closed) {
subscriber.next(this._value);
}
return subscription;
Expand Down
2 changes: 1 addition & 1 deletion src/internal/Notification.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PartialObserver } from './Observer';
import { PartialObserver } from './types';
import { Observable } from './Observable';
import { empty } from './observable/empty';
import { of } from './observable/of';
Expand Down
13 changes: 7 additions & 6 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import { PartialObserver } from './Observer';
import { Operator } from './Operator';
import { Subscriber } from './Subscriber';
import { Subscription, AnonymousSubscription, TeardownLogic } from './Subscription';
import { Subscription } from './Subscription';
import { TeardownLogic } from './types';
import { root } from './util/root';
import { toSubscriber } from './util/toSubscriber';
import { IfObservable } from './observable/IfObservable';
import { observable as Symbol_observable } from '../internal/symbol/observable';
import { OperatorFunction, Subscribable } from '../internal/types';
import { OperatorFunction, PartialObserver, Subscribable } from '../internal/types';
import { pipeFromArray } from './util/pipe';

//TODO(davidd): refactor all references to these to use types instead
export { Subscribable, ObservableLike, SubscribableOrPromise, ObservableInput } from '../internal/types';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
* of RxJS.
Expand All @@ -20,9 +17,12 @@ export { Subscribable, ObservableLike, SubscribableOrPromise, ObservableInput }
*/
export class Observable<T> implements Subscribable<T> {

/** @internal */
public _isScalar: boolean = false;

/** @internal */
protected source: Observable<any>;
/** @internal */
protected operator: Operator<any, T>;

/**
Expand Down Expand Up @@ -243,6 +243,7 @@ export class Observable<T> implements Subscribable<T> {
});
}

/** @internal */
protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
return this.source.subscribe(subscriber);
}
Expand Down
30 changes: 1 addition & 29 deletions src/internal/Observer.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,4 @@
export interface NextObserver<T> {
closed?: boolean;
next: (value: T) => void;
error?: (err: any) => void;
complete?: () => void;
}

export interface ErrorObserver<T> {
closed?: boolean;
next?: (value: T) => void;
error: (err: any) => void;
complete?: () => void;
}

export interface CompletionObserver<T> {
closed?: boolean;
next?: (value: T) => void;
error?: (err: any) => void;
complete: () => void;
}

export type PartialObserver<T> = NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>;

export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
import { Observer } from './types';

export const empty: Observer<any> = {
closed: true,
Expand Down
2 changes: 1 addition & 1 deletion src/internal/Operator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Subscriber } from './Subscriber';
import { TeardownLogic } from './Subscription';
import { TeardownLogic } from './types';

export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
Expand Down
6 changes: 3 additions & 3 deletions src/internal/Subject.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Operator } from './Operator';
import { Observer } from './Observer';
import { Observable } from './Observable';
import { Subscriber } from './Subscriber';
import { ISubscription, Subscription, TeardownLogic } from './Subscription';
import { Subscription } from './Subscription';
import { Observer, SubscriptionLike, TeardownLogic } from './types';
import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
import { SubjectSubscription } from './SubjectSubscription';
import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';
Expand All @@ -19,7 +19,7 @@ export class SubjectSubscriber<T> extends Subscriber<T> {
/**
* @class Subject<T>
*/
export class Subject<T> extends Observable<T> implements ISubscription {
export class Subject<T> extends Observable<T> implements SubscriptionLike {

[rxSubscriberSymbol]() {
return new SubjectSubscriber(this);
Expand Down
2 changes: 1 addition & 1 deletion src/internal/SubjectSubscription.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Subject } from './Subject';
import { Observer } from './Observer';
import { Observer } from './types';
import { Subscription } from './Subscription';

/**
Expand Down
3 changes: 2 additions & 1 deletion src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { isFunction } from './util/isFunction';
import { Observer, PartialObserver, empty as emptyObserver } from './Observer';
import { empty as emptyObserver } from './Observer';
import { Observer, PartialObserver } from './types';
import { Subscription } from './Subscription';
import { rxSubscriber as rxSubscriberSymbol } from '../internal/symbol/rxSubscriber';

Expand Down
21 changes: 8 additions & 13 deletions src/internal/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,7 @@ import { isFunction } from './util/isFunction';
import { tryCatch } from './util/tryCatch';
import { errorObject } from './util/errorObject';
import { UnsubscriptionError } from './util/UnsubscriptionError';

export interface AnonymousSubscription {
unsubscribe(): void;
}

export type TeardownLogic = AnonymousSubscription | Function | void;

export interface ISubscription extends AnonymousSubscription {
unsubscribe(): void;
readonly closed: boolean;
}
import { SubscriptionLike, TeardownLogic } from './types';

/**
* Represents a disposable resource, such as the execution of an Observable. A
Expand All @@ -28,7 +18,7 @@ export interface ISubscription extends AnonymousSubscription {
*
* @class Subscription
*/
export class Subscription implements ISubscription {
export class Subscription implements SubscriptionLike {
public static EMPTY: Subscription = (function(empty: any) {
empty.closed = true;
return empty;
Expand All @@ -40,9 +30,12 @@ export class Subscription implements ISubscription {
*/
public closed: boolean = false;

/** @internal */
protected _parent: Subscription = null;
/** @internal */
protected _parents: Subscription[] = null;
private _subscriptions: ISubscription[] = null;
/** @internal */
private _subscriptions: SubscriptionLike[] = null;

/**
* @param {function(): void} [unsubscribe] A function describing how to
Expand All @@ -51,6 +44,7 @@ export class Subscription implements ISubscription {
constructor(unsubscribe?: () => void) {
if (unsubscribe) {
(<any> this)._unsubscribe = unsubscribe;

}
}

Expand Down Expand Up @@ -200,6 +194,7 @@ export class Subscription implements ISubscription {
}
}

/** @internal */
private _addParent(parent: Subscription) {
let { _parent, _parents } = this;
if (!_parent || _parent === parent) {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/BoundCallbackObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { IScheduler } from '../Scheduler';
import { tryCatch } from '..//util/tryCatch';
import { errorObject } from '..//util/errorObject';
import { tryCatch } from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import { AsyncSubject } from '../../internal/AsyncSubject';

/**
Expand Down
4 changes: 2 additions & 2 deletions src/internal/observable/BoundNodeCallbackObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { IScheduler } from '../Scheduler';
import { Action } from '../scheduler/Action';
import { tryCatch } from '..//util/tryCatch';
import { errorObject } from '..//util/errorObject';
import { tryCatch } from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import { AsyncSubject } from '../../internal/AsyncSubject';

/**
Expand Down
4 changes: 3 additions & 1 deletion src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { Subject, SubjectSubscriber } from '../Subject';
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription, TeardownLogic } from '../Subscription';
import { Subscription } from '../Subscription';
import { TeardownLogic } from '../types';
import { refCount as higherOrderRefCount } from '../../internal/operators/refCount';

/**
Expand All @@ -13,6 +14,7 @@ export class ConnectableObservable<T> extends Observable<T> {
protected _subject: Subject<T>;
protected _refCount: number = 0;
protected _connection: Subscription;
/** @internal */
_isComplete = false;

constructor(protected source: Observable<T>,
Expand Down
6 changes: 3 additions & 3 deletions src/internal/observable/FromEventObservable.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Observable } from '../Observable';
import { tryCatch } from '..//util/tryCatch';
import { isFunction } from '..//util/isFunction';
import { errorObject } from '..//util/errorObject';
import { tryCatch } from '../util/tryCatch';
import { isFunction } from '../util/isFunction';
import { errorObject } from '../util/errorObject';
import { Subscription } from '../Subscription';
import { Subscriber } from '../Subscriber';

Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/FromEventPatternObservable.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isFunction } from '..//util/isFunction';
import { isFunction } from '../util/isFunction';
import { Observable } from '../Observable';
import { Subscription } from '../Subscription';
import { Subscriber } from '../Subscriber';
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/GenerateObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Action } from '../scheduler/Action';
import { Observable } from '../Observable' ;
import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { isScheduler } from '..//util/isScheduler';
import { isScheduler } from '../util/isScheduler';

const selfSelector = <T>(value: T) => value;

Expand Down
6 changes: 3 additions & 3 deletions src/internal/observable/IfObservable.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Observable, SubscribableOrPromise } from '../Observable';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { TeardownLogic } from '../Subscription';
import { SubscribableOrPromise, TeardownLogic } from '../types';

import { subscribeToResult } from '..//util/subscribeToResult';
import { subscribeToResult } from '../util/subscribeToResult';
import { OuterSubscriber } from '../OuterSubscriber';
/**
* We need this JSDoc comment for affecting ESDoc.
Expand Down
2 changes: 1 addition & 1 deletion src/internal/observable/SubscribeOnObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
import { asap } from '../scheduler/asap';
import { isNumeric } from '..//util/isNumeric';
import { isNumeric } from '../util/isNumeric';

export interface DispatchArg<T> {
source: Observable<T>;
Expand Down
22 changes: 11 additions & 11 deletions src/internal/observable/UsingObservable.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Observable, SubscribableOrPromise } from '../Observable';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { AnonymousSubscription, TeardownLogic } from '../Subscription';
import { SubscribableOrPromise, Unsubscribable, TeardownLogic } from '../types';

import { subscribeToResult } from '..//util/subscribeToResult';
import { subscribeToResult } from '../util/subscribeToResult';
import { OuterSubscriber } from '../OuterSubscriber';
/**
* We need this JSDoc comment for affecting ESDoc.
Expand Down Expand Up @@ -41,23 +41,23 @@ export class UsingObservable<T> extends Observable<T> {
* @name using
* @owner Observable
*/
static create<T>(resourceFactory: () => AnonymousSubscription | void,
observableFactory: (resource: AnonymousSubscription) => SubscribableOrPromise<T> | void): Observable<T> {
static create<T>(resourceFactory: () => Unsubscribable | void,
observableFactory: (resource: Unsubscribable) => SubscribableOrPromise<T> | void): Observable<T> {
return new UsingObservable<T>(resourceFactory, observableFactory);
}

constructor(private resourceFactory: () => AnonymousSubscription | void,
private observableFactory: (resource: AnonymousSubscription) => SubscribableOrPromise<T> | void) {
constructor(private resourceFactory: () => Unsubscribable | void,
private observableFactory: (resource: Unsubscribable) => SubscribableOrPromise<T> | void) {
super();
}

protected _subscribe(subscriber: Subscriber<T>): TeardownLogic {
const { resourceFactory, observableFactory } = this;

let resource: AnonymousSubscription;
let resource: Unsubscribable;

try {
resource = <AnonymousSubscription>resourceFactory();
resource = <Unsubscribable>resourceFactory();
return new UsingSubscriber(subscriber, resource, observableFactory);
} catch (err) {
subscriber.error(err);
Expand All @@ -67,8 +67,8 @@ export class UsingObservable<T> extends Observable<T> {

class UsingSubscriber<T> extends OuterSubscriber<T, T> {
constructor(destination: Subscriber<T>,
private resource: AnonymousSubscription,
private observableFactory: (resource: AnonymousSubscription) => SubscribableOrPromise<T> | void) {
private resource: Unsubscribable,
private observableFactory: (resource: Unsubscribable) => SubscribableOrPromise<T> | void) {
super(destination);
destination.add(resource);
this.tryUse();
Expand Down
9 changes: 5 additions & 4 deletions src/internal/observable/combineLatest.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { Observable, ObservableInput } from '../Observable';
import { Observable } from '../Observable';
import { ObservableInput } from '../types';
import { IScheduler } from '../Scheduler';
import { isScheduler } from '..//util/isScheduler';
import { isArray } from '..//util/isArray';
import { isScheduler } from '../util/isScheduler';
import { isArray } from '../util/isArray';
import { Subscriber } from '../Subscriber';
import { OuterSubscriber } from '../OuterSubscriber';
import { Operator } from '../Operator';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '..//util/subscribeToResult';
import { subscribeToResult } from '../util/subscribeToResult';
import { fromArray } from './fromArray';

const NONE = {};
Expand Down
Loading

0 comments on commit 2f9efbf

Please sign in to comment.