Skip to content

Commit

Permalink
Merge branch 'master' into fix_tutorial_basics
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh authored Oct 4, 2016
2 parents e58a4ab + 1ba0358 commit 197e2ac
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 7 deletions.
19 changes: 19 additions & 0 deletions spec/operators/bufferTime-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,23 @@ describe('Observable.prototype.bufferTime', () => {
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should not have errors when take follows and maxBufferSize is provided', () => {
const tick = 10;
const bufferTime = 50;
const expected = '-----a----b----c----d----(e|)';
const values = {
a: [0, 1, 2, 3],
b: [4, 5, 6, 7, 8],
c: [9, 10, 11, 12, 13],
d: [14, 15, 16, 17, 18],
e: [19, 20, 21, 22, 23]
};

const source = Rx.Observable.interval(tick, rxTestScheduler)
.bufferTime(bufferTime, null, 10, rxTestScheduler)
.take(5);

expectObservable(source).toBe(expected, values);
});
});
2 changes: 1 addition & 1 deletion spec/operators/timeout-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const Observable = Rx.Observable;

/** @test {timeout} */
describe('Observable.prototype.timeout', () => {
const defaultTimeoutError = new Error('timeout');
const defaultTimeoutError = new Rx.TimeoutError();

asDiagram('timeout(50)')('should timeout after a specified timeout period', () => {
const e1 = cold('-------a--b--|');
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export {Notification} from './Notification';
export {EmptyError} from './util/EmptyError';
export {ArgumentOutOfRangeError} from './util/ArgumentOutOfRangeError';
export {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
export {TimeoutError} from './util/TimeoutError';
export {UnsubscriptionError} from './util/UnsubscriptionError';
export {TimeInterval} from './operator/timeInterval';
export {Timestamp} from './operator/timestamp';
Expand Down
2 changes: 1 addition & 1 deletion src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {
closeAction.unsubscribe();
this.remove(closeAction);

if (this.timespanOnly) {
if (!this.closed && this.timespanOnly) {
context = this.openContext();
const bufferTimeSpan = this.bufferTimeSpan;
const timeSpanOnlyState = { subscriber: this, context, bufferTimeSpan };
Expand Down
11 changes: 6 additions & 5 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Subscriber } from '../Subscriber';
import { Scheduler } from '../Scheduler';
import { Observable } from '../Observable';
import { TeardownLogic } from '../Subscription';
import { TimeoutError } from '../util/TimeoutError';

/**
* @param due
Expand Down Expand Up @@ -80,25 +81,25 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
this._previousIndex = currentIndex;
}

protected _next(value: T) {
protected _next(value: T): void {
this.destination.next(value);

if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
}

protected _error(err: any) {
protected _error(err: any): void {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete() {
protected _complete(): void {
this.destination.complete();
this._hasCompleted = true;
}

notifyTimeout() {
this.error(this.errorToSend || new Error('timeout'));
notifyTimeout(): void {
this.error(this.errorToSend || new TimeoutError());
}
}
15 changes: 15 additions & 0 deletions src/util/TimeoutError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* An error thrown when duetime elapses.
*
* @see {@link timeout}
*
* @class TimeoutError
*/
export class TimeoutError extends Error {
constructor() {
const err: any = super('Timeout has occurred');
(<any> this).name = err.name = 'TimeoutError';
(<any> this).stack = err.stack;
(<any> this).message = err.message;
}
}

0 comments on commit 197e2ac

Please sign in to comment.