Skip to content

Commit

Permalink
Use ArrayDeque from stdlib instead of custom RingBuffer
Browse files Browse the repository at this point in the history
Minor fix in test
  • Loading branch information
circusmagnus committed Mar 18, 2020
1 parent 6155c22 commit c74bfcf
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
11 changes: 6 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,31 @@ public fun <T> Flow<T>.windowed(size: Int, step: Int, partialWindows: Boolean):
* @param partialWindows controls whether or not to keep partial windows in the end if any.
*/

@OptIn(ExperimentalStdlibApi::class)
@FlowPreview
public fun <T, R> Flow<T>.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List<T>) -> R): Flow<R> {
require(size > 0 && step > 0) { "Size and step should be greater than 0, but was size: $size, step: $step" }

return flow {
val buffer = RingBuffer<T>(size)
val buffer = ArrayDeque<T>(size)
val toDrop = min(step, size)
val toSkip = max(step - size, 0)
var skipped = toSkip

collect { value ->
if(toSkip == skipped) buffer.add(value)
if (toSkip == skipped) buffer.addLast(value)
else skipped++

if (buffer.isFull()) {
if (buffer.size == size) {
emit(transform(buffer))
buffer.removeFirst(toDrop)
repeat(toDrop) { buffer.removeFirst() }
skipped = 0
}
}

while (partialWindows && buffer.isNotEmpty()) {
emit(transform(buffer))
buffer.removeFirst(min(toDrop, buffer.size))
repeat(min(toDrop, buffer.size)) { buffer.removeFirst() }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class WindowedTest : TestBase() {
@Test
fun `Emits correct number of overlapping windows for long sequence of overlapping partial windows`() = runTest {
val elements = generateSequence(1) { it + 1 }.take(100)
val flow = elements.asFlow().windowed(100, 1, true) { }
val flow = elements.asFlow().windowed(100, 1, true)
assertEquals(100, flow.count())
}

Expand Down

0 comments on commit c74bfcf

Please sign in to comment.