Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow size- and time-based chunked #2378

Open
wants to merge 28 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
74d3d60
Merge pull request #1 from Kotlin/master
circusmagnus Mar 18, 2020
c0bf01b
Merge pull request #2 from Kotlin/develop
circusmagnus Mar 18, 2020
ccb76c8
Merge pull request #3 from Kotlin/master
circusmagnus Nov 3, 2020
d52fd69
Merge pull request #4 from Kotlin/master
circusmagnus Nov 6, 2020
6fb01b9
Add time- and size-based chunking operators
circusmagnus Nov 10, 2020
43bfcfb
Remove unused operators
circusmagnus Nov 10, 2020
c378678
Add visibility modifiers and clarify tests
circusmagnus Nov 10, 2020
cfbd8ea
Merge pull request #5 from Kotlin/master
circusmagnus Dec 23, 2020
1c98a45
Merge remote-tracking branch 'origin/master' into flow-time-based-chu…
circusmagnus Dec 23, 2020
5237f92
Chunk with interval and size only
circusmagnus Dec 23, 2020
8b8b28e
Chunk with interval and size only - part 2
circusmagnus Jan 8, 2021
c2a4eac
Merge pull request #6 from Kotlin/master
circusmagnus Mar 29, 2021
5c5c088
Add time- and size-based chunking operators
circusmagnus Nov 10, 2020
e04a106
Remove unused operators
circusmagnus Nov 10, 2020
942b163
Add visibility modifiers and clarify tests
circusmagnus Nov 10, 2020
a12429e
Chunk with interval and size only
circusmagnus Dec 23, 2020
da1a57c
Chunk with interval and size only - part 2
circusmagnus Jan 8, 2021
632d540
Prepare Chunking Methods
circusmagnus Mar 29, 2021
c3244ff
Add a bunch of tests
circusmagnus Mar 31, 2021
5b5c3bd
Test Time based chunking
circusmagnus Apr 2, 2021
2b9e5d1
Add docs and last tests
circusmagnus Apr 12, 2021
9cb86f9
Add test for error propagation in Natural Chunking
circusmagnus Apr 14, 2021
d996a9b
Enable for JDK 1.6
circusmagnus Apr 14, 2021
b16e9b0
Merge remote-tracking branch 'origin/flow-time-based-chunked' into fl…
circusmagnus Apr 14, 2021
3aaf7bd
Merge pull request #7 from Kotlin/develop
circusmagnus Apr 14, 2021
e795cc2
Merge remote-tracking branch 'origin/develop' into flow-time-based-ch…
circusmagnus Apr 14, 2021
3fb6939
Adjust for changes in Channel API
circusmagnus Apr 15, 2021
7431426
New Api dump
circusmagnus Apr 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Chunked.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlin.math.*
import kotlin.time.*

private const val NO_MAXIMUM = -1

public fun <T> Flow<T>.chunked(maxSize: Int, minSize: Int = 1): Flow<List<T>> {
require(minSize in 0 until maxSize)
return flow {
val accumulator = mutableListOf<T>()
collect { value ->
accumulator.add(value)
if (accumulator.size == maxSize) emit(accumulator.drain())
}
if (accumulator.size >= minSize) emit(accumulator)
}
}

@ExperimentalTime
public fun <T> Flow<T>.chunked(
chunkDuration: Duration,
minSize: Int = 1,
maxSize: Int = NO_MAXIMUM
): Flow<List<T>> = chunked(chunkDuration.toDelayMillis(), minSize, maxSize)

public fun <T> Flow<T>.chunked(
chunkDurationMs: Long,
minSize: Int = 1,
maxSize: Int = NO_MAXIMUM
): Flow<List<T>> {
require(chunkDurationMs > 0)
require(minSize >= 0)
require(maxSize == NO_MAXIMUM || maxSize >= minSize)

return if (minSize == 0 && maxSize == NO_MAXIMUM) chunkFixedTimeWindows(chunkDurationMs)
else if (minSize == 0) chunkContinousWindows(chunkDurationMs, maxSize)
else chunkFloatingWindows(chunkDurationMs, minSize, maxSize)
}

private fun <T> Flow<T>.chunkFixedTimeWindows(durationMs: Long): Flow<List<T>> = scopedFlow { downstream ->
val upstream = produce(capacity = Channel.CHANNEL_DEFAULT_CAPACITY) {
val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) }
launch {
for (tick in ticker) send(Signal.TimeIsUp)
}
collect { value -> send(Signal.NewElement(value)) }
ticker.close()
}
val accumulator = mutableListOf<T>()

for (signal in upstream) {
when (signal) {
is Signal.NewElement -> accumulator.add(signal.value)
is Signal.TimeIsUp -> downstream.emit(accumulator.drain())
}
}
if (accumulator.isNotEmpty()) downstream.emit(accumulator)
}

private fun <T> Flow<T>.chunkContinousWindows(durationMs: Long, maxSize: Int): Flow<List<T>> =
scopedFlow { downstream ->
val inbox: ReceiveChannel<T> = this@chunkContinousWindows.produceIn(this)
val ticker = Ticker(durationMs, this).apply { send(Ticker.Message.Start) }
val accumulator = mutableListOf<T>()

whileSelect {
inbox.onReceiveOrClosed.invoke { valueOrClosed ->
val isOpen = !valueOrClosed.isClosed
if (isOpen) {
accumulator.add(valueOrClosed.value)
if(accumulator.size == maxSize){
ticker.send(Ticker.Message.Reset)
downstream.emit(accumulator.drain())
ticker.send(Ticker.Message.Start)
}
}
isOpen
}
ticker.onReceive.invoke {
downstream.emit(accumulator.drain())
true
}
}

ticker.close()
if (accumulator.isNotEmpty()) downstream.emit(accumulator)
}

private fun <T> Flow<T>.chunkFloatingWindows(
durationMs: Long,
minSize: Int,
maxSize: Int,
): Flow<List<T>> {

return scopedFlow { downstream ->
val upstream: ReceiveChannel<T> = this@chunkFloatingWindows.produceIn(this)
val ticker = Ticker(durationMs, this)
val accumulator = mutableListOf<T>()

whileSelect {
upstream.onReceiveOrClosed.invoke { valueOrClosed ->
val isOpen = valueOrClosed.isClosed.not()
if (isOpen) {
if (accumulator.isEmpty()) ticker.send(Ticker.Message.Start)
accumulator.add(valueOrClosed.value)
if (accumulator.size == maxSize) {
ticker.send(Ticker.Message.Reset)
downstream.emit(accumulator.drain())
}
}
isOpen
}
ticker.onReceive.invoke {
if (accumulator.size >= minSize) downstream.emit(accumulator.drain())
true
}
}

ticker.close()
if (accumulator.size >= minSize) downstream.emit(accumulator)
}
}

private class Ticker(
private val intervalMs: Long,
scope: CoroutineScope,
private val inbox: Channel<Message> = Channel(),
private val ticks: Channel<Unit> = Channel()
) : SendChannel<Ticker.Message> by inbox, ReceiveChannel<Unit> by ticks {

init {
scope.processMessages()
}

private fun CoroutineScope.processMessages() = launch {
var ticker = setupTicks()
for (message in inbox) {
when (message) {
Message.Start -> ticker.start()
Message.Reset -> {
ticker.cancel()
ticker = setupTicks()
}
}
}
ticker.cancel()
ticks.cancel()
}

private fun CoroutineScope.setupTicks() = launch(start = CoroutineStart.LAZY) {
while (true) {
delay(intervalMs)
ticks.send(Unit)
}
}

sealed class Message {
object Start : Message()
object Reset : Message()
}
}

private sealed class Signal<out T> {
class NewElement<out T>(val value: T) : Signal<T>()
object TimeIsUp : Signal<Nothing>()
}

private fun <T> MutableList<T>.drain() = toList().also { this.clear() }
175 changes: 175 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/ChunkedTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.test.*
import kotlin.time.*

@ExperimentalTime
class ChunkedTest : TestBase() {

@Test
fun testEmptyFlowChunking() = runTest {
val emptyFlow = emptyFlow<Int>()
val result = measureTimedValue {
emptyFlow.chunked(10.seconds).toList()
}

assertTrue { result.value.isEmpty() }
assertTrue { result.duration.inSeconds < 1 }
}

@ExperimentalTime
@Test
fun testSingleFastElementChunking() = runTest {
val fastFlow = flow { emit(1) }

val result = measureTimedValue {
fastFlow.chunked(10.seconds).toList()
}

assertTrue { result.value.size == 1 && result.value.first().contains(1) }
assertTrue { result.duration.inSeconds < 1 }
}

@ExperimentalTime
@Test
fun testMultipleFastElementsChunking() = runTest {
val fastFlow = flow {
for(i in 1..1000) emit(1)
}

val result = measureTimedValue {
fastFlow.chunked(10.seconds).toList()
}

assertTrue { result.value.size == 1 && result.value.first().size == 1000 }
assertTrue { result.duration.inSeconds < 1 }
}

@Test
fun testFixedTimeWindowChunkingWithZeroMinimumSize() = withVirtualTime {
val intervalFlow = flow {
delay(1500)
emit(1)
delay(1500)
emit(2)
delay(1500)
emit(3)
}
val chunks = intervalFlow.chunked(2.seconds, minSize = 0).toList()

assertEquals (3, chunks.size)
assertTrue { chunks.all { it.size == 1 } }

finish(1)
}

@Test
fun testDefaultChunkingWithFloatingWindows() = withVirtualTime {
val intervalFlow = flow {
delay(1500)
emit(1)
delay(1500)
emit(2)
delay(1500)
emit(3)
}
val chunks = intervalFlow.chunked(2.seconds).toList()

assertEquals (2, chunks.size)
assertTrue { chunks.first().size == 2 && chunks[1].size == 1 }

finish(1)
}

@Test
fun testRespectingMinValue() = withVirtualTime {
val intervalFlow = flow {
delay(1500)
emit(1)
delay(1500)
emit(2)
delay(1500)
emit(3)
}
val chunks = intervalFlow.chunked(2.seconds, minSize = 3).toList()

assertTrue { chunks.size == 1 }
assertTrue { chunks.first().size == 3 }

finish(1)
}

@Test
fun testRespectingMaxValueWithContinousWindows() = withVirtualTime {
val intervalFlow = flow {
delay(1500)
emit(1)
emit(2)
emit(3)
emit(4)
delay(1500)
emit(5)
delay(1500)
emit(6)
}
val chunks = intervalFlow.chunked(2.seconds, minSize = 0, maxSize = 3).toList()

assertEquals(3, chunks.size)
assertEquals(3, chunks.first().size)
assertEquals(2, chunks[1].size)
assertTrue { chunks[1].containsAll(listOf(4, 5)) }

finish(1)
}

@Test
fun testRespectingMaxValueAndResetingTickerWithNonContinousWindows() = withVirtualTime {
val intervalFlow = flow {
delay(1500)
emit(1)
emit(2)
emit(3)
delay(1500)
emit(4)
emit(5)
delay(1500)
emit(6)
}
val chunks = intervalFlow.chunked(2.seconds, maxSize = 3).toList()

assertEquals(2, chunks.size)
assertEquals(3, chunks.first().size)
assertEquals(3, chunks[1].size)
assertTrue { chunks[1].containsAll(listOf(4, 5, 6)) }

finish(1)
}

@Test
fun testSizeBasedChunking() = runTest {
val flow = flow {
for (i in 1..10) emit(i)
}

val chunks = flow.chunked(maxSize = 3).toList()

assertEquals(4, chunks.size)
}

@Test
fun testSizeBasedChunkingWithMinSize() = runTest {
val flow = flow {
for (i in 1..10) emit(i)
}

val chunks = flow.chunked(maxSize = 3, minSize = 2).toList()

assertEquals(3, chunks.size)
}

}