Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Remove TerminalOps from Stream #260

Merged
merged 6 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ internal suspend fun <O> runInner(
outputQ.enqueue1(Some(s))
}
.interruptWhen(done.map { it.isDefined() }) // must be AFTER enqueue to the sync queue, otherwise the process may hang to enq last item while being interrupted
.compile()
.drain()
}.swap().orNull()
val e2 = lease.cancel().swap().orNull()
Expand Down Expand Up @@ -104,7 +103,6 @@ internal suspend fun <O> Stream<Stream<O>>.runOuter(
runInner(ctx, inner, done, outputQ, running, available, outerScope)
}
}.interruptWhen(done.map { it.isDefined() })
.compile()
.drain()
}

Expand Down Expand Up @@ -165,7 +163,6 @@ internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable
* }
* }
* .parJoin(10, IOPool)
* .compile()
* .drain()
* //sampleEnd
* ```
Expand Down Expand Up @@ -196,7 +193,6 @@ fun <O> Stream<Stream<O>>.parJoin(
running.discrete() // Await everyone stop running
.dropWhile { it > 0 }
.take(1)
.compile()
.drain()

signalResult(done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,19 +590,6 @@ fun <O> Pull<O, Unit>.firstOrNull(f: (O) -> Boolean): Pull<Nothing, PullUncons1<
}
}

/** Returns the last element of the input, if non-empty. */
fun <O> Pull<O, Unit>.lastOrNull(): Pull<Nothing, O?> {
fun go(prev: O?, s: Pull<O, Unit>): Pull<Nothing, O?> =
s.unconsOrNull().flatMap { uncons ->
when (uncons) {
null -> Pull.just(prev)
else -> go(uncons.head.lastOrNull() ?: prev, uncons.tail)
}
}

return go(null, this)
}

/** Writes a single `true` value if all input matches the predicate, `false` otherwise. */
fun <O> Pull<O, Unit>.forall(p: (O) -> Boolean): Pull<Nothing, Boolean> =
unconsOrNull().flatMap { uncons ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,55 @@ package arrow.fx.coroutines.stream

import arrow.fx.coroutines.Resource

/** Opens DSL to consume [Stream] as a [Resource]. */
fun <O> Stream<O>.asResource(): ResourceOps<O> =
ResourceOps(this)

/**
* DSL boundary to access terminal operators as resources
* DSL boundary to access terminal operators as a [Resource]
* Allows for consume a [Stream] as a [Resource],
* meaning the root scope of the [Stream] remains open until [Resource.use] returns.
*
* This allows for safe consumption of streaming resources in terminal operators,
* and inside the [Resource.use] combinator.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
* import arrow.fx.coroutines.Atomic
*
* class Logger {
* private val state = Atomic.unsafe(emptyList<String>())
*
* suspend fun log(msg: String): Unit =
* state.update { it + msg }
*
* suspend fun dumpLog(): Unit =
* println(state.get())
* }
*
* fun openFileWithName(name: String): String =
* "File($name)"
*
* //sampleStart
* suspend fun main(): Unit {
* val logger = Logger()
*
* Stream.bracket({ openFileWithName("a") }, { name -> logger.log("finalizing: $name") })
* .append { Stream.bracket({ openFileWithName("b") }, { name -> logger.log("finalizing: $name") }) }
* .asResource()
* .lastOrError()
* .use { last -> logger.log("Using $last") }
*
* logger.dumpLog() // [finalizing: File(a), Using File(b), finalizing: File(b)]
* }
* //sampleEnd
* ```
*
* Terminal operators consume the stream
*/ // TODO report inline results in Exception in thread "main" java.lang.VerifyError: Bad type on operand stack
/* inline */ class ResourceTerminalOps<O>(private val s: Stream<O>) {
* As you can see here, we can `use` the `last` streamed `File` before it gets closed by the Stream.
* Since we consumed the Stream as `asResource().lastOrError()`, this extends the last scope to the returned `Resource`,
* so we can safely `use` it and the `Stream` still properly closes all resources opened with `bracket`.
*/
class ResourceOps<O>(private val s: Stream<O>) {

suspend fun toList(): Resource<List<O>> =
compiler(mutableListOf()) { acc, ch -> acc.apply { addAll(ch.toList()) } }
Expand Down
Loading