Skip to content

Commit

Permalink
fix(buffer): cleanup notifier subscription when unsubscribed
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj authored and benlesh committed Oct 26, 2015
1 parent 69aa51d commit 1b30aa9
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
21 changes: 19 additions & 2 deletions spec/operators/buffer-spec.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* globals describe, it, expect, expectObservable, hot */
/* globals describe, it, expect, expectObservable, expectSubscriptions, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.buffer', function () {
describe('Observable.prototype.buffer()', function () {
it('should work with empty and empty selector', function () {
var a = Observable.empty();
var b = Observable.empty();
Expand Down Expand Up @@ -167,4 +167,21 @@ describe('Observable.prototype.buffer', function () {
expectObservable(a.buffer(b)).toBe(expected, expectedValues, new Error('too bad'));
expectSubscriptions(a.subscriptions).toBe(subs);
});

it('should unsubscribe notifier when source unsubscribed', function () {
var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
var unsub = ' ! ';
var subs = '^ ! ';
var b = hot('--------^--a-------b---cd| ');
var bsubs = '^ ! ';
var expected = '---a-------b--- ';
var expectedValues = {
a: ['3'],
b: ['4', '5']
};

expectObservable(a.buffer(b), unsub).toBe(expected, expectedValues);
expectSubscriptions(a.subscriptions).toBe(subs);
expectSubscriptions(b.subscriptions).toBe(bsubs);
});
});
10 changes: 8 additions & 2 deletions src/operators/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ class BufferOperator<T, R> implements Operator<T, R> {
}

class BufferSubscriber<T> extends Subscriber<T> {
buffer: T[] = [];
private buffer: T[] = [];
private notifierSubscriber: BufferClosingNotifierSubscriber<any> = null;

constructor(destination: Subscriber<T>, closingNotifier: Observable<any>) {
super(destination);
this.add(closingNotifier._subscribe(new BufferClosingNotifierSubscriber(this)));
this.notifierSubscriber = new BufferClosingNotifierSubscriber(this);
this.add(closingNotifier._subscribe(this.notifierSubscriber));
}

_next(value: T) {
Expand All @@ -53,6 +55,10 @@ class BufferSubscriber<T> extends Subscriber<T> {
const buffer = this.buffer;
this.buffer = [];
this.destination.next(buffer);

if (this.isUnsubscribed) {
this.notifierSubscriber.unsubscribe();
}
}
}

Expand Down

0 comments on commit 1b30aa9

Please sign in to comment.