diff --git a/spec/operators/bufferTime-spec.ts b/spec/operators/bufferTime-spec.ts
index 7ebcd7546c..2f5b8010d9 100644
--- a/spec/operators/bufferTime-spec.ts
+++ b/spec/operators/bufferTime-spec.ts
@@ -18,7 +18,7 @@ describe('Observable.prototype.bufferTime', () => {
z: []
};
- const result = e1.bufferTime(t, null, rxTestScheduler);
+ const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(subs);
@@ -34,11 +34,47 @@ describe('Observable.prototype.bufferTime', () => {
z: []
};
- const result = e1.bufferTime(t, null, rxTestScheduler);
+ const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result).toBe(expected, values);
});
+ it('should emit buffers at intervals or when the buffer is full', () => {
+ const e1 = hot('---a---b---c---d---e---f---g-----|');
+ const subs = '^ !';
+ const t = time( '----------|');
+ const expected = '-------w-------x-------y---------(z|)';
+ const values = {
+ w: ['a', 'b'],
+ x: ['c', 'd'],
+ y: ['e', 'f'],
+ z: ['g']
+ };
+
+ const result = e1.bufferTime(t, null, 2, rxTestScheduler);
+
+ expectObservable(result).toBe(expected, values);
+ expectSubscriptions(e1.subscriptions).toBe(subs);
+ });
+
+ it('should emit buffers at intervals or when the buffer is full test 2', () => {
+ const e1 = hot('---a---b---c---d---e---f---g-----|');
+ const subs = '^ !';
+ const t = time( '----------|');
+ const expected = '----------w--------x---------y---(z|)';
+ const values = {
+ w: ['a', 'b'],
+ x: ['c', 'd', 'e'],
+ y: ['f', 'g'],
+ z: []
+ };
+
+ const result = e1.bufferTime(t, null, 3, rxTestScheduler);
+
+ expectObservable(result).toBe(expected, values);
+ expectSubscriptions(e1.subscriptions).toBe(subs);
+ });
+
it('should emit buffers that have been created at intervals and close after the specified delay', () => {
const e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)');
// --------------------*--------------------*---- start interval
@@ -54,7 +90,28 @@ describe('Observable.prototype.bufferTime', () => {
z: ['i', 'k']
};
- const result = e1.bufferTime(t, interval, rxTestScheduler);
+ const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler);
+
+ expectObservable(result).toBe(expected, values);
+ });
+
+ it('should emit buffers that have been created at intervals and close after the specified delay ' +
+ 'or when the buffer is full', () => {
+ const e1 = hot('---a---b---c----d----e----f----g----h----i----(k|)');
+ // --------------------*--------------------*---- start interval
+ // ---------------------| timespans
+ // ---------------------|
+ // -----|
+ const t = time( '---------------------|');
+ const interval = time( '--------------------|');
+ const expected = '----------------x-------------------y---------(z|)';
+ const values = {
+ x: ['a', 'b', 'c', 'd'],
+ y: ['e', 'f', 'g', 'h'],
+ z: ['i', 'k']
+ };
+
+ const result = e1.bufferTime(t, interval, 4, rxTestScheduler);
expectObservable(result).toBe(expected, values);
});
@@ -81,7 +138,7 @@ describe('Observable.prototype.bufferTime', () => {
f: []
};
- const result = e1.bufferTime(t, interval, rxTestScheduler);
+ const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
@@ -107,7 +164,7 @@ describe('Observable.prototype.bufferTime', () => {
e: []
};
- const result = e1.bufferTime(t, interval, rxTestScheduler);
+ const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result).toBe(expected, values);
});
@@ -127,7 +184,7 @@ describe('Observable.prototype.bufferTime', () => {
a: ['2', '3', '4']
};
- const result = e1.bufferTime(t, interval, rxTestScheduler);
+ const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(subs);
@@ -150,7 +207,7 @@ describe('Observable.prototype.bufferTime', () => {
const result = e1
.mergeMap((x: any) => Observable.of(x))
- .bufferTime(t, interval, rxTestScheduler)
+ .bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler)
.mergeMap((x: any) => Observable.of(x));
expectObservable(result, unsub).toBe(expected, values);
@@ -164,7 +221,7 @@ describe('Observable.prototype.bufferTime', () => {
const values = { b: [] };
const t = time('----------|');
- const result = e1.bufferTime(t, null, rxTestScheduler);
+ const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
@@ -176,7 +233,7 @@ describe('Observable.prototype.bufferTime', () => {
const t = time( '----------|');
const expected = '----------a---------a---------a---------a----';
- const result = e1.bufferTime(t, null, rxTestScheduler);
+ const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result, unsub).toBe(expected, { a: [] });
});
@@ -186,7 +243,7 @@ describe('Observable.prototype.bufferTime', () => {
const expected = '#';
const t = time('----------|');
- const result = e1.bufferTime(t, null, rxTestScheduler);
+ const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result).toBe(expected, undefined, new Error('haha'));
});
@@ -200,7 +257,7 @@ describe('Observable.prototype.bufferTime', () => {
w: ['a', 'b']
};
- const result = e1.bufferTime(t, null, rxTestScheduler);
+ const result = e1.bufferTime(t, null, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
@@ -222,7 +279,7 @@ describe('Observable.prototype.bufferTime', () => {
y: ['e', 'f', 'g', 'h', 'i']
};
- const result = e1.bufferTime(t, interval, rxTestScheduler);
+ const result = e1.bufferTime(t, interval, Number.POSITIVE_INFINITY, rxTestScheduler);
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
diff --git a/src/operator/bufferTime.ts b/src/operator/bufferTime.ts
index d8531e58a7..cbd4646759 100644
--- a/src/operator/bufferTime.ts
+++ b/src/operator/bufferTime.ts
@@ -1,9 +1,11 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
+import {Subscription} from '../Subscription';
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {async} from '../scheduler/async';
+import {isScheduler} from '../util/isScheduler';
/**
* Buffers the source Observable values for a specific time period.
@@ -18,7 +20,9 @@ import {async} from '../scheduler/async';
* resets the buffer every `bufferTimeSpan` milliseconds. If
* `bufferCreationInterval` is given, this operator opens the buffer every
* `bufferCreationInterval` milliseconds and closes (emits and resets) the
- * buffer every `bufferTimeSpan` milliseconds.
+ * buffer every `bufferTimeSpan` milliseconds. When the optional argument
+ * `maxBufferSize` is specified, the buffer will be closed either after
+ * `bufferTimeSpan` milliseconds or when it contains `maxBufferSize` elements.
*
* @example
Every second, emit an array of the recent click events
* var clicks = Rx.Observable.fromEvent(document, 'click');
@@ -39,35 +43,60 @@ import {async} from '../scheduler/async';
* @param {number} bufferTimeSpan The amount of time to fill each buffer array.
* @param {number} [bufferCreationInterval] The interval at which to start new
* buffers.
+ * @param {number} [maxBufferSize] The maximum buffer size.
* @param {Scheduler} [scheduler=async] The scheduler on which to schedule the
* intervals that determine buffer boundaries.
* @return {Observable} An observable of arrays of buffered values.
* @method bufferTime
* @owner Observable
*/
-export function bufferTime(bufferTimeSpan: number,
- bufferCreationInterval: number = null,
- scheduler: Scheduler = async): Observable {
- return this.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, scheduler));
+export function bufferTime(bufferTimeSpan: number): Observable {
+ let length: number = arguments.length;
+
+ let scheduler: Scheduler = async;
+ if (isScheduler(arguments[arguments.length - 1])) {
+ scheduler = arguments[arguments.length - 1];
+ length--;
+ }
+
+ let bufferCreationInterval: number = null;
+ if (length >= 2) {
+ bufferCreationInterval = arguments[1];
+ }
+
+ let maxBufferSize: number = Number.POSITIVE_INFINITY;
+ if (length >= 3) {
+ maxBufferSize = arguments[2];
+ }
+
+ return this.lift(new BufferTimeOperator(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
}
export interface BufferTimeSignature {
- (bufferTimeSpan: number, bufferCreationInterval?: number, scheduler?: Scheduler): Observable;
+ (bufferTimeSpan: number, scheduler?: Scheduler): Observable;
+ (bufferTimeSpan: number, bufferCreationInterval: number, scheduler?: Scheduler): Observable;
+ (bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: Scheduler): Observable;
}
class BufferTimeOperator implements Operator {
constructor(private bufferTimeSpan: number,
private bufferCreationInterval: number,
+ private maxBufferSize: number,
private scheduler: Scheduler) {
}
call(subscriber: Subscriber, source: any): any {
return source._subscribe(new BufferTimeSubscriber(
- subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.scheduler
+ subscriber, this.bufferTimeSpan, this.bufferCreationInterval, this.maxBufferSize, this.scheduler
));
}
}
+class Context {
+ buffer: T[] = [];
+ closeAction: Subscription;
+}
+
type CreationState = {
bufferTimeSpan: number;
bufferCreationInterval: number,
@@ -81,93 +110,121 @@ type CreationState = {
* @extends {Ignored}
*/
class BufferTimeSubscriber extends Subscriber {
- private buffers: Array = [];
+ private contexts: Array> = [];
+ private timespanOnly: boolean;
constructor(destination: Subscriber,
private bufferTimeSpan: number,
private bufferCreationInterval: number,
+ private maxBufferSize: number,
private scheduler: Scheduler) {
super(destination);
- const buffer = this.openBuffer();
- if (bufferCreationInterval !== null && bufferCreationInterval >= 0) {
- const closeState = { subscriber: this, buffer };
+ const context = this.openContext();
+ this.timespanOnly = bufferCreationInterval == null || bufferCreationInterval < 0;
+ if (this.timespanOnly) {
+ const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
+ this.add(context.closeAction = scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
+ } else {
+ const closeState = { subscriber: this, context };
const creationState: CreationState = { bufferTimeSpan, bufferCreationInterval, subscriber: this, scheduler };
- this.add(scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
+ this.add(context.closeAction = scheduler.schedule(dispatchBufferClose, bufferTimeSpan, closeState));
this.add(scheduler.schedule(dispatchBufferCreation, bufferCreationInterval, creationState));
- } else {
- const timeSpanOnlyState = { subscriber: this, buffer, bufferTimeSpan };
- this.add(scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
}
}
protected _next(value: T) {
- const buffers = this.buffers;
- const len = buffers.length;
+ const contexts = this.contexts;
+ const len = contexts.length;
+ let filledBufferContext: Context;
for (let i = 0; i < len; i++) {
- buffers[i].push(value);
+ const context = contexts[i];
+ const buffer = context.buffer;
+ buffer.push(value);
+ if (buffer.length == this.maxBufferSize) {
+ filledBufferContext = context;
+ }
+ }
+
+ if (filledBufferContext) {
+ this.onBufferFull(filledBufferContext);
}
}
protected _error(err: any) {
- this.buffers.length = 0;
+ this.contexts.length = 0;
super._error(err);
}
protected _complete() {
- const { buffers, destination } = this;
- while (buffers.length > 0) {
- destination.next(buffers.shift());
+ const { contexts, destination } = this;
+ while (contexts.length > 0) {
+ const context = contexts.shift();
+ destination.next(context.buffer);
}
super._complete();
}
protected _unsubscribe() {
- this.buffers = null;
+ this.contexts = null;
+ }
+
+ protected onBufferFull(context: Context) {
+ this.closeContext(context);
+ const closeAction = context.closeAction;
+ closeAction.unsubscribe();
+ this.remove(closeAction);
+
+ if (this.timespanOnly) {
+ context = this.openContext();
+ const bufferTimeSpan = this.bufferTimeSpan;
+ const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
+ this.add(context.closeAction = this.scheduler.schedule(dispatchBufferTimeSpanOnly, bufferTimeSpan, timeSpanOnlyState));
+ }
}
- openBuffer(): T[] {
- let buffer: T[] = [];
- this.buffers.push(buffer);
- return buffer;
+ openContext(): Context {
+ let context: Context = new Context();
+ this.contexts.push(context);
+ return context;
}
- closeBuffer(buffer: T[]) {
- this.destination.next(buffer);
- const buffers = this.buffers;
- buffers.splice(buffers.indexOf(buffer), 1);
+ closeContext(context: Context) {
+ this.destination.next(context.buffer);
+ const contexts = this.contexts;
+ contexts.splice(contexts.indexOf(context), 1);
}
}
function dispatchBufferTimeSpanOnly(state: any) {
const subscriber: BufferTimeSubscriber = state.subscriber;
- const prevBuffer = state.buffer;
- if (prevBuffer) {
- subscriber.closeBuffer(prevBuffer);
+ const prevContext = state.context;
+ if (prevContext) {
+ subscriber.closeContext(prevContext);
}
- state.buffer = subscriber.openBuffer();
+ state.context = subscriber.openContext();
if (!subscriber.isUnsubscribed) {
- (this).schedule(state, state.bufferTimeSpan);
+ state.context.closeAction = (this).schedule(state, state.bufferTimeSpan);
}
}
interface DispatchArg {
subscriber: BufferTimeSubscriber;
- buffer: Array;
+ context: Context;
}
function dispatchBufferCreation(state: CreationState) {
const { bufferCreationInterval, bufferTimeSpan, subscriber, scheduler } = state;
- const buffer = subscriber.openBuffer();
+ const context = subscriber.openContext();
const action = >>this;
if (!subscriber.isUnsubscribed) {
- action.add(scheduler.schedule>(dispatchBufferClose, bufferTimeSpan, { subscriber, buffer }));
+ subscriber.add(context.closeAction = scheduler.schedule>(dispatchBufferClose, bufferTimeSpan, { subscriber, context }));
action.schedule(state, bufferCreationInterval);
}
}
function dispatchBufferClose(arg: DispatchArg) {
- const { subscriber, buffer } = arg;
- subscriber.closeBuffer(buffer);
+ const { subscriber, context } = arg;
+ subscriber.closeContext(context);
}