-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(groupBy): unsubscribe & cleanup GroupDurationSubscriber #2662
Conversation
Duration selectors where not disposed when the GroupDurationSubscriber's completed
…e group The Groups are disposed by the GroupDurationSelector, however the GroupDurationSubscriber can be subscribed to a different observable than the group itself. To prevent any unwanted subscriptions to accumulate over time we need to explicitly unsubscribe after the first event in GroupDurationSubscriber closes the group. Fixes ReactiveX#2660
The subscriptions to the durationSelector would pile up in the internal subscription list of the GroupBySubscriber. By removing the GroupDurationSubscriber explicitly from the GroupBySubscriber we prevent potential OOM exceptions. Fixes ReactiveX#2661
src/operator/groupBy.ts
Outdated
@@ -240,6 +240,8 @@ class GroupDurationSubscriber<K, T> extends Subscriber<T> { | |||
group.error(err); | |||
} | |||
this.parent.removeGroup(this.key); | |||
this.unsubscribe(); | |||
this.parent.remove(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't it be better GroupBySubscriber::removeGroup
manages subscription as well when it's specified to remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kwonoj if anything, the GroupDurationSubscriber
should be a pass-through to the group
Subject. If we did that, the base Subscriber _error
and _complete
implementations would call unsubscribe
, and the Subscription's unsubscribe would automatically call parent.remove()
…o underlying group Subject
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hermanbanken I've submitted hermanbanken#1 to your branch with my proposed changes. If you accept the PR there, it should reflect in this PR here.
src/operator/groupBy.ts
Outdated
@@ -240,6 +240,8 @@ class GroupDurationSubscriber<K, T> extends Subscriber<T> { | |||
group.error(err); | |||
} | |||
this.parent.removeGroup(this.key); | |||
this.unsubscribe(); | |||
this.parent.remove(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kwonoj if anything, the GroupDurationSubscriber
should be a pass-through to the group
Subject. If we did that, the base Subscriber _error
and _complete
implementations would call unsubscribe
, and the Subscription's unsubscribe would automatically call parent.remove()
refactor(groupBy): Refactor GroupDurationSubscriber to pass-through to underlying group Subject
With the change of @trxcllnt we no longer need to do It took me a while to understand why his change was enough. Let me refrase the explanation, maybe it will help anyone understand if it is not clear already: During unsubscribe the Previously Rx.Observable.range(0, 10000)
.concat(Rx.Observable.never())
.groupBy(x => x, x => x, group => Rx.Observable.empty())
.subscribe() would result in 10000 |
@crunchie84 and I have retested our application against these changes. We can happily confirm that it fixes both #2660 and #2661. Thanks for your work on this PR! 👍 |
spec/operators/groupBy-spec.ts
Outdated
let unsubs = []; | ||
for (let i = 0; i < gr.length; i++) { | ||
if (gr[i] !== '-' && gr[i] !== '|') { | ||
unsubs.push(empty.slice(0, i) + '^' + durationMarble.slice(1, -1) + '!'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is arguably less readable than just using the strings:
const unsubs = [
'^------!',
'-^------!',
'--^------!,
/* etc */
];
It's more busy work, sure, but tests are about readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes true. How about this [incoming].
'-^--!', | ||
'---^--!', | ||
'------------^-!', | ||
]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha... so much cleaner than the for loop! 😄
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
The
groupBy
operator has adurationSelector
argument that we can use to specify additional closings for the produced groups. However, the durationSelector produces Observables that were never unsubscribed. (#2660)Furthermore even if the durationSelector Observable was auto-unsubscribed (due to for example a
take(1)
in that Observable) its subscription would not be removed from theGroupBySubscriber
, which causes memory usage to grow and eventually causes a OOM exception in @crunchie84's application. (#2661)This PR solves both problems by explicitly unsubscribing & removing the subscription from the GroupBySubscriber. I also added a test for the unsubscribing part, but could not easily create a test in the same style as the other groupBy-tests without poking into the subscription.