Skip to content

Commit

Permalink
feat(delayWhen): add index to the selector function (#2473)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj authored and benlesh committed Jul 26, 2018
1 parent d8231e2 commit 0979d31
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
22 changes: 21 additions & 1 deletion spec/operators/delayWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { of, EMPTY } from 'rxjs';
import { delayWhen } from 'rxjs/operators';
import { delayWhen, tap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { expect } from 'chai';
Expand Down Expand Up @@ -249,4 +249,24 @@ describe('delayWhen operator', () => {
expect(next).to.be.true;
expect(complete).to.be.true;
});

it('should call predicate with indices starting at 0', () => {
const e1 = hot('--a--b--c--|');
const expected = '--a--b--c--|';
const selector = cold('(x|)');

let indices: number[] = [];
const predicate = (value: string, index: number) => {
indices.push(index);
return selector;
};

const result = e1.pipe(delayWhen(predicate));

expectObservable(result.pipe(
tap(null, null, () => {
expect(indices).to.deep.equal([0, 1, 2]);
})
)).toBe(expected);
});
});
16 changes: 9 additions & 7 deletions src/internal/operators/delayWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types';

/* tslint:disable:max-line-length */
/** @deprecated In future versions, empty notifiers will no longer re-emit the source value on the output observable. */
export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<never>, subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<any>, subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<never>, subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>, subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T>;
/* tslint:disable:max-line-length */

/**
Expand Down Expand Up @@ -51,7 +51,7 @@ export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<any
* @see {@link debounce}
* @see {@link delay}
*
* @param {function(value: T): Observable} delayDurationSelector A function that
* @param {function(value: T, index: number): Observable} delayDurationSelector A function that
* returns an Observable for each value emitted by the source Observable, which
* is then used to delay the emission of that item on the output Observable
* until the Observable returned from this function emits a value.
Expand All @@ -63,7 +63,7 @@ export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<any
* @method delayWhen
* @owner Observable
*/
export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<any>,
export function delayWhen<T>(delayDurationSelector: (value: T, index: number) => Observable<any>,
subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T> {
if (subscriptionDelay) {
return (source: Observable<T>) =>
Expand All @@ -74,7 +74,7 @@ export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<any
}

class DelayWhenOperator<T> implements Operator<T, T> {
constructor(private delayDurationSelector: (value: T) => Observable<any>) {
constructor(private delayDurationSelector: (value: T, index: number) => Observable<any>) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
Expand All @@ -90,9 +90,10 @@ class DelayWhenOperator<T> implements Operator<T, T> {
class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
private completed: boolean = false;
private delayNotifierSubscriptions: Array<Subscription> = [];
private index: number = 0;

constructor(destination: Subscriber<T>,
private delayDurationSelector: (value: T) => Observable<any>) {
private delayDurationSelector: (value: T, index: number) => Observable<any>) {
super(destination);
}

Expand All @@ -117,8 +118,9 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
}

protected _next(value: T): void {
const index = this.index++;
try {
const delayNotifier = this.delayDurationSelector(value);
const delayNotifier = this.delayDurationSelector(value, index);
if (delayNotifier) {
this.tryDelay(delayNotifier, value);
}
Expand Down

0 comments on commit 0979d31

Please sign in to comment.