-
Notifications
You must be signed in to change notification settings - Fork 31
/
SignalProducer.swift
133 lines (110 loc) · 5.45 KB
/
SignalProducer.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//
// SignalProducer.swift
// Rex
//
// Created by Neil Pankey on 5/9/15.
// Copyright (c) 2015 Neil Pankey. All rights reserved.
//
import ReactiveCocoa
extension SignalProducerType {
/// Buckets each received value into a group based on the key returned
/// from `grouping`. Termination events on the original signal are
/// also forwarded to each producer group.
public func groupBy<Key: Hashable>(grouping: Value -> Key) -> SignalProducer<(Key, SignalProducer<Value, Error>), Error> {
return SignalProducer<(Key, SignalProducer<Value, Error>), Error> { observer, disposable in
var groups: [Key: Signal<Value, Error>.Observer] = [:]
let lock = NSRecursiveLock()
lock.name = "me.neilpa.rex.groupBy"
self.start(Observer(next: { value in
let key = grouping(value)
lock.lock()
var group = groups[key]
if group == nil {
let (producer, sink) = SignalProducer<Value, Error>.buffer(Int.max)
observer.sendNext(key, producer)
groups[key] = sink
group = sink
}
lock.unlock()
group!.sendNext(value)
}, failed: { error in
observer.sendFailed(error)
groups.values.forEach { $0.sendFailed(error) }
}, completed: { _ in
observer.sendCompleted()
groups.values.forEach { $0.sendCompleted() }
}, interrupted: { _ in
observer.sendInterrupted()
groups.values.forEach { $0.sendInterrupted() }
}))
}
}
/// Applies `transform` to values from self with non-`nil` results unwrapped and
/// forwared on the returned producer.
public func filterMap<U>(transform: Value -> U?) -> SignalProducer<U, Error> {
return lift { $0.filterMap(transform) }
}
/// Returns a producer that drops `Error` sending `replacement` terminal event
/// instead, defaulting to `Completed`.
public func ignoreError(replacement replacement: Event<Value, NoError> = .Completed) -> SignalProducer<Value, NoError> {
precondition(replacement.isTerminating)
return lift { $0.ignoreError(replacement: replacement) }
}
/// Forwards events from self until `interval`. Then if producer isn't completed yet,
/// terminates with `event` on `scheduler`.
///
/// If the interval is 0, the timeout will be scheduled immediately. The producer
/// must complete synchronously (or on a faster scheduler) to avoid the timeout.
public func timeoutAfter(interval: NSTimeInterval, withEvent event: Event<Value, Error>, onScheduler scheduler: DateSchedulerType) -> SignalProducer<Value, Error> {
return lift { $0.timeoutAfter(interval, withEvent: event, onScheduler: scheduler) }
}
/// Enforces that at least `interval` time passes before forwarding a value. If a
/// new value arrives, the previous one is dropped and the `interval` delay starts
/// again. Error events are immediately forwarded, even if there's a queued value.
///
/// This operator is useful for scenarios like type-to-search where you want to
/// wait for a "lull" in typing before kicking off a search request.
public func debounce(interval: NSTimeInterval, onScheduler scheduler: DateSchedulerType) -> SignalProducer<Value, Error> {
return lift { $0.debounce(interval, onScheduler: scheduler) }
}
/// Forwards a value and then mutes the producer by dropping all subsequent values
/// for `interval` seconds. Once time elapses the next new value will be forwarded
/// and repeat the muting process. Error events are immediately forwarded even while
/// the producer is muted.
///
/// This operator could be used to coalesce multiple notifications in a short time
/// frame by only showing the first one.
public func muteFor(interval: NSTimeInterval, clock: DateSchedulerType) -> SignalProducer<Value, Error> {
return lift { $0.muteFor(interval, clock: clock) }
}
/// Delays the start of the producer by `interval` on the provided scheduler.
public func deferred(interval: NSTimeInterval, onScheduler scheduler: DateSchedulerType) -> SignalProducer<Value, Error> {
return SignalProducer.empty
.delay(interval, onScheduler: scheduler)
.concat(self.producer)
}
/// Delays retrying on failure by `interval` up to `count` attempts.
public func deferredRetry(interval: NSTimeInterval, onScheduler scheduler: DateSchedulerType, count: Int = .max) -> SignalProducer<Value, Error> {
precondition(count >= 0)
if count == 0 {
return producer
}
var retries = count
return flatMapError { error in
// The final attempt shouldn't defer the error if it fails
var producer = SignalProducer<Value, Error>(error: error)
if retries > 0 {
producer = producer.deferred(interval, onScheduler: scheduler)
}
retries -= 1
return producer
}
.retry(count)
}
}
extension SignalProducerType where Value: SequenceType {
/// Returns a producer that flattens sequences of elements. The inverse of `collect`.
public func uncollect() -> SignalProducer<Value.Generator.Element, Error> {
return lift { $0.uncollect() }
}
}