Skip to content

Commit

Permalink
Merge branch 'master' into cache-adapter-operator
Browse files Browse the repository at this point in the history
  • Loading branch information
tklever authored Sep 27, 2016
2 parents 397b38a + e20cdb7 commit 8393bb9
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 14 deletions.
84 changes: 84 additions & 0 deletions spec/observables/interval-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import * as Rx from '../../dist/cjs/Rx';
declare const {hot, asDiagram, expectObservable, expectSubscriptions};
declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;
const asap = Rx.Scheduler.asap;
const queue = Rx.Scheduler.queue;
const animationFrame = Rx.Scheduler.animationFrame;

/** @test {interval} */
describe('Observable.interval', () => {
Expand Down Expand Up @@ -66,4 +69,85 @@ describe('Observable.interval', () => {
done(new Error('should not be called'));
});
});

it('should create an observable emitting periodically with the AsapScheduler', (done: MochaDone) => {
const sandbox = sinon.sandbox.create();
const fakeTimer = sandbox.useFakeTimers();
const interval = 10;
const events = [0, 1, 2, 3, 4, 5];
const source = Observable.interval(interval, asap).take(6);
source.subscribe({
next(x) {
expect(x).to.equal(events.shift());
},
error(e) {
sandbox.restore();
done(e);
},
complete() {
expect(asap.actions.length).to.equal(0);
expect(asap.scheduled).to.equal(undefined);
sandbox.restore();
done();
}
});
let i = -1, n = events.length;
while (++i < n) {
fakeTimer.tick(interval);
}
});

it('should create an observable emitting periodically with the QueueScheduler', (done: MochaDone) => {
const sandbox = sinon.sandbox.create();
const fakeTimer = sandbox.useFakeTimers();
const interval = 10;
const events = [0, 1, 2, 3, 4, 5];
const source = Observable.interval(interval, queue).take(6);
source.subscribe({
next(x) {
expect(x).to.equal(events.shift());
},
error(e) {
sandbox.restore();
done(e);
},
complete() {
expect(queue.actions.length).to.equal(0);
expect(queue.scheduled).to.equal(undefined);
sandbox.restore();
done();
}
});
let i = -1, n = events.length;
while (++i < n) {
fakeTimer.tick(interval);
}
});

it('should create an observable emitting periodically with the AnimationFrameScheduler', (done: MochaDone) => {
const sandbox = sinon.sandbox.create();
const fakeTimer = sandbox.useFakeTimers();
const interval = 10;
const events = [0, 1, 2, 3, 4, 5];
const source = Observable.interval(interval, animationFrame).take(6);
source.subscribe({
next(x) {
expect(x).to.equal(events.shift());
},
error(e) {
sandbox.restore();
done(e);
},
complete() {
expect(animationFrame.actions.length).to.equal(0);
expect(animationFrame.scheduled).to.equal(undefined);
sandbox.restore();
done();
}
});
let i = -1, n = events.length;
while (++i < n) {
fakeTimer.tick(interval);
}
});
});
16 changes: 16 additions & 0 deletions spec/operators/take-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Subject = Rx.Subject;
const Observable = Rx.Observable;

/** @test {take} */
Expand Down Expand Up @@ -135,4 +136,19 @@ describe('Observable.prototype.take', () => {

source.subscribe();
});

it('should complete when the source is reentrant', () => {
let completed = false;
const source = new Subject();
source.take(5).subscribe({
next() {
source.next();
},
complete() {
completed = true;
}
});
source.next();
expect(completed).to.be.true;
});
});
16 changes: 16 additions & 0 deletions spec/schedulers/AnimationFrameScheduler-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {expect} from 'chai';
import * as sinon from 'sinon';
import * as Rx from '../../dist/cjs/Rx';

const animationFrame = Rx.Scheduler.animationFrame;
Expand All @@ -9,6 +10,21 @@ describe('Scheduler.animationFrame', () => {
expect(animationFrame).exist;
});

it('should act like the async scheduler if delay > 0', () => {
let actionHappened = false;
const sandbox = sinon.sandbox.create();
const fakeTimer = sandbox.useFakeTimers();
animationFrame.schedule(() => {
actionHappened = true;
}, 50);
expect(actionHappened).to.be.false;
fakeTimer.tick(25);
expect(actionHappened).to.be.false;
fakeTimer.tick(25);
expect(actionHappened).to.be.true;
sandbox.restore();
});

it('should schedule an action to happen later', (done: MochaDone) => {
let actionHappened = false;
animationFrame.schedule(() => {
Expand Down
16 changes: 16 additions & 0 deletions spec/schedulers/AsapScheduler-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {expect} from 'chai';
import * as sinon from 'sinon';
import * as Rx from '../../dist/cjs/Rx';

const asap = Rx.Scheduler.asap;
Expand All @@ -9,6 +10,21 @@ describe('Scheduler.asap', () => {
expect(asap).exist;
});

it('should act like the async scheduler if delay > 0', () => {
let actionHappened = false;
const sandbox = sinon.sandbox.create();
const fakeTimer = sandbox.useFakeTimers();
asap.schedule(() => {
actionHappened = true;
}, 50);
expect(actionHappened).to.be.false;
fakeTimer.tick(25);
expect(actionHappened).to.be.false;
fakeTimer.tick(25);
expect(actionHappened).to.be.true;
sandbox.restore();
});

it('should schedule an action to happen later', (done: MochaDone) => {
let actionHappened = false;
asap.schedule(() => {
Expand Down
15 changes: 15 additions & 0 deletions spec/schedulers/QueueScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ const queue = Scheduler.queue;

/** @test {Scheduler} */
describe('Scheduler.queue', () => {
it('should act like the async scheduler if delay > 0', () => {
let actionHappened = false;
const sandbox = sinon.sandbox.create();
const fakeTimer = sandbox.useFakeTimers();
queue.schedule(() => {
actionHappened = true;
}, 50);
expect(actionHappened).to.be.false;
fakeTimer.tick(25);
expect(actionHappened).to.be.false;
fakeTimer.tick(25);
expect(actionHappened).to.be.true;
sandbox.restore();
});

it('should switch from synchronous to asynchronous at will', () => {
const sandbox = sinon.sandbox.create();
const fakeTimer = sandbox.useFakeTimers();
Expand Down
3 changes: 1 addition & 2 deletions src/operator/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ export function mergeStatic<T, R>(...observables: (ObservableInput<any> | Schedu
* @see {@link mergeMapTo}
* @see {@link mergeScan}
*
* @param {Observable} input1 An input Observable to merge with others.
* @param {Observable} input2 An input Observable to merge with others.
* @param {...Observable} observables Input Observables to merge together.
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
* Observables being subscribed to concurrently.
* @param {Scheduler} [scheduler=null] The Scheduler to use for managing
Expand Down
5 changes: 3 additions & 2 deletions src/operator/take.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ class TakeSubscriber<T> extends Subscriber<T> {

protected _next(value: T): void {
const total = this.total;
if (++this.count <= total) {
const count = ++this.count;
if (count <= total) {
this.destination.next(value);
if (this.count === total) {
if (count === total) {
this.destination.complete();
this.unsubscribe();
}
Expand Down
6 changes: 4 additions & 2 deletions src/scheduler/AnimationFrameAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ export class AnimationFrameAction<T> extends AsyncAction<T> {
));
}
protected recycleAsyncId(scheduler: AnimationFrameScheduler, id?: any, delay: number = 0): any {
// If delay exists and is greater than 0, recycle as an async action.
if (delay !== null && delay > 0) {
// If delay exists and is greater than 0, or if the delay is null (the
// action wasn't rescheduled) but was originally scheduled as an async
// action, then recycle as an async action.
if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
return super.recycleAsyncId(scheduler, id, delay);
}
// If the scheduler queue is empty, cancel the requested animation frame and
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/AnimationFrameScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AsyncAction } from './AsyncAction';
import { AsyncScheduler } from './AsyncScheduler';

export class AnimationFrameScheduler extends AsyncScheduler {
public flush(): void {
public flush(action?: AsyncAction<any>): void {

this.active = true;
this.scheduled = undefined;
Expand All @@ -11,7 +11,7 @@ export class AnimationFrameScheduler extends AsyncScheduler {
let error: any;
let index: number = -1;
let count: number = actions.length;
let action: AsyncAction<any> = actions.shift();
action = action || actions.shift();

do {
if (error = action.execute(action.state, action.delay)) {
Expand Down
6 changes: 4 additions & 2 deletions src/scheduler/AsapAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ export class AsapAction<T> extends AsyncAction<T> {
));
}
protected recycleAsyncId(scheduler: AsapScheduler, id?: any, delay: number = 0): any {
// If delay exists and is greater than 0, recycle as an async action.
if (delay !== null && delay > 0) {
// If delay exists and is greater than 0, or if the delay is null (the
// action wasn't rescheduled) but was originally scheduled as an async
// action, then recycle as an async action.
if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
return super.recycleAsyncId(scheduler, id, delay);
}
// If the scheduler queue is empty, cancel the requested microtask and
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/AsapScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AsyncAction } from './AsyncAction';
import { AsyncScheduler } from './AsyncScheduler';

export class AsapScheduler extends AsyncScheduler {
public flush(): void {
public flush(action?: AsyncAction<any>): void {

this.active = true;
this.scheduled = undefined;
Expand All @@ -11,7 +11,7 @@ export class AsapScheduler extends AsyncScheduler {
let error: any;
let index: number = -1;
let count: number = actions.length;
let action: AsyncAction<any> = actions.shift();
action = action || actions.shift();

do {
if (error = action.execute(action.state, action.delay)) {
Expand Down
6 changes: 4 additions & 2 deletions src/scheduler/QueueAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ export class QueueAction<T> extends AsyncAction<T> {
}

protected requestAsyncId(scheduler: QueueScheduler, id?: any, delay: number = 0): any {
// If delay is greater than 0, enqueue as an async action.
if (delay !== null && delay > 0) {
// If delay exists and is greater than 0, or if the delay is null (the
// action wasn't rescheduled) but was originally scheduled as an async
// action, then recycle as an async action.
if ((delay !== null && delay > 0) || (delay === null && this.delay > 0)) {
return super.requestAsyncId(scheduler, id, delay);
}
// Otherwise flush the scheduler starting with this action.
Expand Down

0 comments on commit 8393bb9

Please sign in to comment.