diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt index 29aa02d69..de1801183 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt @@ -74,7 +74,6 @@ internal suspend fun runInner( outputQ.enqueue1(Some(s)) } .interruptWhen(done.map { it.isDefined() }) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enq last item while being interrupted - .compile() .drain() }.swap().orNull() val e2 = lease.cancel().swap().orNull() @@ -104,7 +103,6 @@ internal suspend fun Stream>.runOuter( runInner(ctx, inner, done, outputQ, running, available, outerScope) } }.interruptWhen(done.map { it.isDefined() }) - .compile() .drain() } @@ -165,7 +163,6 @@ internal suspend fun signalResult(done: SignallingAtomic Stream>.parJoin( running.discrete() // Await everyone stop running .dropWhile { it > 0 } .take(1) - .compile() .drain() signalResult(done) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Pull.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Pull.kt index a6358eeb5..c56876a27 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Pull.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Pull.kt @@ -590,19 +590,6 @@ fun Pull.firstOrNull(f: (O) -> Boolean): Pull Pull.lastOrNull(): Pull { - fun go(prev: O?, s: Pull): Pull = - s.unconsOrNull().flatMap { uncons -> - when (uncons) { - null -> Pull.just(prev) - else -> go(uncons.head.lastOrNull() ?: prev, uncons.tail) - } - } - - return go(null, this) -} - /** Writes a single `true` value if all input matches the predicate, `false` otherwise. */ fun Pull.forall(p: (O) -> Boolean): Pull = unconsOrNull().flatMap { uncons -> diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ResourceTerminalOps.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ResourceOps.kt similarity index 53% rename from arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ResourceTerminalOps.kt rename to arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ResourceOps.kt index 72f15d7fd..82d36d7bd 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ResourceTerminalOps.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ResourceOps.kt @@ -2,12 +2,55 @@ package arrow.fx.coroutines.stream import arrow.fx.coroutines.Resource +/** Opens DSL to consume [Stream] as a [Resource]. */ +fun Stream.asResource(): ResourceOps = + ResourceOps(this) + /** - * DSL boundary to access terminal operators as resources + * DSL boundary to access terminal operators as a [Resource] + * Allows for consume a [Stream] as a [Resource], + * meaning the root scope of the [Stream] remains open until [Resource.use] returns. + * + * This allows for safe consumption of streaming resources in terminal operators, + * and inside the [Resource.use] combinator. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * import arrow.fx.coroutines.Atomic + * + * class Logger { + * private val state = Atomic.unsafe(emptyList()) + * + * suspend fun log(msg: String): Unit = + * state.update { it + msg } + * + * suspend fun dumpLog(): Unit = + * println(state.get()) + * } + * + * fun openFileWithName(name: String): String = + * "File($name)" + * + * //sampleStart + * suspend fun main(): Unit { + * val logger = Logger() + * + * Stream.bracket({ openFileWithName("a") }, { name -> logger.log("finalizing: $name") }) + * .append { Stream.bracket({ openFileWithName("b") }, { name -> logger.log("finalizing: $name") }) } + * .asResource() + * .lastOrError() + * .use { last -> logger.log("Using $last") } + * + * logger.dumpLog() // [finalizing: File(a), Using File(b), finalizing: File(b)] + * } + * //sampleEnd + * ``` * - * Terminal operators consume the stream - */ // TODO report inline results in Exception in thread "main" java.lang.VerifyError: Bad type on operand stack -/* inline */ class ResourceTerminalOps(private val s: Stream) { + * As you can see here, we can `use` the `last` streamed `File` before it gets closed by the Stream. + * Since we consumed the Stream as `asResource().lastOrError()`, this extends the last scope to the returned `Resource`, + * so we can safely `use` it and the `Stream` still properly closes all resources opened with `bracket`. + */ +class ResourceOps(private val s: Stream) { suspend fun toList(): Resource> = compiler(mutableListOf()) { acc, ch -> acc.apply { addAll(ch.toList()) } } diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt index 34719b074..7482f32b5 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt @@ -47,7 +47,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3) * .flatMap { i -> Stream(i, i, i) } - * .compile() * .toList() * .let(::println) // [1, 1, 1, 2, 2, 2, 3, 3, 3] * //sampleEnd @@ -68,7 +67,6 @@ inline fun StreamOf.fix(): Stream = * .append { Stream.raiseError(RuntimeException("Boom!")) } * .append { Stream(4, 5, 6) } * .attempt() - * .compile() * .toList() * .let(::println) // [Right(b=1), Right(b=2), Right(b=3), Left(a=java.lang.RuntimeException: Boom!)] * //sampleEnd @@ -109,7 +107,6 @@ inline fun StreamOf.fix(): Stream = * .buffer(4) * .effectTap { i -> buf.add("<$i") } * .take(10) - * .compile() * .toList() * .let(::println) // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] * @@ -143,7 +140,6 @@ inline fun StreamOf.fix(): Stream = * .bufferAll() * .effectTap { i -> buf.add("<$i") } * .take(4) - * .compile() * .toList() * .let(::println) // [0, 1, 2, 3] * @@ -170,7 +166,6 @@ inline fun StreamOf.fix(): Stream = * .effectTap { i -> buf.add(">$i") } * .bufferBy { it % 2 == 0 } * .effectTap { i -> buf.add("<$i") } - * .compile() * .toList() * .let(::println) // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] * @@ -220,7 +215,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(1..10) * .delete { it % 2 == 0 } - * .compile() * .toList() * .let(::println) // [1, 3, 4, 5, 6, 7, 8, 9, 10] * //sampleEnd @@ -249,7 +243,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..10) * .drop(5) - * .compile() * .toList() * .let(::println) // [5, 6, 7, 8, 9, 10] * //sampleEnd @@ -268,7 +261,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..5) * .tail() - * .compile() * .toList() * .let(::println) // [1, 2, 3, 4, 5] * //sampleEnd @@ -290,7 +282,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..10) * .dropLast(5) - * .compile() * .toList() * .let(::println) // [0, 1, 2, 3, 4] * //sampleEnd @@ -328,7 +319,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..10) * .dropWhile { it != 4 } - * .compile() * .toList() * .let(::println) // [4, 5, 6, 7, 8, 9, 10] * //sampleEnd @@ -347,7 +337,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..10) * .dropThrough { it != 4 } - * .compile() * .toList() * .let(::println) // [5, 6, 7, 8, 9, 10] * //sampleEnd @@ -373,7 +362,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..1000) * .take(5) - * .compile() * .toList() * .let(::println) // [0, 1, 2, 3, 4] * //sampleEnd @@ -383,23 +371,29 @@ inline fun StreamOf.fix(): Stream = asPull.take(n).void().stream() /** - * Emits the first element this stream. + * Emits the last `n` elements of the input. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* * * //sampleStart * suspend fun main(): Unit = - * Stream.range(0..1000) - * .first() - * .compile() + * Stream.empty() + * .takeLastOrNull(5) * .toList() - * .let(::println) // [0] + * .let(::println) // [null] * //sampleEnd * ``` */ - fun first(): Stream = - take(1) + fun takeLastOrNull(n: Int): Stream = + asPull + .takeLast(n) + .flatMap { chunkQueue -> + if (chunkQueue.isEmpty()) Pull.output1(null) + else chunkQueue.chunks.fold(Pull.done()) { acc, c -> + acc.flatMap { Pull.output(c) } + } + }.stream() /** * Emits the last `n` elements of the input. @@ -411,7 +405,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..1000) * .takeLast(5) - * .compile() * .toList() * .let(::println) // [996, 997, 998, 999, 1000] * //sampleEnd @@ -436,7 +429,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(null, 1, 2, null, 3) * .first { it != null } - * .compile() * .toList() * .let(::println) // [1] * //sampleEnd @@ -453,25 +445,6 @@ inline fun StreamOf.fix(): Stream = fun find(f: (O) -> Boolean): Stream = first(f) - /** - * Returns the last element of this stream, if non-empty. - * - * ```kotlin:ank:playground - * import arrow.fx.coroutines.stream.* - * - * //sampleStart - * suspend fun main(): Unit = - * Stream(1, 2, 3) - * .last() - * .compile() - * .toList() - * .let(::println) // [3] - * //sampleEnd - * ``` - */ - fun last(): Stream = - asPull.lastOrNull().flatMap(Pull.Companion::output1).stream() - /** * Emits `true` as soon as a matching element is received, else `false` if no input matches. * @@ -481,10 +454,10 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit { * Stream.range(0..10).exists { it == 4 } - * .compile().toList().let(::println) //[true] + * .toList().let(::println) //[true] * * Stream.range(0..10).exists { it == 11 } - * .compile().toList().let(::println) //[false] + * .toList().let(::println) //[false] * } * //sampleEnd * ``` @@ -512,7 +485,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..1000) * .takeWhile { it != 5 } - * .compile() * .toList() * .let(::println) // [0, 1, 2, 3, 4] * //sampleEnd @@ -531,7 +503,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..1000) * .takeThrough { it != 5 } - * .compile() * .toList() * .let(::println) // [0, 1, 2, 3, 4, 5] * //sampleEnd @@ -552,7 +523,6 @@ inline fun StreamOf.fix(): Stream = * Stream(1,2,3) * .repeat() * .take(8) - * .compile() * .toList() * .let(::println) // [1, 2, 3, 1, 2, 3, 1, 2] * //sampleEnd @@ -573,7 +543,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1,2,3) * .repeatN(3) - * .compile() * .toList() * .let(::println) // [1, 2, 3, 1, 2, 3, 1, 2, 3] * //sampleEnd @@ -593,7 +562,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3) * .effectMap { print(it) } - * .compile() * .drain() // 123 * //sampleEnd * ``` @@ -612,7 +580,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3) * .effectTap { print(it) } // 123 - * .compile() * .toList() * .let(::println) // [1, 2, 3] * //sampleEnd @@ -635,7 +602,6 @@ inline fun StreamOf.fix(): Stream = * sleep((i * 10).milliseconds) * Pair(i, acc + i) * } - * .compile() * .toList() * .let(::println) // [(1,1), (2,3), (3,5), (4,7)] * //sampleEnd @@ -667,7 +633,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream("Hello", "World!") * .map(String::length) - * .compile() * .toList().let(::println) // [5, 6] * //sampleEnd * ``` @@ -685,7 +650,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3).append { Stream(4, 5, 6) } * .mapChunks { ch -> ch.map { it + 1 } } - * .compile() * .toList().let(::println) // [2, 3, 4, 5, 6, 7] * //sampleEnd * ``` @@ -710,7 +674,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream("Hello", "World") * .mapAccumulate(0) { l, s -> Pair(l + s.length, s.first()) } - * .compile() * .toList() * .let(::println) //[(5,H), (10,W)] * //sampleEnd @@ -735,7 +698,6 @@ inline fun StreamOf.fix(): Stream = * Stream(1,2,3).append { Stream(4,5,6) } * .unchunk() * .chunks() - * .compile() * .toList() * .let(::println) //[Chunk(1), Chunk(2), Chunk(3), Chunk(4), Chunk(5), Chunk(6)] * //sampleEnd @@ -761,7 +723,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1).append { Stream(2, 3).append { Stream(4, 5, 6) } } * .chunks() - * .compile() * .toList() * .let(::println) //[Chunk(1), Chunk(2, 3), Chunk(4, 5, 6)] * //sampleEnd @@ -787,7 +748,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1).append { Stream(2, 3).append { Stream(4, 5, 6) } } * .chunkLimit(2) - * .compile() * .toList() * .let(::println) //[Chunk(1), Chunk(2, 3), Chunk(4, 5), Chunk(6)] * //sampleEnd @@ -818,7 +778,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 2).append { Stream(3, 4).append { Stream(5, 6, 7) } } * .chunkMin(3) - * .compile() * .toList() * .let(::println) //[Chunk(1, 2, 3, 4), Chunk(5, 6, 7)] * //sampleEnd @@ -865,7 +824,6 @@ inline fun StreamOf.fix(): Stream = * .repeat() * .chunkN(2) * .take(6) - * .compile() * .toList() * .let(::println) //[Chunk(1, 2), Chunk(3, 1), Chunk(2, 3), Chunk(1, 2), Chunk(3, 1), Chunk(2)] * //sampleEnd @@ -891,7 +849,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.range(0..10) * .filter { it % 2 == 0 } - * .compile() * .toList() * .let(::println) //[0, 2, 4, 6, 8] * //sampleEnd @@ -917,7 +874,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, -1, 2, -2, 3, -3, 4, -4) * .filterWithPrevious { previous, current -> previous < current } - * .compile() * .toList() * .let(::println) //[1, 2, 3, 4] * //sampleEnd @@ -965,7 +921,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3, 4, 5) * .fold(0) { acc, b -> acc + b } - * .compile() * .toList() * .let(::println) //[15] * //sampleEnd @@ -987,7 +942,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 1, 1, 1, 1) * .foldMap(Int.monoid()) { it + 1 } - * .compile() * .toList() * .let(::println) //[10] * //sampleEnd @@ -1008,7 +962,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1,2,3,4) * .effectScan(0) { acc,i -> acc + i } - * .compile() * .toList() * .let(::println) //[0, 1, 3, 6, 10] * //sampleEnd @@ -1044,7 +997,6 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit = * Stream(1, 2, 3).zip(Stream(4, 5, 6, 7)) - * .compile() * .toList() * .let(::println) //[(1,4), (2,5), (3,6)] * //sampleEnd @@ -1078,7 +1030,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3) * .zipWith(Stream(4, 5, 6, 7)) { acc, b -> acc + b } - * .compile() * .toList() * .let(::println) //[5, 7, 9] * //sampleEnd @@ -1102,7 +1053,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream("The", "quick", "brown", "fox") * .zipWithIndex() - * .compile() * .toList() * .let(::println) //[(The,0), (quick,1), (brown,2), (fox,3)] * //sampleEnd @@ -1131,7 +1081,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream("The", "quick", "brown", "fox") * .zipWithNext() - * .compile() * .toList() * .let(::println) //[(The,quick), (quick,brown), (brown,fox), (fox,null)] * //sampleEnd @@ -1170,7 +1119,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream("The", "quick", "brown", "fox") * .zipWithPrevious() - * .compile() * .toList() * .let(::println) //[(null,The), (The,quick), (quick,brown), (brown,fox)] * //sampleEnd @@ -1193,7 +1141,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream("The", "quick", "brown", "fox") * .zipWithPreviousAndNext() - * .compile() * .toList() * .let(::println) //[(null,The, quick), (The, quick,brown), (quick,brown,fox), (brown,fox,null)] * //sampleEnd @@ -1215,7 +1162,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream("uno", "dos", "tres", "cuatro") * .zipWithScan(0) { acc, b -> acc + b.length } - * .compile() * .toList() * .let(::println) //[(uno,0), (dos,3), (tres,6), (cuatro,10)] * //sampleEnd @@ -1240,7 +1186,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream("uno", "dos", "tres", "cuatro") * .zipWithScan1(0) { acc, b -> acc + b.length } - * .compile() * .toList() * .let(::println) //[(uno,3), (dos,6), (tres,10), (cuatro,16)] * //sampleEnd @@ -1266,7 +1211,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3, 4, 5) * .forall { it < 10 } - * .compile() * .toList() * .let(::println) //[true] * //sampleEnd @@ -1296,13 +1240,12 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit = * Stream(1, 2, 3, 4) - * .drain() - * .compile() + * .void() * .toList().let(::println) //[] * //sampleEnd * ``` */ - fun drain(): Stream = + fun void(): Stream = mapChunks { Chunk.empty() } /** @@ -1315,7 +1258,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1,2,3).append { Stream.raiseError(RuntimeException()).append { Stream(4, 5, 6) } } * .mask() - * .compile() * .toList().let(::println) //[1, 2, 3] * //sampleEnd * ``` @@ -1355,7 +1297,6 @@ inline fun StreamOf.fix(): Stream = * signalling.discrete() * .concurrently(data.effectMap { signalling.set(it) }) * .takeWhile { it < 9 } - * .compile() * .toList() * .let(::println) //[0, 1, 2, 3, 4, 5, 6, 7, 8] * } @@ -1382,7 +1323,6 @@ inline fun StreamOf.fix(): Stream = val r = Either.catch { other .interruptWhen { Either.catch { interrupt.get() } } - .compile() .drain() } @@ -1406,7 +1346,7 @@ inline fun StreamOf.fix(): Stream = * Starts this stream and cancels it as finalization of the returned stream. */ fun spawn(ctx: CoroutineContext = ComputationPool): Stream> = - supervise(ctx) { compile().drain() } + supervise(ctx) { drain() } /** * Run the supplied effectful action at the end of this stream, regardless of how the stream terminates. @@ -1486,7 +1426,6 @@ inline fun StreamOf.fix(): Stream = haltWhenTrue .takeWhile(Boolean::not) .interruptWhen { Right(interruptR.get()) } - .compile() .drain() }, { ex -> val r = when (ex) { @@ -1542,11 +1481,11 @@ inline fun StreamOf.fix(): Stream = effect { arrow.fx.coroutines.sleep(d) } /** - * Alias for `sleep(d).drain`. Often used in conjunction with `++` (i.e., `sleep_(..) ++ s`) as a more - * performant version of `sleep(..) >> s`. + * Alias for `sleep(d).void`. Often used in conjunction with [append] (i.e., `sleep_(..).append { s }`) as a more + * performant version of `sleep(..).flatMap { s }`. */ fun sleep_(d: Duration): Stream = - sleep(d).drain() + sleep(d).void() /** * Creates a single element stream that gets its value by evaluating the supplied effect. @@ -1560,13 +1499,11 @@ inline fun StreamOf.fix(): Stream = * //sampleEnd * suspend fun main(): Unit { * Stream.effect { 10 } - * .compile() * .toList() * .let(::println) // [10] * * Either.catch { * Stream.effect { throw RuntimeException() } - * .compile() * .toList() * }.let(::println) // Left(java.lang.RuntimeException) * } @@ -1588,7 +1525,6 @@ inline fun StreamOf.fix(): Stream = * //sampleEnd * suspend fun main(): Unit = * Stream.effect_ { println("Ran") } - * .compile() * .toList() * .let(::println) // [] * //sampleStart @@ -1613,7 +1549,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit { * Stream.constant(0) * .take(5) - * .compile() * .toList() * .let(::println) // [0, 0, 0, 0, 0] * } @@ -1632,7 +1567,6 @@ inline fun StreamOf.fix(): Stream = * //sampleEnd * suspend fun main(): Unit = * Stream.force { Stream(1, 2, 3) } - * .compile() * .toList() * .let(::println) // [1, 2, 3] * //sampleStart @@ -1658,7 +1592,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream.iterateEffect(0) { it + 1 } * .take(10) - * .compile() * .toList() * .let(::println) // [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] * //sampleStart @@ -1684,7 +1617,6 @@ inline fun StreamOf.fix(): Stream = * //sampleEnd * suspend fun main(): Unit = * Stream.unfold(0) { i -> if (i < 5) Pair(i, i + 1) else null } - * .compile() * .toList() * .let(::println) //[0, 1, 2, 3, 4] * //sampleStart @@ -1724,7 +1656,6 @@ inline fun StreamOf.fix(): Stream = * if (i < 5) Pair(Chunk(i) { i }, i + 1) * else null * } - * .compile() * .toList() * .let(::println) //[1, 2, 2, 3, 3, 3, 4, 4, 4, 4] * //sampleEnd @@ -1762,7 +1693,6 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit = * Stream.just(1) - * .compile() * .toList() * .let(::println) //[0] * //sampleEnd @@ -1853,7 +1783,6 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit = * Stream.range(0..20 step 2) - * .compile() * .toList().let(::println) // [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] * //sampleEnd * ``` @@ -1874,7 +1803,6 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit = * Stream.range(0L..15 step 3) - * .compile() * .toList().let(::println) // [0, 3, 6, 9, 12, 15] * //sampleEnd * ``` @@ -1895,7 +1823,6 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit = * Stream.range('a'..'z' step 2) - * .compile() * .toList() * .let(::println) // [a, c, e, g, i, k, m, o, q, s, u, w, y] * //sampleEnd @@ -1924,7 +1851,7 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit = * timeOutOrNull(1.seconds) { - * Stream.never().compile().drain() + * Stream.never().drain() * }.let(::println) * //sampleEnd * ``` @@ -1941,7 +1868,6 @@ inline fun StreamOf.fix(): Stream = * //sampleStart * suspend fun main(): Unit = * Stream.emits(1, 2, 3) - * .compile() * .toList() * .let(::println) // [1, 2, 3] * //sampleEnd @@ -1989,7 +1915,6 @@ inline fun StreamOf.fix(): Stream = * suspend fun main(): Unit = * Stream(1,2,3) * .zipAll(Stream(4,5,6,7), 0,0) - * .compile() * .toList().let(::println) // [(1,4), (2,5), (3,6), (0,7)] * //sampleEnd *``` @@ -2009,7 +1934,6 @@ fun Stream.zipAll(that: Stream, pad1: O, pad2: B): Stream acc + b } - * .compile() * .toList().let(::println) // [5, 7, 9, 7] * //sampleEnd *``` @@ -2067,7 +1991,6 @@ fun Stream.zipAllWith( * //sampleStart * suspend fun main(): Unit = * Stream(1, 2, 3).onComplete { Stream(4, 5) } - * .compile() * .toList().let(::println) // [1, 2, 3, 4, 5] * //sampleEnd *``` @@ -2076,32 +1999,6 @@ fun Stream.onComplete(s2: () -> Stream): Stream = handleErrorWith { e -> s2.invoke().append { Stream.raiseError(e) } } .append(s2) -/** - * Returns the last element of this stream, if non-empty, otherwise the supplied `fallback` value. - * - * ```kotlin:ank:playground - * import arrow.fx.coroutines.stream.* - * - * //sampleStart - * suspend fun main(): Unit { - * Stream(1, 2, 3) - * .lastOr { 0 } - * .compile() - * .toList().let(::println) // [3] - * - * Stream.empty() - * .lastOr { 0 } - * .compile() - * .toList().let(::println) // [0] - * } - * //sampleEnd - *``` - */ -fun Stream.lastOr(fallback: () -> O): Stream = - asPull.lastOrNull().flatMap { - it?.let(Pull.Companion::output1) ?: Pull.output1(fallback.invoke()) - }.stream() - /** * Repartitions the input with the function `f`. On each step `f` is applied * to the input and all elements but the last of the resulting sequence @@ -2116,7 +2013,6 @@ fun Stream.lastOr(fallback: () -> O): Stream = * suspend fun main(): Unit = * Stream("Hel", "l", "o Wor", "ld") * .repartition(String.semigroup()) { s -> Chunk.iterable(s.split(" ")) } - * .compile() * .toList() * .let(::println) //[Hello, World] * //sampleEnd @@ -2154,7 +2050,6 @@ fun Stream.repartition(S: Semigroup, f: (O) -> Chunk): Stream = * suspend fun main(): Unit = * Stream(1, 2, null, 3, None) * .filterNotNull() - * .compile() * .toList() * .let(::println) //[1, 2, 3] * //sampleEnd @@ -2173,7 +2068,6 @@ fun Stream.filterNotNull(): Stream = * suspend fun main(): Unit = * Stream(1, 2, null, 3, null) * .terminateOnNull() - * .compile() * .toList() * .let(::println) //[1, 2] * //sampleEnd @@ -2192,7 +2086,6 @@ fun Stream.terminateOnNull(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3, 4) * .terminateOn { it == 3 } - * .compile() * .toList() * .let(::println) //[1, 2] * //sampleEnd @@ -2226,7 +2119,6 @@ fun Stream.terminateOn(terminator: (O) -> Boolean): Stream = * suspend fun main(): Unit = * Stream(Some(1), Some(2), None, Some(3), None) * .filterOption() - * .compile() * .toList() * .let(::println) //[1, 2, 3] * //sampleEnd @@ -2247,7 +2139,6 @@ fun Stream>.filterOption(): Stream = * suspend fun main(): Unit = * Stream(Some(1), Some(2), None, Some(3), None) * .terminateOnNone() - * .compile() * .toList() * .let(::println) //[1, 2] * //sampleEnd @@ -2276,12 +2167,6 @@ fun Stream>.terminateOnNone(): Stream = fun Stream.noneTerminate(): Stream> = map { Some(it) }.append { Stream.just(None) } -/** - * Turn [Stream] into [TerminalOps] to consume the stream. - */ -fun Stream.compile(): TerminalOps = - TerminalOps(this) - private val EmptyStream: Stream = Stream(Pull.done) fun emptyStream(): Stream = @@ -2297,7 +2182,6 @@ fun emptyStream(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3) * .interleave(Stream(4, 5, 6, 7)) - * .compile() * .toList() * .let(::println) //[1, 4, 2, 5, 3, 6] * //sampleEnd @@ -2316,7 +2200,6 @@ fun Stream.interleave(that: Stream): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3) * .interleaveAll(Stream(4, 5, 6, 7)) - * .compile() * .toList() * .let(::println) //[1, 4, 2, 5, 3, 6, 7] * //sampleEnd @@ -2339,7 +2222,6 @@ fun Stream.interleaveAll(that: Stream): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3, 4, 5) * .intersperse(0) - * .compile() * .toList() * .let(::println) //[1, 0, 2, 0, 3, 0, 4, 0, 5] * //sampleEnd @@ -2370,7 +2252,6 @@ fun Stream.intersperse(separator: O): Stream = * //sampleStart * suspend fun main(): Unit = * Stream(1, 2, 3).append { Stream(4, 5, 6) } - * .compile() * .toList().let(::println) // [1, 2, 3, 4, 5, 6] * //sampleEnd *``` @@ -2387,7 +2268,6 @@ fun Stream.append(s2: () -> Stream): Stream = * //sampleStart * suspend fun main(): Unit = * (Chunk(-1, 0) prependTo Stream(1, 2, 3)) - * .compile() * .toList().let(::println) // [-1, 0, 1, 2, 3] * //sampleEnd * ``` @@ -2404,7 +2284,6 @@ infix fun Chunk.prependTo(s: Stream): Stream = * //sampleStart * suspend fun main(): Unit = * Stream(1, 2, 3).cons(Chunk(-1, 0)) - * .compile() * .toList().let(::println) // [-1, 0, 1, 2, 3] * //sampleEnd * ``` @@ -2421,7 +2300,6 @@ fun Stream.cons(c: Chunk): Stream = * //sampleStart * suspend fun main(): Unit = * (0 prependTo Stream(1, 2, 3)) - * .compile() * .toList().let(::println) // [0, 1, 2, 3] * //sampleEnd * ``` @@ -2438,7 +2316,6 @@ infix fun O.prependTo(s: Stream): Stream = * //sampleStart * suspend fun main(): Unit = * Stream(1, 2, 3).cons1(0) - * .compile() * .toList() * . let(::println) // [0, 1, 2, 3] * //sampleEnd @@ -2458,7 +2335,6 @@ fun Stream.cons1(o: O): Stream = * Stream(1, 2, 3) * .append { Stream.raiseError(RuntimeException()) } * .handleErrorWith { _: Throwable -> Stream.just(0) } - * .compile() * .toList() * .let(::println) // [1, 2, 3, 0] * //sampleEnd @@ -2482,7 +2358,6 @@ fun Stream>.flatten(): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3, 4, 5) * .fold1 { a, b -> a + b } - * .compile() * .toList() * .let(::println) // [15] * //sampleEnd @@ -2519,7 +2394,6 @@ fun Stream.reduceSemigroup(S: Semigroup): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3, 4, 5) * .foldMonoid(Int.monoid()) - * .compile() * .toList() * .let(::println) // [15] * //sampleEnd @@ -2540,7 +2414,6 @@ fun Stream.foldMonoid(MO: Monoid): Stream = * suspend fun main(): Unit = * Stream(1,2,3,4) * .scan(0) { a, b -> a + b } - * .compile() * .toList() * .let(::println) // [0, 1, 3, 6, 10] * //sampleEnd @@ -2566,7 +2439,6 @@ fun Stream.scan(init: O2, f: (O2, O) -> O2): Stream = * suspend fun main(): Unit = * Stream(1, 2, 3, 4) * .scanMonoid(Int.monoid()) - * .compile() * .toList() * .let(::println) //[0, 1, 3, 6, 10] * //sampleEnd @@ -2586,7 +2458,6 @@ fun Stream.scanMonoid(MO: Monoid): Stream = * suspend fun main(): Unit = * Stream("a", "aa", "aaa", "aaaa") * .scanMap(Int.monoid()) { it.length } - * .compile() * .toList() * .let(::println) //[0, 1, 3, 6, 10] * //sampleEnd @@ -2616,7 +2487,6 @@ private fun Pull.scan_(init: O2, f: (O2, O) -> O2): Pull a + b } - * .compile() * .toList() * .let(::println) //[1, 3, 6, 10] * //sampleEnd @@ -2666,7 +2536,6 @@ fun Stream.scanChunks( * * suspend fun main(): Unit = * Stream.range(0..100).take(5) - * .compile() * .toList().let(::println) // [0, 1, 2, 3, 4] * //sampleEnd * ``` diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/TerminalOps.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/TerminalOps.kt index ecae69f27..83eafcfd7 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/TerminalOps.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/TerminalOps.kt @@ -1,60 +1,108 @@ package arrow.fx.coroutines.stream /** - * DSL boundary to access terminal operators - * - * Terminal operators consume the stream - */ // TODO report inline results in Exception in thread "main" java.lang.VerifyError: Bad type on operand stack -/* inline */ class TerminalOps(private val s: Stream) { - - suspend fun toList(): List = - compiler(mutableListOf()) { acc, ch -> acc.apply { addAll(ch.toList()) } } - - suspend fun toSet(): Set = - compiler(mutableSetOf()) { acc, ch -> acc.apply { addAll(ch.toList()) } } - - suspend fun drain(): Unit = - foldChunks(Unit) { _, _ -> Unit } - - /** - * Compiles this stream in to a value, - * returning `null` if the stream emitted no values and returning the - * last value emitted if values were emitted. - * - * When this method has returned, the stream has not begun execution -- this method simply - * compiles the stream down to the target effect type. - */ - suspend fun lastOrNull(): O? = - foldChunks(null) { acc, c -> c.lastOrNull() ?: acc } - - /** - * Compiles this stream in to a value, - * raising a `NoSuchElementException` if the stream emitted no values - * and returning the last value emitted otherwise. - * - * When this method has returned, the stream has not begun execution -- this method simply - * compiles the stream down to the target effect type. - */ - suspend fun lastOrError(): O = - lastOrNull() ?: throw NoSuchElementException() - - /** - * Compiles this stream in to a value by folding - * the output chunks together, starting with the provided `init` and combining the - * current value with each output chunk. - * - * When this method has returned, the stream has not begun execution -- this method simply - * compiles the stream down to the target effect type. - */ - suspend fun foldChunks(init: B, f: (B, Chunk) -> B): B = - compiler(init, f) - - val resource: ResourceTerminalOps = - ResourceTerminalOps(s) - - private suspend fun compiler(init: () -> B, foldChunk: (B, Chunk) -> B): B = - s.asPull.compiler(init.invoke(), foldChunk) - - private suspend fun compiler(init: B, foldChunk: (B, Chunk) -> B): B = - s.asPull.compiler(init, foldChunk) -} + * Runs all the effects of this [Stream] and collects all emitted values into a [List]. + * If the [Stream] doesn't emit any values it returns [emptyList]. + * + * This a terminal operator, meaning this functions `suspend`s until the [Stream] finishes. + * If any errors are raised while streaming, it's thrown from this `suspend` scope. + */ +suspend fun Stream.toList(): List = + compiler(mutableListOf()) { acc, ch -> acc.apply { addAll(ch.toList()) } } + +/** + * Runs all the effects of this [Stream] and collects all emitted values into a [Set]. + * If the [Stream] doesn't emit any values it returns [emptySet]. + * + * This a terminal operator, meaning this functions `suspend`s until the [Stream] finishes. + * If any errors are raised while streaming, it's thrown from this `suspend` scope. + */ +suspend fun Stream.toSet(): Set = + compiler(mutableSetOf()) { acc, ch -> acc.apply { addAll(ch.toList()) } } + +/** + * Runs all the effects of this [Stream] and ignores all emitted values. + * + * This a terminal operator, meaning this functions `suspend`s until the [Stream] finishes. + * If any errors are raised while streaming, it's thrown from this `suspend` scope. + */ +suspend fun Stream.drain(): Unit = + foldChunks(Unit) { _, _ -> Unit } + +/** + * Runs the first effect of this [Stream], and returns `null` if the stream emitted a value + * and returns the value if emitted. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * + * //sampleStart + * suspend fun main(): Unit = + * Stream.range(0..1000) + * .firstOrNull() + * .let(::println) // 0 + * //sampleEnd + * ``` + * + * This a terminal operator, meaning this functions `suspend`s until the [Stream] finishes. + * If any errors are raised while streaming, it's thrown from this `suspend` scope. + */ +suspend fun Stream.firstOrNull(): O? = + take(1).foldChunks(null) { acc, c -> + acc ?: c.firstOrNull() + } + +/** + * Runs the first effect of this [Stream], raising a [NoSuchElementException] if the stream emitted no values + * and returns the value if emitted. + * + * This a terminal operator, meaning this functions `suspend`s until the [Stream] finishes. + * If any errors are raised while streaming, it's thrown from this `suspend` scope. + */ +suspend fun Stream.firstOrError(): O? = + firstOrNull() ?: throw NoSuchElementException() + +/** + * Runs all the effects of this [Stream], and returns `null` if the stream emitted no values + * and returning the last value emitted if values were emitted. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * + * //sampleStart + * suspend fun main(): Unit = + * Stream(1, 2, 3) + * .lastOrNull() + * .let(::println) // 3 + * //sampleEnd + * ``` + * + * This a terminal operator, meaning this functions `suspend`s until the [Stream] finishes. + * If any errors are raised while streaming, it's thrown from this `suspend` scope. + */ +suspend fun Stream.lastOrNull(): O? = + foldChunks(null) { acc, c -> c.lastOrNull() ?: acc } + +/** + * Runs all the effects of this [Stream], raising a [NoSuchElementException] if the stream emitted no values + * and returning the last value emitted otherwise. + * + * This a terminal operator, meaning this functions `suspend`s until the [Stream] finishes. + * If any errors are raised while streaming, it's thrown from this `suspend` scope. + */ +suspend fun Stream.lastOrError(): O = + lastOrNull() ?: throw NoSuchElementException() + +/** + * Folds all the effects of this stream in to a value by folding + * the output chunks together, starting with the provided [init] and combining the + * current value with each output chunk using [f] + * + * This a terminal operator, meaning this functions `suspend`s until the [Stream] finishes. + * If any errors are raised while streaming, it's thrown from this `suspend` scope. + */ +suspend fun Stream.foldChunks(init: B, f: (B, Chunk) -> B): B = + compiler(init, f) + +private suspend fun Stream.compiler(init: B, foldChunk: (B, Chunk) -> B): B = + asPull.compiler(init, foldChunk) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index fadf3d68c..f9dcafc34 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -35,7 +35,6 @@ interface EmitterSyntax { * emit(2, 3, 4) * end() * } - * .compile() * .toList() * .let(::println) //[1, 2, 3, 4] * //sampleEnd @@ -105,7 +104,6 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. * * val result = getUsernames() * .effectTap { println(it) } - * .compile() * .toList() * //sampleEnd * println(result) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/CancelBoundary.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/CancelBoundary.kt index 5712436c5..8b8c9c030 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/CancelBoundary.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/CancelBoundary.kt @@ -7,7 +7,6 @@ class CancelBoundary : StringSpec({ suspend fun forever(): Unit { while (true) { - println("I am getting dizzy...") cancelBoundary() // cancellable computation loop } } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt index bdd5c959c..235e6b096 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt @@ -31,7 +31,6 @@ class BracketTest : StreamSpec(spec = { events.recordBracketEvents(1) .effectMap { events.get() shouldBe listOf(Acquired.one) } .flatMap { use } - .compile() .drain() }.fold({ e -> if (e is RuntimeException) Unit else throw e @@ -60,7 +59,6 @@ class BracketTest : StreamSpec(spec = { .flatMap { events.recordBracketEvents(2) } .effectMap { events.get() shouldBe listOf(Acquired(1), Acquired(2)) } .flatMap { use } - .compile() .drain() }.fold({ e -> if (e is RuntimeException) Unit else throw e @@ -88,7 +86,6 @@ class BracketTest : StreamSpec(spec = { .append { events.recordBracketEvents(2).flatMap { use2 } } - .compile() .drain() }.fold({ e -> if (e is RuntimeException) Unit else throw e @@ -118,7 +115,7 @@ class BracketTest : StreamSpec(spec = { val counter = Counter() val innermost: Stream = - if (finalizerFail) Stream.bracket(counter::increment) { counter.decrement(); throw e }.drain() + if (finalizerFail) Stream.bracket(counter::increment) { counter.decrement(); throw e }.void() else Stream.raiseError(e) val nested = s0.foldRight(innermost) { i, acc -> @@ -128,7 +125,6 @@ class BracketTest : StreamSpec(spec = { assertThrowable { nested - .compile() .drain() } shouldBe e @@ -150,7 +146,7 @@ class BracketTest : StreamSpec(spec = { val fiveLevels = bracketed.take(i).take(j).take(k).take(j).take(i) val all = earlyTermination.append { twoLevels.append { twoLevels2.append { threeLevels.append { fiveLevels } } } } - all.compile().drain() + all.drain() counter.count() shouldBe 0L } } @@ -166,7 +162,6 @@ class BracketTest : StreamSpec(spec = { buffer += "FlatMapped" Stream(s) } - .compile() .toList() buffer shouldBe listOf( @@ -186,7 +181,6 @@ class BracketTest : StreamSpec(spec = { Stream.bracket({ counter.increment() }) { counter.decrement() } .flatMap { Stream(i) } } - .compile() .toList() shouldBe (0..bracketsInSequence).toList() counter.count() shouldBe 0 @@ -195,7 +189,6 @@ class BracketTest : StreamSpec(spec = { "evaluating a bracketed stream multiple times is safe" { val s = suspend { Stream.bracket({ Unit }, { Unit }) - .compile() .drain() } @@ -211,7 +204,6 @@ class BracketTest : StreamSpec(spec = { Stream.bracket({ i }, { i -> o = o + i }) .flatMap { acc } } - .compile() .drain() o shouldBe (0 until 10).toList() @@ -227,7 +219,6 @@ class BracketTest : StreamSpec(spec = { } } .attempt() - .compile() .drain() o shouldBe (0 until 10).toList() @@ -239,7 +230,6 @@ class BracketTest : StreamSpec(spec = { Either.catch { Stream.bracket({ Unit }, { throw e }) .flatMap { s } - .compile() .toList() } shouldBe Either.Left(e) } @@ -255,7 +245,6 @@ class BracketTest : StreamSpec(spec = { throw e }).flatMap { Stream.never() } .interruptWhen { Right(sleep(50.milliseconds)) } - .compile() .drain() } shouldBe e @@ -270,7 +259,6 @@ class BracketTest : StreamSpec(spec = { assertThrowable { s1.zip(s2) - .compile() .toList() } shouldBe e } @@ -283,7 +271,6 @@ class BracketTest : StreamSpec(spec = { assertThrowable { s1.zip(s2) - .compile() .toList() } shouldBe e } @@ -297,7 +284,6 @@ class BracketTest : StreamSpec(spec = { .flatMap { Stream.raiseError(e) } .handleErrorWith { Stream(1) } .flatMap { events.recordBracketEvents(2) } - .compile() .drain() events.get() shouldBe listOf(Acquired.one, Released.one, Acquired.two, Released.two) @@ -319,7 +305,7 @@ class BracketTest : StreamSpec(spec = { } val s2 = s.fold(Stream.empty()) { acc, ss -> acc.append { ss } } - s2.append { s2.take(10) }.take(10).compile().drain() + s2.append { s2.take(10) }.take(10).drain() counter.count() shouldBe 0L ecs.all { it is ExitCase.Completed } shouldBe true @@ -337,7 +323,7 @@ class BracketTest : StreamSpec(spec = { }.flatMap { s.append { Stream.raiseError(e) } } } val s2 = s.fold(Stream.empty()) { acc, i -> acc.append { i } } - Either.catch { s2.compile().drain() } + Either.catch { s2.drain() } counter.count() shouldBe 0L ecs.all { it == ExitCase.Failure(e) } shouldBe true } @@ -356,7 +342,7 @@ class BracketTest : StreamSpec(spec = { val f = ForkAndForget { parTupledN( - { s.compile().drain() }, + { s.drain() }, { latch.complete(Unit) } ) } @@ -381,7 +367,6 @@ class BracketTest : StreamSpec(spec = { s .interruptAfter(50.milliseconds) - .compile() .drain() counter.count() shouldBe 0L diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt index ffd655755..20356d146 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt @@ -37,7 +37,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { } effect.get() shouldBe false - s.compile().drain() + s.drain() effect.get() shouldBe true } } @@ -48,7 +48,6 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { list.forEach { emit(it) } end() } - .compile() .toList() shouldBe list } } @@ -60,7 +59,6 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { end() } .chunks() - .compile() .toList() shouldBe listOf(Chunk(*list)) } } @@ -72,7 +70,6 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { end() } .chunks() - .compile() .toList() shouldBe listOf(Chunk.iterable(list)) } } @@ -85,7 +82,6 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { end() } .chunks() - .compile() .toList() shouldBe listOf(ch, ch2) } } @@ -96,7 +92,6 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { countToCallback(4, { it }, { emit(it) }) { end() } } } - .compile() .toList() shouldBe listOf(1, 2, 3, 4, 5) } @@ -114,7 +109,6 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { if (it == 1) ref.get() shouldBe false else Unit } - .compile() .drain() } @@ -122,7 +116,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { checkAll(Arb.throwable()) { e -> val s = Stream.cancellable { throw e - }.compile() + } assertThrowable { s.drain() @@ -134,7 +128,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { checkAll(Arb.throwable()) { e -> val s = Stream.cancellable { e.suspend() - }.compile() + } assertThrowable { s.drain() @@ -153,7 +147,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { val f = ForkAndForget { parTupledN( - { s.compile().drain() }, + { s.drain() }, { latch.complete(Unit) } ) } @@ -180,7 +174,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { emit(Unit) done.complete(i) CancelToken.unit - }.compile() + } .lastOrError() } @@ -209,7 +203,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { end() CancelToken { effect.set(true) } } - .compile() + .drain() effect.get() shouldBe false diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CancellationTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CancellationTest.kt index 491f9f484..880cf4178 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CancellationTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CancellationTest.kt @@ -68,7 +68,7 @@ private suspend fun assertCancellable(fa: (latch: Promise) -> Stream p.complete(ex) } ) @@ -88,7 +88,6 @@ private suspend fun Stream.assertCancellable(): Unit { fa = { Stream.effect { start.complete(Unit) } .flatMap { this@assertCancellable } - .compile() .drain() }, finalizer = { ex -> p.complete(ex) } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ConcurrentlyTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ConcurrentlyTest.kt index bccae1f53..604e2de82 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ConcurrentlyTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ConcurrentlyTest.kt @@ -19,10 +19,9 @@ class ConcurrentlyTest : StreamSpec(spec = { "concurrently" - { "when background stream terminates, overall stream continues" { checkAll(Arb.stream(Arb.int()), Arb.stream(Arb.int())) { s1, s2 -> - val expected = s1.compile().toList() + val expected = s1.toList() s1.delayBy(25.milliseconds) .concurrently(s2) - .compile() .toList() shouldBe expected } } @@ -32,7 +31,6 @@ class ConcurrentlyTest : StreamSpec(spec = { assertThrowable { s.delayBy(25.milliseconds) .concurrently(Stream.raiseError(e)) - .compile() .drain() } shouldBe e } @@ -47,7 +45,6 @@ class ConcurrentlyTest : StreamSpec(spec = { assertThrowable { fg.concurrently(bg) .onFinalize { semaphore.acquire() } // Hangs if bg doesn't go through terminate - .compile() .drain() } shouldBe e } @@ -62,7 +59,6 @@ class ConcurrentlyTest : StreamSpec(spec = { fg.concurrently(bg) .onFinalize { semaphore.acquire() } // Hangs if bg doesn't go through terminate - .compile() .drain() } } @@ -72,7 +68,6 @@ class ConcurrentlyTest : StreamSpec(spec = { assertThrowable { s.concurrently(Stream.raiseError(e)) .effectTap { never() } - .compile() .drain() } shouldBe e } @@ -98,7 +93,6 @@ class ConcurrentlyTest : StreamSpec(spec = { Stream.bracket({ Unit }, { finRef.update { it + "Outer" } }) .flatMap { s.concurrently(runner) } .interruptWhen { Either.catch { halt.get() } } - .compile() .toList() } @@ -112,7 +106,7 @@ class ConcurrentlyTest : StreamSpec(spec = { } else { // still the outer finalizer shall be run, but there is no failure in `s` finalizers shouldBe listOf("Outer") - r shouldBe Either.Right(s.compile().toList()) + r shouldBe Either.Right(s.toList()) } } } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt index 6013a918a..e38e51efc 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/InterruptionTest.kt @@ -33,7 +33,6 @@ class InterruptionTest : StreamSpec(spec = { .effectMap { guaranteeCase({ latch.complete(Unit); never() }) { ex -> exit.complete(ex) } }.interruptWhen { Right(latch.get().also { sleep(20.milliseconds) }) } - .compile() .toList() } @@ -47,7 +46,6 @@ class InterruptionTest : StreamSpec(spec = { checkAll(Arb.stream(Arb.int())) { s -> s.effectMap { never() } .interruptWhen { Right(sleep(20.milliseconds)) } - .compile() .toList() shouldBe emptyList() } } @@ -61,7 +59,6 @@ class InterruptionTest : StreamSpec(spec = { s.effectMap { semaphore.acquire() } .interruptWhen(interrupt) } - .compile() .toList() shouldBe emptyList() } } @@ -70,7 +67,6 @@ class InterruptionTest : StreamSpec(spec = { checkAll(Arb.int()) { i -> Stream.constant(i) .interruptWhen { Right(sleep(20.milliseconds)) } - .compile() .drain() // Finishes and gets interrupted } } @@ -80,7 +76,6 @@ class InterruptionTest : StreamSpec(spec = { Stream.constant(i) .interruptWhen { Right(sleep(20.milliseconds)) } .flatMap { Stream(1) } - .compile() .drain() } } @@ -91,7 +86,6 @@ class InterruptionTest : StreamSpec(spec = { loop(0) .interruptWhen { Right(sleep(20.milliseconds)) } - .compile() .drain() } @@ -101,7 +95,6 @@ class InterruptionTest : StreamSpec(spec = { loop() .interruptWhen { Right(sleep(20.milliseconds)) } - .compile() .drain() } @@ -111,29 +104,25 @@ class InterruptionTest : StreamSpec(spec = { loop() .interruptWhen { Right(sleep(20.milliseconds)) } - .compile() .drain() } "effect stream" - { Stream.effect { Unit }.repeat() .interruptWhen { Right(sleep(20.milliseconds)) } - .compile() .drain() } "Constant drained stream" - { Stream.constant(true) .interruptWhen { Right(sleep(20.milliseconds)) } - .compile() .drain() } "terminates when interruption stream is infinitely false" - { checkAll(Arb.stream(Arb.int())) { s -> - val expected = s.compile().toList() + val expected = s.toList() s.interruptWhen(Stream.constant(false)) - .compile() .toList() shouldBe expected } } @@ -151,7 +140,6 @@ class InterruptionTest : StreamSpec(spec = { i } else i }.interruptWhen(interrupt) - .compile() // as soon as we hit a value divisible by 7, we enable interruption then hang before emitting it, // so there should be no elements in the output that are divisible by 7 // this also checks that interruption works fine even if one or both streams are in a hung state @@ -163,7 +151,6 @@ class InterruptionTest : StreamSpec(spec = { checkAll(Arb.stream(Arb.int())) { s -> s.interruptWhen { Right(sleep(20.milliseconds)) } .flatMap { Stream.never() } - .compile() .toList() shouldBe emptyList() } } @@ -176,9 +163,7 @@ class InterruptionTest : StreamSpec(spec = { .append { s } .interruptWhen { sleep(20.milliseconds); Either.Left(e) } .flatMap { Stream.effect_ { semaphore.acquire() } } - } - .compile() - .toList() + }.toList() } shouldBe Either.Left(e) } } @@ -187,39 +172,36 @@ class InterruptionTest : StreamSpec(spec = { Stream.never() .interruptWhen { Right(sleep(20.milliseconds)) } .append { Stream(5) } - .compile() .toList() shouldBe listOf(5) } "hang in effectMap and then resume on append" - { checkAll(Arb.stream(Arb.int())) { s -> - val expected = s.compile().toList() + val expected = s.toList() s.interruptWhen { Right(sleep(20.milliseconds)) } .effectMap { never() } - .drain() + .void() .append { s } - .compile() .toList() shouldBe expected } } "effectMap + filterOption and then resume on append" - { checkAll(Arb.stream(Arb.int())) { s -> - val expected = s.compile().toList() + val expected = s.toList() s.interruptWhen { Right(sleep(20.milliseconds)) } .effectMap { never>() } .append { s.map { Some(it) } } .filterOption() - .compile() .toList() shouldBe expected } } "interruption works when flatMap is followed by filterOption" - { checkAll(Arb.stream(Arb.int())) { s -> - val expected = s.compile().toList() + val expected = s.toList() s.append { Stream(1) } .interruptWhen { Right(sleep(50.milliseconds)) } @@ -232,7 +214,6 @@ class InterruptionTest : StreamSpec(spec = { } } .filterOption() - .compile() .toList() shouldBe expected } } @@ -260,7 +241,6 @@ class InterruptionTest : StreamSpec(spec = { .flatMap { Stream(it) } .interruptWhen { Right(latch.get()) } .through(p) - .compile() .toList() .let { result -> result shouldBe listOfNotNull(result.firstOrNull()) + result.drop(1).filter { it != 0 } @@ -283,7 +263,6 @@ class InterruptionTest : StreamSpec(spec = { .stream() .interruptScope() .append { Stream(5) } - .compile() .toList() shouldBe listOf(5) } @@ -292,7 +271,6 @@ class InterruptionTest : StreamSpec(spec = { .interruptWhen { Right(sleep(20.milliseconds)) } .effectMap { never() } .append { Stream(5) } - .compile() .toList() shouldBe listOf(5) } @@ -302,7 +280,6 @@ class InterruptionTest : StreamSpec(spec = { timeOutOrNull(500.milliseconds) { Stream.effect { guarantee(latch::get) { latch.complete(Unit) } } .interruptAfter(50.milliseconds) - .compile() .drain() latch.get() @@ -312,7 +289,7 @@ class InterruptionTest : StreamSpec(spec = { "nested-interrupt" - { io.kotest.property.checkAll(500, Arb.stream(Arb.int())) { s -> - val expected = s.compile().toList() + val expected = s.toList() s.interruptWhen { Right(sleep(50.milliseconds)) } .map { None } @@ -324,7 +301,6 @@ class InterruptionTest : StreamSpec(spec = { is Some -> Stream(Some(it.t)) } }.filterOption() - .compile() .toList() shouldBe expected } } @@ -333,7 +309,6 @@ class InterruptionTest : StreamSpec(spec = { Stream.effect { never() } .interruptWhen { never() } .interruptWhen { Right(Unit) } - .compile() .toList() shouldBe emptyList() } @@ -343,7 +318,6 @@ class InterruptionTest : StreamSpec(spec = { .append { Stream(1).delayBy(20.milliseconds) } .interruptWhen { Right(Unit) } .append { Stream(2) } - .compile() .toList() shouldBe listOf(2) } @@ -355,8 +329,7 @@ class InterruptionTest : StreamSpec(spec = { guaranteeCase({ latch.complete(Unit) Stream.never() - .compile() - .resource + .asResource() .drain() .use { Unit } }, { ex -> stop.complete(ex) }) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ParJoinTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ParJoinTest.kt index 028801955..d36a51b43 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ParJoinTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ParJoinTest.kt @@ -22,19 +22,17 @@ class ParJoinTest : StreamSpec(spec = { checkAll(Arb.stream(Arb.int())) { s -> s.map { Stream.just(it) } .parJoin(1) - .compile() - .toList() shouldBe s.compile().toList() + .toList() shouldBe s.toList() } } "concurrency" - { checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, n0 -> val n = (n0 % 20) + 1 - val expected = s.compile().toList().toSet() + val expected = s.toList().toSet() s.map { Stream(it) } .parJoin(n) - .compile() .toSet() shouldBe expected } } @@ -42,10 +40,9 @@ class ParJoinTest : StreamSpec(spec = { "concurrent flattening" - { checkAll(Arb.stream(Arb.stream(Arb.int())), Arb.positiveInts()) { s, n0 -> val n = n0 % 20 + 1 - val expected = s.flatten().compile().toSet() + val expected = s.flatten().toSet() s.parJoin(n) - .compile() .toSet() shouldBe expected } } @@ -62,7 +59,6 @@ class ParJoinTest : StreamSpec(spec = { } s.parJoinUnbounded() - .compile() .drain() } @@ -97,7 +93,6 @@ class ParJoinTest : StreamSpec(spec = { val r = Either.catch { prg0 .parJoinUnbounded() - .compile() .drain() } val finalizers = finalizerRef.get() @@ -113,13 +108,12 @@ class ParJoinTest : StreamSpec(spec = { val full = Stream.constant(42) val hang = Stream.effect { never() }.repeat() - val hang2 = full.drain() + val hang2 = full.void() "Can take from non-hanging stream on left" - { Stream(full, hang) .parJoin(10) .take(2) - .compile() .toList() shouldBe listOf(42, 42) } @@ -127,7 +121,6 @@ class ParJoinTest : StreamSpec(spec = { Stream(hang2, full) .parJoin(10) .take(1) - .compile() .toList() shouldBe listOf(42) } @@ -135,7 +128,6 @@ class ParJoinTest : StreamSpec(spec = { Stream(hang, full, hang2) .parJoin(10) .take(1) - .compile() .toList() shouldBe listOf(42) } @@ -144,7 +136,6 @@ class ParJoinTest : StreamSpec(spec = { assertThrowable { Stream(s, Stream.raiseError(e)) .parJoinUnbounded() - .compile() .drain() } shouldBe e } @@ -156,7 +147,6 @@ class ParJoinTest : StreamSpec(spec = { Stream(Stream.raiseError(e)) .parJoinUnbounded() .append { s } - .compile() .toList() } shouldBe e } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/PullTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/PullTest.kt index 52b55ad8b..b86ccf25b 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/PullTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/PullTest.kt @@ -11,7 +11,6 @@ class PullTest : ArrowFxSpec(spec = { "pull can output chunks" { checkAll(Arb.chunk(Arb.int())) { ch -> Stream(Pull.output(ch)) - .compile() .toList() shouldBe ch.toList() } } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ScopeTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ScopeTest.kt index b5201cd7b..1aafb95e6 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ScopeTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ScopeTest.kt @@ -41,7 +41,6 @@ class ScopeTest : ArrowFxSpec(spec = { } } .attempt() - .compile() .drain() buffer shouldBe (0..10).toList() @@ -54,21 +53,18 @@ class ScopeTest : ArrowFxSpec(spec = { .flatMap { Stream.raiseError(RuntimeException()) } .handleErrorWith { Stream.empty() } .append { events.recordBracketEvents(2) } - .compile() .drain() events.get() shouldBe listOf(Acquired(1), Released(1), Acquired(2), Released(2)) } - // TODO Fix scoping "last scope extended, not all scopes - 1" { val st = Atomic(emptyList()) Stream.just("start") .onFinalize { st.record("first finalize") } .onFinalize { st.record("second finalize") } - .compile() - .resource + .asResource() .lastOrError() .use { st.record(it) } @@ -83,8 +79,7 @@ class ScopeTest : ArrowFxSpec(spec = { Stream.bracket({ "c" }, { st.record("third finalize") }) } } - .compile() - .resource + .asResource() .lastOrError() .use { st.record(it) } @@ -105,7 +100,6 @@ class ScopeTest : ArrowFxSpec(spec = { .scope() .repeat() .take(4) - .compile() .drain() c.get() shouldBe 0 @@ -121,7 +115,6 @@ class ScopeTest : ArrowFxSpec(spec = { } .scope() .append { Stream.effect { counter.count() } } - .compile() .toList() shouldBe listOf(1, 1, 0) } @@ -148,7 +141,6 @@ class ScopeTest : ArrowFxSpec(spec = { .foldMap(Stream.monoid(), Stream.Companion::resource) .effectTap { st.record("use") } .append { Stream.effect_ { st.record("done") } } - .compile() .drain() st.get() shouldBe listOf( @@ -190,7 +182,6 @@ class ScopeTest : ArrowFxSpec(spec = { .foldMap(Stream.monoid(), Stream.Companion::resourceWeak) .effectTap { st.record("use") } .append { Stream.effect_ { st.record("done") } } - .compile() .drain() st.get() shouldBe listOf( diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt index 1ed60a0a4..c1eaf7cea 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt @@ -33,19 +33,17 @@ class StreamTest : StreamSpec(spec = { "constructors" - { "empty() is empty" { Stream.empty() - .compile() .toList() shouldBe emptyList() } "Stream.unit" { Stream.unit - .compile() .toList() shouldBe listOf(Unit) } "never() should timeout" { timeOutOrNull(10.milliseconds) { - Stream.never().compile().toList() + Stream.never().toList() } shouldBe null } @@ -54,7 +52,6 @@ class StreamTest : StreamSpec(spec = { checkAll(Arb.throwable()) { e -> assertThrowable { Stream.raiseError(e) - .compile() .drain() } shouldBe e } @@ -65,7 +62,6 @@ class StreamTest : StreamSpec(spec = { assertThrowable { Stream.just(1) .append { Stream.raiseError(e) } - .compile() .drain() } shouldBe e } @@ -76,7 +72,6 @@ class StreamTest : StreamSpec(spec = { Stream.just(i) .append { Stream.raiseError(e) } .take(1) - .compile() .toList() shouldBe listOf(i) } } @@ -86,7 +81,6 @@ class StreamTest : StreamSpec(spec = { checkAll(Arb.chunk(Arb.int())) { ch -> Stream.chunk(ch) .chunks() - .compile() .toList() shouldBe listOf(ch) } } @@ -94,7 +88,6 @@ class StreamTest : StreamSpec(spec = { "effect" { checkAll(Arb.int()) { i -> Stream.effect { i } - .compile() .toList() shouldBe listOf(i) } } @@ -103,7 +96,6 @@ class StreamTest : StreamSpec(spec = { checkAll(Arb.int()) { i -> var effect: Int? = null Stream.effect_ { effect = i } - .compile() .drain() effect shouldBe i } @@ -112,7 +104,6 @@ class StreamTest : StreamSpec(spec = { "effectUnChunk" { checkAll(Arb.chunk(Arb.int())) { ch -> Stream.effectUnChunk { ch } - .compile() .toList() shouldBe ch.toList() } } @@ -122,7 +113,6 @@ class StreamTest : StreamSpec(spec = { Stream.constant(i, n) .take(n * 2) .chunks() - .compile() .toList() shouldBe if (n == 0) emptyList() else listOf(Chunk(n) { i }, Chunk(n) { i }) } } @@ -130,7 +120,6 @@ class StreamTest : StreamSpec(spec = { "iterable" { checkAll(Arb.set(Arb.int())) { s -> Stream.iterable(s) - .compile() .toSet() shouldBe s } } @@ -138,7 +127,6 @@ class StreamTest : StreamSpec(spec = { "iterate" { Stream.iterate(0, Int::inc) .take(100) - .compile() .toList() shouldBe (0..99).toList() } @@ -148,14 +136,12 @@ class StreamTest : StreamSpec(spec = { Stream.iterateEffect(0) { it.increment() } .take(100) - .compile() .toList() shouldBe (0..99).toList() } "range(IntRange).toList() - IntRange.toList()" { checkAll(Arb.intRange(min = -500, max = 500)) { range -> Stream.range(range) - .compile() .toList() shouldBe range.toList() } } @@ -163,7 +149,6 @@ class StreamTest : StreamSpec(spec = { "range(LongRange).toList() - LongRange.toList()" { checkAll(Arb.longRange(min = -500, max = 500)) { range -> Stream.range(range) - .compile() .toList() shouldBe range.toList() } } @@ -171,7 +156,6 @@ class StreamTest : StreamSpec(spec = { "range(CharRange).toList() - CharRange.toList()" { checkAll(Arb.charRange()) { range -> Stream.range(range) - .compile() .toList() shouldBe range.toList() } } @@ -181,7 +165,6 @@ class StreamTest : StreamSpec(spec = { if (f1 <= 13) Pair(Pair(f1, f2), Pair(f2, f1 + f2)) else null }.map { it.first } - .compile() .toList() shouldBe listOf(0, 1, 1, 2, 3, 5, 8, 13) } @@ -189,13 +172,11 @@ class StreamTest : StreamSpec(spec = { Stream.unfoldChunk(4L) { s -> if (s > 0) Pair(Chunk.longs(longArrayOf(s, s)), s - 1) else null - }.compile() - .toList() shouldBe listOf(4L, 4, 3, 3, 2, 2, 1, 1) + }.toList() shouldBe listOf(4L, 4, 3, 3, 2, 2, 1, 1) } "unfoldEffect" { Stream.unfoldEffect(10) { s -> if (s > 0) Pair(s, s - 1) else null } - .compile() .toList() shouldBe (10 downTo 1).toList() } @@ -204,14 +185,12 @@ class StreamTest : StreamSpec(spec = { if (s) Pair(Chunk.booleans(booleanArrayOf(s)), false) else null } - .compile() .toList() shouldBe listOf(true) } "emits - array.toList()" { checkAll(Arb.list(Arb.int())) { ints -> Stream.emits(*ints.toTypedArray()) - .compile() .toList() shouldBe ints } } @@ -219,7 +198,6 @@ class StreamTest : StreamSpec(spec = { "invoke - array.toList()" { checkAll(Arb.list(Arb.int())) { ints -> Stream.emits(*ints.toTypedArray()) - .compile() .toList() shouldBe ints } } @@ -229,7 +207,6 @@ class StreamTest : StreamSpec(spec = { val n = n0 % 20 Stream.random(seed) .take(n) - .compile() .toList() shouldBe Random(seed).run { List(max(n, 0)) { nextInt() } } } } @@ -238,20 +215,19 @@ class StreamTest : StreamSpec(spec = { "filterNotNull" { checkAll(Arb.stream(Arb.int().orNull())) { s -> s.filterNotNull() - .compile() - .toList() shouldBe s.compile().toList().filterNotNull() + .toList() shouldBe s.toList().filterNotNull() } } "append" { checkAll(Arb.stream(Arb.int()), Arb.stream(Arb.int())) { s1, s2 -> - s1.append { s2 }.compile().toList() shouldBe s1.compile().toList() + s2.compile().toList() + s1.append { s2 }.toList() shouldBe s1.toList() + s2.toList() } } "flatMap" { checkAll(Arb.stream(Arb.int()), Arb.stream(Arb.int())) { s1, s2 -> - s1.flatMap { s2 }.compile().toList() shouldBe s1.compile().toList().flatMap { s2.compile().toList() } + s1.flatMap { s2 }.toList() shouldBe s1.toList().flatMap { s2.toList() } } } @@ -262,27 +238,25 @@ class StreamTest : StreamSpec(spec = { s .take(n) - .compile() - .toList() shouldBe s.compile().toList().take(max(n, 0)) + .toList() shouldBe s.toList().take(max(n, 0)) } } "takeLast" { checkAll(Arb.stream(Arb.int()), Arb.int(-10..1000)) { s, n0 -> val n = n0 % 20 - s.takeLast(n).compile().toList() shouldBe s.compile().toList().takeLast(max(0, n)) + s.takeLast(n).toList() shouldBe s.toList().takeLast(max(0, n)) } } "takeWhile - identity" { checkAll(Arb.stream(Arb.int()), Arb.int(-10..1000)) { s, n0 -> val n = n0 % 20 - val l = s.compile().toList() + val l = s.toList() val set = l.take(max(0, n)).toSet() s .takeWhile(set::contains) - .compile() .toList() shouldBe l.takeWhile(set::contains) } } @@ -290,7 +264,7 @@ class StreamTest : StreamSpec(spec = { "takeThrough" { checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, n0 -> val n = n0 % 20 + 1 - val l = s.compile().toList() + val l = s.toList() val isEven = { i: Int -> i % n == 0 } val head = l.takeWhile(isEven) @@ -299,7 +273,6 @@ class StreamTest : StreamSpec(spec = { s .takeThrough(isEven) - .compile() .toList() shouldBe rhs } } @@ -310,46 +283,43 @@ class StreamTest : StreamSpec(spec = { checkAll(Arb.stream(Arb.int().orNull()), Arb.int()) { s, n -> s .drop(n) - .compile() - .toList() shouldBe s.compile().toList().drop(max(n, 0)) + .toList() shouldBe s.toList().drop(max(n, 0)) } } "last" { checkAll(Arb.stream(Arb.int()), Arb.int(-10..1000)) { s, n0 -> val n = n0 % 10 - s.dropLast(n).compile().toList() shouldBe s.compile().toList().dropLast(max(0, n)) + s.dropLast(n).toList() shouldBe s.toList().dropLast(max(0, n)) } } "tail" { checkAll(Arb.stream(Arb.int())) { s -> - s.tail().compile().toList() shouldBe s.compile().toList().drop(1) + s.tail().toList() shouldBe s.toList().drop(1) } } "dropWhile" { checkAll(Arb.stream(Arb.int()), Arb.int(-10..1000)) { s, n0 -> - val l = s.compile().toList() + val l = s.toList() val n = n0 % 10 val set = l.take(max(0, n)).toSet() s.dropWhile(set::contains) - .compile() .toList() shouldBe l.dropWhile(set::contains) } } "dropThrough" { checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, n -> - val l = s.compile().toList() + val l = s.toList() val set = l.take(n).toSet() val expected = l.dropWhile(set::contains).drop(1) s .dropThrough(set::contains) - .compile() .toList() shouldBe expected } } @@ -358,13 +328,13 @@ class StreamTest : StreamSpec(spec = { "chunk" - { "chunked" { val s = Stream(1, 2).append { Stream(3, 4) } - s.take(3).chunks().compile().toList() shouldBe listOf(Chunk(1, 2), Chunk(3)) + s.take(3).chunks().toList() shouldBe listOf(Chunk(1, 2), Chunk(3)) } "map identity" { checkAll(Arb.list(Arb.list(Arb.int()))) { lli -> val s = lli.foldMap(Stream.monoid()) { Stream.iterable(it) } - s.chunks().map { it.toList() }.compile().toList() shouldBe lli + s.chunks().map { it.toList() }.toList() shouldBe lli } } @@ -372,28 +342,28 @@ class StreamTest : StreamSpec(spec = { checkAll(Arb.list(Arb.list(Arb.int()))) { lli -> val s = if (lli.isEmpty()) Stream.empty() else lli.map { Stream.iterable(it) }.reduce { a, b -> a.append { b } } - s.chunks().flatMap { Stream.chunk(it) }.compile().toList() shouldBe lli.flatten() + s.chunks().flatMap { Stream.chunk(it) }.toList() shouldBe lli.flatten() } } "chunkLimit" { checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, n0 -> val n = n0 % 20 + 1 - val sizes = s.chunkLimit(n).compile().toList().map { it.size() } + val sizes = s.chunkLimit(n).toList().map { it.size() } sizes.all { it <= n } shouldBe true - sizes.combineAll(Int.monoid()) shouldBe s.compile().toList().size + sizes.combineAll(Int.monoid()) shouldBe s.toList().size } } "chunkMin" { checkAll(Arb.stream(Arb.int()), Arb.int()) { s, n0 -> val n = n0 % 20 + 1 - val chunked = s.chunkMin(n, true).compile().toList() - val chunkedSmaller = s.chunkMin(n, false).compile().toList() - val unchunked = s.compile().toList() - val smallerSet = s.take(n - 1).compile().toList() - val smallerN = s.take(n - 1).chunkMin(n, false).compile().toList() - val smallerY = s.take(n - 1).chunkMin(n, true).compile().toList() + val chunked = s.chunkMin(n, true).toList() + val chunkedSmaller = s.chunkMin(n, false).toList() + val unchunked = s.toList() + val smallerSet = s.take(n - 1).toList() + val smallerN = s.take(n - 1).chunkMin(n, false).toList() + val smallerY = s.take(n - 1).chunkMin(n, true).toList() // All but last list have n values chunked.dropLast(1).all { it.size() >= n } // Equivalent to last chunk with allowFewerTotal @@ -414,7 +384,6 @@ class StreamTest : StreamSpec(spec = { "repartition" { Stream("Hel", "l", "o Wor", "ld") .repartition(String.semigroup()) { s -> Chunk.iterable(s.split(" ")) } - .compile() .toList() shouldBe listOf("Hello", "World") } @@ -423,8 +392,7 @@ class StreamTest : StreamSpec(spec = { checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, n -> s .buffer(n) - .compile() - .toList() shouldBe s.compile().toList() + .toList() shouldBe s.toList() } } @@ -436,7 +404,6 @@ class StreamTest : StreamSpec(spec = { s2.effectMap { i -> counter += 1; i } .buffer(n) .take(n + 1) - .compile() .drain() counter shouldBe (n * 2) @@ -448,14 +415,13 @@ class StreamTest : StreamSpec(spec = { "identity" { checkAll(Arb.stream(Arb.int())) { s -> s.bufferAll() - .compile() - .toList() shouldBe s.compile().toList() + .toList() shouldBe s.toList() } } "buffers results of effectMap" { checkAll(Arb.stream(Arb.int())) { s -> - val size = s.compile().toList().size + val size = s.toList().size val expected = size * 2 var counter = 0 @@ -463,7 +429,6 @@ class StreamTest : StreamSpec(spec = { .effectMap { i -> counter += 1; i } .bufferAll() .take(size + 1) - .compile() .drain() counter shouldBe expected @@ -475,14 +440,13 @@ class StreamTest : StreamSpec(spec = { "identity" { checkAll(Arb.stream(Arb.int())) { s -> s.bufferBy { it >= 0 } - .compile() - .toList() shouldBe s.compile().toList() + .toList() shouldBe s.toList() } } "buffers results of effectMap" { checkAll(Arb.stream(Arb.int())) { s -> - val size = s.compile().toList().size + val size = s.toList().size val expected = size * 2 + 1 var counter = 0 @@ -496,7 +460,6 @@ class StreamTest : StreamSpec(spec = { s3.bufferBy { it >= 0 } .take(size + 2) - .compile() .drain() counter shouldBe expected @@ -509,7 +472,6 @@ class StreamTest : StreamSpec(spec = { checkAll(Arb.throwable(), Arb.int()) { e, i -> Stream.raiseError(e) .handleErrorWith { Stream.just(i) } - .compile() .toList() shouldBe listOf(i) } } @@ -519,7 +481,6 @@ class StreamTest : StreamSpec(spec = { val effect = SideEffect() Stream.just(i) .handleErrorWith { Stream.effect { effect.increment() } } - .compile() .toList() shouldBe listOf(i) effect.counter shouldBe 0 @@ -533,7 +494,6 @@ class StreamTest : StreamSpec(spec = { infinite .take(n) - .compile() .toList() shouldBe List(n) { i } } } @@ -541,21 +501,18 @@ class StreamTest : StreamSpec(spec = { "terminateOn" { Stream(1, 2, 3, 4) .terminateOn { it % 3 == 0 } - .compile() .toList() shouldBe listOf(1, 2) } "terminateOnNull" { Stream(1, 2, null, 4) .terminateOnNull() - .compile() .toList() shouldBe listOf(1, 2) } "terminateOnNone" { Stream(Some(1), Some(2), None, Some(4)) .terminateOnNone() - .compile() .toList() shouldBe listOf(1, 2) } }) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/concurrent/QueueTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/concurrent/QueueTest.kt index 6044f28ba..f42a92b53 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/concurrent/QueueTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/concurrent/QueueTest.kt @@ -7,10 +7,10 @@ import arrow.fx.coroutines.StreamSpec import arrow.fx.coroutines.milliseconds import arrow.fx.coroutines.stream.Stream import arrow.fx.coroutines.stream.append -import arrow.fx.coroutines.stream.compile import arrow.fx.coroutines.stream.noneTerminate import arrow.fx.coroutines.stream.parJoinUnbounded import arrow.fx.coroutines.stream.terminateOnNone +import arrow.fx.coroutines.stream.toList import arrow.fx.coroutines.timeOutOrNull import io.kotest.assertions.assertSoftly import io.kotest.matchers.ints.shouldBeLessThan @@ -26,7 +26,7 @@ class QueueTest : StreamSpec(spec = { val q = Queue.unbounded() q.tryOffer1(i) shouldBe true - q.dequeue().take(1).compile().toList() shouldBe listOf(i) + q.dequeue().take(1).toList() shouldBe listOf(i) } } @@ -49,35 +49,33 @@ class QueueTest : StreamSpec(spec = { "unbounded producer/consumer" { checkAll(Arb.stream(Arb.int())) { s -> - val expected = s.compile().toList() + val expected = s.toList() val n = expected.size val q = Queue.unbounded() Stream( q.dequeue(), - s.through(q.enqueue()).drain() + s.through(q.enqueue()).void() ).parJoinUnbounded() .take(n) - .compile() .toList() shouldBe expected } } "dequeueAvailable" { checkAll(Arb.stream(Arb.int())) { s -> - val expected = s.compile().toList() + val expected = s.toList() val q = Queue.unbounded>() val res = s.noneTerminate() .through(q.enqueue()) - .drain() + .void() .append { q.dequeueChunk(Int.MAX_VALUE) .terminateOnNone() .chunks() } - .compile() .toList() assertSoftly { @@ -90,18 +88,17 @@ class QueueTest : StreamSpec(spec = { "dequeueBatch unbounded" { checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, batchSize0 -> val batchSize = batchSize0 % 20 + 1 - val expected = s.compile().toList() + val expected = s.toList() val q = Queue.unbounded>() s.noneTerminate() .effectMap { q.enqueue1(it) } - .drain() + .void() .append { Stream.constant(batchSize) .through(q.dequeueBatch()) .terminateOnNone() } - .compile() .toList() shouldBe expected } } @@ -109,15 +106,14 @@ class QueueTest : StreamSpec(spec = { "Queue.sliding - accepts maxSize elements while sliding over capacity" { checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, maxSize0 -> val maxSize = maxSize0 % 20 + 1 - val expected = s.compile().toList().takeLast(maxSize) + val expected = s.toList().takeLast(maxSize) val q = Queue.sliding>(maxSize) s.noneTerminate() .effectMap { q.enqueue1(it) } - .drain() + .void() .append { q.dequeue().terminateOnNone() } - .compile() .toList() shouldBe expected } } @@ -126,17 +122,16 @@ class QueueTest : StreamSpec(spec = { checkAll(Arb.stream(Arb.int()), Arb.positiveInts(), Arb.positiveInts()) { s, maxSize0, batchSize0 -> val maxSize = maxSize0 % 20 + 1 val batchSize = batchSize0 % 20 + 1 - val expected = s.compile().toList().takeLast(maxSize) + val expected = s.toList().takeLast(maxSize) val q = Queue.sliding>(maxSize) s.noneTerminate() .effectMap { q.enqueue1(it) } - .drain().append { + .void().append { Stream.constant(batchSize) .through(q.dequeueBatch()) .terminateOnNone() } - .compile() .toList() shouldBe expected } } @@ -144,16 +139,15 @@ class QueueTest : StreamSpec(spec = { "Queue.dropping - accepts maxSize elements while dropping over capacity" { checkAll(Arb.stream(Arb.int()), Arb.positiveInts()) { s, maxSize0 -> val maxSize = maxSize0 % 20 + 1 - val expected = s.compile().toList().take(maxSize) + val expected = s.toList().take(maxSize) val q = Queue.dropping(maxSize) s.effectMap { q.enqueue1(it) } - .drain() + .void() .append { q.dequeue().take(expected.size) } - .compile() .toList() shouldBe expected } } @@ -162,16 +156,15 @@ class QueueTest : StreamSpec(spec = { checkAll(Arb.stream(Arb.int()), Arb.positiveInts(), Arb.positiveInts()) { s, maxSize0, batchSize0 -> val maxSize = maxSize0 % 20 + 1 val batchSize = batchSize0 % 20 + 1 - val expected = s.compile().toList().take(maxSize) + val expected = s.toList().take(maxSize) val q = Queue.dropping(maxSize) s.effectMap { q.enqueue1(it) } - .drain().append { + .void().append { Stream.constant(batchSize) .through(q.dequeueBatch()) .take(expected.size) } - .compile() .toList() shouldBe expected } } @@ -179,7 +172,7 @@ class QueueTest : StreamSpec(spec = { "dequeue releases subscriber on " - { "interrupt" { val q = Queue.unbounded() - q.dequeue().interruptAfter(100.milliseconds).compile().drain() + q.dequeue().interruptAfter(100.milliseconds).void() q.enqueue1(1) q.enqueue1(2) q.dequeue1() shouldBe 1