Skip to content

Commit

Permalink
Flow onEmpty (#1904)
Browse files Browse the repository at this point in the history
* Introduce Flow.onEmpty operator

Fixes #1890
  • Loading branch information
qwwdfsad authored Apr 7, 2020
1 parent 02b403d commit de29acd
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 5 deletions.
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down
39 changes: 35 additions & 4 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
}

/**
* Invokes the given [action] when the this flow starts to be collected.
* Invokes the given [action] when this flow starts to be collected.
*
* The receiver of the [action] is [FlowCollector] and thus `onStart` can emit additional elements.
* The receiver of the [action] is [FlowCollector], so `onStart` can emit additional elements.
* For example:
*
* ```
Expand All @@ -67,7 +67,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
* .collect { println(it) } // prints Begin, a, b, c
* ```
*/
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
Expand Down Expand Up @@ -129,7 +129,7 @@ public fun <T> Flow<T>.onStart(
* .collect { println(it) } // prints a, b, c, Done
* ```
*/
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
Expand All @@ -155,6 +155,37 @@ public fun <T> Flow<T>.onCompletion(
exception?.let { throw it }
}

/**
* Invokes the given [action] when this flow completes without emitting any elements.
* The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements.
* For example:
*
* ```
* emptyFlow<Int>().onEmpty {
* emit(1)
* emit(2)
* }.collect { println(it) } // prints 1, 2
* ```
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onEmpty(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow {
var isEmpty = true
collect {
isEmpty = false
emit(it)
}
if (isEmpty) {
val collector = SafeCollector(this, coroutineContext)
try {
collector.action()
} finally {
collector.releaseIntercepted()
}
}
}

private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
override suspend fun emit(value: Any?) {
throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,23 @@ class OnCompletionTest : TestBase() {
assertEquals(42, value)
finish(2)
}

@Test
fun testTransparencyViolation() = runTest {
val flow = emptyFlow<Int>().onCompletion {
expect(2)
coroutineScope {
launch {
try {
emit(1)
} catch (e: IllegalStateException) {
expect(3)
}
}
}
}
expect(1)
assertNull(flow.singleOrNull())
finish(4)
}
}
81 changes: 81 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.operators

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

class OnEmptyTest : TestBase() {

@Test
fun testOnEmptyInvoked() = runTest {
val flow = emptyFlow<Int>().onEmpty { emit(1) }
assertEquals(1, flow.single())
}

@Test
fun testOnEmptyNotInvoked() = runTest {
val flow = flowOf(1).onEmpty { emit(2) }
assertEquals(1, flow.single())
}

@Test
fun testOnEmptyNotInvokedOnError() = runTest {
val flow = flow<Int> {
throw TestException()
}.onEmpty { expectUnreached() }
assertFailsWith<TestException>(flow)
}

@Test
fun testOnEmptyNotInvokedOnCancellation() = runTest {
val flow = flow<Int> {
expect(2)
hang { expect(4) }
}.onEmpty { expectUnreached() }

expect(1)
val job = flow.onEach { expectUnreached() }.launchIn(this)
yield()
expect(3)
job.cancelAndJoin()
finish(5)
}

@Test
fun testOnEmptyCancellation() = runTest {
val flow = emptyFlow<Int>().onEmpty {
expect(2)
hang { expect(4) }
emit(1)
}
expect(1)
val job = flow.onEach { expectUnreached() }.launchIn(this)
yield()
expect(3)
job.cancelAndJoin()
finish(5)
}

@Test
fun testTransparencyViolation() = runTest {
val flow = emptyFlow<Int>().onEmpty {
expect(2)
coroutineScope {
launch {
try {
emit(1)
} catch (e: IllegalStateException) {
expect(3)
}
}
}
}
expect(1)
assertNull(flow.singleOrNull())
finish(4)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,23 @@ class OnStartTest : TestBase() {
.onStart { emit("Begin") }
assertEquals(listOf("Begin", "a", "b", "c"), flow.toList())
}
}

@Test
fun testTransparencyViolation() = runTest {
val flow = emptyFlow<Int>().onStart {
expect(2)
coroutineScope {
launch {
try {
emit(1)
} catch (e: IllegalStateException) {
expect(3)
}
}
}
}
expect(1)
assertNull(flow.singleOrNull())
finish(4)
}
}

0 comments on commit de29acd

Please sign in to comment.