Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunked and windowed operators #1558

Closed

Conversation

circusmagnus
Copy link

Attempt to solve issue with lack of chunked operator with size limit: #1290

New functions:
fun <T, R> Flow<T>.chunked(size: Int, transform: suspend (List<T>) -> R): Flow<R>

fun <T, R> Flow<T>.windowed(
    size: Int, 
    step: Int, 
    partialWindows: Boolean, 
    transform: suspend (List<T>) -> R
): Flow<R>

Might be worth considering whether it is worth to accept transform parameter instead of just using
flow.chunked(2).map { ... }


private fun <T> makeWindow(step: Int, buffer: MutableList<T>): List<T> {
val window = ArrayList(buffer)
repeat(step) { buffer.removeAt(0) }
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if our buffer was ArrayDeque or similar entity which can be both iterated and has good performance, when removing elements at the head. Is it possible to expect ArrayDeque class, just as we expect ArrayList?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote yes. The poor performance and heap pressure that the current implementation will apply is very unsavory. You could also leave a reference to https://youtrack.jetbrains.com/issue/KT-21327 in a comment next to it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a List implementation called RingBuffer, which would suit us perfectly. It is however private inside kotlin collections. About 100 lines long.

I could also use ArrayQueue from within coroutines lib, but it is not possible to iterate over its content without removing elements (so - buffer would have to be cleared and partially filled again after each windowed emission).

What is kotlin`s team advice in such circumstances?

  • Stay with underperformant current version?
  • Drop windowed operator until we get proper public collection available?
  • Just write our own RingBuffer without unneeded parts (we likely do not need toArray() function)
  • Use existing ArrayQueue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I have decided to write our own RingBuffer impl (it is almost identical to the one from kotlin collections) and use it in windowed operator.

kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt Outdated Show resolved Hide resolved
kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt Outdated Show resolved Hide resolved

private fun <T> makeWindow(step: Int, buffer: MutableList<T>): List<T> {
val window = ArrayList(buffer)
repeat(step) { buffer.removeAt(0) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote yes. The poor performance and heap pressure that the current implementation will apply is very unsavory. You could also leave a reference to https://youtrack.jetbrains.com/issue/KT-21327 in a comment next to it.

import kotlinx.coroutines.flow.flow


fun <T, R> Flow<T>.chunked(size: Int, transform: suspend (List<T>) -> R): Flow<R> = nonOverlappingWindowed(size, size, true, transform)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this default to an identity transform?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

} else skipped++

if (window.size == size) {
emit(transform(window.toList()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation provides a copy of the current window, this force a new list on each iteration.

Sequence and Iterable pass an ephemeral list to transform's argument, I propose to apply the same criteria.

A not ephemeral list should be provided only if the transformer is missing, so flow(...).windowed(10).toList() is a valid statement.

Will be always possible to emulate the current behaviour with: flow(...).windowed(...).map(...).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@zach-klippenstein
Copy link
Contributor

Might be worth considering whether it is worth to accept transform parameter instead of just using flow.chunked(2).map { ... }

I would vote for removing the transform parameter in the interest of limiting each operator to a single concern. If this operator is being used because some downstream API actually needs lists, or will be immediately chained with a different transform like flatMap, then the transform parameter is unnecessary.

@glasser
Copy link
Contributor

glasser commented Feb 29, 2020

Any chance this will get merged any time soon? Or for that matter — does this look basically good enough that copying it into my app isn't a bad idea?

@qwwdfsad qwwdfsad force-pushed the develop branch 3 times, most recently from 4a49830 to aff8202 Compare March 10, 2020 17:27
Copy link
Contributor

@elizarov elizarov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other general comments:

  • public modified shall be explicitly added to function.
  • New functions shall be marked with @FlowPreview.
  • API dump shall be updated (use ./gradlew apiDump)

kotlinx-coroutines-core/common/src/internal/RingBuffer.kt Outdated Show resolved Hide resolved
kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt Outdated Show resolved Hide resolved
@circusmagnus circusmagnus force-pushed the flow-chunked-operators branch from 9ff3052 to c74bfcf Compare March 18, 2020 23:35
@circusmagnus circusmagnus requested a review from elizarov March 18, 2020 23:49
@circusmagnus circusmagnus requested a review from elizarov March 31, 2020 21:00
val toSkip = max(step - size, 0)
var skipped = toSkip

collect { value ->
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick observation. If the upstream flow throws an exception, you could end up with a partial buffer here.
what are your thoughts of adding this before the collect

catch { e ->
  if(partialWindows){
    emit(transform(buffer))
  }
  throw e
}.collect { value -> 

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fear, that we can loose exception this way or otherwise obscure it.

What if emitting partial buffer gets cancelled on some suspension point, before we rethrow exception? Or even, what if exception will be thrown from emitting our partial buffer, before we manage to rethrow original exception? Perhaps we could fix this by using a finally block somewhere. But, I think, it would be simpler, to just let exception propagate right away, and loose unfortunate partial buffer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loosing partial buffer is somewhat analogous to coroutineScope cancelling all its children immediately, on detecting exception anywhere. It is just more consistent this way.

@circusmagnus
Copy link
Author

@elizarov What are the perspectives for this PR?

@elizarov
Copy link
Contributor

We don't plan to have just a simple windowed and chunked operators from stdlib for Kotlin Flow, because asynchrony adds a whole another dimension of time period measurement. However, all time-based operators require a common, coherent design. We want to design a minimal number of basic composed primitives as opposed to many tailor-fit operators.
We'll be undertaking this design later.

For now, you are welcome to have your own implementations of the corresponding operators in your code.

Having said that, I'm closing this PR for now.

@elizarov elizarov closed this Jul 16, 2020
@glasser
Copy link
Contributor

glasser commented Jul 16, 2020

I'm confused about how this is time-based: isn't it based on chunk lengths?

@circusmagnus
Copy link
Author

Sure thing. Wise decision not to chase reactive libraries for operator count.

@elizarov
Copy link
Contributor

I'm confused about how this is time-based: isn't it based on chunk lengths?

@glasser This implementation, just like windowed and chunked from stdlib is not time-based, but size-based only. However, there are use-cases that call for time-based operators (see #1302). We don't want separate operators for that. We want to have a consistent chunking/windowing framework with flexible time- and size-based cutoffs, to minimize the number of concepts and operators users will have to learn.

@wafisher
Copy link

Just confirming that if and when this code is done, it would replace this solution?

@mwspellman
Copy link

Hey, just wanted to confirm if this is under the Apache 2.0 License? being the same as the kotlin libraries (https://kotlinlang.org/docs/contribute.html). Would like to use it, but wanted to clarify the licensing situation.

Thank you! I have found this very useful for my use case.

@circusmagnus
Copy link
Author

@mwspellman As far as I am concerned (as a PR author), you can use this code under Apache 2.0 license.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants