Skip to content

Commit

Permalink
feat(operator): add materialize. closes #132
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Aug 13, 2015
1 parent c80688b commit 6d9f6ae
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 1 deletion.
43 changes: 43 additions & 0 deletions spec/Notification-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* globals describe, it, expect */
var Rx = require('../dist/cjs/Rx');

var Notification = Rx.Notification;

describe('Notification', function () {
it('should exist', function () {
expect(typeof Notification).toBe('function');
});

describe('createNext', function () {
it('should return a Notification', function () {
var n = Notification.createNext('test');
expect(n instanceof Notification).toBe(true);
expect(n.value).toBe('test');
expect(n.kind).toBe('N');
expect(typeof n.exception).toBe('undefined');
expect(n.hasValue).toBe(true);
});
});

describe('createError', function () {
it('should return a Notification', function () {
var n = Notification.createError('test');
expect(n instanceof Notification).toBe(true);
expect(typeof n.value).toBe('undefined');
expect(n.kind).toBe('E');
expect(n.exception).toBe('test');
expect(n.hasValue).toBe(false);
});
});

describe('createComplete', function () {
it('should return a Notification', function () {
var n = Notification.createComplete();
expect(n instanceof Notification).toBe(true);
expect(typeof n.value).toBe('undefined');
expect(n.kind).toBe('C');
expect(typeof n.exception).toBe('undefined');
expect(n.hasValue).toBe(false);
});
});
});
43 changes: 43 additions & 0 deletions spec/operators/materialize-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Notification = Rx.Notification;

describe('Observable.prototype.materialize()', function () {
it('should materialize a happy stream', function () {
var expected = [
Notification.createNext(1),
Notification.createNext(2),
Notification.createNext(3),
Notification.createComplete()
];

Observable.of(1, 2, 3)
.materialize()
.subscribe(function (n) {
expect(n instanceof Notification).toBe(true);
expect(n).toEqual(expected.shift());
});
});

it('should materialize a sad stream', function () {
var expected = [
Notification.createNext(1),
Notification.createNext(2),
Notification.createNext(3),
Notification.createError('booooo')
];

Observable.of(1, 2, 3, 4)
.map(function (x) {
if (x === 4) {
throw 'booooo';
}
return x;
})
.materialize()
.subscribe(function (n) {
expect(n).toEqual(expected.shift());
});
});
});
67 changes: 67 additions & 0 deletions src/Notification.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import Observer from './Observer';
import Observable from './Observable';
import noop from './util/noop';

export default class Notification<T> {
hasValue: boolean;

constructor(public kind: string, public value?: T, public exception?: any) {
this.hasValue = kind === 'N';
}

observe(observer: Observer<T>): any {
switch (this.kind) {
case 'N':
return observer.next(this.value);
case 'E':
return observer.error(this.exception);
case 'C':
return observer.complete();
}
}

do(next: (value: T) => void, error?: (err: any) => void, complete?: () => void): any {
const kind = this.kind;
switch (kind) {
case 'N':
return next(this.value);
case 'E':
return error(this.exception);
case 'C':
return complete();
}
}

accept(nextOrObserver: Observer<T>|((value: T) => void), error?: (err: any) => void, complete?: () => void) {
if (nextOrObserver && typeof (<Observer<T>>nextOrObserver).next === 'function') {
return this.observe(<Observer<T>>nextOrObserver);
} else {
return this.do(<(value: T) => void>nextOrObserver, error, complete);
}
}

toObservable(): Observable<T> {
const kind = this.kind;
const value = this.value;
switch (kind) {
case 'N':
return Observable.value(value);
case 'E':
return Observable.throw(value);
case 'C':
return Observable.empty();
}
}

static createNext<T>(value: T): Notification<T> {
return new Notification('N', value);
}

static createError<T>(err: any): Notification<T> {
return new Notification('E', undefined, err);
}

static createComplete(): Notification<any> {
return new Notification('C');
}
}
7 changes: 7 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import Scheduler from './Scheduler';
import Subscriber from './Subscriber';
import Subscription from './Subscription';
import ConnectableObservable from './observables/ConnectableObservable';
// HACK: the Babel part of the build doesn't like this reference.
// seems to put it in an infinite loop.
//import Notification from './Notification';


import $$observer from './util/Symbol_observer';

Expand Down Expand Up @@ -131,6 +135,9 @@ export default class Observable<T> {
partition: (predicate: (x: T) => boolean) => Observable<T>[];
toPromise: (PromiseCtor: PromiseConstructor) => Promise<T>;
defaultIfEmpty: <T, R>(defaultValue: R) => Observable<T>|Observable<R>;
// HACK: this should be Observable<Notification<T>>, but the build process didn't like it. :(
// this will be fixed when we can move everything to the TypeScript compiler I suspect.
materialize: () => Observable<any>;

observeOn: (scheduler: Scheduler, delay?: number) => Observable<T>;
subscribeOn: (scheduler: Scheduler, delay?: number) => Observable<T>;
Expand Down
7 changes: 6 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import Scheduler from './Scheduler';
import Observable from './Observable';
import Subscriber from './Subscriber';
import Subscription from './Subscription';
import Notification from './Notification';

import ReplaySubject from './subjects/ReplaySubject';
import BehaviorSubject from './subjects/BehaviorSubject';
import ConnectableObservable from './observables/ConnectableObservable';
Expand Down Expand Up @@ -124,10 +126,12 @@ observableProto.subscribeOn = subscribeOn;
import partition from './operators/partition';
import toPromise from './operators/toPromise';
import defaultIfEmpty from './operators/defaultIfEmpty';
import materialize from './operators/materialize';

observableProto.partition = partition;
observableProto.toPromise = toPromise;
observableProto.defaultIfEmpty = defaultIfEmpty;
observableProto.materialize = materialize;

import _catch from './operators/catch';
import retryWhen from './operators/retryWhen';
Expand All @@ -143,5 +147,6 @@ export default {
Subscription,
ReplaySubject,
BehaviorSubject,
ConnectableObservable
ConnectableObservable,
Notification 
};
36 changes: 36 additions & 0 deletions src/operators/materialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Notification from '../Notification';

export default function materialize<T>() {
return this.lift(new MaterializeOperator());
}

export class MaterializeOperator<T, R> extends Operator<T, R> {
call(observer: Observer<T>): Observer<T> {
return new MaterializeSubscriber(observer);
}
}

export class MaterializeSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<T>) {
super(destination);
}

_next(value:T) {
this.destination.next(Notification.createNext(value));
}

_error(err: any) {
const destination = this.destination;
destination.next(Notification.createError(err));
destination.complete();
}

_complete() {
const destination = this.destination;
destination.next(Notification.createComplete());
destination.complete();
}
}

0 comments on commit 6d9f6ae

Please sign in to comment.