Skip to content

Commit

Permalink
fixes stack overflow possibility with merge operators (#2616)
Browse files Browse the repository at this point in the history
* fixes stackoverflows in MergeLimitedSink

fix subscribing immediately in merge operators can produce values immediately which can re-enter and cause stack overflows

* fix possible fail to dispose and re-lock

* fixes stackoverflows in MergeLimitedSink

fix subscribing immediately in merge operators can produce values immediately which can re-enter and cause stack overflows

* fix possible fail to dispose and re-lock
  • Loading branch information
geoffmacd authored Feb 15, 2025
1 parent 68647db commit baf816f
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions RxSwift/Observables/Merge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,7 @@ private final class MergeLimitedSinkIter<SourceElement, SourceSequence: Observab
self.parent.dispose()
case .completed:
self.parent.group.remove(for: self.disposeKey)
if let next = self.parent.queue.dequeue() {
self.parent.subscribe(next, group: self.parent.group)
}
else {
self.parent.activeCount -= 1

if self.parent.stopped && self.parent.activeCount == 0 {
self.parent.forwardOn(.completed)
self.parent.dispose()
}
}
self.parent.dequeueNextAndSubscribe()
}
}
}
Expand Down Expand Up @@ -236,7 +226,8 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
return self.group
}

func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) {
@discardableResult
func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) -> Disposable {
let subscription = SingleAssignmentDisposable()

let key = group.insert(subscription)
Expand All @@ -247,6 +238,28 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
let disposable = innerSource.asObservable().subscribe(observer)
subscription.setDisposable(disposable)
}
return subscription
}

func dequeueNextAndSubscribe() {
if let next = queue.dequeue() {
// subscribing immediately can produce values immediately which can re-enter and cause stack overflows
let disposable = CurrentThreadScheduler.instance.schedule(()) { _ in
// lock again
self.lock.performLocked {
self.subscribe(next, group: self.group)
}
}
_ = group.insert(disposable)
}
else {
activeCount -= 1

if stopped && activeCount == 0 {
forwardOn(.completed)
dispose()
}
}
}

func performMap(_ element: SourceElement) throws -> SourceSequence {
Expand Down

0 comments on commit baf816f

Please sign in to comment.