diff --git a/src/operators/bufferToggle.ts b/src/operators/bufferToggle.ts index 5ea0857a11..49e59cfdd2 100644 --- a/src/operators/bufferToggle.ts +++ b/src/operators/bufferToggle.ts @@ -33,8 +33,13 @@ class BufferToggleOperator implements Operator { } } +interface BufferContext { + buffer: T[]; + subscription: Subscription; +} + class BufferToggleSubscriber extends Subscriber { - private buffers: Array = []; + private contexts: Array> = []; private closingNotification: Subscription; constructor(destination: Subscriber, @@ -45,53 +50,58 @@ class BufferToggleSubscriber extends Subscriber { } _next(value: T) { - const buffers = this.buffers; - const len = buffers.length; + const contexts = this.contexts; + const len = contexts.length; for (let i = 0; i < len; i++) { - buffers[i].push(value); + contexts[i].buffer.push(value); } } _error(err: any) { - this.buffers = null; + this.contexts = null; this.destination.error(err); } _complete() { - const buffers = this.buffers; - while (buffers.length > 0) { - this.destination.next(buffers.shift()); + const contexts = this.contexts; + while (contexts.length > 0) { + const context = contexts.shift(); + this.destination.next(context.buffer); + context.subscription.unsubscribe(); + context.buffer = null; } this.destination.complete(); } openBuffer(value: O) { const closingSelector = this.closingSelector; - const buffers = this.buffers; + const contexts = this.contexts; let closingNotifier = tryCatch(closingSelector)(value); if (closingNotifier === errorObject) { const err = closingNotifier.e; - this.buffers = null; + this.contexts = null; this.destination.error(err); } else { - let buffer = []; let context = { - buffer, + buffer: [], subscription: new Subscription() }; - buffers.push(buffer); + contexts.push(context); const subscriber = new BufferClosingNotifierSubscriber(this, context); const subscription = closingNotifier._subscribe(subscriber); this.add(context.subscription.add(subscription)); } } - closeBuffer(context: { subscription: any, buffer: T[] }) { + closeBuffer(context: BufferContext) { + const contexts = this.contexts; + if (contexts === null) { + return; + } const { buffer, subscription } = context; - const buffers = this.buffers; this.destination.next(buffer); - buffers.splice(buffers.indexOf(buffer), 1); + contexts.splice(contexts.indexOf(context), 1); this.remove(subscription); subscription.unsubscribe(); } @@ -111,7 +121,7 @@ class BufferClosingNotifierSubscriber extends Subscriber { } _complete() { - // noop + this.parent.closeBuffer(this.context); } }