diff --git a/spec/operators/dematerialize-spec.js b/spec/operators/dematerialize-spec.js new file mode 100644 index 0000000000..029c1a2c8a --- /dev/null +++ b/spec/operators/dematerialize-spec.js @@ -0,0 +1,84 @@ +/* globals describe, it, expect, expectObservable, hot */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Notification = Rx.Notification; + +describe('Observable.prototype.dematerialize()', function () { + it('should dematerialize a happy stream', function () { + var values = { + a: Notification.createNext('w'), + b: Notification.createNext('x'), + c: Notification.createNext('y'), + d: Notification.createComplete() + }; + + var e1 = hot('--a--b--c--d--|', values); + var expected = '--w--x--y--|'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize a sad stream', function () { + var values = { + a: Notification.createNext('w'), + b: Notification.createNext('x'), + c: Notification.createNext('y'), + d: Notification.createError('error') + }; + + var e1 = hot('--a--b--c--d--|', values); + var expected = '--w--x--y--#'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize stream does not completes', function () { + var e1 = hot('------'); + var expected = '-'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize stream never completes', function () { + var e1 = Observable.never(); + var expected = '-'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize stream does not emit', function () { + var e1 = hot('----|'); + var expected = '----|)'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize empty stream', function () { + var e1 = Observable.empty(); + var expected = '|'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize stream throws', function () { + var error = 'error'; + var e1 = hot('(x|)', {x: Notification.createError(error)}); + var expected = '#'; + + expectObservable(e1.dematerialize()).toBe(expected, null, error); + }); + + it('should dematerialize and completes when stream compltes with complete notification', function () { + var e1 = hot('----(a|)', { a: Notification.createComplete() }); + var expected = '----|'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); + + it('should dematerialize and completes when stream emits complete notification', function () { + var e1 = hot('----a--|', { a: Notification.createComplete() }); + var expected = '----|'; + + expectObservable(e1.dematerialize()).toBe(expected); + }); +}); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index db2feaeb28..7e8918b6ec 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -18,6 +18,7 @@ export interface CoreOperators { concatMap?: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; concatMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; count?: () => Observable; + dematerialize?: () => Observable; debounce?: (dueTime: number, scheduler?: Scheduler) => Observable; defaultIfEmpty?: (defaultValue: R) => Observable|Observable; delay?: (delay: number, scheduler?: Scheduler) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 1489f0713b..5dc59a6785 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -108,6 +108,9 @@ observableProto.concatMapTo = concatMapTo; import count from './operators/count'; observableProto.count = count; +import dematerialize from './operators/dematerialize'; +observableProto.dematerialize = dematerialize; + import debounce from './operators/debounce'; observableProto.debounce = debounce; diff --git a/src/Rx.ts b/src/Rx.ts index 32db411b1d..75e4f0d74a 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -97,6 +97,9 @@ observableProto.concatMapTo = concatMapTo; import count from './operators/count'; observableProto.count = count; +import dematerialize from './operators/dematerialize'; +observableProto.dematerialize = dematerialize; + import debounce from './operators/debounce'; observableProto.debounce = debounce; diff --git a/src/operators/dematerialize.ts b/src/operators/dematerialize.ts new file mode 100644 index 0000000000..85122e3b09 --- /dev/null +++ b/src/operators/dematerialize.ts @@ -0,0 +1,24 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Notification from '../Notification'; + +export default function dematerialize() { + return this.lift(new DeMaterializeOperator()); +} + +class DeMaterializeOperator, R> implements Operator { + call(subscriber: Subscriber) { + return new DeMaterializeSubscriber(subscriber); + } +} + +class DeMaterializeSubscriber> extends Subscriber { + constructor(destination: Subscriber) { + super(destination); + } + + _next(value: T) { + value.observe(this.destination); + } +}