diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index a6e5fd513e..0d4c906051 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -866,6 +866,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; + public static final fun chunked (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -984,6 +986,8 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun transformLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; + public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZ)Lkotlinx/coroutines/flow/Flow; + public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt new file mode 100644 index 0000000000..cfc2121d58 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -0,0 +1,100 @@ +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.jvm.* +import kotlin.math.* + +/** + * Returns a flow of lists each not exceeding the given [size]. + * The last list in the resulting flow may have less elements than the given [size]. + * + * @param size the number of elements to take in each list, must be positive and can be greater than the number of elements in this flow. + */ + +@FlowPreview +public fun Flow.chunked(size: Int): Flow> = chunked(size) { it.toList() } + +/** + * Chunks a flow of elements into flow of lists, each not exceeding the given [size] + * and applies the given [transform] function to an each. + * + * Note that the list passed to the [transform] function is ephemeral and is valid only inside that function. + * You should not store it or allow it to escape in some way, unless you made a snapshot of it. + * The last list may have less elements than the given [size]. + * + * This is more efficient, than using flow.chunked(n).map { ... } + * + * @param size the number of elements to take in each list, must be positive and can be greater than the number of elements in this flow. + */ + +@FlowPreview +public fun Flow.chunked(size: Int, transform: suspend (List) -> R): Flow { + require(size > 0) { "Size should be greater than 0, but was $size" } + return windowed(size, size, true, transform) +} + +/** + * Returns a flow of snapshots of the window of the given [size] + * sliding along this flow with the given [step], where each + * snapshot is a list. + * + * Several last lists may have less elements than the given [size]. + * + * Both [size] and [step] must be positive and can be greater than the number of elements in this flow. + * @param size the number of elements to take in each window + * @param step the number of elements to move the window forward by on an each step + * @param partialWindows controls whether or not to keep partial windows in the end if any. + */ + +@FlowPreview +public fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean): Flow> = + windowed(size, step, partialWindows) { it.toList() } + +/** + * Returns a flow of results of applying the given [transform] function to + * an each list representing a view over the window of the given [size] + * sliding along this collection with the given [step]. + * + * Note that the list passed to the [transform] function is ephemeral and is valid only inside that function. + * You should not store it or allow it to escape in some way, unless you made a snapshot of it. + * Several last lists may have less elements than the given [size]. + * + * This is more efficient, than using flow.windowed(...).map { ... } + * + * Both [size] and [step] must be positive and can be greater than the number of elements in this collection. + * @param size the number of elements to take in each window + * @param step the number of elements to move the window forward by on an each step. + * @param partialWindows controls whether or not to keep partial windows in the end if any. + */ + +@OptIn(ExperimentalStdlibApi::class) +@FlowPreview +public fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List) -> R): Flow { + require(size > 0 && step > 0) { "Size and step should be greater than 0, but was size: $size, step: $step" } + + return flow { + val buffer = ArrayDeque(size) + val toDrop = min(step, size) + val toSkip = max(step - size, 0) + var skipped = toSkip + + collect { value -> + if (toSkip == skipped) buffer.addLast(value) + else skipped++ + + if (buffer.size == size) { + emit(transform(buffer)) + repeat(toDrop) { buffer.removeFirst() } + skipped = 0 + } + } + + while (partialWindows && buffer.isNotEmpty()) { + emit(transform(buffer)) + repeat(min(toDrop, buffer.size)) { buffer.removeFirst() } + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt new file mode 100644 index 0000000000..f3169d0d3f --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt @@ -0,0 +1,59 @@ +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.* +import kotlin.test.Test +import kotlin.test.assertEquals + +class ChunkedTest : TestBase() { + + private val flow = flow { + emit(1) + emit(2) + emit(3) + emit(4) + } + + @Test + fun `Chunks correct number of emissions with possible partial window at the end`() = runTest { + assertEquals(2, flow.chunked(2).count()) + assertEquals(2, flow.chunked(3).count()) + assertEquals(1, flow.chunked(5).count()) + } + + @Test + fun `Throws IllegalArgumentException for chunk of size less than 1`() { + assertFailsWith { flow.chunked(0) } + assertFailsWith { flow.chunked(-1) } + } + + @Test + fun `No emissions with empty flow`() = runTest { + assertEquals(0, flowOf().chunked(2).count()) + } + + @Test + fun testErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch(start = CoroutineStart.ATOMIC) { + latch.send(Unit) + hang { expect(3) } + } + emit(1) + expect(1) + emit(2) + expectUnreached() + } + }.chunked(2) { chunk -> + expect(2) // 2 + latch.receive() + throw TestException() + }.catch { emit(42) } + + assertEquals(42, flow.single()) + finish(4) + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt new file mode 100644 index 0000000000..173c50caf2 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt @@ -0,0 +1,82 @@ +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.* +import kotlin.test.Test +import kotlin.test.assertEquals + +class WindowedTest : TestBase() { + + private val flow = flow { + emit(1) + emit(2) + emit(3) + emit(4) + } + + @Test + fun `Throws IllegalArgumentException for window of size or step less than 1`() { + assertFailsWith { flow.windowed(0, 1, false) } + assertFailsWith { flow.windowed(-1, 2, false) } + assertFailsWith { flow.windowed(2, 0, false) } + assertFailsWith { flow.windowed(5, -2, false) } + } + + @Test + fun `No emissions with empty flow`() = runTest { + assertEquals(0, flowOf().windowed(2, 2, false).count()) + } + + @Test + fun `Emits correct sum with overlapping non partial windows`() = runTest { + assertEquals(15, flow.windowed(3, 1, false) { window -> + window.sum() + }.sum()) + } + + @Test + fun `Emits correct sum with overlapping partial windows`() = runTest { + assertEquals(13, flow.windowed(3, 2, true) { window -> + window.sum() + }.sum()) + } + + @Test + fun `Emits correct number of overlapping windows for long sequence of overlapping partial windows`() = runTest { + val elements = generateSequence(1) { it + 1 }.take(100) + val flow = elements.asFlow().windowed(100, 1, true) + assertEquals(100, flow.count()) + } + + @Test + fun `Emits correct sum with partial windows set apart`() = runTest { + assertEquals(7, flow.windowed(2, 3, true) { window -> + window.sum() + }.sum()) + } + + @Test + fun testErrorCancelsUpstream() = runTest { + val latch = Channel() + val flow = flow { + coroutineScope { + launch(start = CoroutineStart.ATOMIC) { + latch.send(Unit) + hang { expect(3) } + } + emit(1) + expect(1) + emit(2) + expectUnreached() + } + }.windowed(2, 3, false) { window -> + expect(2) // 2 + latch.receive() + throw TestException() + }.catch { emit(42) } + + assertEquals(42, flow.single()) + finish(4) + } +} \ No newline at end of file