Skip to content

Commit

Permalink
fix(groupBy): ensures durationSelector subscriptions are cleaned
Browse files Browse the repository at this point in the history
The subscriptions to the durationSelector would pile up in the
internal subscription list of the GroupBySubscriber. By maintaining
a map entry per group we can discard subscriptions of a single
group.

Fixes ReactiveX#2661
  • Loading branch information
hermanbanken committed Jun 13, 2017
1 parent 1a03cc2 commit b853f54
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion src/operator/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class GroupByOperator<T, K, R> implements Operator<T, GroupedObservable<K, R>> {
*/
class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscription {
private groups: Map<K, Subject<T|R>> = null;
private subscriptions: Map<K, Subscription> = null;
public attemptedToUnsubscribe: boolean = false;
public count: number = 0;

Expand Down Expand Up @@ -147,6 +148,18 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr

let group = groups.get(key);

let subscriptions = this.subscriptions;

if (!subscriptions) {
subscriptions = this.subscriptions = typeof key === 'string' ? new FastMap() : new Map();
}

let groupSubscription = subscriptions.get(key) || new Subscription();

if (!subscriptions.get(key)) {
subscriptions.set(key, groupSubscription);
}

let element: R;
if (this.elementSelector) {
try {
Expand All @@ -171,7 +184,9 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
this.error(err);
return;
}
this.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));

groupSubscription.add(duration.subscribe(new GroupDurationSubscriber(key, group, this)));
this.add(groupSubscription);
}
}

Expand Down Expand Up @@ -205,6 +220,12 @@ class GroupBySubscriber<T, K, R> extends Subscriber<T> implements RefCountSubscr
}

removeGroup(key: K): void {
let groupSubscription = this.subscriptions.get(key);
if (groupSubscription) {
this.remove(groupSubscription);
groupSubscription.unsubscribe();
this.subscriptions.delete(key);
}
this.groups.delete(key);
}

Expand Down

0 comments on commit b853f54

Please sign in to comment.