Skip to content

Commit

Permalink
feat(Subscription): add no longer returns unnecessary Subscription …
Browse files Browse the repository at this point in the history
…reference

- Updates a few locations where this feature was being used incorrectly.
- Eliminates confusion where people were trying to "chain" `add` calls.

BREAKING CHANGE: `add` no longer returns an unnecessary Subscription reference. This means that if you are calling `add` with a _function_ and not a `Subscription` (e.g. `subscription.add(() => { /* teardown */ })`), you will not be able to remove that teardown _function_ with `remove`. The fix for this is to wrap your function with a `Subscription` like so:  `const childSub = new Subscription(() => { /* teardown */ }); subscription.add(childSub);`. Then you will be able to remove it via `subscription.remove(childSub);`. Bear in mind that is it an edge case to need to manually remove a child subscription from a parent subscription. **All subscriptions that have been added to other subscriptions will remove themselves from the parent subscription(s) automatically when they are unsubscribed.**.
  • Loading branch information
benlesh committed Aug 18, 2020
1 parent b5b6450 commit 54eeda7
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 75 deletions.
11 changes: 7 additions & 4 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ export declare type InteropObservable<T> = {

export declare function interval(period?: number, scheduler?: SchedulerLike): Observable<number>;

export declare function isAbortError(value: any): value is AbortError;

export declare function isObservable<T>(obj: any): obj is Observable<T>;

export declare function lastValueFrom<T>(source: Observable<T>): Promise<T>;
Expand Down Expand Up @@ -368,8 +370,8 @@ export declare class Observable<T> implements Subscribable<T> {
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic);
protected _subscribe(subscriber: Subscriber<any>): TeardownLogic;
protected _trySubscribe(sink: Subscriber<T>): TeardownLogic;
forEach(next: (value: T) => void): Promise<void>;
forEach(next: (value: T) => void, promiseCtor: PromiseConstructorLike): Promise<void>;
forEach(nextHandler: (value: T) => void, signal?: AbortSignal): Promise<void>;
forEach(nextHandler: (value: T) => void, promiseCtor: PromiseConstructorLike): Promise<void>;
protected lift<R>(operator?: Operator<T, R>): Observable<R>;
pipe(): Observable<T>;
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
Expand All @@ -383,6 +385,7 @@ export declare class Observable<T> implements Subscribable<T> {
pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>): Observable<I>;
pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>, ...operations: OperatorFunction<any, any>[]): Observable<unknown>;
subscribe(observer?: PartialObserver<T>): Subscription;
subscribe(observer: PartialObserver<T> | null | undefined, signal: AbortSignal | null | undefined): Subscription;
subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription;
subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Subscription;
subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Subscription;
Expand Down Expand Up @@ -548,7 +551,7 @@ export declare class Subscription implements SubscriptionLike {
protected _parentOrParents: Subscription | Subscription[] | null;
closed: boolean;
constructor(unsubscribe?: () => void);
add(teardown: TeardownLogic): Subscription;
add(teardown: TeardownLogic): void;
remove(subscription: Subscription): void;
unsubscribe(): void;
static EMPTY: Subscription;
Expand Down Expand Up @@ -621,7 +624,7 @@ export declare class VirtualAction<T> extends AsyncAction<T> {
protected recycleAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay?: number): any;
protected requestAsyncId(scheduler: VirtualTimeScheduler, id?: any, delay?: number): any;
schedule(state?: T, delay?: number): Subscription;
static sortActions<T>(a: VirtualAction<T>, b: VirtualAction<T>): 1 | 0 | -1;
static sortActions<T>(a: VirtualAction<T>, b: VirtualAction<T>): 0 | 1 | -1;
}

export declare class VirtualTimeScheduler extends AsyncScheduler {
Expand Down
85 changes: 25 additions & 60 deletions spec/Subscription-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,85 +4,50 @@ import { Observable, UnsubscriptionError, Subscription, merge } from 'rxjs';
/** @test {Subscription} */
describe('Subscription', () => {
describe('Subscription.add()', () => {
it('Should return self if the self is passed', () => {
const sub = new Subscription();
const ret = sub.add(sub);

expect(ret).to.equal(sub);
});

it('Should return Subscription.EMPTY if it is passed', () => {
const sub = new Subscription();
const ret = sub.add(Subscription.EMPTY);

expect(ret).to.equal(Subscription.EMPTY);
});

it('Should return Subscription.EMPTY if it is called with `void` value', () => {
const sub = new Subscription();
const ret = sub.add(undefined);
expect(ret).to.equal(Subscription.EMPTY);
});

it('Should return a new Subscription created with teardown function if it is passed a function', () => {
const sub = new Subscription();

it('should unsubscribe child subscriptions', () => {
const main = new Subscription();

let isCalled = false;
const ret = sub.add(function() {
const child = new Subscription(() => {
isCalled = true;
});
ret.unsubscribe();
main.add(child);
main.unsubscribe();

expect(isCalled).to.equal(true);
});

it('Should wrap the AnonymousSubscription and return a subscription that unsubscribes and removes it when unsubbed', () => {
const sub: any = new Subscription();
let called = false;
const arg = {
unsubscribe: () => called = true,
};
const ret = sub.add(arg);

expect(called).to.equal(false);
expect(sub._subscriptions.length).to.equal(1);
ret.unsubscribe();
expect(called).to.equal(true);
expect(sub._subscriptions.length).to.equal(0);
});

it('Should return the passed one if passed a AnonymousSubscription having not function `unsubscribe` member', () => {
const sub = new Subscription();
const arg = {
isUnsubscribed: false,
unsubscribe: undefined as any,
};
const ret = sub.add(arg as any);

expect(ret).to.equal(arg);
});

it('Should return the passed one if the self has been unsubscribed', () => {
it('should unsubscribe child subscriptions if it has already been unsubscribed', () => {
const main = new Subscription();
main.unsubscribe();

const child = new Subscription();
const ret = main.add(child);
let isCalled = false;
const child = new Subscription(() => {
isCalled = true;
});
main.add(child);

expect(ret).to.equal(child);
expect(isCalled).to.equal(true);
});

it('Should unsubscribe the passed one if the self has been unsubscribed', () => {
it('should unsubscribe a teardown function that was passed', () => {
let isCalled = false;
const main = new Subscription();
main.add(() => {
isCalled = true;
});
main.unsubscribe();
expect(isCalled).to.be.true;
});

it('should unsubscribe a teardown function that was passed immediately if it has been unsubscribed', () => {
let isCalled = false;
const child = new Subscription(() => {
const main = new Subscription();
main.unsubscribe();
main.add(() => {
isCalled = true;
});
main.add(child);

expect(isCalled).to.equal(true);
expect(isCalled).to.be.true;
});
});

Expand Down
15 changes: 7 additions & 8 deletions src/internal/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ export class Subscription implements SubscriptionLike {
* `remove()` to remove the passed teardown logic from the inner subscriptions
* list.
*/
add(teardown: TeardownLogic): Subscription {
add(teardown: TeardownLogic): void {
let subscription = (<Subscription>teardown);

if (!teardown) {
return Subscription.EMPTY;
return;
}

switch (typeof teardown) {
Expand All @@ -153,10 +153,9 @@ export class Subscription implements SubscriptionLike {
case 'object':
if (subscription === this || subscription.closed || typeof subscription.unsubscribe !== 'function') {
// This also covers the case where `subscription` is `Subscription.EMPTY`, which is always in `closed` state.
return subscription;
return;
} else if (this.closed) {
subscription.unsubscribe();
return subscription;
} else if (!(subscription instanceof Subscription)) {
const tmp = subscription;
subscription = new Subscription();
Expand All @@ -170,14 +169,14 @@ export class Subscription implements SubscriptionLike {

// Add `this` as parent of `subscription` if that's not already the case.
let { _parentOrParents } = subscription;
if (_parentOrParents === null) {
if (_parentOrParents == null) {
// If we don't have a parent, then set `subscription._parents` to
// the `this`, which is the common case that we optimize for.
subscription._parentOrParents = this;
} else if (_parentOrParents instanceof Subscription) {
if (_parentOrParents === this) {
// The `subscription` already has `this` as a parent.
return subscription;
return;
}
// If there's already one parent, but not multiple, allocate an
// Array to store the rest of the parent Subscriptions.
Expand All @@ -187,7 +186,7 @@ export class Subscription implements SubscriptionLike {
_parentOrParents.push(this);
} else {
// The `subscription` already has `this` as a parent.
return subscription;
return;
}

// Optimize for the common case when adding the first subscription.
Expand All @@ -198,7 +197,7 @@ export class Subscription implements SubscriptionLike {
subscriptions.push(subscription);
}

return subscription;
return;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/internal/operators/exhaust.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class SwitchFirstSubscriber<T> extends SimpleOuterSubscriber<T, T> {

protected _next(value: T): void {
if (!this.innerSubscription) {
this.innerSubscription = this.add(innerSubscribe(value, new SimpleInnerSubscriber(this)));
this.add(this.innerSubscription = innerSubscribe(value, new SimpleInnerSubscriber(this)));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/subscribeOn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ export interface DispatchArg<T> {

class SubscribeOnObservable<T> extends Observable<T> {
/** @nocollapse */
static dispatch<T>(this: SchedulerAction<T>, arg: DispatchArg<T>): Subscription {
static dispatch<T>(this: SchedulerAction<T>, arg: DispatchArg<T>) {
const { source, subscriber } = arg;
return this.add(source.subscribe(subscriber));
this.add(source.subscribe(subscriber));
}

constructor(
Expand Down

0 comments on commit 54eeda7

Please sign in to comment.