Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(operator): add timeout and timeoutWith #245

Merged
merged 1 commit into from
Sep 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions spec/operators/timeout-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.timeout', function () {
it('should timeout after a specified delay', function (done) {
Observable.never().timeout(100)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
expect(err.message).toBe('timeout');
done();
}, function () {
throw 'should not complete';
});
}, 2000);

it('should timeout after a delay and send the passed error', function (done) {
Observable.never().timeout(100, 'hello')
.subscribe(function () {
throw 'should not next';
}, function (err) {
expect(err).toBe('hello');
done();
}, function () {
throw 'should not complete';
})
});


it('should timeout at a specified Date', function (done) {
var date = new Date(Date.now() + 100);

Observable.never().timeout(date)
.subscribe(function (x) {
throw 'should not next';
}, function (err) {
expect(err.message).toBe('timeout');
done();
}, function () {
throw 'should not complete';
});
}, 2000);
});
23 changes: 23 additions & 0 deletions spec/operators/timeoutWith-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.timeoutWith', function () {
it('should timeout after a specified delay then subscribe to the passed observable', function (done) {
var expected = [1, 2, 3];
Observable.never().timeoutWith(100, Observable.of(1,2,3))
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
}, 2000);


it('should timeout at a specified date then subscribe to the passed observable', function (done) {
var expected = [1, 2, 3];
var date = new Date(Date.now() + 100);
Observable.never().timeoutWith(date, Observable.of(1,2,3))
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
}, 2000);
});
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,6 @@ export default class Observable<T> {
bufferCount: <T>(bufferSize: number, startBufferEvery: number) => Observable<T[]>;

finally: (ensure: () => void, thisArg?: any) => Observable<T>;
timeout: <T>(due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith: <T>(due: number|Date, withObservable: Observable<any>, scheduler?: Scheduler) => Observable<T>;
}
4 changes: 4 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,12 @@ observableProto.retryWhen = retryWhen;
observableProto.repeat = repeat;

import _finally from './operators/finally';
import timeout from './operators/timeout';
import timeoutWith from './operators/timeoutWith';

observableProto.finally = _finally;
observableProto.timeout = timeout;
observableProto.timeoutWith = timeoutWith;

import groupBy from './operators/groupBy';
import window from './operators/window';
Expand Down
39 changes: 39 additions & 0 deletions src/operators/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Scheduler from '../Scheduler';
import Subscription from '../Subscription';
import isDate from '../util/isDate';

export default function timeout(due: number|Date, errorToSend: any = null, scheduler: Scheduler = Scheduler.immediate) {
let waitFor = isDate(due) ? (+due - Date.now()) : <number>due;
return this.lift(new TimeoutOperator(waitFor, errorToSend, scheduler));
}

class TimeoutOperator<T, R> implements Operator<T, R> {
constructor(private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
}

call(observer: Observer<R>) {
return new TimeoutSubscriber(observer, this.waitFor, this.errorToSend, this.scheduler);
}
}

class TimeoutSubscriber<T> extends Subscriber<T> {
timeoutSubscription: Subscription<any>;

constructor(destination: Observer<T>, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
super(destination);
let delay = waitFor;
scheduler.schedule(delay, { subscriber: this }, dispatchTimeout);
}

sendTimeoutError() {
this.error(this.errorToSend || new Error('timeout'));
}
}

function dispatchTimeout<T>(state: { subscriber: TimeoutSubscriber<T> }) {
const subscriber = state.subscriber;
subscriber.sendTimeoutError();
}
41 changes: 41 additions & 0 deletions src/operators/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Scheduler from '../Scheduler';
import Subscription from '../Subscription';
import Observable from '../Observable';
import isDate from '../util/isDate';

export default function timeoutWith(due: number|Date, withObservable: Observable<any>, scheduler: Scheduler = Scheduler.immediate) {
let waitFor = isDate(due) ? (+due - Date.now()) : <number>due;
return this.lift(new TimeoutWithOperator(waitFor, withObservable, scheduler));
}

class TimeoutWithOperator<T, R> implements Operator<T, R> {
constructor(private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) {
}

call(observer: Observer<R>) {
return new TimeoutWithSubscriber(observer, this.waitFor, this.withObservable, this.scheduler);
}
}

class TimeoutWithSubscriber<T> extends Subscriber<T> {
timeoutSubscription: Subscription<any>;

constructor(destination: Observer<T>, private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) {
super(destination);
let delay = waitFor;
scheduler.schedule(delay, { subscriber: this }, dispatchTimeout);
}

handleTimeout() {
const withObservable = this.withObservable;
this.add(withObservable.subscribe(this));
}
}

function dispatchTimeout<T>(state: { subscriber: TimeoutWithSubscriber<T> }) {
const subscriber = state.subscriber;
subscriber.handleTimeout();
}
3 changes: 3 additions & 0 deletions src/util/isDate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export default function isDate(value) {
return value instanceof Date && !isNaN(+value);
}