Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce basic Flow<T>.chunked operator #4127

Merged
merged 3 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun cancellable (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final synthetic fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutine
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/cache(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/cache|cache@kotlinx.coroutines.flow.Flow<0:0>(){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/cancellable(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/cancellable|cancellable@kotlinx.coroutines.flow.Flow<0:0>(){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/catch(kotlin.coroutines/SuspendFunction2<kotlinx.coroutines.flow/FlowCollector<#A>, kotlin/Throwable, kotlin/Unit>): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/catch|catch@kotlinx.coroutines.flow.Flow<0:0>(kotlin.coroutines.SuspendFunction2<kotlinx.coroutines.flow.FlowCollector<0:0>,kotlin.Throwable,kotlin.Unit>){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/chunked(kotlin/Int): kotlinx.coroutines.flow/Flow<kotlin.collections/List<#A>> // kotlinx.coroutines.flow/chunked|chunked@kotlinx.coroutines.flow.Flow<0:0>(kotlin.Int){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/concatWith(#A): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/concatWith|concatWith@kotlinx.coroutines.flow.Flow<0:0>(0:0){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/concatWith(kotlinx.coroutines.flow/Flow<#A>): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/concatWith|concatWith@kotlinx.coroutines.flow.Flow<0:0>(kotlinx.coroutines.flow.Flow<0:0>){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.flow/Flow<#A>).kotlinx.coroutines.flow/conflate(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/conflate|conflate@kotlinx.coroutines.flow.Flow<0:0>(){0§<kotlin.Any?>}[0]
Expand Down
35 changes: 35 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,38 @@ public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value:
emit(accumulator as T)
}
}

/**
* Splits the given flow into a flow of non-overlapping lists each not exceeding the given [size] but never empty.
* The last emitted list may have fewer elements than the given size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please name the parameter maxSize then

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my original intent, but stdlib disagrees on that: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/chunked.html

A nice thing to consider tho

*
* Example of usage:
* ```
* flowOf("a", "b", "c", "d", "e")
* .chunked(2) // ["a", "b"], ["c", "d"], ["e"]
* .map { it.joinToString(separator = "") }
* .collect {
* println(it) // Prints "ab", "cd", e"
* }
* ```
*
* @throws IllegalArgumentException if [size] is not positive.
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> {
require(size >= 1) { "Expected positive chunk size, but got $size" }
return flow {
var result: ArrayList<T>? = null // Do not preallocate anything
collect { value ->
// Allocate if needed
val acc = result ?: ArrayList<T>(size).also { result = it }
acc.add(value)
if (acc.size == size) {
emit(acc)
// Cleanup, but don't allocate -- it might've been the case this is the last element
result = null
}
}
result?.let { emit(it) }
}
}
89 changes: 89 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package kotlinx.coroutines.flow.operators

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.testing.*
import kotlin.test.*

@OptIn(ExperimentalCoroutinesApi::class)
class ChunkedTest : TestBase() {

@Test
fun testChunked() = runTest {
doTest(flowOf(1, 2, 3, 4, 5), 2, listOf(listOf(1, 2), listOf(3, 4), listOf(5)))
doTest(flowOf(1, 2, 3, 4, 5), 3, listOf(listOf(1, 2, 3), listOf(4, 5)))
doTest(flowOf(1, 2, 3, 4), 2, listOf(listOf(1, 2), listOf(3, 4)))
doTest(flowOf(1), 3, listOf(listOf(1)))
}

private suspend fun <T> doTest(flow: Flow<T>, chunkSize: Int, expected: List<List<T>>) {
assertEquals(expected, flow.chunked(chunkSize).toList())
assertEquals(flow.toList().chunked(chunkSize), flow.chunked(chunkSize).toList())
}

@Test
fun testEmpty() = runTest {
doTest(emptyFlow<Int>(), 1, emptyList())
doTest(emptyFlow<Int>(), 2, emptyList())
}

@Test
fun testChunkedCancelled() = runTest {
val result = flow {
expect(1)
emit(1)
emit(2)
expect(2)
}.chunked(1).buffer().take(1).toList()
assertEquals(listOf(listOf(1)), result)
finish(3)
}

@Test
fun testChunkedCancelledWithSuspension() = runTest {
val result = flow {
expect(1)
emit(1)
yield()
expectUnreached()
emit(2)
}.chunked(1).buffer().take(1).toList()
assertEquals(listOf(listOf(1)), result)
finish(2)
}

@Test
fun testChunkedDoesNotIgnoreCancellation() = runTest {
expect(1)
val result = flow {
coroutineScope {
launch {
hang { expect(2) }
}
yield()
emit(1)
emit(2)
}
}.chunked(1).take(1).toList()
assertEquals(listOf(listOf(1)), result)
finish(3)
}

@Test
fun testIae() {
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(-1) }
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(0) }
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(Int.MIN_VALUE) }
assertFailsWith<IllegalArgumentException> { emptyFlow<Int>().chunked(Int.MIN_VALUE + 1) }
}

@Test
fun testSample() = runTest {
val result = flowOf("a", "b", "c", "d", "e")
.chunked(2)
.map { it.joinToString(separator = "") }
.toList()
assertEquals(listOf("ab", "cd", "e"), result)
}
}