Skip to content

Commit

Permalink
refactor: remove GroupBySubscriber (ReactiveX#6806)
Browse files Browse the repository at this point in the history
Adds a special case to `OperatorSubscriber` to support `groupBy`.

Related: ReactiveX#6805 ReactiveX#6803 ReactiveX#6804
  • Loading branch information
benlesh authored Feb 8, 2022
1 parent 211e6ff commit e43063a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 35 deletions.
17 changes: 12 additions & 5 deletions src/internal/operators/OperatorSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
* this handler are sent to the `destination` error handler.
* @param onFinalize Additional teardown logic here. This will only be called on teardown if the
* subscriber itself is not already closed. This is called after all other teardown logic is executed.
* @param shouldUnsubscribe An optional check to see if an unsubscribe call should truly unsubscribe.
* NOTE: This currently **ONLY** exists to support the strange behavior of {@link groupBy}, where unsubscription
* to the resulting observable does not actually disconnect from the source if there are active subscriptions
* to any grouped observable. (DO NOT EXPOSE OR USE EXTERNALLY!!!)
*/
constructor(
destination: Subscriber<any>,
onNext?: (value: T) => void,
onComplete?: () => void,
onError?: (err: any) => void,
private onFinalize?: () => void
private onFinalize?: () => void,
private shouldUnsubscribe?: () => boolean
) {
// It's important - for performance reasons - that all of this class's
// members are initialized and that they are always initialized in the same
Expand Down Expand Up @@ -97,9 +102,11 @@ export class OperatorSubscriber<T> extends Subscriber<T> {
}

unsubscribe() {
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
if (!this.shouldUnsubscribe || this.shouldUnsubscribe()) {
const { closed } = this;
super.unsubscribe();
// Execute additional teardown if we have any and we didn't already do so.
!closed && this.onFinalize?.();
}
}
}
47 changes: 17 additions & 30 deletions src/internal/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,20 @@ export function groupBy<T, K, R>(
// next call from the source.
const handleError = (err: any) => notify((consumer) => consumer.error(err));

// The number of actively subscribed groups
let activeGroups = 0;

// Whether or not teardown was attempted on this subscription.
let teardownAttempted = false;

// Capturing a reference to this, because we need a handle to it
// in `createGroupedObservable` below. This is what we use to
// subscribe to our source observable. This sometimes needs to be unsubscribed
// out-of-band with our `subscriber` which is the downstream subscriber, or destination,
// in cases where a user unsubscribes from the main resulting subscription, but
// still has groups from this subscription subscribed and would expect values from it
// Consider: `source.pipe(groupBy(fn), take(2))`.
const groupBySourceSubscriber = new GroupBySubscriber(
const groupBySourceSubscriber = new OperatorSubscriber(
subscriber,
(value: T) => {
// Because we have to notify all groups of any errors that occur in here,
Expand Down Expand Up @@ -234,7 +240,14 @@ export function groupBy<T, K, R>(
// When the source subscription is _finally_ torn down, release the subjects and keys
// in our groups Map, they may be quite large and we don't want to keep them around if we
// don't have to.
() => groups.clear()
() => groups.clear(),
() => {
teardownAttempted = true;
// We only kill our subscription to the source if we have
// no active groups. As stated above, consider this scenario:
// source$.pipe(groupBy(fn), take(2)).
return activeGroups === 0;
}
);

// Subscribe to the source
Expand All @@ -247,16 +260,14 @@ export function groupBy<T, K, R>(
*/
function createGroupedObservable(key: K, groupSubject: SubjectLike<any>) {
const result: any = new Observable<T>((groupSubscriber) => {
groupBySourceSubscriber.activeGroups++;
activeGroups++;
const innerSub = groupSubject.subscribe(groupSubscriber);
return () => {
innerSub.unsubscribe();
// We can kill the subscription to our source if we now have no more
// active groups subscribed, and a teardown was already attempted on
// the source.
--groupBySourceSubscriber.activeGroups === 0 &&
groupBySourceSubscriber.teardownAttempted &&
groupBySourceSubscriber.unsubscribe();
--activeGroups === 0 && teardownAttempted && groupBySourceSubscriber.unsubscribe();
};
});
result.key = key;
Expand All @@ -265,30 +276,6 @@ export function groupBy<T, K, R>(
});
}

/**
* This was created because groupBy is a bit unique, in that emitted groups that have
* subscriptions have to keep the subscription to the source alive until they
* are torn down.
*/
class GroupBySubscriber<T> extends OperatorSubscriber<T> {
/**
* The number of actively subscribed groups
*/
activeGroups = 0;
/**
* Whether or not teardown was attempted on this subscription.
*/
teardownAttempted = false;

unsubscribe() {
this.teardownAttempted = true;
// We only kill our subscription to the source if we have
// no active groups. As stated above, consider this scenario:
// source$.pipe(groupBy(fn), take(2)).
this.activeGroups === 0 && super.unsubscribe();
}
}

/**
* An observable of values that is the emitted by the result of a {@link groupBy} operator,
* contains a `key` property for the grouping.
Expand Down

0 comments on commit e43063a

Please sign in to comment.