Skip to content

Commit

Permalink
feat(dematerialize): add dematerialize operator
Browse files Browse the repository at this point in the history
- add dematerialize operator
- add test coverage for dematerialize operator

closes #475
  • Loading branch information
kwonoj authored and benlesh committed Oct 8, 2015
1 parent 414c1b7 commit 0a8b074
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 0 deletions.
84 changes: 84 additions & 0 deletions spec/operators/dematerialize-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Notification = Rx.Notification;

describe('Observable.prototype.dematerialize()', function () {
it('should dematerialize a happy stream', function () {
var values = {
a: Notification.createNext('w'),
b: Notification.createNext('x'),
c: Notification.createNext('y'),
d: Notification.createComplete()
};

var e1 = hot('--a--b--c--d--|', values);
var expected = '--w--x--y--|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize a sad stream', function () {
var values = {
a: Notification.createNext('w'),
b: Notification.createNext('x'),
c: Notification.createNext('y'),
d: Notification.createError('error')
};

var e1 = hot('--a--b--c--d--|', values);
var expected = '--w--x--y--#';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream does not completes', function () {
var e1 = hot('------');
var expected = '-';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream does not emit', function () {
var e1 = hot('----|');
var expected = '----|)';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize empty stream', function () {
var e1 = Observable.empty();
var expected = '|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize stream throws', function () {
var error = 'error';
var e1 = hot('(x|)', {x: Notification.createError(error)});
var expected = '#';

expectObservable(e1.dematerialize()).toBe(expected, null, error);
});

it('should dematerialize and completes when stream compltes with complete notification', function () {
var e1 = hot('----(a|)', { a: Notification.createComplete() });
var expected = '----|';

expectObservable(e1.dematerialize()).toBe(expected);
});

it('should dematerialize and completes when stream emits complete notification', function () {
var e1 = hot('----a--|', { a: Notification.createComplete() });
var expected = '----|';

expectObservable(e1.dematerialize()).toBe(expected);
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface CoreOperators<T> {
concatMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
concatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
count?: () => Observable<number>;
dematerialize?: () => Observable<any>;
debounce?: <R>(dueTime: number, scheduler?: Scheduler) => Observable<R>;
defaultIfEmpty?: <T, R>(defaultValue: R) => Observable<T>|Observable<R>;
delay?: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ observableProto.concatMapTo = concatMapTo;
import count from './operators/count';
observableProto.count = count;

import dematerialize from './operators/dematerialize';
observableProto.dematerialize = dematerialize;

import debounce from './operators/debounce';
observableProto.debounce = debounce;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ observableProto.concatMapTo = concatMapTo;
import count from './operators/count';
observableProto.count = count;

import dematerialize from './operators/dematerialize';
observableProto.dematerialize = dematerialize;

import debounce from './operators/debounce';
observableProto.debounce = debounce;

Expand Down
24 changes: 24 additions & 0 deletions src/operators/dematerialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Notification from '../Notification';

export default function dematerialize<T>() {
return this.lift(new DeMaterializeOperator());
}

class DeMaterializeOperator<T extends Notification<any>, R> implements Operator<T, R> {
call(subscriber: Subscriber<R>) {
return new DeMaterializeSubscriber(subscriber);
}
}

class DeMaterializeSubscriber<T extends Notification<any>> extends Subscriber<T> {
constructor(destination: Subscriber<any>) {
super(destination);
}

_next(value: T) {
value.observe(this.destination);
}
}

0 comments on commit 0a8b074

Please sign in to comment.