Skip to content

Commit

Permalink
fix(Observer): fix typing to allow observation via partial observable…
Browse files Browse the repository at this point in the history
…s with PartialObservable<T>

- Observer<T> is the "full" observer interface
- PartialObserver<T> is any object with at least one method of the Observer interface
- Updates subscribe signature and Notification signatures to use PartialObserver instead of Observer

Related #1260
  • Loading branch information
benlesh committed Feb 2, 2016
1 parent 7c9547a commit 7b6da90
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 41 deletions.
22 changes: 11 additions & 11 deletions src/Notification.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Observer} from './Observer';
import {PartialObserver} from './Observer';
import {Observable} from './Observable';

export class Notification<T> {
Expand All @@ -8,32 +8,32 @@ export class Notification<T> {
this.hasValue = kind === 'N';
}

observe(observer: Observer<T>): any {
observe(observer: PartialObserver<T>): any {
switch (this.kind) {
case 'N':
return observer.next(this.value);
return observer.next && observer.next(this.value);
case 'E':
return observer.error(this.exception);
return observer.error && observer.error(this.exception);
case 'C':
return observer.complete();
return observer.complete && observer.complete();
}
}

do(next: (value: T) => void, error?: (err: any) => void, complete?: () => void): any {
const kind = this.kind;
switch (kind) {
case 'N':
return next(this.value);
return next && next(this.value);
case 'E':
return error(this.exception);
return error && error(this.exception);
case 'C':
return complete();
return complete && complete();
}
}

accept(nextOrObserver: Observer<T> | ((value: T) => void), error?: (err: any) => void, complete?: () => void) {
if (nextOrObserver && typeof (<Observer<T>>nextOrObserver).next === 'function') {
return this.observe(<Observer<T>>nextOrObserver);
accept(nextOrObserver: PartialObserver<T> | ((value: T) => void), error?: (err: any) => void, complete?: () => void) {
if (nextOrObserver && typeof (<PartialObserver<T>>nextOrObserver).next === 'function') {
return this.observe(<PartialObserver<T>>nextOrObserver);
} else {
return this.do(<(value: T) => void>nextOrObserver, error, complete);
}
Expand Down
6 changes: 3 additions & 3 deletions src/Observable.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Observer} from './Observer';
import {PartialObserver} from './Observer';
import {Operator} from './Operator';
import {Scheduler} from './Scheduler';
import {Subscriber} from './Subscriber';
Expand Down Expand Up @@ -98,7 +98,7 @@ export class Observable<T> implements CoreOperators<T> {

/**
* @method subscribe
* @param {Observer|Function} observerOrNext (optional) either an observer defining all functions to be called,
* @param {PartialObserver|Function} observerOrNext (optional) either an observer defining all functions to be called,
* or the first of three possible handlers, which is the handler for each value emitted from the observable.
* @param {Function} error (optional) a handler for a terminal event resulting from an error. If no error handler is provided,
* the error will be thrown as unhandled
Expand All @@ -107,7 +107,7 @@ export class Observable<T> implements CoreOperators<T> {
* @description registers handlers for handling emitted values, error and completions from the observable, and
* executes the observable's subscriber function, which will take action to set up the underlying data stream
*/
subscribe(observerOrNext?: Observer<T> | ((value: T) => void),
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {

Expand Down
31 changes: 27 additions & 4 deletions src/Observer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
export interface NextObserver<T> {
isUnsubscribed?: boolean;
next: (value: T) => void;
error?: (err: any) => void;
complete?: () => void;
}

export interface ErrorObserver<T> {
isUnsubscribed?: boolean;
next?: (value: T) => void;
error: (err: any) => void;
complete?: () => void;
}

export interface CompletionObserver<T> {
isUnsubscribed?: boolean;
next?: (value: T) => void;
error?: (err: any) => void;
complete: () => void;
}

export type PartialObserver<T> = NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>;

export interface Observer<T> {
isUnsubscribed: boolean;
next(value: T): void;
error(error: any): void;
complete(): void;
isUnsubscribed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

export const empty: Observer<any> = {
Expand Down
18 changes: 9 additions & 9 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {isFunction} from './util/isFunction';
import {Observer} from './Observer';
import {Observer, PartialObserver} from './Observer';
import {Subscription} from './Subscription';
import {rxSubscriber} from './symbol/rxSubscriber';
import {empty as emptyObserver} from './Observer';
Expand All @@ -19,9 +19,9 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
public syncErrorThrowable: boolean = false;

protected isStopped: boolean = false;
protected destination: Observer<any>;
protected destination: PartialObserver<any>;

constructor(destinationOrNext?: Observer<any> | ((value: T) => void),
constructor(destinationOrNext?: PartialObserver<any> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super();
Expand All @@ -37,10 +37,10 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.destination = (<Observer<any>> destinationOrNext);
this.destination = (<Subscriber<any>> destinationOrNext);
} else {
this.syncErrorThrowable = true;
this.destination = new SafeSubscriber<T>(this, <Observer<any>> destinationOrNext);
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>> destinationOrNext);
}
break;
}
Expand Down Expand Up @@ -103,7 +103,7 @@ class SafeSubscriber<T> extends Subscriber<T> {
private _context: any;

constructor(private _parent: Subscriber<T>,
observerOrNext?: Observer<T> | ((value: T) => void),
observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (e?: any) => void,
complete?: () => void) {
super();
Expand All @@ -115,9 +115,9 @@ class SafeSubscriber<T> extends Subscriber<T> {
next = (<((value: T) => void)> observerOrNext);
} else if (observerOrNext) {
context = observerOrNext;
next = (<Observer<T>> observerOrNext).next;
error = (<Observer<T>> observerOrNext).error;
complete = (<Observer<T>> observerOrNext).complete;
next = (<PartialObserver<T>> observerOrNext).next;
error = (<PartialObserver<T>> observerOrNext).error;
complete = (<PartialObserver<T>> observerOrNext).complete;
}

this._context = context;
Expand Down
4 changes: 2 additions & 2 deletions src/operator/mergeMapTo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
Expand Down Expand Up @@ -54,7 +54,7 @@ export class MergeMapToSubscriber<T, R, R2> extends OuterSubscriber<T, R> {
}

private _innerSub(ish: any,
destination: Observer<R>,
destination: PartialObserver<R>,
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => R2,
value: T,
index: number): void {
Expand Down
4 changes: 2 additions & 2 deletions src/operator/observeOn.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
import {Notification} from '../Notification';

Expand Down Expand Up @@ -50,6 +50,6 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {

class ObserveOnMessage {
constructor(public notification: Notification<any>,
public destination: Observer<any>) {
public destination: PartialObserver<any>) {
}
}
4 changes: 2 additions & 2 deletions src/operator/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Observable} from '../Observable';
import {ArrayObservable} from '../observable/ArrayObservable';
import {isArray} from '../util/isArray';
import {Operator} from '../Operator';
import {Observer} from '../Observer';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
import {OuterSubscriber} from '../OuterSubscriber';
import {InnerSubscriber} from '../InnerSubscriber';
Expand Down Expand Up @@ -199,7 +199,7 @@ class ZipBufferIterator<T, R> extends OuterSubscriber<T, R> implements LookAhead
buffer: T[] = [];
isComplete = false;

constructor(destination: Observer<T>,
constructor(destination: PartialObserver<T>,
private parent: ZipSubscriber<T, R>,
private observable: Observable<T>,
private index: number) {
Expand Down
16 changes: 8 additions & 8 deletions src/util/toSubscriber.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import {Observer} from '../Observer';
import {PartialObserver} from '../Observer';
import {Subscriber} from '../Subscriber';
import {rxSubscriber} from '../symbol/rxSubscriber';

export function toSubscriber<T>(
next?: Observer<T> | ((value: T) => void),
nextOrObserver?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscriber<T> {

if (next && typeof next === 'object') {
if (next instanceof Subscriber) {
return (<Subscriber<T>> next);
} else if (typeof next[rxSubscriber] === 'function') {
return next[rxSubscriber]();
if (nextOrObserver && typeof nextOrObserver === 'object') {
if (nextOrObserver instanceof Subscriber) {
return (<Subscriber<T>> nextOrObserver);
} else if (typeof nextOrObserver[rxSubscriber] === 'function') {
return nextOrObserver[rxSubscriber]();
}
}

return new Subscriber(next, error, complete);
return new Subscriber(nextOrObserver, error, complete);
}

0 comments on commit 7b6da90

Please sign in to comment.