Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce basic Flow<T>.chunked operator #4127

Merged
merged 3 commits into from
May 27, 2024
Merged

Introduce basic Flow<T>.chunked operator #4127

merged 3 commits into from
May 27, 2024

Conversation

qwwdfsad
Copy link
Member

Fixes #1290

@qwwdfsad qwwdfsad requested a review from dkhalanskyjb May 10, 2024 13:37
@qwwdfsad
Copy link
Member Author

qwwdfsad commented May 10, 2024

A few things about this PR:

  • It's a long overdue we introduce chunked operator, and whatever we do, chunked(n) is likely to be here as the shorthand syntax
  • It also might bootstrap the discussion and break our analysis paralysis
  • transform version (chunked(n, transform)) is not added even though the stdlib has one -- we have a strong preference for proper separation of concerns. The documentation sample provides a clear substitute
  • Meanwhile, I'll experiment with duration overload, but it's less straightforward

API shape brain dump (eventually I'll move it towards a better-structured issue, but not there yet):

There exists a whole bunch of possibilities, use cases and things full-blown chunked family might/should address

  • Kotlin's windowed -- the same, but buffers are overlapped
  • Reactive windowed -- the same as chunked, but it yields Flow<Flow<T>>. Operator to chunk a single Flow<T> into a Flow<Flow<T>> #2989
  • Time-based chunking:
    • The most straightforward and common -- keep the buffer open for the given duration, then emit it if it's not empty. Start the new buffer
    • A whole variety of supplemental things -- with time skips, predicate-based buffering
  • Batching: group while you can (or the limit exceeded), emit it and wait the next element.

The problem with time is that it requires the actual concurrency, while the size-based version does not. I'm still exploring the possibility of something like chunkedTransform { /* decide whether it's time-based or not */ }, but not much of ideas for now.

@dkhalanskyjb dkhalanskyjb self-requested a review May 10, 2024 14:34
Co-authored-by: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com>

/**
* Splits the given flow into a flow of non-overlapping lists each not exceeding the given [size] but never empty.
* The last emitted list may have fewer elements than the given size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please name the parameter maxSize then

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my original intent, but stdlib disagrees on that: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/chunked.html

A nice thing to consider tho

@qwwdfsad qwwdfsad merged commit c9c735a into develop May 27, 2024
1 check passed
@qwwdfsad qwwdfsad deleted the chunked-initial branch May 27, 2024 12:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants