Skip to content

Commit

Permalink
fix(delayWhen): no longer emits if duration selector is empty (#5769)
Browse files Browse the repository at this point in the history
- Resolves an issue where a duration selector that never emitted a value would cause the value to be emitted, even though there wasn't a "notification".
- `delayWhen` is now based on `mergeMap`. I started off with trying to use `mergeInternals`, but quickly realized it was doing the same thing as `mergeMap`, and I could just use that directly.

I'm not sure if this size optimization is "taking it too far", but I feel that `delayWhen` isn't exactly "hot path" code, so this optimization is fine, given the bundle savings it should yield.

BREAKING CHANGE: `delayWhen` will no longer emit if the duration selector simply completes without a value. Notifiers must notify with a value, not a completion.

fixes #3665
  • Loading branch information
benlesh authored Oct 20, 2020
1 parent 1cc7a5d commit 0872341
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 86 deletions.
25 changes: 7 additions & 18 deletions spec/operators/delayWhen-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ describe('delayWhen operator', () => {
expectSubscriptions(selector.subscriptions).toBe(selectorSubs);
});

it('should delay by selector completes if selector does not emits', () => {
it('should delay, but not emit if the selector never emits a notification', () => {
const e1 = hot('--a--b--|');
const expected = '------a--(b|)';
const expected = '-----------|';
const subs = '^ !';
const selector = cold( '----|');
const selectorSubs = [' ^ !',
' ^ !'];
const selector = cold( '------|');
const selectorSubs = [' ^ !',
' ^ !'];

const result = e1.pipe(delayWhen((x: any) => selector));

Expand All @@ -106,9 +106,9 @@ describe('delayWhen operator', () => {
expectSubscriptions(selector.subscriptions).toBe(selectorSubs);
});

it('should emit if the selector completes synchronously', () => {
it('should not emit for async source and sync empty selector', () => {
const e1 = hot('a--|');
const expected = 'a--|';
const expected = '---|';
const subs = '^ !';

const result = e1.pipe(delayWhen((x: any) => EMPTY));
Expand All @@ -117,17 +117,6 @@ describe('delayWhen operator', () => {
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should emit if the source completes synchronously and the selector completes synchronously', () => {
const e1 = hot('(a|)');
const expected = '(a|)';
const subs = '(^!)';

const result = e1.pipe(delayWhen((x: any) => EMPTY));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should not emit if selector never emits', () => {
const e1 = hot('--a--b--|');
const expected = '-';
Expand Down
71 changes: 3 additions & 68 deletions src/internal/operators/delayWhen.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
/** @prettier */
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction } from '../types';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { concat } from '../observable/concat';
import { take } from './take';
import { ignoreElements } from './ignoreElements';
import { mapTo } from './mapTo';
import { mergeMap } from './mergeMap';

/* tslint:disable:max-line-length */
/** @deprecated In future versions, empty notifiers will no longer re-emit the source value on the output observable. */
export function delayWhen<T>(
delayDurationSelector: (value: T, index: number) => Observable<never>,
Expand All @@ -21,7 +20,6 @@ export function delayWhen<T>(
export function delayWhen<T>(
delayDurationSelector: (value: T, index: number) => Observable<any>
): MonoTypeOperatorFunction<T>;
/* tslint:disable:max-line-length */

/**
* Delays the emission of items from the source Observable by a given time span
Expand Down Expand Up @@ -91,68 +89,5 @@ export function delayWhen<T>(
concat(subscriptionDelay.pipe(take(1), ignoreElements()), source.pipe(delayWhen(delayDurationSelector)));
}

return operate((source, subscriber) => {
// An index to give to the projection function.
let index = 0;
// Whether or not the source has completed.
let isComplete = false;
// Tracks the number of actively delayed values we have.
let active = 0;

/**
* Checks to see if we can complete the result and completes it, if so.
*/
const checkComplete = () => isComplete && !active && subscriber.complete();

source.subscribe(
new OperatorSubscriber(
subscriber,
(value: T) => {
// Closed bit to guard reentrancy and
// synchronous next/complete (which both make the same calls right now)
let closed = false;

/**
* Notifies the consumer of the value.
*/
const notify = () => {
// Notify the consumer.
subscriber.next(value);

// Ensure our inner subscription is cleaned up
// as soon as possible. Once the first `next` fires,
// we have no more use for this subscription.
durationSubscriber?.unsubscribe();

if (!closed) {
active--;
closed = true;
checkComplete();
}
};

// We have to capture our duration subscriber so we can unsubscribe from
// it on the first next notification it gives us.
const durationSubscriber = new OperatorSubscriber(
subscriber,
notify,
// Errors are sent to consumer.
undefined,
// TODO(benlesh): I'm inclined to say this is _incorrect_ behavior.
// A completion should not be a notification. Note the deprecation above
notify
);

active++;
delayDurationSelector(value, index++).subscribe(durationSubscriber);
},
// Errors are passed through to consumer.
undefined,
() => {
isComplete = true;
checkComplete();
}
)
);
});
return mergeMap((value, index) => delayDurationSelector(value, index).pipe(take(1), mapTo(value)));
}

0 comments on commit 0872341

Please sign in to comment.