diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt index 87d4ed844d..535536efd4 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Chunk.kt @@ -71,30 +71,31 @@ public fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean): * @param partialWindows controls whether or not to keep partial windows in the end if any. */ +@OptIn(ExperimentalStdlibApi::class) @FlowPreview public fun Flow.windowed(size: Int, step: Int, partialWindows: Boolean, transform: suspend (List) -> R): Flow { require(size > 0 && step > 0) { "Size and step should be greater than 0, but was size: $size, step: $step" } return flow { - val buffer = RingBuffer(size) + val buffer = ArrayDeque(size) val toDrop = min(step, size) val toSkip = max(step - size, 0) var skipped = toSkip collect { value -> - if(toSkip == skipped) buffer.add(value) + if (toSkip == skipped) buffer.addLast(value) else skipped++ - if (buffer.isFull()) { + if (buffer.size == size) { emit(transform(buffer)) - buffer.removeFirst(toDrop) + repeat(toDrop) { buffer.removeFirst() } skipped = 0 } } while (partialWindows && buffer.isNotEmpty()) { emit(transform(buffer)) - buffer.removeFirst(min(toDrop, buffer.size)) + repeat(min(toDrop, buffer.size)) { buffer.removeFirst() } } } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt index ca2c02297f..173c50caf2 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/WindowedTest.kt @@ -45,7 +45,7 @@ class WindowedTest : TestBase() { @Test fun `Emits correct number of overlapping windows for long sequence of overlapping partial windows`() = runTest { val elements = generateSequence(1) { it + 1 }.take(100) - val flow = elements.asFlow().windowed(100, 1, true) { } + val flow = elements.asFlow().windowed(100, 1, true) assertEquals(100, flow.count()) }