Skip to content

Commit

Permalink
Merge pull request cx-org#106 from cx-org/fix-zip
Browse files Browse the repository at this point in the history
Publishers.Zip should wait unapplied values before finish
  • Loading branch information
luoxiu authored Feb 24, 2021
2 parents 973cc9b + 8515049 commit d351c15
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 23 deletions.
38 changes: 38 additions & 0 deletions Sources/CombineX/Internal/Extensions/Completion+extensions.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
extension Subscribers.Completion {

func mapError<NewFailure: Error>(_ transform: (Failure) -> NewFailure) -> Subscribers.Completion<NewFailure> {
switch self {
case .finished:
return .finished
case .failure(let error):
return .failure(transform(error))
}
}

var isFinished: Bool {
switch self {
case .finished:
return true
case .failure:
return false
}
}

var isFailure: Bool {
switch self {
case .finished:
return false
case .failure:
return true
}
}

var error: Failure? {
switch self {
case .finished:
return nil
case .failure(let error):
return error
}
}
}
11 changes: 0 additions & 11 deletions Sources/CombineX/Internal/Extensions/Completion+mapError.swift

This file was deleted.

53 changes: 41 additions & 12 deletions Sources/CombineX/Publishers/B/Zip.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,19 @@ extension Publishers.Zip {
func cancel() {
self.lock.lock()
self.isCompleted = true
let (childA, childB) = self.release()
let (childA, childB) = (self.childA, self.childB)
self.release()
self.lock.unlock()

childA?.cancel()
childB?.cancel()
}

private func release() -> (Child<A.Output>?, Child<B.Output>?) {
defer {
self.bufferA = CircularBuffer()
self.bufferB = CircularBuffer()

self.childA = nil
self.childB = nil
}
return (self.childA, self.childB)
private func release() {
self.bufferA = CircularBuffer()
self.bufferB = CircularBuffer()
self.childA = nil
self.childB = nil
}

func childReceive(_ value: Any, from source: Source) -> Subscribers.Demand {
Expand Down Expand Up @@ -170,6 +167,7 @@ extension Publishers.Zip {
let childB = self.childB
self.lock.unlock()

// TODO: we don't want more if we need complete
if more > 0 {
switch source {
case .a:
Expand All @@ -178,22 +176,53 @@ extension Publishers.Zip {
childA?.request(more)
}
}
self.completeIfNeeded(completion: .finished)
return more
default:
self.lock.unlock()
return .none
}
}

func childReceive(completion: Subscribers.Completion<A.Failure>, from source: Source) {
func childReceive(completion: Subscribers.Completion<Failure>, from source: Source) {
self.lock.lock()
if self.isCompleted {
self.lock.unlock()
return
}
switch source {
case .a:
let childA = self.childA
self.childA = nil
self.lock.unlock()
childA?.cancel()
case .b:
let childB = self.childB
self.childB = nil
self.lock.unlock()
childB?.cancel()
}

self.completeIfNeeded(completion: completion)
}

func completeIfNeeded(completion: Subscribers.Completion<Failure>) {
self.lock.lock()
if self.isCompleted {
self.lock.unlock()
return
}
guard completion.isFailure ||
(self.childA == nil && bufferA.isEmpty) ||
(self.childB == nil && bufferB.isEmpty) ||
(self.childA == nil && self.childB == nil) else {
self.lock.unlock()
return
}

self.isCompleted = true
let (childA, childB) = self.release()
let (childA, childB) = (self.childA, self.childB)
self.release()
self.lock.unlock()

childA?.cancel()
Expand Down
18 changes: 18 additions & 0 deletions Tests/CombineXTests/Publishers/ZipSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ class ZipSpec: QuickSpec {
expect(sub.eventsWithoutSubscription) == [.value(6), .value(22), .completion(.failure(.e0))]
}

it("should wait unapplied values before finish") {
let subject0 = PassthroughSubject<String, TestError>()
let subject1 = PassthroughSubject<String, TestError>()

let pub = subject0.zip(subject1, +)
let sub = pub.subscribeTracingSubscriber(initialDemand: .unlimited)

subject0.send("0")
subject0.send("1")
subject1.send("a")

subject0.send(completion: .finished)
subject1.send("b")
subject1.send("c")

expect(sub.eventsWithoutSubscription) == [.value("0a"), .value("1b"), .completion(.finished)]
}

// MARK: 1.5 should send as many as demands
it("should send as many as demands") {
let subject0 = PassthroughSubject<String, TestError>()
Expand Down

0 comments on commit d351c15

Please sign in to comment.