Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tap): Adds subscribe, unsubscribe, finalize handlers #6527

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ export declare function takeWhile<T, S extends T>(predicate: (value: T, index: n
export declare function takeWhile<T, S extends T>(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction<T, S>;
export declare function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction<T>;

export declare function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction<T>;

Expand Down
2 changes: 1 addition & 1 deletion api_guard/dist/types/operators/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ export declare function takeWhile<T, S extends T>(predicate: (value: T, index: n
export declare function takeWhile<T, S extends T>(predicate: (value: T, index: number) => value is S, inclusive: false): OperatorFunction<T, S>;
export declare function takeWhile<T>(predicate: (value: T, index: number) => boolean, inclusive?: boolean): MonoTypeOperatorFunction<T>;

export declare function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
export declare function tap<T>(next?: ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null): MonoTypeOperatorFunction<T>;

Expand Down
86 changes: 85 additions & 1 deletion spec/operators/tap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/** @prettier */
import { expect } from 'chai';
import { tap, mergeMap, take } from 'rxjs/operators';
import { Subject, of, throwError, Observer, EMPTY, Observable } from 'rxjs';
import { Subject, of, throwError, Observer, EMPTY, Observable, noop } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

Expand Down Expand Up @@ -310,4 +310,88 @@ describe('tap', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

describe('lifecycle handlers', () => {
it('should support an unsubscribe event that fires before finalize', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe();

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);

subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'unsubscribe', 'finalize']);
});

it('should not call unsubscribe if source completes', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe();

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);
subject.complete();
// should have no effect
subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'complete', 'finalize']);
});

it('should not call unsubscribe if source errors', () => {
const results: any[] = [];
const subject = new Subject<number>();

const subscription = subject
.pipe(
tap({
subscribe: () => results.push('subscribe'),
next: (value) => results.push(`next ${value}`),
error: (err) => results.push(`error: ${err.message}`),
complete: () => results.push('complete'),
unsubscribe: () => results.push('unsubscribe'),
finalize: () => results.push('finalize'),
})
)
.subscribe({
error: noop,
});

subject.next(1);
subject.next(2);
expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2']);
subject.error(new Error('bad'));
// should have no effect
subscription.unsubscribe();

expect(results).to.deep.equal(['subscribe', 'next 1', 'next 2', 'error: bad', 'finalize']);
});
});
});
26 changes: 22 additions & 4 deletions src/internal/operators/tap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { identity } from '../util/identity';

export function tap<T>(observer?: Partial<Observer<T>>): MonoTypeOperatorFunction<T>;
export interface TapObserver<T> extends Observer<T> {
subscribe: () => void;
unsubscribe: () => void;
finalize: () => void;
}

export function tap<T>(observer?: Partial<TapObserver<T>>): MonoTypeOperatorFunction<T>;
export function tap<T>(next: (value: T) => void): MonoTypeOperatorFunction<T>;
/** @deprecated Instead of passing separate callback arguments, use an observer argument. Signatures taking separate callback arguments will be removed in v8. Details: https://rxjs.dev/deprecations/subscribe-arguments */
export function tap<T>(
Expand Down Expand Up @@ -106,19 +112,23 @@ export function tap<T>(
* runs the specified Observer or callback(s) for each item.
*/
export function tap<T>(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
observerOrNext?: Partial<TapObserver<T>> | ((value: T) => void) | null,
error?: ((e: any) => void) | null,
complete?: (() => void) | null
): MonoTypeOperatorFunction<T> {
// We have to check to see not only if next is a function,
// but if error or complete were passed. This is because someone
// could technically call tap like `tap(null, fn)` or `tap(null, null, fn)`.
const tapObserver =
isFunction(observerOrNext) || error || complete ? { next: observerOrNext as (value: T) => void, error, complete } : observerOrNext;
isFunction(observerOrNext) || error || complete
? // tslint:disable-next-line: no-object-literal-type-assertion
({ next: observerOrNext as Exclude<typeof observerOrNext, Partial<TapObserver<T>>>, error, complete } as Partial<TapObserver<T>>)
: observerOrNext;

// TODO: Use `operate` function once this PR lands: https://github.com/ReactiveX/rxjs/pull/5742
return tapObserver
? operate((source, subscriber) => {
tapObserver.subscribe?.();
let isUnsub = true;
source.subscribe(
new OperatorSubscriber(
subscriber,
Expand All @@ -127,12 +137,20 @@ export function tap<T>(
subscriber.next(value);
},
() => {
isUnsub = false;
tapObserver.complete?.();
subscriber.complete();
},
(err) => {
isUnsub = false;
tapObserver.error?.(err);
subscriber.error(err);
},
() => {
if (isUnsub) {
tapObserver.unsubscribe?.();
}
tapObserver.finalize?.();
}
)
);
Expand Down