From 6d9f6ae63867efe9e2f52e17e9f7abae1d946da5 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 5 Aug 2015 17:34:19 -0700 Subject: [PATCH] feat(operator): add materialize. closes #132 --- spec/Notification-spec.js | 43 +++++++++++++++++++ spec/operators/materialize-spec.js | 43 +++++++++++++++++++ src/Notification.ts | 67 ++++++++++++++++++++++++++++++ src/Observable.ts | 7 ++++ src/Rx.ts | 7 +++- src/operators/materialize.ts | 36 ++++++++++++++++ 6 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 spec/Notification-spec.js create mode 100644 spec/operators/materialize-spec.js create mode 100644 src/Notification.ts create mode 100644 src/operators/materialize.ts diff --git a/spec/Notification-spec.js b/spec/Notification-spec.js new file mode 100644 index 0000000000..5dce55ceca --- /dev/null +++ b/spec/Notification-spec.js @@ -0,0 +1,43 @@ +/* globals describe, it, expect */ +var Rx = require('../dist/cjs/Rx'); + +var Notification = Rx.Notification; + +describe('Notification', function () { + it('should exist', function () { + expect(typeof Notification).toBe('function'); + }); + + describe('createNext', function () { + it('should return a Notification', function () { + var n = Notification.createNext('test'); + expect(n instanceof Notification).toBe(true); + expect(n.value).toBe('test'); + expect(n.kind).toBe('N'); + expect(typeof n.exception).toBe('undefined'); + expect(n.hasValue).toBe(true); + }); + }); + + describe('createError', function () { + it('should return a Notification', function () { + var n = Notification.createError('test'); + expect(n instanceof Notification).toBe(true); + expect(typeof n.value).toBe('undefined'); + expect(n.kind).toBe('E'); + expect(n.exception).toBe('test'); + expect(n.hasValue).toBe(false); + }); + }); + + describe('createComplete', function () { + it('should return a Notification', function () { + var n = Notification.createComplete(); + expect(n instanceof Notification).toBe(true); + expect(typeof n.value).toBe('undefined'); + expect(n.kind).toBe('C'); + expect(typeof n.exception).toBe('undefined'); + expect(n.hasValue).toBe(false); + }); + }); +}); \ No newline at end of file diff --git a/spec/operators/materialize-spec.js b/spec/operators/materialize-spec.js new file mode 100644 index 0000000000..65225c9aab --- /dev/null +++ b/spec/operators/materialize-spec.js @@ -0,0 +1,43 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Notification = Rx.Notification; + +describe('Observable.prototype.materialize()', function () { + it('should materialize a happy stream', function () { + var expected = [ + Notification.createNext(1), + Notification.createNext(2), + Notification.createNext(3), + Notification.createComplete() + ]; + + Observable.of(1, 2, 3) + .materialize() + .subscribe(function (n) { + expect(n instanceof Notification).toBe(true); + expect(n).toEqual(expected.shift()); + }); + }); + + it('should materialize a sad stream', function () { + var expected = [ + Notification.createNext(1), + Notification.createNext(2), + Notification.createNext(3), + Notification.createError('booooo') + ]; + + Observable.of(1, 2, 3, 4) + .map(function (x) { + if (x === 4) { + throw 'booooo'; + } + return x; + }) + .materialize() + .subscribe(function (n) { + expect(n).toEqual(expected.shift()); + }); + }); +}); \ No newline at end of file diff --git a/src/Notification.ts b/src/Notification.ts new file mode 100644 index 0000000000..e4291d1738 --- /dev/null +++ b/src/Notification.ts @@ -0,0 +1,67 @@ +import Observer from './Observer'; +import Observable from './Observable'; +import noop from './util/noop'; + +export default class Notification { + hasValue: boolean; + + constructor(public kind: string, public value?: T, public exception?: any) { + this.hasValue = kind === 'N'; + } + + observe(observer: Observer): any { + switch (this.kind) { + case 'N': + return observer.next(this.value); + case 'E': + return observer.error(this.exception); + case 'C': + return observer.complete(); + } + } + + do(next: (value: T) => void, error?: (err: any) => void, complete?: () => void): any { + const kind = this.kind; + switch (kind) { + case 'N': + return next(this.value); + case 'E': + return error(this.exception); + case 'C': + return complete(); + } + } + + accept(nextOrObserver: Observer|((value: T) => void), error?: (err: any) => void, complete?: () => void) { + if (nextOrObserver && typeof (>nextOrObserver).next === 'function') { + return this.observe(>nextOrObserver); + } else { + return this.do(<(value: T) => void>nextOrObserver, error, complete); + } + } + + toObservable(): Observable { + const kind = this.kind; + const value = this.value; + switch (kind) { + case 'N': + return Observable.value(value); + case 'E': + return Observable.throw(value); + case 'C': + return Observable.empty(); + } + } + + static createNext(value: T): Notification { + return new Notification('N', value); + } + + static createError(err: any): Notification { + return new Notification('E', undefined, err); + } + + static createComplete(): Notification { + return new Notification('C'); + } +} \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 38d256de52..a93c684774 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -5,6 +5,10 @@ import Scheduler from './Scheduler'; import Subscriber from './Subscriber'; import Subscription from './Subscription'; import ConnectableObservable from './observables/ConnectableObservable'; +// HACK: the Babel part of the build doesn't like this reference. +// seems to put it in an infinite loop. +//import Notification from './Notification'; + import $$observer from './util/Symbol_observer'; @@ -131,6 +135,9 @@ export default class Observable { partition: (predicate: (x: T) => boolean) => Observable[]; toPromise: (PromiseCtor: PromiseConstructor) => Promise; defaultIfEmpty: (defaultValue: R) => Observable|Observable; + // HACK: this should be Observable>, but the build process didn't like it. :( + // this will be fixed when we can move everything to the TypeScript compiler I suspect. + materialize: () => Observable; observeOn: (scheduler: Scheduler, delay?: number) => Observable; subscribeOn: (scheduler: Scheduler, delay?: number) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 6b12698a40..cc1b061ef6 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -3,6 +3,8 @@ import Scheduler from './Scheduler'; import Observable from './Observable'; import Subscriber from './Subscriber'; import Subscription from './Subscription'; +import Notification from './Notification'; + import ReplaySubject from './subjects/ReplaySubject'; import BehaviorSubject from './subjects/BehaviorSubject'; import ConnectableObservable from './observables/ConnectableObservable'; @@ -124,10 +126,12 @@ observableProto.subscribeOn = subscribeOn; import partition from './operators/partition'; import toPromise from './operators/toPromise'; import defaultIfEmpty from './operators/defaultIfEmpty'; +import materialize from './operators/materialize'; observableProto.partition = partition; observableProto.toPromise = toPromise; observableProto.defaultIfEmpty = defaultIfEmpty; +observableProto.materialize = materialize; import _catch from './operators/catch'; import retryWhen from './operators/retryWhen'; @@ -143,5 +147,6 @@ export default { Subscription, ReplaySubject, BehaviorSubject, - ConnectableObservable + ConnectableObservable, + Notification  }; \ No newline at end of file diff --git a/src/operators/materialize.ts b/src/operators/materialize.ts new file mode 100644 index 0000000000..847960f7ed --- /dev/null +++ b/src/operators/materialize.ts @@ -0,0 +1,36 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Notification from '../Notification'; + +export default function materialize() { + return this.lift(new MaterializeOperator()); +} + +export class MaterializeOperator extends Operator { + call(observer: Observer): Observer { + return new MaterializeSubscriber(observer); + } +} + +export class MaterializeSubscriber extends Subscriber { + constructor(destination: Observer) { + super(destination); + } + + _next(value:T) { + this.destination.next(Notification.createNext(value)); + } + + _error(err: any) { + const destination = this.destination; + destination.next(Notification.createError(err)); + destination.complete(); + } + + _complete() { + const destination = this.destination; + destination.next(Notification.createComplete()); + destination.complete(); + } +}