From 255c9d424a69d4fce8aa69160cd60d14fa749fb5 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 29 Aug 2015 11:07:54 +0300 Subject: [PATCH 1/2] feat(operator): add publishReplay operator and spec --- spec/operators/publishReplay-spec.js | 102 +++++++++++++++++++++++++++ src/Observable.ts | 1 + src/Rx.ts | 2 + src/operators/publishReplay.ts | 11 +++ 4 files changed, 116 insertions(+) create mode 100644 spec/operators/publishReplay-spec.js create mode 100644 src/operators/publishReplay.ts diff --git a/spec/operators/publishReplay-spec.js b/spec/operators/publishReplay-spec.js new file mode 100644 index 0000000000..d44b0516cd --- /dev/null +++ b/spec/operators/publishReplay-spec.js @@ -0,0 +1,102 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.publishReplay()', function () { + it('should return a ConnectableObservable', function () { + var source = Observable.value(1).publishReplay(); + expect(source instanceof Rx.ConnectableObservable).toBe(true); + }); + + it('should multicast one observable to multiple observers', function (done) { + var results1 = []; + var results2 = []; + var subscriptions = 0; + + var source = new Observable(function (observer) { + subscriptions++; + observer.next(1); + observer.next(2); + observer.next(3); + observer.next(4); + observer.complete(); + }); + + var connectable = source.publishReplay(); + + connectable.subscribe(function (x) { + results1.push(x); + }); + connectable.subscribe(function (x) { + results2.push(x); + }); + + expect(results1).toEqual([]); + expect(results2).toEqual([]); + + connectable.connect(); + + expect(results1).toEqual([1, 2, 3, 4]); + expect(results2).toEqual([1, 2, 3, 4]); + expect(subscriptions).toBe(1); + done(); + }); + + it('should replay as many events as specified by the bufferSize', function (done) { + var results1 = []; + var results2 = []; + var subscriptions = 0; + + var source = new Observable(function (observer) { + subscriptions++; + observer.next(1); + observer.next(2); + observer.next(3); + observer.next(4); + }); + + var connectable = source.publishReplay(2); + + connectable.subscribe(function (x) { + results1.push(x); + }); + + expect(results1).toEqual([]); + expect(results2).toEqual([]); + + connectable.connect(); + + connectable.subscribe(function (x) { + results2.push(x); + }); + + expect(results1).toEqual([1, 2, 3, 4]); + expect(results2).toEqual([3, 4]); + expect(subscriptions).toBe(1); + done(); + }); + + it('should allow you to reconnect by subscribing again', function (done) { + var expected = [1, 2, 3, 4]; + var i = 0; + + var source = Observable.of(1, 2, 3, 4).publishReplay(); + + source.subscribe( + function (x) { + expect(x).toBe(expected[i++]); + }, + null, + function () { + i = 0; + + source.subscribe(function (x) { + expect(x).toBe(expected[i++]); + }, null, done); + + source.connect(); + }); + + source.connect(); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 1f53cebeb2..3512ffa9c4 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -155,6 +155,7 @@ export default class Observable { publish: () => ConnectableObservable; publishBehavior: (value: any) => ConnectableObservable; + publishReplay: (bufferSize: number, windowTime: number, scheduler?: Scheduler) => ConnectableObservable; multicast: (subjectFactory: () => Subject) => ConnectableObservable; catch: (selector: (err: any, source: Observable, caught: Observable) => Observable) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 7b18d6d2d8..dac6fbc786 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -127,10 +127,12 @@ observableProto.zipAll = zipAll; import publish from './operators/publish'; import publishBehavior from './operators/publishBehavior'; +import publishReplay from './operators/publishReplay'; import multicast from './operators/multicast'; observableProto.publish = publish; observableProto.publishBehavior = publishBehavior; +observableProto.publishReplay = publishReplay; observableProto.multicast = multicast; import observeOn from './operators/observeOn'; diff --git a/src/operators/publishReplay.ts b/src/operators/publishReplay.ts new file mode 100644 index 0000000000..7b329ec254 --- /dev/null +++ b/src/operators/publishReplay.ts @@ -0,0 +1,11 @@ +import ReplaySubject from '../subjects/ReplaySubject'; +import Scheduler from '../Scheduler'; +import multicast from './multicast'; + +export default function publishReplay(bufferSize: number = Number.POSITIVE_INFINITY, + windowTime: number = Number.POSITIVE_INFINITY, + scheduler?: Scheduler) { + return multicast.call(this, + () => new ReplaySubject(bufferSize, windowTime, scheduler) + ); +} \ No newline at end of file From 905efe60c10388548de3ddfcf47c133627c5d920 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Sat, 29 Aug 2015 12:41:39 +0300 Subject: [PATCH 2/2] Correct tests for publishBehavior and publishReplay --- spec/operators/publishBehavior-spec.js | 5 ++--- spec/operators/publishReplay-spec.js | 22 ++++++++++++++++++++++ spec/subjects/behavior-subject-spec.js | 18 ++++++++++++++---- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/spec/operators/publishBehavior-spec.js b/spec/operators/publishBehavior-spec.js index f2a42d0ec1..e963112d44 100644 --- a/spec/operators/publishBehavior-spec.js +++ b/spec/operators/publishBehavior-spec.js @@ -43,7 +43,7 @@ describe('Observable.prototype.publishBehavior()', function () { done(); }); - it('should emit default value to observer after completed', function (done) { + it('should not emit next events to observer after completed', function (done) { var results = []; var source = new Observable(function (observer) { @@ -55,14 +55,13 @@ describe('Observable.prototype.publishBehavior()', function () { }); var connectable = source.publishBehavior(0); - connectable.connect(); connectable.subscribe(function (x) { results.push(x); }); - expect(results).toEqual([0]); + expect(results).toEqual([]); done(); }); diff --git a/spec/operators/publishReplay-spec.js b/spec/operators/publishReplay-spec.js index d44b0516cd..eaa1cc0aa8 100644 --- a/spec/operators/publishReplay-spec.js +++ b/spec/operators/publishReplay-spec.js @@ -76,6 +76,28 @@ describe('Observable.prototype.publishReplay()', function () { done(); }); + it('should emit replayed events to observer after completed', function (done) { + var results = []; + + var source = new Observable(function (observer) { + observer.next(1); + observer.next(2); + observer.next(3); + observer.next(4); + observer.complete(); + }); + + var connectable = source.publishReplay(2); + connectable.connect(); + + connectable.subscribe(function (x) { + results.push(x); + }); + + expect(results).toEqual([3, 4]); + done(); + }); + it('should allow you to reconnect by subscribing again', function (done) { var expected = [1, 2, 3, 4]; var i = 0; diff --git a/spec/subjects/behavior-subject-spec.js b/spec/subjects/behavior-subject-spec.js index 9e44eeeda6..2cce561e66 100644 --- a/spec/subjects/behavior-subject-spec.js +++ b/spec/subjects/behavior-subject-spec.js @@ -1,4 +1,4 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, fail */ var Rx = require('../../dist/cjs/Rx'); var BehaviorSubject = Rx.BehaviorSubject; @@ -10,23 +10,33 @@ describe('BehaviorSubject', function() { expect(subject instanceof Rx.Subject).toBe(true); done(); }); - + it('should start with an initialization value', function(done) { var subject = new BehaviorSubject('foo'); var expected = ['foo', 'bar']; var i = 0; - + subject.subscribe(function(x) { expect(x).toBe(expected[i++]); }, null, function(){ done(); }); - + // HACK Rx.Scheduler.nextTick.schedule(0, null, function(){ subject.next('bar'); subject.complete(); }); }); + + it('should not replay last value after it completes', function(done) { + var subject = new BehaviorSubject('foo'); + subject.next('bar'); + subject.next('baz'); + subject.complete(); + subject.subscribe(function(x) { + fail('subscribeOnNext has been called'); + }, null, done); + }); }); \ No newline at end of file