Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunked and windowed operators #1558

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun catch (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun chunked (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun chunked (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -984,6 +986,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun transformLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZ)Lkotlinx/coroutines/flow/Flow;
public static final fun windowed (Lkotlinx/coroutines/flow/Flow;IIZLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}
Expand Down
100 changes: 100 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.jvm.*
import kotlin.math.*

/**
* Returns a flow of lists each not exceeding the given [size].
* The last list in the resulting flow may have less elements than the given [size].
*
* @param size the number of elements to take in each list, must be positive and can be greater than the number of elements in this flow.
*/

@FlowPreview
public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = chunked(size) { it.toList() }

/**
* Chunks a flow of elements into flow of lists, each not exceeding the given [size]
* and applies the given [transform] function to an each.
*
* Note that the list passed to the [transform] function is ephemeral and is valid only inside that function.
* You should not store it or allow it to escape in some way, unless you made a snapshot of it.
* The last list may have less elements than the given [size].
*
* This is more efficient, than using flow.chunked(n).map { ... }
*
* @param size the number of elements to take in each list, must be positive and can be greater than the number of elements in this flow.
*/

@FlowPreview
public fun <T, R> Flow<T>.chunked(size: Int, transform: suspend (List<T>) -> R): Flow<R> {
require(size > 0) { "Size should be greater than 0, but was $size" }
return windowed(size, size, true, transform)
}

/**
* Returns a flow of snapshots of the window of the given [size]
* sliding along this flow with the given [step], where each
* snapshot is a list.
*
* Several last lists may have less elements than the given [size].
*
* Both [size] and [step] must be positive and can be greater than the number of elements in this flow.
* @param size the number of elements to take in each window
* @param step the number of elements to move the window forward by on an each step
* @param partialWindows controls whether or not to keep partial windows in the end if any.
*/

@FlowPreview
public fun <T> Flow<T>.windowed(size: Int, step: Int, partialWindows: Boolean): Flow<List<T>> =
windowed(size, step, partialWindows) { it.toList() }

/**
* Returns a flow of results of applying the given [transform] function to
* an each list representing a view over the window of the given [size]
* sliding along this collection with the given [step].
*
* Note that the list passed to the [transform] function is ephemeral and is valid only inside that function.
* You should not store it or allow it to escape in some way, unless you made a snapshot of it.
* Several last lists may have less elements than the given [size].
*
* This is more efficient, than using flow.windowed(...).map { ... }
*
* Both [size] and [step] must be positive and can be greater than the number of elements in this collection.
* @param size the number of elements to take in each window
* @param step the number of elements to move the window forward by on an each step.
* @param partialWindows controls whether or not to keep partial windows in the end if any.
*/

@OptIn(ExperimentalStdlibApi::class)
@FlowPreview
public fun <T, R> Flow<T>.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List<T>) -> R): Flow<R> {
require(size > 0 && step > 0) { "Size and step should be greater than 0, but was size: $size, step: $step" }

return flow {
val buffer = ArrayDeque<T>(size)
val toDrop = min(step, size)
val toSkip = max(step - size, 0)
var skipped = toSkip

collect { value ->
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick observation. If the upstream flow throws an exception, you could end up with a partial buffer here.
what are your thoughts of adding this before the collect

catch { e ->
  if(partialWindows){
    emit(transform(buffer))
  }
  throw e
}.collect { value -> 

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fear, that we can loose exception this way or otherwise obscure it.

What if emitting partial buffer gets cancelled on some suspension point, before we rethrow exception? Or even, what if exception will be thrown from emitting our partial buffer, before we manage to rethrow original exception? Perhaps we could fix this by using a finally block somewhere. But, I think, it would be simpler, to just let exception propagate right away, and loose unfortunate partial buffer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loosing partial buffer is somewhat analogous to coroutineScope cancelling all its children immediately, on detecting exception anywhere. It is just more consistent this way.

if (toSkip == skipped) buffer.addLast(value)
else skipped++

if (buffer.size == size) {
emit(transform(buffer))
repeat(toDrop) { buffer.removeFirst() }
skipped = 0
}
}

while (partialWindows && buffer.isNotEmpty()) {
emit(transform(buffer))
repeat(min(toDrop, buffer.size)) { buffer.removeFirst() }
}
}
}
59 changes: 59 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package kotlinx.coroutines.flow.operators

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlin.test.Test
import kotlin.test.assertEquals

class ChunkedTest : TestBase() {

private val flow = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}

@Test
fun `Chunks correct number of emissions with possible partial window at the end`() = runTest {
assertEquals(2, flow.chunked(2).count())
assertEquals(2, flow.chunked(3).count())
assertEquals(1, flow.chunked(5).count())
}

@Test
fun `Throws IllegalArgumentException for chunk of size less than 1`() {
assertFailsWith<IllegalArgumentException> { flow.chunked(0) }
assertFailsWith<IllegalArgumentException> { flow.chunked(-1) }
}

@Test
fun `No emissions with empty flow`() = runTest {
assertEquals(0, flowOf<Int>().chunked(2).count())
}

@Test
fun testErrorCancelsUpstream() = runTest {
val latch = Channel<Unit>()
val flow = flow {
coroutineScope {
launch(start = CoroutineStart.ATOMIC) {
latch.send(Unit)
hang { expect(3) }
}
emit(1)
expect(1)
emit(2)
expectUnreached()
}
}.chunked<Int, Int>(2) { chunk ->
expect(2) // 2
latch.receive()
throw TestException()
}.catch { emit(42) }

assertEquals(42, flow.single())
finish(4)
}
}
82 changes: 82 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package kotlinx.coroutines.flow.operators

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlin.test.Test
import kotlin.test.assertEquals

class WindowedTest : TestBase() {

private val flow = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}

@Test
fun `Throws IllegalArgumentException for window of size or step less than 1`() {
assertFailsWith<IllegalArgumentException> { flow.windowed(0, 1, false) }
assertFailsWith<IllegalArgumentException> { flow.windowed(-1, 2, false) }
assertFailsWith<IllegalArgumentException> { flow.windowed(2, 0, false) }
assertFailsWith<IllegalArgumentException> { flow.windowed(5, -2, false) }
}

@Test
fun `No emissions with empty flow`() = runTest {
assertEquals(0, flowOf<Int>().windowed(2, 2, false).count())
}

@Test
fun `Emits correct sum with overlapping non partial windows`() = runTest {
assertEquals(15, flow.windowed(3, 1, false) { window ->
window.sum()
}.sum())
}

@Test
fun `Emits correct sum with overlapping partial windows`() = runTest {
assertEquals(13, flow.windowed(3, 2, true) { window ->
window.sum()
}.sum())
}

@Test
fun `Emits correct number of overlapping windows for long sequence of overlapping partial windows`() = runTest {
val elements = generateSequence(1) { it + 1 }.take(100)
val flow = elements.asFlow().windowed(100, 1, true)
assertEquals(100, flow.count())
}

@Test
fun `Emits correct sum with partial windows set apart`() = runTest {
assertEquals(7, flow.windowed(2, 3, true) { window ->
window.sum()
}.sum())
}

@Test
fun testErrorCancelsUpstream() = runTest {
val latch = Channel<Unit>()
val flow = flow {
coroutineScope {
launch(start = CoroutineStart.ATOMIC) {
latch.send(Unit)
hang { expect(3) }
}
emit(1)
expect(1)
emit(2)
expectUnreached()
}
}.windowed<Int, Int>(2, 3, false) { window ->
expect(2) // 2
latch.receive()
throw TestException()
}.catch { emit(42) }

assertEquals(42, flow.single())
finish(4)
}
}