diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 1e786a2116..6afe466b1f 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -995,6 +995,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index b61fac350b..98df6dab72 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -473,6 +473,7 @@ final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutine final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/cache(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/cache|cache@kotlinx.coroutines.flow.Flow<0:0>(){0§}[0] final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/cancellable(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/cancellable|cancellable@kotlinx.coroutines.flow.Flow<0:0>(){0§}[0] final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/catch(kotlin.coroutines/SuspendFunction2, kotlin/Throwable, kotlin/Unit>): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/catch|catch@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction2,kotlin.Throwable,kotlin.Unit>){0§}[0] +final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/chunked(kotlin/Int): kotlinx.coroutines.flow/Flow> // kotlinx.coroutines.flow/chunked|chunked@kotlinx.coroutines.flow.Flow<0:0>(kotlin.Int){0§}[0] final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/concatWith(#A): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/concatWith|concatWith@kotlinx.coroutines.flow.Flow<0:0>(0:0){0§}[0] final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/concatWith(kotlinx.coroutines.flow/Flow<#A>): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/concatWith|concatWith@kotlinx.coroutines.flow.Flow<0:0>(kotlinx.coroutines.flow.Flow<0:0>){0§}[0] final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/conflate(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/conflate|conflate@kotlinx.coroutines.flow.Flow<0:0>(){0§}[0] diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index 2f43cfacc7..f3c9be1c7e 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -129,3 +129,38 @@ public fun Flow.runningReduce(operation: suspend (accumulator: T, value: emit(accumulator as T) } } + +/** + * Splits the given flow into a flow of non-overlapping lists each not exceeding the given [size] but never empty. + * The last emitted list may have fewer elements than the given size. + * + * Example of usage: + * ``` + * flowOf("a", "b", "c", "d", "e") + * .chunked(2) // ["a", "b"], ["c", "d"], ["e"] + * .map { it.joinToString(separator = "") } + * .collect { + * println(it) // Prints "ab", "cd", e" + * } + * ``` + * + * @throws IllegalArgumentException if [size] is not positive. + */ +@ExperimentalCoroutinesApi +public fun Flow.chunked(size: Int): Flow> { + require(size >= 1) { "Expected positive chunk size, but got $size" } + return flow { + var result: ArrayList? = null // Do not preallocate anything + collect { value -> + // Allocate if needed + val acc = result ?: ArrayList(size).also { result = it } + acc.add(value) + if (acc.size == size) { + emit(acc) + // Cleanup, but don't allocate -- it might've been the case this is the last element + result = null + } + } + result?.let { emit(it) } + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt new file mode 100644 index 0000000000..d9c6104cd9 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -0,0 +1,89 @@ +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.testing.* +import kotlin.test.* + +@OptIn(ExperimentalCoroutinesApi::class) +class ChunkedTest : TestBase() { + + @Test + fun testChunked() = runTest { + doTest(flowOf(1, 2, 3, 4, 5), 2, listOf(listOf(1, 2), listOf(3, 4), listOf(5))) + doTest(flowOf(1, 2, 3, 4, 5), 3, listOf(listOf(1, 2, 3), listOf(4, 5))) + doTest(flowOf(1, 2, 3, 4), 2, listOf(listOf(1, 2), listOf(3, 4))) + doTest(flowOf(1), 3, listOf(listOf(1))) + } + + private suspend fun doTest(flow: Flow, chunkSize: Int, expected: List>) { + assertEquals(expected, flow.chunked(chunkSize).toList()) + assertEquals(flow.toList().chunked(chunkSize), flow.chunked(chunkSize).toList()) + } + + @Test + fun testEmpty() = runTest { + doTest(emptyFlow(), 1, emptyList()) + doTest(emptyFlow(), 2, emptyList()) + } + + @Test + fun testChunkedCancelled() = runTest { + val result = flow { + expect(1) + emit(1) + emit(2) + expect(2) + }.chunked(1).buffer().take(1).toList() + assertEquals(listOf(listOf(1)), result) + finish(3) + } + + @Test + fun testChunkedCancelledWithSuspension() = runTest { + val result = flow { + expect(1) + emit(1) + yield() + expectUnreached() + emit(2) + }.chunked(1).buffer().take(1).toList() + assertEquals(listOf(listOf(1)), result) + finish(2) + } + + @Test + fun testChunkedDoesNotIgnoreCancellation() = runTest { + expect(1) + val result = flow { + coroutineScope { + launch { + hang { expect(2) } + } + yield() + emit(1) + emit(2) + } + }.chunked(1).take(1).toList() + assertEquals(listOf(listOf(1)), result) + finish(3) + } + + @Test + fun testIae() { + assertFailsWith { emptyFlow().chunked(-1) } + assertFailsWith { emptyFlow().chunked(0) } + assertFailsWith { emptyFlow().chunked(Int.MIN_VALUE) } + assertFailsWith { emptyFlow().chunked(Int.MIN_VALUE + 1) } + } + + @Test + fun testSample() = runTest { + val result = flowOf("a", "b", "c", "d", "e") + .chunked(2) + .map { it.joinToString(separator = "") } + .toList() + assertEquals(listOf("ab", "cd", "e"), result) + } +}