Skip to content

Commit

Permalink
feat(operator): add publishBehavior operator and spec
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros authored and benlesh committed Aug 28, 2015
1 parent 322218a commit 249ab8d
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 0 deletions.
92 changes: 92 additions & 0 deletions spec/operators/publishBehavior-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.publishBehavior()', function () {
it('should return a ConnectableObservable', function () {
var source = Observable.value(1).publishBehavior(1);
expect(source instanceof Rx.ConnectableObservable).toBe(true);
});

it('should multicast one observable to multiple observers', function (done) {
var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = new Observable(function (observer) {
subscriptions++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
});

var connectable = source.publishBehavior(0);

connectable.subscribe(function (x) {
results1.push(x);
});

expect(results1).toEqual([0]);

connectable.connect();

expect(results2).toEqual([]);

connectable.subscribe(function (x) {
results2.push(x);
});

expect(results1).toEqual([0, 1, 2, 3, 4]);
expect(results2).toEqual([4]);
expect(subscriptions).toBe(1);
done();
});

it('should emit default value to observer after completed', function (done) {
var results = [];

var source = new Observable(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
observer.complete();
});

var connectable = source.publishBehavior(0);

connectable.connect();

connectable.subscribe(function (x) {
results.push(x);
});

expect(results).toEqual([0]);
done();
});

it('should allow you to reconnect by subscribing again', function (done) {
var expected = [0, 1, 2, 3, 4];
var i = 0;

var source = Observable.of(1, 2, 3, 4).publishBehavior(0);

source.subscribe(
function (x) {
expect(x).toBe(expected[i++]);
},
null,
function () {
i = 0;

source.subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);

source.connect();
});

source.connect();
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ export default class Observable<T> {
subscribeOn: (scheduler: Scheduler, delay?: number) => Observable<T>;

publish: () => ConnectableObservable<T>;
publishBehavior: (value: any) => ConnectableObservable<T>;
multicast: (subjectFactory: () => Subject<T>) => ConnectableObservable<T>;

catch: (selector: (err: any, source: Observable<T>, caught: Observable<any>) => Observable<any>) => Observable<T>;
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ observableProto.zip = zipProto;
observableProto.zipAll = zipAll;

import publish from './operators/publish';
import publishBehavior from './operators/publishBehavior';
import multicast from './operators/multicast';

observableProto.publish = publish;
observableProto.publishBehavior = publishBehavior;
observableProto.multicast = multicast;

import observeOn from './operators/observeOn';
Expand Down
6 changes: 6 additions & 0 deletions src/operators/publishBehavior.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import BehaviorSubject from '../subjects/BehaviorSubject';
import multicast from './multicast';

export default function publishBehavior(value: any) {
return multicast.call(this, () => new BehaviorSubject(value));
}

0 comments on commit 249ab8d

Please sign in to comment.