Skip to content

Lock introduced in eraseToStream can lead to missing sequence elements #19

@cwalo

Description

@cwalo

Description

I stumbled on this issue after observing some missing values when using eraseToStream(). It seems that a side effect of introducing the lock is that values can be dropped if the sequence produces elements while the initializer is holding the lock. Assuming I'm holding everything correctly in the example below, the current implementation will drop the first few values pretty consistently. This is not the case when using the AsyncPublisher produced by publisher.values or when manually calling continuation.yield().

I took a stab at this version which seems to be behave as expected and keeps things within the concurrency domain. That said, I'm not certain of the implications compared to the current implementation.
Leveraging the initializer with a bufferingPolicy:

init<S: AsyncSequence>(_ sequence: S) where S.Element == Element {
    self.init { continuation in
        Task {
            do {
                for try await value in sequence {
                    continuation.yield(value)
                }
            } catch {
                continuation.finish()
            }
        }
    }
}

Checklist

  • If possible, I've reproduced the issue using the main branch of this package.
  • This issue hasn't been addressed in an existing GitHub issue or discussion.

Expected behavior

All sequence elements are received.

Actual behavior

Sequence elements are sometimes missing.

Steps to reproduce

class Observer<T> {
    init(_ publisher: some Publisher<T, Never>, usesEraseStream: Bool) {
        let stream: any AsyncSequence = usesEraseStream ? publisher.values.eraseToStream() : publisher.values
        
        Task {
            print("Receive Task started")
            for try await value in stream {
                print("receive: \(value)")
            }
        }
    }
}

let subject = PassthroughSubject<Int, Never>()
let observer = Observer<Int>(subject, usesEraseStream: true)

Task {
    print("Send Task started")
    for i in 0..<100 {
        subject.send(i)
    }
}

// expected - receive: 0 ... receive: 99
// result - receive: 5 ... receive: 99

swift-concurrency-extras version information

1.1.0

Destination operating system

macOS 14.1.2

Xcode version information

15.0.1

Swift Compiler version information

swift-driver version: 1.87.1 Apple Swift version 5.9 (swiftlang-5.9.0.128.108 clang-1500.0.40.1)
Target: arm64-apple-macosx14.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions