Skip to content

[WIP] Sendable support #850

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Sources/Action.swift
Original file line number Diff line number Diff line change
@@ -116,7 +116,8 @@ public final class Action<Input, Output, Error: Swift.Error> {
let isEnabled = MutableProperty(actionState.value.isEnabled)
self.isEnabled = Property(capturing: isEnabled)

func modifyActionState<Result>(_ action: (inout ActionState<State.Value>) throws -> Result) rethrows -> Result {
@Sendable
func modifyActionState<Result>(_ action: @Sendable (inout ActionState<State.Value>) throws -> Result) rethrows -> Result {
return try actionState.begin { storage in
let oldState = storage.value
defer {
10 changes: 6 additions & 4 deletions Sources/Atomic.swift
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ import MachO
/// A simple, generic lock-free finite state machine.
///
/// - warning: `deinitialize` must be called to dispose of the consumed memory.
internal struct UnsafeAtomicState<State: RawRepresentable> where State.RawValue == Int32 {
internal struct UnsafeAtomicState<State: RawRepresentable>: Sendable where State.RawValue == Int32 {
internal typealias Transition = (expected: State, next: State)
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
private let value: UnsafeMutablePointer<Int32>
@@ -29,6 +29,7 @@ internal struct UnsafeAtomicState<State: RawRepresentable> where State.RawValue
}

/// Deinitialize the finite state machine.
@Sendable
internal func deinitialize() {
value.deinitialize(count: 1)
value.deallocate()
@@ -104,7 +105,8 @@ internal struct UnsafeAtomicState<State: RawRepresentable> where State.RawValue

/// `Lock` exposes `os_unfair_lock` on supported platforms, with pthread mutex as the
/// fallback.
internal class Lock: LockProtocol {
// TODO: unckecked? subclass?
internal class Lock: LockProtocol, @unchecked Sendable {
#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS)
@available(iOS 10.0, *)
@available(macOS 10.12, *)
@@ -212,7 +214,7 @@ internal class Lock: LockProtocol {
func `try`() -> Bool { fatalError() }
}

internal protocol LockProtocol {
internal protocol LockProtocol: Sendable {
static func make() -> Self

func lock()
@@ -229,7 +231,7 @@ internal struct NoLock: LockProtocol {
}

/// An atomic variable.
public final class Atomic<Value> {
public final class Atomic<Value>: @unchecked Sendable {
private let lock: Lock
private var _value: Value

4 changes: 3 additions & 1 deletion Sources/Bag.swift
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
public struct Bag<Element> {
/// A uniquely identifying token for removing a value that was inserted into a
/// Bag.
public struct Token {
public struct Token: Sendable {
fileprivate let value: UInt64
}

@@ -97,3 +97,5 @@ extension Bag: RandomAccessCollection {
}
}
}

extension Bag: Sendable where Element: Sendable {}
23 changes: 12 additions & 11 deletions Sources/Disposable.swift
Original file line number Diff line number Diff line change
@@ -8,14 +8,15 @@

/// Represents something that can be “disposed”, usually associated with freeing
/// resources or canceling work.
public protocol Disposable: AnyObject {
public protocol Disposable: AnyObject, Sendable {
/// Whether this disposable has been disposed already.
var isDisposed: Bool { get }

/// Disposing of the resources represented by `self`. If `self` has already
/// been disposed of, it does nothing.
///
/// - note: Implementations must issue a memory barrier.
@Sendable
func dispose()
}

@@ -58,22 +59,22 @@ internal final class _SimpleDisposable: Disposable {
/// A disposable that has already been disposed.
internal final class NopDisposable: Disposable {
static let shared = NopDisposable()
var isDisposed = true
let isDisposed = true
func dispose() {}
private init() {}
}

/// A type-erased disposable that forwards operations to an underlying disposable.
public final class AnyDisposable: Disposable {
private final class ActionDisposable: Disposable {
private final class ActionDisposable: Disposable, @unchecked Sendable {
let state: UnsafeAtomicState<DisposableState>
var action: (() -> Void)?
var action: (@Sendable () -> Void)?

var isDisposed: Bool {
return state.is(.disposed)
}

init(_ action: (() -> Void)?) {
init(_ action: (@Sendable () -> Void)?) {
self.state = UnsafeAtomicState(.active)
self.action = action
}
@@ -100,7 +101,7 @@ public final class AnyDisposable: Disposable {
///
/// - parameters:
/// - action: A closure to run when calling `dispose()`.
public init(_ action: @escaping () -> Void) {
public init(_ action: @escaping @Sendable () -> Void) {
base = ActionDisposable(action)
}

@@ -123,7 +124,7 @@ public final class AnyDisposable: Disposable {
}

/// A disposable that will dispose of any number of other disposables.
public final class CompositeDisposable: Disposable {
public final class CompositeDisposable: Disposable, @unchecked Sendable {
private let disposables: Atomic<Bag<Disposable>?>
private var state: UnsafeAtomicState<DisposableState>

@@ -203,7 +204,7 @@ public final class CompositeDisposable: Disposable {
/// composite has been disposed of, `disposable` has been disposed of, or
/// `disposable` is `nil`.
@discardableResult
public func add(_ action: @escaping () -> Void) -> Disposable? {
public func add(_ action: @escaping @Sendable () -> Void) -> Disposable? {
return add(AnyDisposable(action))
}

@@ -246,7 +247,7 @@ public final class CompositeDisposable: Disposable {
/// - returns: An instance of `DisposableHandle` that can be used to opaquely
/// remove the disposable later (if desired).
@discardableResult
public static func += (lhs: CompositeDisposable, rhs: @escaping () -> Void) -> Disposable? {
public static func += (lhs: CompositeDisposable, rhs: @escaping @Sendable () -> Void) -> Disposable? {
return lhs.add(rhs)
}
}
@@ -325,7 +326,7 @@ extension ScopedDisposable where Inner == CompositeDisposable {
/// - returns: An instance of `DisposableHandle` that can be used to opaquely
/// remove the disposable later (if desired).
@discardableResult
public static func += (lhs: ScopedDisposable<CompositeDisposable>, rhs: @escaping () -> Void) -> Disposable? {
public static func += (lhs: ScopedDisposable<CompositeDisposable>, rhs: @escaping @Sendable () -> Void) -> Disposable? {
return lhs.inner.add(rhs)
}
}
@@ -334,7 +335,7 @@ extension ScopedDisposable where Inner == CompositeDisposable {
/// wrapped disposable to be replaced.
public final class SerialDisposable: Disposable {
private let _inner: Atomic<Disposable?>
private var state: UnsafeAtomicState<DisposableState>
private let state: UnsafeAtomicState<DisposableState>

public var isDisposed: Bool {
return state.is(.disposed)
20 changes: 10 additions & 10 deletions Sources/Event.swift
Original file line number Diff line number Diff line change
@@ -206,27 +206,27 @@ extension Signal.Event: EventProtocol {
// This operator performs side effect upon interruption.

extension Signal.Event {
internal typealias Transformation<U, E: Swift.Error> = (ReactiveSwift.Observer<U, E>, Lifetime) -> ReactiveSwift.Observer<Value, Error>
internal typealias Transformation<U, E: Swift.Error> = (any ReactiveSwift.Observer<U, E>, Lifetime) -> (any ReactiveSwift.Observer<Value, Error>)

internal static func filter(_ isIncluded: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
internal static func filter(_ isIncluded: @escaping @Sendable (Value) -> Bool) -> Transformation<Value, Error> {
return { downstream, _ in
Operators.Filter(downstream: downstream, predicate: isIncluded)
}
}

internal static func compactMap<U>(_ transform: @escaping (Value) -> U?) -> Transformation<U, Error> {
internal static func compactMap<U>(_ transform: @escaping @Sendable (Value) -> U?) -> Transformation<U, Error> {
return { downstream, _ in
Operators.CompactMap(downstream: downstream, transform: transform)
}
}

internal static func map<U>(_ transform: @escaping (Value) -> U) -> Transformation<U, Error> {
internal static func map<U>(_ transform: @escaping @Sendable (Value) -> U) -> Transformation<U, Error> {
return { downstream, _ in
Operators.Map(downstream: downstream, transform: transform)
}
}

internal static func mapError<E>(_ transform: @escaping (Error) -> E) -> Transformation<Value, E> {
internal static func mapError<E>(_ transform: @escaping @Sendable (Error) -> E) -> Transformation<Value, E> {
return { downstream, _ in
Operators.MapError(downstream: downstream, transform: transform)
}
@@ -244,13 +244,13 @@ extension Signal.Event {
}
}

internal static func attemptMap<U>(_ transform: @escaping (Value) -> Result<U, Error>) -> Transformation<U, Error> {
internal static func attemptMap<U>(_ transform: @escaping @Sendable (Value) -> Result<U, Error>) -> Transformation<U, Error> {
return { downstream, _ in
Operators.AttemptMap(downstream: downstream, transform: transform)
}
}

internal static func attempt(_ action: @escaping (Value) -> Result<(), Error>) -> Transformation<Value, Error> {
internal static func attempt(_ action: @escaping @Sendable (Value) -> Result<(), Error>) -> Transformation<Value, Error> {
return attemptMap { value -> Result<Value, Error> in
return action(value).map { _ in value }
}
@@ -285,13 +285,13 @@ extension Signal.Event {
}
}

internal static func take(while shouldContinue: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
internal static func take(while shouldContinue: @escaping @Sendable (Value) -> Bool) -> Transformation<Value, Error> {
return { downstream, _ in
Operators.TakeWhile(downstream: downstream, shouldContinue: shouldContinue)
}
}

internal static func take(until shouldContinue: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
internal static func take(until shouldContinue: @escaping @Sendable (Value) -> Bool) -> Transformation<Value, Error> {
return { downstream, _ in
Operators.TakeUntil(downstream: downstream, shouldContinue: shouldContinue)
}
@@ -397,7 +397,7 @@ extension Signal.Event {
return scan(into: initialResult) { $0 = nextPartialResult($0, $1) }
}

internal static func scanMap<State, U>(into initialState: State, _ next: @escaping (inout State, Value) -> U) -> Transformation<U, Error> {
internal static func scanMap<State, U>(into initialState: State, _ next: @escaping @Sendable (inout State, Value) -> U) -> Transformation<U, Error> {
return { downstream, _ in
Operators.ScanMap(downstream: downstream, initial: initialState, next: next)
}
33 changes: 20 additions & 13 deletions Sources/EventLogger.swift
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ public func defaultEventLog(identifier: String, event: String, fileName: String,

/// A type that represents an event logging function.
/// Signature is:
/// - identifier
/// - identifier
/// - event
/// - fileName
/// - functionName
@@ -52,22 +52,22 @@ fileprivate struct LogContext<Event: LoggingEventProtocol> {
let functionName: String
let lineNumber: Int
let logger: EventLogger
func log<T>(_ event: Event) -> ((T) -> Void)? {

func log<T>(_ event: Event) -> (@Sendable (T) -> Void)? {
return event.logIfNeeded(events: self.events) { event in
self.logger(self.identifier, event, self.fileName, self.functionName, self.lineNumber)
}
}
func log(_ event: Event) -> (() -> Void)? {

func log(_ event: Event) -> (@Sendable () -> Void)? {
return event.logIfNeededNoArg(events: self.events) { event in
self.logger(self.identifier, event, self.fileName, self.functionName, self.lineNumber)
}
}
}

extension Signal {
/// Logs all events that the receiver sends. By default, it will print to
/// Logs all events that the receiver sends. By default, it will print to
/// the standard output.
///
/// - parameters:
@@ -80,14 +80,21 @@ extension Signal {
/// - logger: Logger that logs the events.
///
/// - returns: Signal that, when observed, logs the fired events.
public func logEvents(identifier: String = "", events: Set<LoggingEvent.Signal> = Set(LoggingEvent.Signal.allCases), fileName: String = #file, functionName: String = #function, lineNumber: Int = #line, logger: @escaping EventLogger = defaultEventLog) -> Signal<Value, Error> {
public func logEvents(
identifier: String = "",
events: Set<LoggingEvent.Signal> = Set(LoggingEvent.Signal.allCases),
fileName: String = #file,
functionName: String = #function,
lineNumber: Int = #line,
logger: @escaping EventLogger = defaultEventLog
) -> Signal<Value, Error> {
let logContext = LogContext(events: events,
identifier: identifier,
fileName: fileName,
functionName: functionName,
lineNumber: lineNumber,
logger: logger)

return self.on(
failed: logContext.log(.failed),
completed: logContext.log(.completed),
@@ -100,7 +107,7 @@ extension Signal {
}

extension SignalProducer {
/// Logs all events that the receiver sends. By default, it will print to
/// Logs all events that the receiver sends. By default, it will print to
/// the standard output.
///
/// - parameters:
@@ -149,14 +156,14 @@ private extension LoggingEventProtocol {
// Due to differences in the type checker, this method cannot
// overload the generic `logIfNeeded`, or otherwise it would lead to
// infinite recursion with Swift 4.0.x.
func logIfNeededNoArg(events: Set<Self>, logger: @escaping (String) -> Void) -> (() -> Void)? {
func logIfNeededNoArg(events: Set<Self>, logger: @escaping (String) -> Void) -> (@Sendable () -> Void)? {
return (self.logIfNeeded(events: events, logger: logger) as ((()) -> Void)?)
.map { closure in
{ closure(()) }
{ @Sendable in closure(()) }
}
}
func logIfNeeded<T>(events: Set<Self>, logger: @escaping (String) -> Void) -> ((T) -> Void)? {

func logIfNeeded<T>(events: Set<Self>, logger: @escaping (String) -> Void) -> (@Sendable (T) -> Void)? {
guard events.contains(self) else {
return nil
}
Loading