-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Await.kt
151 lines (135 loc) · 6.1 KB
/
Await.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactive
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
import kotlinx.coroutines.suspendCancellableCoroutine
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import java.util.*
import kotlin.coroutines.*
/**
* Awaits for the first value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
*/
public suspend fun <T> Publisher<T>.awaitFirst(): T = awaitOne(Mode.FIRST)
/**
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*/
public suspend fun <T> Publisher<T>.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default)
/**
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*/
public suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT)
/**
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*/
public suspend fun <T> Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue()
/**
* Awaits for the last value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
*/
public suspend fun <T> Publisher<T>.awaitLast(): T = awaitOne(Mode.LAST)
/**
* Awaits for the single value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
*/
public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
// ------------------------ private ------------------------
private enum class Mode(val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
LAST("awaitLast"),
SINGLE("awaitSingle");
override fun toString(): String = s
}
private suspend fun <T> Publisher<T>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
private lateinit var subscription: Subscription
private var value: T? = null
private var seenValue = false
override fun onSubscribe(sub: Subscription) {
subscription = sub
cont.invokeOnCancellation { sub.cancel() }
sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
}
override fun onNext(t: T) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
seenValue = true
subscription.cancel()
cont.resume(t)
}
}
Mode.LAST, Mode.SINGLE -> {
if (mode == Mode.SINGLE && seenValue) {
subscription.cancel()
if (cont.isActive)
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
} else {
value = t
seenValue = true
}
}
}
}
@Suppress("UNCHECKED_CAST")
override fun onComplete() {
if (seenValue) {
if (cont.isActive) cont.resume(value as T)
return
}
when {
mode == Mode.FIRST_OR_DEFAULT -> {
cont.resume(default as T)
}
cont.isActive -> {
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
}
}
}
override fun onError(e: Throwable) {
cont.resumeWithException(e)
}
})
}