-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
Producer.swift
92 lines (76 loc) · 2.96 KB
/
Producer.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
//
// Producer.swift
// RxSwift
//
// Created by Krunoslav Zaher on 2/20/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
class Producer<Element>: Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
private final class SinkDisposer: Cancelable {
private enum DisposeState: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private let state = AtomicInt(0)
private var sink: Disposable?
private var subscription: Disposable?
var isDisposed: Bool {
isFlagSet(self.state, DisposeState.disposed.rawValue)
}
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self.sink = sink
self.subscription = subscription
let previousState = fetchOr(self.state, DisposeState.sinkAndSubscriptionSet.rawValue)
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeState.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self.sink = nil
self.subscription = nil
}
}
func dispose() {
let previousState = fetchOr(self.state, DisposeState.disposed.rawValue)
if (previousState & DisposeState.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = self.sink else {
rxFatalError("Sink not set")
}
guard let subscription = self.subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
self.sink = nil
self.subscription = nil
}
}
}