Skip to content

Commit

Permalink
fix(delay): Now properly handles Date and negative numbers
Browse files Browse the repository at this point in the history
- Resolves an issue where passing a Date would cause every value to be scheduled incorrectly, instead of delaying all values until the absolute date -- as documented.
- Resolves an issue where negative numbers were treated as though they were positive numbers
- Reduces operator sizes
- Adds comments and formatting
- Updates tests to reflect better behavior
- Adds test for Dates in the past.
  • Loading branch information
benlesh committed Sep 11, 2020
1 parent 6416935 commit 7f2b1e4
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 110 deletions.
75 changes: 52 additions & 23 deletions spec/operators/delay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { expect } from 'chai';
import { observableMatcher } from '../helpers/observableMatcher';

/** @test {delay} */
describe('delay operator', () => {
describe('delay', () => {
let testScheduler: TestScheduler;

beforeEach(() => {
Expand All @@ -27,12 +27,26 @@ describe('delay operator', () => {
});
});

it('should delay by absolute time period', () => {
it('should not delay at all if the delay number is negative', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --a--b--| ');
const t = 3; // ---|
const expected = '-----a--(b|)';
const subs = ' ^-------! ';
const e1 = hot('---a--b--|');
const t = -1; // --|
const expected = '---a--b--|';
const subs = '^--------!';

const result = e1.pipe(delay(t, testScheduler));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should delay by absolute time period', () => {
testScheduler.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --a--a---a----a----a------------b---b---b---b--|');
const t = time(' --------------------|');
const expected = '--------------------(aaaaa)-----b---b---b---b--|';
const subs = ' ^----------------------------------------------!';

const absoluteDelay = new Date(testScheduler.now() + t);
const result = e1.pipe(delay(absoluteDelay, testScheduler));
Expand All @@ -42,12 +56,27 @@ describe('delay operator', () => {
});
});

it('should delay by absolute time period after subscription', () => {
it('should not delay at all if the absolute time is in the past', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' ---^--a--b--| ');
const t = 3; // ---|
const expected = ' ------a--(b|)';
const subs = ' ^--------! ';
const e1 = hot(' --a--a---a----a----a------------b---b---b---b--|');
const t = -10000;
const expected = '--a--a---a----a----a------------b---b---b---b--|';
const subs = ' ^----------------------------------------------!';

const absoluteDelay = new Date(testScheduler.now() + t);
const result = e1.pipe(delay(absoluteDelay, testScheduler));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});
});

it('should delay by absolute time period after source ends', () => {
testScheduler.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' ---^--a-----a---a-----a---|');
const t = time(' ------------------------------|');
const expected = ' ------------------------------(aaaa|)';
const subs = ' ^----------------------! ';

const absoluteDelay = new Date(testScheduler.now() + t);
const result = e1.pipe(delay(absoluteDelay, testScheduler));
Expand All @@ -71,12 +100,12 @@ describe('delay operator', () => {
});
});

it('should raise error when source raises error', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --a--b--#');
const t = 3; // ---|
const expected = '-----a--#';
const subs = ' ^-------!';
it('should raise error when source raises error before absolute delay fires', () => {
testScheduler.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --a--a---a-----#');
const t = time(' --------------------|')
const expected = '---------------#';
const subs = ' ^--------------!';

const absoluteDelay = new Date(testScheduler.now() + t);
const result = e1.pipe(delay(absoluteDelay, testScheduler));
Expand All @@ -86,12 +115,12 @@ describe('delay operator', () => {
});
});

it('should raise error when source raises error after subscription', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' ---^---a---b---#');
const t = 3; // ---|
const expected = ' -------a---b#';
const e1Sub = ' ^-----------!';
it('should raise error when source raises error after absolute delay fires', () => {
testScheduler.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' ---^---a--a---a---a--------b---b---b--#');
const t = time(' -----------------|');
const expected = ' -----------------(aaaa)-b---b---b--#';
const e1Sub = ' ^----------------------------------!';

const absoluteDelay = new Date(testScheduler.now() + t);
const result = e1.pipe(delay(absoluteDelay, testScheduler));
Expand Down
179 changes: 92 additions & 87 deletions src/internal/operators/delay.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { async } from '../scheduler/async';
/** @prettier */
import { asyncScheduler } from '../scheduler/async';
import { isValidDate } from '../util/isDate';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import {
MonoTypeOperatorFunction,
SchedulerAction,
SchedulerLike,
TeardownLogic
} from '../types';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { lift } from '../util/lift';
import { Subscription } from '../Subscription';

/**
* Delays the emission of items from the source Observable by a given timeout or
Expand Down Expand Up @@ -59,96 +55,105 @@ import { lift } from '../util/lift';
* @return {Observable} An Observable that delays the emissions of the source
* Observable by the specified timeout or Date.
*/
export function delay<T>(delay: number | Date, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T> {
const delayFor = isValidDate(delay) ? +delay - scheduler.now() : Math.abs(delay);
return (source: Observable<T>) => lift(source, new DelayOperator(delayFor, scheduler));
}
export function delay<T>(delay: number | Date, scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
return lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;

class DelayOperator<T> implements Operator<T, T> {
constructor(private delay: number, private scheduler: SchedulerLike) {}
// This is a bit of code golf, but we're getting whether or not to use the absolute time
// scenario, as well as the actual delay value to schedule with in one line here.
let isAbsoluteTime: boolean;
const delayFor = (isAbsoluteTime = isValidDate(delay)) ? +delay - scheduler.now() : delay;

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
}
}
if (delayFor < 0) {
// If the delay is negative, or the absolute time is in the past, this
// is just a passthrough, and we don't need to do any more work than just connect it.
// Note that if the delay is actually 0, we want to do all of the work below, because
// the user might be trying to leverage the "0 behavior" of a given scheduler.
return source.subscribe(subscriber);
}

interface DelayState<T> {
source: DelaySubscriber<T>;
destination: Subscriber<T>;
scheduler: SchedulerLike;
}
// A flag for whether the source has completed. Used to
// check whether or not to complete when a scheduled delay fires
// after the source completed.
let isComplete = false;
// The next handler for the given scenario (absolute time or relative time)
let _next: (value: T) => void;
// Called when the source completes to see whether or not to complete right
// then and there. This is just a little different for each scenario.
let checkComplete: () => Boolean;
// This is captured so we can try to unsub from the source as soon as possible.
let delaySubscriber: DelaySubscriber<T>;

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DelaySubscriber<T> extends Subscriber<T> {
private queue: Array<DelayMessage<T>> = [];
private active: boolean = false;
if (isAbsoluteTime) {
// ABSOLUTE TIME SCENARIO
// In this scenario, we buffer all of the values until the absolute time
// expires. Then emit the buffered values. After that we just allow the
// values through normally.
let buffer: T[] | null = [];

private static dispatch<T>(this: SchedulerAction<DelayState<T>>, state: DelayState<T>): void {
const source = state.source;
const queue = source.queue;
const scheduler = state.scheduler;
const destination = state.destination;
// Schedule to emit the buffered values at the absolute time.
subscriber.add(
scheduler.schedule(() => {
// Emit the buffered values and null out the buffer
// buffer truthiness is checked later to see if we're still buffering
for (const value of buffer!) {
subscriber.next(value);
}
buffer = null;
if (isComplete) {
subscriber.complete();
}
}, delayFor)
);

while (queue.length > 0 && queue[0].time - scheduler.now() <= 0) {
destination.next(queue.shift()!.value);
}
_next = (value) => {
if (buffer) {
// The absolute time hasn't arrived yet. Buffer the value.
buffer.push(value);
} else {
// There's no buffer, because the absolute time has passed.
subscriber.next(value);
}
};

if (queue.length > 0) {
const delay = Math.max(0, queue[0].time - scheduler.now());
this.schedule(state, delay);
} else if (source.isStopped) {
source.destination.complete();
source.active = false;
} else {
this.unsubscribe();
source.active = false;
}
}
checkComplete = () => !buffer?.length;
} else {
// Relative time
let innerSubs: Subscription | null = null;

constructor(protected destination: Subscriber<T>, private delay: number, private scheduler: SchedulerLike) {
super(destination);
}
_next = (value) => {
subscriber.add(
(innerSubs = scheduler.schedule(() => {
innerSubs = null;
subscriber.next(value);
if (isComplete) {
subscriber.complete();
}
}, delayFor))
);
};

private _schedule(scheduler: SchedulerLike): void {
this.active = true;
const { destination } = this;
// TODO: The cast below seems like an issue with typings for SchedulerLike to me.
destination.add(
scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch as any, this.delay, {
source: this,
destination,
scheduler,
} as DelayState<T>)
);
}
checkComplete = () => !innerSubs;
}

protected _next(value: T) {
const scheduler = this.scheduler;
const message = new DelayMessage(scheduler.now() + this.delay, value);
this.queue.push(message);
if (this.active === false) {
this._schedule(scheduler);
}
}
delaySubscriber = new DelaySubscriber(subscriber, _next, () => {
isComplete = true;
if (checkComplete()) {
subscriber.complete();
}
delaySubscriber?.unsubscribe();
});

protected _error(err: any) {
this.queue.length = 0;
this.destination.error(err);
this.unsubscribe();
}

protected _complete() {
if (this.queue.length === 0) {
this.destination.complete();
}
this.unsubscribe();
}
return source.subscribe(delaySubscriber);
});
};
}

class DelayMessage<T> {
constructor(public readonly time: number, public readonly value: T) {}
// TODO: combine with other implementations

class DelaySubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>, protected _next: (value: T) => void, protected _complete: () => void) {
super(destination);
}
}

0 comments on commit 7f2b1e4

Please sign in to comment.