Skip to content

Commit

Permalink
fix(buffer): Ensure notifier is subscribed after source (#5654)
Browse files Browse the repository at this point in the history
- Resolves an issue where a multicast observable could not adequately be used to notify a buffer on itself
- Corrects a regression that was introduced by a bad merge back in 6.0. This was originally corrected in PR #2195.

fixes #1754
  • Loading branch information
benlesh authored Sep 1, 2020
1 parent c289688 commit c088b0e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
22 changes: 21 additions & 1 deletion spec/operators/buffer-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { buffer, mergeMap, take } from 'rxjs/operators';
import { EMPTY, NEVER, throwError, of } from 'rxjs';
import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
import { expect } from 'chai';

/** @test {buffer} */
describe('Observable.prototype.buffer', () => {
Expand Down Expand Up @@ -266,4 +267,23 @@ describe('Observable.prototype.buffer', () => {
expectSubscriptions(b.subscriptions).toBe(bsubs);
});
});

it('should emit properly with an observable using itself as a notifier', () => {
const results: any[] = [];
const subject = new Subject<number>();

const source = subject.pipe(
buffer(subject)
).subscribe({
next: value => results.push(value),
complete: () => results.push('complete')
});

subject.next(1);
expect(results).to.deep.equal([[1]]);
subject.next(2);
expect(results).to.deep.equal([[1], [2]]);
subject.complete();
expect(results).to.deep.equal([[1], [2], 'complete']);
});
});
8 changes: 5 additions & 3 deletions src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ class BufferOperator<T> implements Operator<T, T[]> {
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier));
const bufferSubscriber = new BufferSubscriber(subscriber);
subscriber.add(source.subscribe(bufferSubscriber));
subscriber.add(innerSubscribe(this.closingNotifier, new SimpleInnerSubscriber(bufferSubscriber)));
return subscriber;
}
}

Expand All @@ -68,9 +71,8 @@ class BufferOperator<T> implements Operator<T, T[]> {
class BufferSubscriber<T> extends SimpleOuterSubscriber<T, any> {
private buffer: T[] = [];

constructor(destination: Subscriber<T[]>, closingNotifier: Observable<any>) {
constructor(destination: Subscriber<T[]>) {
super(destination);
this.add(innerSubscribe(closingNotifier, new SimpleInnerSubscriber(this)));
}

protected _next(value: T) {
Expand Down

0 comments on commit c088b0e

Please sign in to comment.