Skip to content

Commit d520395

Browse files
Add opt-in dropping behavior to ObservableSource.asFlow
1 parent 3975ed4 commit d520395

File tree

2 files changed

+23
-3
lines changed

2 files changed

+23
-3
lines changed

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,17 +87,19 @@ public fun <T : Any> ReceiveChannel<T>.asObservable(context: CoroutineContext):
8787
* A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
8888
* resulting flow to specify a user-defined value and to control what happens when data is produced faster
8989
* than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details.
90+
*
91+
* @param dropWhenFull Set to true to drop items when buffer is full instead of blocking the [ObservableSource].
9092
*/
91-
@Suppress("NOTHING_TO_INLINE")
93+
@Suppress("BlockingMethodInNonBlockingContext")
9294
@ExperimentalCoroutinesApi
93-
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
95+
public fun <T: Any> ObservableSource<T>.asFlow(dropWhenFull: Boolean = false): Flow<T> = callbackFlow {
9496

9597
var disposable: Disposable? = null
9698

9799
val observer = object : Observer<T> {
98100
override fun onComplete() { close() }
99101
override fun onSubscribe(d: Disposable) { disposable = d }
100-
override fun onNext(t: T) { sendBlocking(t) }
102+
override fun onNext(t: T) { if(!offer(t) && !dropWhenFull) runBlocking { send(t) } }
101103
override fun onError(e: Throwable) { close(e) }
102104
}
103105
subscribe(observer)

reactive/kotlinx-coroutines-rx2/test/ObservableAsFlowTest.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,13 @@ class ObservableAsFlowTest : TestBase() {
143143
assertEquals(10_000, count)
144144
}
145145

146+
@Test
147+
fun testLongRangeDroppingWhenFull() = runTest {
148+
val source = Observable.range(1, 10_000)
149+
val count = source.asFlow(dropWhenFull = true).buffer(10).count()
150+
assertEquals(11, count)
151+
}
152+
146153
@Test
147154
fun testProduce() = runTest {
148155
val source = Observable.range(0, 10)
@@ -154,6 +161,17 @@ class ObservableAsFlowTest : TestBase() {
154161
check(listOf(0, 9), flow.conflate().produceIn(this))
155162
}
156163

164+
@Test
165+
fun testProduceDroppingWhenFull() = runTest {
166+
val source = Observable.range(0, 10)
167+
val flow = source.asFlow(dropWhenFull = true)
168+
check((0..9).toList(), flow.produceIn(this))
169+
check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
170+
check((0..2).toList(), flow.buffer(2).produceIn(this))
171+
check(listOf(0), flow.buffer(0).produceIn(this))
172+
check(listOf(0, 9), flow.conflate().produceIn(this))
173+
}
174+
157175
private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
158176
val result = ArrayList<Int>(10)
159177
channel.consumeEach { result.add(it) }

0 commit comments

Comments
 (0)